Documentation
¶
Overview ¶
Package raft implements the Raft consensus algorithm.
Index ¶
- Constants
- Variables
- func Version() string
- type Actor
- type Benchmark
- type BlastBenchmark
- type Callback
- type Client
- type ComplexValidator
- type Config
- func (c *Config) GetLogLevel() zerolog.Level
- func (c *Config) GetName() (name string, err error)
- func (c *Config) GetPath() (string, error)
- func (c *Config) GetPeer() (peers.Peer, error)
- func (c *Config) GetRemotes() (remotes []peers.Peer, err error)
- func (c *Config) GetTick() (time.Duration, error)
- func (c *Config) GetTimeout() (time.Duration, error)
- func (c *Config) GetUptime() (time.Duration, error)
- func (c *Config) IsLeader() bool
- func (c *Config) Load() error
- func (c *Config) Update(o *Config) error
- func (c *Config) Validate() error
- type Election
- type Event
- type EventType
- type FixedInterval
- type Interval
- type Log
- func (l *Log) After(index uint64) ([]*pb.LogEntry, error)
- func (l *Log) Append(entries ...*pb.LogEntry) error
- func (l *Log) AsUpToDate(lastIndex, lastTerm uint64) bool
- func (l *Log) Commit(index uint64) error
- func (l *Log) CommitIndex() uint64
- func (l *Log) CommitTerm() uint64
- func (l *Log) Create(name string, value []byte, term uint64) (*pb.LogEntry, error)
- func (l *Log) Get(index uint64) (*pb.LogEntry, error)
- func (l *Log) LastApplied() uint64
- func (l *Log) LastCommit() *pb.LogEntry
- func (l *Log) LastEntry() *pb.LogEntry
- func (l *Log) LastTerm() uint64
- func (l *Log) Prev(index uint64) (*pb.LogEntry, error)
- func (l *Log) Truncate(index, term uint64) error
- type Metrics
- type RandomInterval
- type Remote
- type Replica
- func (r *Replica) AppendEntries(stream pb.Raft_AppendEntriesServer) (err error)
- func (r *Replica) CheckCommits() error
- func (r *Replica) CheckRPCTerm(term uint64) (updated bool, err error)
- func (r *Replica) Close() error
- func (r *Replica) Commit(ctx context.Context, in *pb.CommitRequest) (*pb.CommitReply, error)
- func (r *Replica) CommitEntry(entry *pb.LogEntry) error
- func (r *Replica) Dispatch(e Event) error
- func (r *Replica) DropEntry(entry *pb.LogEntry) error
- func (r *Replica) Handle(e Event) error
- func (r *Replica) Listen() error
- func (r *Replica) Quorum() *Election
- func (r *Replica) RequestVote(ctx context.Context, in *pb.VoteRequest) (*pb.VoteReply, error)
- type SimpleBenchmark
- type State
- type StateMachine
- type Ticker
Constants ¶
const ( VersionMajor = 1 VersionMinor = 0 VersionPatch = 0 VersionReleaseLevel = "dev" VersionReleaseNumber = 6 )
Version component constants for the current build.
const DefaultRetries = 3
DefaultRetries specifies the number of times to attempt a commit.
Variables ¶
var ( ErrCommitIndex = errors.New("commit index does not refer to an entry in the log") ErrAlreadyCommitted = errors.New("commit index precedes current commit index") ErrMissingCommit = errors.New("cannot commit entry higher than found in log") ErrNotImplemented = errors.New("functionality not implemented yet") ErrEventTypeError = errors.New("captured event with wrong value type") ErrEventSourceError = errors.New("captured event with wrong source type") ErrUnknownState = errors.New("raft in an unknown state") ErrNotListening = errors.New("replica is not listening for events") ErrRetries = errors.New("could not connect after several attempts") ErrNoNetwork = errors.New("no network specified in the configuration") ErrBenchmarkMode = errors.New("specify either fixed duration or maximum operations benchmark mode") ErrBenchmarkRun = errors.New("benchmark has already been run") )
Standard errors for primary operations.
var GitVersion string
Set the GitVersion via -ldflags="-X 'github.com/bbengfort/raft.GitVersion=$(git rev-parse --short HEAD)'"
Functions ¶
Types ¶
type Actor ¶
type Actor interface { Listen() error // Run the actor model listen for events and handle them Close() error // Stop the actor from receiving new events (handles remaining pending events) Dispatch(Event) error // Outside callers can dispatch events to the actor Handle(Event) error // Handler method for each event in sequence }
Actor objects listen for events (messages) and then can create more actors, send more messages or make local decisions that modify their own private state. Actors implement lockless concurrent operation (indeed, none of the structs in this package implement mutexes and are not thread safe independently). Concurrency here is based on the fact that only a single actor is initialized and reads event objects one at a time off of a buffered channel. All actor methods should be private as a result so they are not called from other threads.
type Benchmark ¶
type Benchmark interface { Run(addr string) error // execute the benchmark, may return an error if already run CSV(header bool) (string, error) // returns a CSV representation of the results JSON(indent int) ([]byte, error) // returns a JSON representation of the results }
Benchmark defines the interface for all benchmark runners, both for execution as well as the delivery of results. A single benchmark is executed once and stores its internal results to be saved to disk.
func NewBenchmark ¶
func NewBenchmark(options *Config, addr string, blast bool, N, S, C uint) (bench Benchmark, err error)
NewBenchmark creates either a blast or a simple benchmark depending on the blast boolean flag. In blast mode, N operations is executed simultaneously against the cluster putting a unique key with a random value of size S. In simple mode, C workers executes N requests each, putting a unique key with a random value of size S. Note that C is ignored in blast mode.
type BlastBenchmark ¶
type BlastBenchmark struct {
// contains filtered or unexported fields
}
BlastBenchmark implements Benchmark by sending n Put requests to the specified server each in its own thread. It then records the total time it takes to complete all n requests and uses that to compute the throughput. Additionally, each thread records the latency of each request, so that outlier requests can be removed from the blast computation.
Note: this benchmark Puts a unique key and short value to the server, its intent is to compute pedal to the metal write throughput.
func (*BlastBenchmark) CSV ¶
CSV returns a results row delimited by commas as:
requests,failures,duration,throughput,version,benchmark
If header is specified then string contains two rows with the header first.
func (*BlastBenchmark) Complete ¶
func (b *BlastBenchmark) Complete() bool
Complete returns true if requests and duration is greater than 0.
func (*BlastBenchmark) JSON ¶
JSON returns a results row as a json object, formatted with or without the number of spaces specified by indent. Use no indent for JSON lines format.
func (*BlastBenchmark) Run ¶
func (b *BlastBenchmark) Run(addr string) (err error)
Run the blast benchmark against the system by putting a unique key and small value to the server as fast as possible and measuring the duration.
func (*BlastBenchmark) Throughput ¶
func (b *BlastBenchmark) Throughput() float64
Throughput computes the number of requests (excluding failures) by the total duration of the experiment, e.g. the operations per second.
type Client ¶
Client maintains network information embedded in the configuration to connect to a Raft consensus quorum and make commit requests.
type ComplexValidator ¶
type ComplexValidator struct {
TagName string
}
ComplexValidator validates complex types that multiconfig doesn't understand
func (*ComplexValidator) Validate ¶
func (v *ComplexValidator) Validate(s interface{}) error
Validate implements the multiconfig.Validator interface.
type Config ¶
type Config struct { Name string `required:"false" json:"name,omitempty"` // unique name of the local replica, hostname by default Seed int64 `required:"false" json:"seed,omitempty"` // random seed to initialize random generator Tick string `default:"1s" validate:"duration" json:"tick"` // clock tick rate for timing (parseable duration) Timeout string `default:"500ms" validate:"duration" json:"timeout"` // timeout to wait for responses (parseable duration) Aggregate bool `default:"true" json:"aggregate"` // aggregate append entries from multiple concurrent clients LogLevel string `default:"info" validate:"zerolog" json:"log_level"` // verbosity of logging, sets the zerolog log level ConsoleLog bool `default:"false" json:"console_log"` // output human readable logs instead of JSON logs Leader string `required:"false" json:"leader,omitempty"` // designated initial leader, if any Peers []peers.Peer `json:"peers"` // definition of all hosts on the network // Experimental configuration // TODO: remove after benchmarks Uptime string `required:"false" validate:"duration" json:"uptime"` // run for a time limit and then shutdown Metrics string `requred:"false" json:"metrics"` // location to write benchmarks to disk }
Config uses the multiconfig loader and validators to store configuration values required to run Raft. Configuration can be stored as a JSON, TOML, or YAML file in the current working directory as raft.json, in the user's home directory as .raft.json or in /etc/raft.json (with the extension of the file format of choice). Configuration can also be added from the environment using environment variables prefixed with $RAFT_ and the all caps version of the configuration name.
func (*Config) GetLogLevel ¶
GetLogLevel returns the zerolog level
func (*Config) GetName ¶
GetName returns the name of the local host defined by the configuration or using the hostname by default.
func (*Config) GetPath ¶
GetPath searches possible configuration paths returning the first path it finds; this path is used when loading the configuration from disk. An error is returned if no configuration file exists.
func (*Config) GetPeer ¶
GetPeer returns the local peer configuration or an error if no peer is found in the configuration. If the name is not set on the configuration, the hostname is used.
func (*Config) GetRemotes ¶
GetRemotes returns all peer configurations for remote hosts on the network, e.g. by excluding the local peer configuration.
func (*Config) GetTimeout ¶
GetTimeout parses the timeout duration and returns it.
func (*Config) Load ¶
Load the configuration from default values, then from a configuration file, and finally from the environment. Validate the configuration when loaded.
type Election ¶
type Election struct {
// contains filtered or unexported fields
}
Election objects keep track of the outcome of a single leader election by mapping remote peers to the votes they've provided. Uses simple majority to determine if an election has passed or failed.
func NewElection ¶
NewElection creates an election for the specified peers, defaulting the votes to false until otherwise updated.
type Event ¶
type Event interface { Type() EventType Source() interface{} Value() interface{} }
Event represents actions that occur during consensus. Listeners can register callbacks with event handlers for specific event types.
type EventType ¶
type EventType uint16
EventType is an enumeration of the kind of events that can occur.
type FixedInterval ¶
type FixedInterval struct {
// contains filtered or unexported fields
}
FixedInterval dispatches it's internal event type on a routine period. It does that by wrapping a time.Timer object, adding the additional Interval functionality as well as the event dispatcher functionality.
func NewFixedInterval ¶
func NewFixedInterval(actor Actor, delay time.Duration, etype EventType) *FixedInterval
NewFixedInterval creates and initializes a new fixed interval.
func (*FixedInterval) GetDelay ¶
func (t *FixedInterval) GetDelay() time.Duration
GetDelay returns the fixed interval duration.
func (*FixedInterval) Interrupt ¶
func (t *FixedInterval) Interrupt() bool
Interrupt the current interval, stopping and starting it again. Returns true if the interval was running and is successfully reset, false if the ticker was stopped or uninitialized.
func (*FixedInterval) Running ¶
func (t *FixedInterval) Running() bool
Running returns true if the timer exists and false otherwise.
func (*FixedInterval) Start ¶
func (t *FixedInterval) Start() bool
Start the interval to periodically issue events. Returns true if the ticker gets started, false if it's already started or uninitialized.
func (*FixedInterval) Stop ¶
func (t *FixedInterval) Stop() bool
Stop the interval so that no more events are dispatched. Returns true if the call stops the interval, false if already expired or never started.
type Interval ¶
type Interval interface { Start() bool // start the interval to periodically call its function Stop() bool // stop the interval, the function will not be called Interrupt() bool // interrupt the interval, setting it to the next period Running() bool // whether or not the interval is running GetDelay() time.Duration // the duration of the current interval period }
Interval is an interface that specifies the behavior of time based event dispatchers. A single interval object dispatches a single event type, to which callbacks from any go routine can be registered. The event is dispatched on schedule - the interval can either be fixed or stochastic. Fixed intervals resechedule themselves for a fixed delay after all callbacks are called. Stochastic intervals select a random delay in a configured range to schedule the next event after all callbacks.
Interval objects can be started and stopped. On start, the interval schedules the next event after the delay returned by GetDelay(). On stop no events will be dispatched by the handler. Intervals can be interrupted which resets the timer to a new delay. Timer state (running or not running) can be determined by the Running() method.
type Log ¶
type Log struct {
// contains filtered or unexported fields
}
Log implements the sequence of commands applied to the Raft state machine. This implementation uses an entirely in-memory log that snapshots to disk occassionally for durability. The log ensures that the sequence of commands is consistent, e.g. that entries are appendended in monotonically increasing time order as defined by the Raft leader's term.
Logs generate two types of events: entry committed and entry dropped. Commit events are dispatched in the order of the log, so the they are seen sequentially in order to apply them to the state machine. Dropped events occur when a log's uncommitted entries are truncated in response to leadership changes, these events also occur in order, though they have no impact on the state machine itself.
Note that the log is not thread-safe, and is not intended to be accessed from multiple go routines. Instead the log should be maintained by a single state machine that updates it sequentially when events occur.
func NewLog ¶
func NewLog(sm StateMachine) *Log
NewLog creates and initializes a new log whose first entry is the NullEntry.
func (*Log) Append ¶
Append one ore more entries and perform log invariant checks. If appending an entry creates a log inconsistency (out of order term or index), then an error is returned. A couple of important notes:
- Append does not undo any successful appends even on error
- Append will not compare entries that specify the same index
These notes mean that all entries being appended to this log should be consistent with each other as well as the end of the log, and that the log needs to be truncated in order to "update" or splice two logs together.
func (*Log) AsUpToDate ¶
AsUpToDate returns true if the remote log specified by the last index and last term are at least as up to date (or farther ahead) than the local log.
func (*Log) CommitIndex ¶
CommitIndex returns the index of the last committed log entry.
func (*Log) CommitTerm ¶
CommitTerm is a helper function to get the term of the entry at the commit index.
func (*Log) Create ¶
Create an entry in the log and append it. This is essentially a helper method for quickly adding a command to the state machine consistent with the local log.
func (*Log) Get ¶
Get the entry at the specified index (whether or not it is committed). Returns an error if no entry exists at the index.
func (*Log) LastApplied ¶
LastApplied returns the index of the last applied log entry.
func (*Log) LastCommit ¶
LastCommit returns the log entry at the commit index.
func (*Log) LastTerm ¶
LastTerm is a helper function to get the term of the entry at the last applied index.
func (*Log) Prev ¶
Prev returns the entry before the specified index (whether or not it is committed). Returns an error if no entry exists before.
func (*Log) Truncate ¶
Truncate the log to the given position, conditioned by term. This method freturns an error if the log has been committed after the specified index, there is an epoch mismatch, or there is some other log operation error.
This method truncates everything after the given index, but keeps the entry at the specified index; e.g. truncate after.
type Metrics ¶
Metrics tracks the measurable statistics of the system over time from the perspective of the local replica -- e.g. how many accesses over a specific time period.
func (*Metrics) Aggregation ¶
Aggregation is called when an aggregation occurs. No need for synchronization here since the stats object is synchronized.
type RandomInterval ¶
type RandomInterval struct { FixedInterval // contains filtered or unexported fields }
RandomInterval dispatches its internal interval on a random period between the minimum and maximum delay values. Every event has a different delay.
func NewRandomInterval ¶
func NewRandomInterval(actor Actor, minDelay, maxDelay time.Duration, etype EventType) *RandomInterval
NewRandomInterval creates and initializes a new random interval.
func (*RandomInterval) GetDelay ¶
func (t *RandomInterval) GetDelay() time.Duration
GetDelay returns a random integer in the range (minDelay, maxDelay) on every request for the delay, causing jitter so that no timeout occurs at the same time.
func (*RandomInterval) Interrupt ¶
func (t *RandomInterval) Interrupt() bool
Interrupt the current interval, stopping and starting it again. Returns true if the interval was running and is successfully reset, false if the ticker was stopped or uninitialized.
func (*RandomInterval) Start ¶
func (t *RandomInterval) Start() bool
Start the interval to periodically issue events. Returns true if the ticker gets started, false if it's already started or uninitialized.
type Remote ¶
Remote maintains a connection to a peer on the network.
func (*Remote) AppendEntries ¶
AppendEntries from leader to followers in quorum; this acts as a heartbeat as well as the primary consensus mechanism. The method requires access to the log, since the remote stores the state of the remote log. In order to ensure consistency, log accesses happen synchronously, then the method initiates a go routine to send the RPC asynchronously and dispatches an event on reply. Send errors are ignored as the connection will simply be put into offline mode, and retries can be made after the next heartbeat.
Dispatches AppendReplyEvents.
func (*Remote) Connect ¶
Connect to the remote using the specified timeout. Connect is usually not explicitly called, but is instead connected when a message is sent.
func (*Remote) RequestVote ¶
RequestVote from other members of the quorum. This method initiates a go routine to send the vote and will put any response onto the event queue. Send errors are ignored as the connection will simply be put into offline mode, and retries can be made in the next election.
Dispatches VoteReplyEvents.
type Replica ¶
type Replica struct { pb.UnimplementedRaftServer peers.Peer // TODO: remove when stable Metrics *Metrics // keep track of access statistics // contains filtered or unexported fields }
Replica represents the local consensus replica and is the primary object in the system. There should only be one replica per process (and many peers). TODO: document more.
func (*Replica) AppendEntries ¶
func (r *Replica) AppendEntries(stream pb.Raft_AppendEntriesServer) (err error)
AppendEntries from leader for either heartbeat or consensus.
func (*Replica) CheckCommits ¶
CheckCommits works backward from the last applied index to the commit index checking to see if a majority of peers matches that index, and if so, committing all entries prior to the match index.
func (*Replica) CheckRPCTerm ¶
CheckRPCTerm ensures that the replica is in the correct state relative to the term of a remote replica. If the term from the remote is larger than local term, we update our term and set our state to follower.
func (*Replica) Commit ¶
func (r *Replica) Commit(ctx context.Context, in *pb.CommitRequest) (*pb.CommitReply, error)
Commit a client request to append some entry to the log.
func (*Replica) CommitEntry ¶
CommitEntry responds to the client with a successful entry commit.
func (*Replica) RequestVote ¶
RequestVote from peers whose election timeout has elapsed.
type SimpleBenchmark ¶
type SimpleBenchmark struct {
// contains filtered or unexported fields
}
SimpleBenchmark implements benchmark by having concurrent workers continuously sending requests at the server for a fixed number of requests.
func (*SimpleBenchmark) CSV ¶
func (b *SimpleBenchmark) CSV(header bool) (csv string, err error)
CSV returns a results row delimited by commas as:
concurrency,requests,failures,duration,throughput,version,benchmark
func (*SimpleBenchmark) Complete ¶
func (b *SimpleBenchmark) Complete() bool
Complete returns true if requests and duration is greater than 0.
func (*SimpleBenchmark) JSON ¶
func (b *SimpleBenchmark) JSON(indent int) ([]byte, error)
JSON returns a results row as a json object, formatted with or without the number of spaces specified by indent. Use no indent for JSON lines format.
func (*SimpleBenchmark) Run ¶
func (b *SimpleBenchmark) Run(addr string) (err error)
Run the simple benchmark against the system such that each client puts a unique key and small value to the server as quickly as possible.
func (*SimpleBenchmark) Throughput ¶
func (b *SimpleBenchmark) Throughput() float64
Throughput computes the number of requests (excluding failures) by the total duration of the experiment, e.g. the operations per second.
type State ¶
type State uint8
State is an enumeration of the possible status of a replica.
type StateMachine ¶
type StateMachine interface { CommitEntry(entry *pb.LogEntry) error DropEntry(entry *pb.LogEntry) error }
StateMachine implements a handler for applying commands when they are committed or for dropping commands if they are truncated from the log.
type Ticker ¶
type Ticker struct {
// contains filtered or unexported fields
}
Ticker implements intervals for all timing events based on the tick parameter set by the user. External objects can register for timing events in order to handle ticks, as well as to manage individual tickers.
func NewTicker ¶
NewTicker creates a ticker with intervals for each of the timing events in the system, computed from a base "tick" rate, defined as the delay between heartbeats.