Documentation
¶
Index ¶
- Constants
- Variables
- func AddEventView[V encoding.BinaryMarshaler, VPtr sm.Unmarshallable[V], ...](ctx context.Context, builder *Builder, shardID uint64) eventstream.EventView[V, E]
- func AddShard[Q any, R any, E sm.Marshallable, EPtr sm.Unmarshallable[E]](ctx context.Context, to *Builder, shardID uint64, ...) sm.Handle[Q, R, E]
- func RPCOption(cluster *Cluster) rpc.Option
- type Builder
- type Cluster
- func (c *Cluster) AddMember(ctx context.Context, req *connect.Request[raftpb.AddMemberRequest]) (*connect.Response[raftpb.AddMemberResponse], error)
- func (c *Cluster) Join(ctx context.Context, controlAddress string) error
- func (c *Cluster) Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error)
- func (c *Cluster) RemoveMember(ctx context.Context, req *connect.Request[raftpb.RemoveMemberRequest]) (*connect.Response[raftpb.RemoveMemberResponse], error)
- func (c *Cluster) RuntimeReplicaID() uint64
- func (c *Cluster) Start(ctx context.Context) error
- func (c *Cluster) Stop(ctx context.Context)
- type DragonBoatLoggingAdaptor
- func (d DragonBoatLoggingAdaptor) Debugf(format string, args ...interface{})
- func (d DragonBoatLoggingAdaptor) Errorf(format string, args ...interface{})
- func (d DragonBoatLoggingAdaptor) Infof(format string, args ...interface{})
- func (d DragonBoatLoggingAdaptor) Panicf(format string, args ...interface{})
- func (d DragonBoatLoggingAdaptor) SetLevel(level logger2.LogLevel)
- func (d DragonBoatLoggingAdaptor) Warningf(format string, args ...interface{})
- type RaftConfig
- type RaftEventView
- type RaftStreamEvent
- type ShardHandle
- type UnitQuery
Constants ¶
const InvalidEventValue = 0x1001
Variables ¶
var ErrInvalidEvent = errors.New("invalid event")
ErrInvalidEvent is returned if we are attempting to publish an invalid event.
Functions ¶
func AddEventView ¶ added in v0.418.0
func AddEventView[
V encoding.BinaryMarshaler,
VPtr sm.Unmarshallable[V],
E RaftStreamEvent[V, VPtr],
EPtr sm.Unmarshallable[E],
](ctx context.Context, builder *Builder, shardID uint64) eventstream.EventView[V, E]
AddEventView to the Builder
Types ¶
type Builder ¶ added in v0.418.0
type Builder struct {
// contains filtered or unexported fields
}
Builder for a Raft Cluster.
func NewBuilder ¶ added in v0.418.0
func NewBuilder(cfg *RaftConfig) *Builder
func (*Builder) Ephemeral ¶ added in v0.471.0
func (b *Builder) Ephemeral() *Builder
Ephemeral sets the cluster to run on ephemeral storage that gets deleted on shutdown.
func (*Builder) WithControlClient ¶ added in v0.421.0
func (b *Builder) WithControlClient(client *http.Client) *Builder
WithControlClient sets the http client used to communicate with the control plane.
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster of dragonboat nodes.
func (*Cluster) AddMember ¶
func (c *Cluster) AddMember(ctx context.Context, req *connect.Request[raftpb.AddMemberRequest]) (*connect.Response[raftpb.AddMemberResponse], error)
AddMember to the cluster. This needs to be called on an existing running cluster member, before the new member is started.
func (*Cluster) Join ¶
func (c *Cluster) Join(ctx context.Context, controlAddress string) error
Join the cluster as a new member. Blocks until the cluster instance is ready.
func (*Cluster) Ping ¶ added in v0.420.0
func (c *Cluster) Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error)
Ping the cluster.
func (*Cluster) RemoveMember ¶ added in v0.439.0
func (c *Cluster) RemoveMember(ctx context.Context, req *connect.Request[raftpb.RemoveMemberRequest]) (*connect.Response[raftpb.RemoveMemberResponse], error)
func (*Cluster) RuntimeReplicaID ¶ added in v0.447.0
func (c *Cluster) RuntimeReplicaID() uint64
type DragonBoatLoggingAdaptor ¶ added in v0.470.0
type DragonBoatLoggingAdaptor struct {
// contains filtered or unexported fields
}
func (DragonBoatLoggingAdaptor) Debugf ¶ added in v0.470.0
func (d DragonBoatLoggingAdaptor) Debugf(format string, args ...interface{})
func (DragonBoatLoggingAdaptor) Errorf ¶ added in v0.470.0
func (d DragonBoatLoggingAdaptor) Errorf(format string, args ...interface{})
func (DragonBoatLoggingAdaptor) Infof ¶ added in v0.470.0
func (d DragonBoatLoggingAdaptor) Infof(format string, args ...interface{})
func (DragonBoatLoggingAdaptor) Panicf ¶ added in v0.470.0
func (d DragonBoatLoggingAdaptor) Panicf(format string, args ...interface{})
type RaftConfig ¶
type RaftConfig struct {
InitialMembers []string `help:"Initial members" env:"RAFT_INITIAL_MEMBERS" and:"raft"`
InitialReplicaIDs []uint64 `name:"initial-replica-ids" help:"Initial replica IDs" env:"RAFT_INITIAL_REPLICA_IDS" and:"raft"`
DataDir string `help:"Data directory" env:"RAFT_DATA_DIR" and:"raft"`
Address string `help:"Address to advertise to other nodes" env:"RAFT_ADDRESS" and:"raft"`
ListenAddress string `help:"Address to listen for incoming traffic. If empty, Address will be used." env:"RAFT_LISTEN_ADDRESS"`
ControlAddress *url.URL `help:"Address to connect to the control server" env:"RAFT_CONTROL_ADDRESS"`
ShardReadyTimeout time.Duration `help:"Timeout for shard to be ready" default:"5s"`
Retry retry.RetryConfig `help:"Connection retry configuration" prefix:"retry-" embed:""`
ChangesInterval time.Duration `help:"Interval for changes to be checked" default:"10ms"`
ChangesTimeout time.Duration `help:"Timeout for changes to be checked" default:"1s"`
QueryTimeout time.Duration `help:"Timeout for queries" default:"5s"`
Ephemeral bool `help:"The cluster runs on ephemeral storage that gets deleted on shutdown" env:"RAFT_EPHEMERAL"`
// Raft configuration
RTT time.Duration `help:"Estimated average round trip time between nodes" default:"200ms"`
ElectionRTT uint64 `help:"Election RTT as a multiple of RTT" default:"10"`
HeartbeatRTT uint64 `help:"Heartbeat RTT as a multiple of RTT" default:"1"`
SnapshotEntries uint64 `help:"Snapshot entries" default:"100"`
CompactionOverhead uint64 `help:"Compaction overhead" default:"100"`
}
type RaftEventView ¶ added in v0.414.0
type RaftEventView[V encoding.BinaryMarshaler, VPrt sm.Unmarshallable[V], E RaftStreamEvent[V, VPrt]] struct {
// contains filtered or unexported fields
}
func (*RaftEventView[V, VPrt, E]) Changes ¶ added in v0.421.0
func (s *RaftEventView[V, VPrt, E]) Changes(ctx context.Context) (iter.Seq[V], error)
type RaftStreamEvent ¶ added in v0.414.0
type RaftStreamEvent[View encoding.BinaryMarshaler, VPtr sm.Unmarshallable[View]] interface {
encoding.BinaryMarshaler
eventstream.Event[View]
}
type ShardHandle ¶
type ShardHandle[Q any, R any, E sm.Marshallable] struct {
// contains filtered or unexported fields
}
ShardHandle is a handle to a shard in the cluster. It is the interface to update and query the state of a shard.
E is the event type. Q is the query type. R is the query response type.
func (*ShardHandle[Q, R, E]) Publish ¶ added in v0.421.0
func (s *ShardHandle[Q, R, E]) Publish(ctx context.Context, msg E) error
Publish an event to the shard.
func (*ShardHandle[Q, R, E]) Query ¶
func (s *ShardHandle[Q, R, E]) Query(ctx context.Context, query Q) (R, error)
Query the state of the shard.
func (*ShardHandle[Q, R, E]) StateIter ¶ added in v0.421.0
func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq[R], error)
StateIter returns an iterator that will return the result of the query when the shard state changes.
This can only be called when the cluster is running.
Note, that this is not guaranteed to receive an event for every change, but will always receive the latest state of the shard.