Documentation
¶
Index ¶
- type RWMode
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) Lag(i int) int64
- func (r *Reader) Offset(i int) int64
- func (r *Reader) ReadLag(ctx context.Context, i int) (lag int64, err error)
- func (r *Reader) ReadMessage(ctx context.Context) ([]kafka.Message, error)
- func (r *Reader) SetOffset(i int, offset int64) error
- func (r *Reader) SetOffsetAt(ctx context.Context, i int, t time.Time) error
- func (r *Reader) Stats(i int) kafka.ReaderStats
- type WriteErrors
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader 代表一个支持多Kafka集群的reader. 它会从多个kafka集群同时读取消息.
func NewReader ¶
func NewReader(configs []kafka.ReaderConfig) *Reader
NewReader 返回一个支持多Kafka集群的reader.
func (*Reader) Lag ¶
Lag returns the lag of the last message returned by ReadMessage, or -1 if r is backed by a consumer group.
func (*Reader) Offset ¶
Offset returns the current absolute offset of the reader, or -1 if r is backed by a consumer group.
func (*Reader) ReadLag ¶
ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage.
This method is intended to be used in cases where a program may be unable to call ReadMessage to update the value returned by Lag, but still needs to get an up to date estimation of how far behind the reader is. For example when the consumer is not ready to process the next message.
The function returns a lag of zero when the reader's current offset is negative.
func (*Reader) ReadMessage ¶
ReadMessage reads from all kafka clusters and return the next messages from the r. The method call blocks until at least a message becomes available, or all readers return errors. The program may also specify a context to asynchronously cancel the blocking operation.
The method returns io.EOF to indicate that the reader has been closed.
If consumer groups are used, ReadMessage will automatically commit the offset when called. Note that this could result in an offset being committed before the message is fully processed.
If more fine grained control of when offsets are committed is required, it is recommended to use FetchMessage with CommitMessages instead.
func (*Reader) SetOffset ¶
SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed.
From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note while -1 and -2 were accepted to indicate the first or last offset in previous versions, the meanings of the numbers were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol specification.
func (*Reader) SetOffsetAt ¶
SetOffsetAt changes the offset from which the next batch of messages will be read given the timestamp t.
The method fails if the unable to connect partition leader, or unable to read the offset given the ts, or if the reader has been closed.
func (*Reader) Stats ¶
Stats returns a snapshot of the reader stats since the last time the method was called, or since the reader was created if it is called for the first time.
A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka reader and report the metrics to a stats collection system.
type WriteErrors ¶
type WriteErrors []error
func (WriteErrors) Count ¶
func (err WriteErrors) Count() int
Count counts the number of non-nil errors in err.
func (WriteErrors) Error ¶
func (err WriteErrors) Error() string
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer 支持多写Kafka集群. 可以选择多写模式还是主备模式. - 多写模式下: 轮询选择一个kafka集群进行写入 - 主备模式: 优先写入主, 主失败的情况下写入从
func (*Writer) Close ¶
Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.
func (*Writer) Stats ¶
Stats returns a snapshot of the selected writer stats since the last time the method was called, or since the writer was created if it is called for the first time.
A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka writer and report the metrics to a stats collection system.
func (*Writer) WriteMessages ¶
/ WriteMessages writes a batch of messages to the kafka topic configured on this writers. If write fails, it will try write another kafka cluster again.
Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached.
When sending synchronously and the writer's batch size is configured to be greater than 1, this method blocks until either a full batch can be assembled or the batch timeout is reached. The batch size and timeouts are evaluated per partition, so the choice of Balancer can also influence the flushing behavior. For example, the Hash balancer will require on average N * batch size messages to trigger a flush where N is the number of partitions. The best way to achieve good batching behavior is to share one Writer amongst multiple go routines.
When the method returns an error, it may be of type kafka.WriteError to allow the caller to determine the status of each message.
The context passed as first argument may also be used to asynchronously cancel the operation. Note that in this case there are no guarantees made on whether messages were written to kafka. The program should assume that the whole batch failed and re-write the messages later (which could then cause duplicates).