Documentation
¶
Overview ¶
Package cluster implements the cluster service for Flow, where multiple instances of Flow connect to each other for work distribution.
Index ¶
- Constants
- type Cluster
- type Component
- type Options
- type Service
- func (s *Service) ChangeState(ctx context.Context, targetState peer.State) error
- func (s *Service) Data() any
- func (s *Service) Definition() service.Definition
- func (s *Service) Run(ctx context.Context, host service.Host) error
- func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler)
- func (s *Service) Update(newConfig any) error
Constants ¶
const ServiceName = "cluster"
ServiceName defines the name used for the cluster service.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cluster ¶
type Cluster interface {
// Lookup determines the set of replicationFactor owners for a given key.
// peer.Peer.Self can be used to determine if the local node is the owner,
// allowing for short-circuiting logic to connect directly to the local node
// instead of using the network.
//
// Callers can use github.com/grafana/ckit/shard.StringKey or
// shard.NewKeyBuilder to create a key.
Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error)
// Peers returns the current set of peers for a Node.
Peers() []peer.Peer
}
Cluster is a read-only view of a cluster.
type Component ¶
type Component interface {
component.Component
// NotifyClusterChange notifies the component that the state of the cluster
// has changed.
//
// Implementations should ignore calls to this method if they are configured
// to not utilize clustering.
NotifyClusterChange()
}
Component is a Flow component which subscribes to clustering updates.
type Options ¶
type Options struct {
Log log.Logger // Where to send logs to.
Metrics prometheus.Registerer // Where to send metrics to.
Tracer trace.TracerProvider // Where to send traces.
// EnableClustering toggles clustering as a whole. When EnableClustering is
// false, the instance of Flow acts as a single-node cluster and it is not
// possible for other nodes to join the cluster.
EnableClustering bool
NodeName string // Name to use for this node in the cluster.
AdvertiseAddress string // Address to advertise to other nodes in the cluster.
RejoinInterval time.Duration // How frequently to rejoin the cluster to address split brain issues.
ClusterMaxJoinPeers int // Number of initial peers to join from the discovered set.
ClusterName string // Name to prevent nodes without this identifier from joining the cluster.
// Function to discover peers to join. If this function is nil or returns an
// empty slice, no peers will be joined.
DiscoverPeers func() ([]string, error)
}
Options are used to configure the cluster service. Options are constant for the lifetime of the cluster service.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the cluster service.
func New ¶
func New(opts Options) (*Service, error)
New returns a new, unstarted instance of the cluster service.
func (*Service) ChangeState ¶
func (s *Service) ChangeState(ctx context.Context, targetState peer.State) error
ChangeState changes the state of the service. If clustering is enabled, ChangeState will block until the state change has been propagated to another node; cancel the current context to stop waiting. ChangeState fails if the current state cannot move to the provided targetState.
Note that the state must be StateParticipant to receive writes.
func (*Service) Definition ¶
func (s *Service) Definition() service.Definition
Definition returns the definition of the cluster service.
func (*Service) Run ¶
func (s *Service) Run(ctx context.Context, host service.Host) error
Run starts the cluster service. It will run until the provided context is canceled or there is a fatal error.
func (*Service) ServiceHandler ¶
func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler)
ServiceHandler returns the service handler for the clustering service. The resulting handler always returns 404 when clustering is disabled.
func (*Service) Update ¶
func (s *Service) Update(newConfig any) error
Update implements service.Service. It returns an error since the cluster service does not support runtime configuration.