Documentation
¶
Index ¶
- Variables
- func NewZKEventCancelContext(ctx context.Context, notify <-chan zk.Event) context.Context
- type Checkpoint
- func (*Checkpoint) Descriptor() ([]byte, []int)deprecated
- func (x *Checkpoint) GetLastEntryOffset() int64
- func (x *Checkpoint) GetNextEntryOffset() int64
- func (*Checkpoint) ProtoMessage()
- func (x *Checkpoint) ProtoReflect() protoreflect.Message
- func (x *Checkpoint) Reset()
- func (x *Checkpoint) String() string
- type InvalidVersionError
- type NoMoreMessageError
- type Receiver
- type Writer
Constants ¶
This section is empty.
Variables ¶
var File_journal_proto protoreflect.FileDescriptor
Functions ¶
func NewZKEventCancelContext ¶
Returns a Context which will be asynchronously cancelled by a channel emitting zk.Event. Such a context can be used to cancel blocking FetchEntry operations when a secondary node becomes a primary one.
Types ¶
type Checkpoint ¶
type Checkpoint struct { LastEntryOffset int64 `protobuf:"varint,1,opt,name=lastEntryOffset,proto3" json:"lastEntryOffset,omitempty"` NextEntryOffset int64 `protobuf:"varint,2,opt,name=nextEntryOffset,proto3" json:"nextEntryOffset,omitempty"` // contains filtered or unexported fields }
func (*Checkpoint) Descriptor
deprecated
func (*Checkpoint) Descriptor() ([]byte, []int)
Deprecated: Use Checkpoint.ProtoReflect.Descriptor instead.
func (*Checkpoint) GetLastEntryOffset ¶
func (x *Checkpoint) GetLastEntryOffset() int64
func (*Checkpoint) GetNextEntryOffset ¶
func (x *Checkpoint) GetNextEntryOffset() int64
func (*Checkpoint) ProtoMessage ¶
func (*Checkpoint) ProtoMessage()
func (*Checkpoint) ProtoReflect ¶
func (x *Checkpoint) ProtoReflect() protoreflect.Message
func (*Checkpoint) Reset ¶
func (x *Checkpoint) Reset()
func (*Checkpoint) String ¶
func (x *Checkpoint) String() string
type InvalidVersionError ¶
type InvalidVersionError struct {
// contains filtered or unexported fields
}
func NewInvalidVersionError ¶
func NewInvalidVersionError(version int64) *InvalidVersionError
func (*InvalidVersionError) Error ¶
func (i *InvalidVersionError) Error() string
type NoMoreMessageError ¶
type NoMoreMessageError struct { }
func (*NoMoreMessageError) Error ¶
func (n *NoMoreMessageError) Error() string
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver wraps around a kafka.Reader, offering methods for a secondary node to receive and replicate journal entries.
func NewReceiver ¶
Initialize a Receiver.
This function tries to create a single partition topic with given name, if it exists, do nothing, and if other errors happened during creation, return it.
@param
server: address to a Kafka server. topic: Kafka topic name used to store messages.
@return
error: not nil if failed to ensure the topic is created.
func (*Receiver) FetchEntry ¶
Blocking until fetch an entry from Kafka successfully or an error raised. This method should be used when a secondary node pulls entries and applies them continuously as a secondary one.
When a message got fetched, this method checks whether the message is a Checkpoint or not. If so, it will Unmarshall this message and return an unmarshalled *Checkpoint. If not, it just returns raw byte content of the message. Applications should parse those bytes by themselves.
@param
ctx: Context used to cancel operation asynchronously.
@return
[]byte: raw bytes content of journal entry if success *Checkpoint: points to an unmarshalled Checkpoint object if success and the entry fetched is a Checkpoint entry. error: not nil if failed to fetch message or unmarshall Checkpoint,
func (*Receiver) SetOffset ¶
Set offset of wrapped kafka.Reader. A secondary node can call this method to set current offset to a checkpoint, skipping replicated and persisted jouranl entries.
func (*Receiver) TryFetchEntry ¶
Fetch a journal entry from kafka. If there are more messages to be consumed in kafka, it will block until fetch a message successfully or fail to do that. And it will return immediately with a *NoMoreMessageError if there is no more message in kafka currently by computing lag of internal kafka.Reader. This error can be regarded as an indication that a secondary node has performed all necessary preparations to turn into a primary node.
@param
ctx: Context used to cancel operation asynchronously.
@return
[]byte: raw bytes content of journal entry if success *Checkpoint: points to an unmarshalled Checkpoint object if success and the entry fetched is a Checkpoint entry. error: not nil if failed to fetch message or unmarshall Checkpoint,
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer abstracts Kafka operations. The primary node can utilize this type to send journal entries it produced to Kafka, making corresponding secondary nodes to be able to replicate them and achieves master-backup fault tolerance.
Writer also provides a version-based Writer.Checkpoint API. It will send a special checkpoint journal entry to Kafka, which can be used to represent a complete checkpoint operation conducted by the primary node. After receiving such an entry, a secondary node should also perform checkpointing too.
Internally, Writer uses two different event keys to represent general journal entries which is opaque to Writer, and those special Checkpoint entries. Because Kafka only guarantees that write operations to the same partition of the same topic are ordered, Writer applies a FixedBalancer to its internal kafka.Writer, which makes the two keys are always routed to the same partition.
func NewWriter ¶
Initialize a Writer.
This function tries to create a single partition topic with given name, if it exists, do nothing, and if other errors happened during creation, return it.
@param
server: address to a Kafka server. topic: Kafka topic name used to store messages.
@return
error: not nil if failed to ensure the topic is created.
func (*Writer) Checkpoint ¶
Commit a checkpoint entry to Kafka. This method should be invoked by a primary node who has already performs a checkpoint operation. And an offset is returned by this method which indicates the offset of the first entry after the new checkpoint entry, this offset can be used by the primary node to recover from journals after crash.
func (*Writer) CommitEntry ¶
Commit a general journal entry to Kafka. Writer employs synchronous mode to write messages, so this method will block until the new journal entry is written successfully or exceeds max attempt times.
@param
ctx: Context used to cancel operation asynchronously. entry: data of journal entry.
@return
error: not nil if failed to write message.
func (*Writer) ExitCheckpoint ¶
func (w *Writer) ExitCheckpoint()
Unlock the RWMutex, to allow further CommitEntry to be executed.
func (*Writer) PrepareCheckpoint ¶
There may be many goroutines are using a Writer in a primary node, but when the latter wants to perform Checkpoint operation, it needs to ensure all pending CommitEntry have finished and there is no more incoming CommitEntry. This can be achieved by calling PrepareCheckpoint, this method guarantees that when it returns, all pending CommitEntry are done and no more incoming ones.
Internally, we use a writer-preferred RWMutex here, to make Checkpoint operation blocks all incoming CommitEntry, and all CommitEntry don't interfere with each other. PrepareCheckpoint will Lock this RWMutex, and CommitEntry will RLock it.