Documentation
¶
Overview ¶
Package sarama provides client libraries for the Kafka 0.8 protocol. The Client, Producer and Consumer objects are the core of the high-level API. The Broker and Request/Response objects permit more precise control.
The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Addr() string
- func (b *Broker) Close() (err error)
- func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)
- func (b *Broker) Connected() (bool, error)
- func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)
- func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)
- func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)
- func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
- func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)
- func (b *Broker) ID() int32
- func (b *Broker) Open(conf *BrokerConfig) error
- func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)
- type BrokerConfig
- type ByteEncoder
- type Client
- func (client *Client) Close() error
- func (client *Client) Closed() bool
- func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)
- func (client *Client) Leader(topic string, partitionID int32) (*Broker, error)
- func (client *Client) Partitions(topic string) ([]int32, error)
- func (client *Client) RefreshAllMetadata() error
- func (client *Client) RefreshTopicMetadata(topics ...string) error
- func (client *Client) Topics() ([]string, error)
- type ClientConfig
- type CompressionCodec
- type ConfigurationError
- type ConstantPartitioner
- type Consumer
- type ConsumerConfig
- type ConsumerEvent
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type DecodingError
- type DroppedMessagesError
- type Encoder
- type FetchRequest
- type FetchResponse
- type FetchResponseBlock
- type HashPartitioner
- type KError
- type Message
- type MessageBlock
- type MessageSet
- type MessageToSend
- type MetadataRequest
- type MetadataResponse
- type MockBroker
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type OffsetMethod
- type OffsetRequest
- type OffsetResponse
- type OffsetResponseBlock
- type OffsetTime
- type PartitionMetadata
- type Partitioner
- type PartitionerConstructor
- type ProduceError
- type ProduceErrors
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseBlock
- type Producer
- type ProducerConfig
- type RandomPartitioner
- type RequiredAcks
- type RoundRobinPartitioner
- type SimpleProducer
- type StdLogger
- type StringEncoder
- type TestState
- type TopicMetadata
Examples ¶
Constants ¶
const ReceiveTime int64 = -1
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received.
Variables ¶
var AlreadyConnected = errors.New("kafka: broker: already connected")
AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")
ClosedClient is the error returned when a method is called on a client that has been closed.
var EncodingError = errors.New("kafka: Error while encoding packet.")
EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")
IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected.")
InsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")
InvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var MaxRequestSize uint32 = 100 * 1024 * 1024
MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an EncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.
var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
var NotConnected = errors.New("kafka: broker: not connected")
NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
OutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var PanicHandler func(interface{})
PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
var ShuttingDown = errors.New("kafka: Message received by producer in process of shutting down.")
ShuttingDown is returned when a producer receives a message during shutdown.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Example ¶
broker := NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { return err } defer broker.Close() request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata("myClient", &request) if err != nil { return err } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") return nil
Output:
func NewBroker ¶
NewBroker creates and returns a Broker targetting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.
func (*Broker) Addr ¶
Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
func (*Broker) CommitOffset ¶
func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)
func (*Broker) Connected ¶
Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.
func (*Broker) Fetch ¶
func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)
func (*Broker) FetchOffset ¶
func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)
func (*Broker) GetAvailableOffsets ¶
func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)
func (*Broker) GetConsumerMetadata ¶
func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
func (*Broker) GetMetadata ¶
func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)
func (*Broker) ID ¶
ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
func (*Broker) Open ¶
func (b *Broker) Open(conf *BrokerConfig) error
Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which connects and releases the lock. This means any subsequent operations on the broker will block waiting for the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
func (*Broker) Produce ¶
func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)
type BrokerConfig ¶
type BrokerConfig struct { MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send. DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error. ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error. WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error. }
BrokerConfig is used to pass multiple configuration options to Broker.Open.
func NewBrokerConfig ¶
func NewBrokerConfig() *BrokerConfig
NewBrokerConfig returns a new broker configuration with sane defaults.
func (*BrokerConfig) Validate ¶
func (config *BrokerConfig) Validate() error
Validate checks a BrokerConfig instance. This will return a ConfigurationError if the specified values don't make sense.
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. A single client can be safely shared by multiple concurrent Producers and Consumers.
func NewClient ¶
func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.
func (*Client) Close ¶
Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.
func (*Client) GetOffset ¶
GetOffset queries the cluster to get the most recent available offset at the given time on the topic/partition combination.
func (*Client) Leader ¶
Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.
func (*Client) Partitions ¶
Partitions returns the sorted list of available partition IDs for the given topic.
func (*Client) RefreshAllMetadata ¶
RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (*Client) RefreshTopicMetadata ¶
RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics.
type ClientConfig ¶
type ClientConfig struct { MetadataRetries int // How many times to retry a metadata request when a partition is in the middle of leader election. WaitForElection time.Duration // How long to wait for leader election to finish between retries. DefaultBrokerConf *BrokerConfig // Default configuration for broker connections created by this client. BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable. }
ClientConfig is used to pass multiple configuration options to NewClient.
func NewClientConfig ¶
func NewClientConfig() *ClientConfig
NewClientConfig creates a new ClientConfig instance with sensible defaults
func (*ClientConfig) Validate ¶
func (config *ClientConfig) Validate() error
Validate checks a ClientConfig instance. This will return a ConfigurationError if the specified values don't make sense.
type CompressionCodec ¶
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 )
type ConfigurationError ¶
type ConfigurationError string
ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified configuration is invalid.
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type ConstantPartitioner ¶
type ConstantPartitioner struct {
Constant int32
}
ConstantPartitioner implements the Partitioner interface by just returning a constant value.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer processes Kafka messages from a given topic and partition. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
Example ¶
client, err := NewClient("my_client", []string{"localhost:9092"}, nil) if err != nil { panic(err) } else { fmt.Println("> connected") } defer client.Close() consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", NewConsumerConfig()) if err != nil { panic(err) } else { fmt.Println("> consumer ready") } defer consumer.Close() msgCount := 0 consumerLoop: for { select { case event := <-consumer.Events(): if event.Err != nil { panic(event.Err) } msgCount++ case <-time.After(5 * time.Second): fmt.Println("> timed out") break consumerLoop } } fmt.Println("Got", msgCount, "messages.")
Output:
func NewConsumer ¶
func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error)
NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as part of the named consumer group.
func (*Consumer) Close ¶
Close stops the consumer from fetching messages. It is required to call this function before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
func (*Consumer) Events ¶
func (c *Consumer) Events() <-chan *ConsumerEvent
Events returns the read channel for any events (messages or errors) that might be returned by the broker.
type ConsumerConfig ¶
type ConsumerConfig struct { // The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes. DefaultFetchSize int32 // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available. // The default is 1, as 0 causes the consumer to spin when no messages are available. MinFetchSize int32 // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is // treated as no limit. MaxMessageSize int32 // The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it // returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available. // 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated. MaxWaitTime time.Duration // The method used to determine at which offset to begin consuming messages. OffsetMethod OffsetMethod // Interpreted differently according to the value of OffsetMethod. OffsetValue int64 // The number of events to buffer in the Events channel. Having this non-zero permits the // consumer to continue fetching messages in the background while client code consumes events, // greatly improving throughput. The default is 16. EventBufferSize int }
ConsumerConfig is used to pass multiple configuration options to NewConsumer.
func NewConsumerConfig ¶
func NewConsumerConfig() *ConsumerConfig
NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
func (*ConsumerConfig) Validate ¶
func (config *ConsumerConfig) Validate() error
Validate checks a ConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.
type ConsumerEvent ¶
ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
type DecodingError ¶
type DecodingError struct {
Info string
}
DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (DecodingError) Error ¶
func (err DecodingError) Error() string
type DroppedMessagesError ¶
DroppedMessagesError is returned from a producer when messages weren't able to be successfully delivered to a broker.
func (DroppedMessagesError) Error ¶
func (err DroppedMessagesError) Error() string
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type FetchRequest ¶
type FetchResponse ¶
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
}
func (*FetchResponse) AddMessage ¶
func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
func (*FetchResponse) GetBlock ¶
func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
type FetchResponseBlock ¶
type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 MsgSet MessageSet }
type HashPartitioner ¶
type HashPartitioner struct {
// contains filtered or unexported fields
}
HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( NoError KError = 0 Unknown KError = -1 OffsetOutOfRange KError = 1 InvalidMessage KError = 2 UnknownTopicOrPartition KError = 3 InvalidMessageSize KError = 4 LeaderNotAvailable KError = 5 NotLeaderForPartition KError = 6 RequestTimedOut KError = 7 BrokerNotAvailable KError = 8 ReplicaNotAvailable KError = 9 MessageSizeTooLarge KError = 10 StaleControllerEpochCode KError = 11 OffsetMetadataTooLarge KError = 12 OffsetsLoadInProgress KError = 14 ConsumerCoordinatorNotAvailable KError = 15 NotCoordinatorForConsumer KError = 16 )
Numeric error codes returned by the Kafka server.
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap // contains filtered or unexported fields }
type MessageBlock ¶
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock Messages []*MessageBlock }
type MessageToSend ¶
type MessageToSend struct { Topic string Key, Value Encoder // contains filtered or unexported fields }
MessageToSend is the collection of elements passed to the Producer in order to send a message.
func (*MessageToSend) Offset ¶
func (m *MessageToSend) Offset() int64
Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
func (*MessageToSend) Partition ¶
func (m *MessageToSend) Partition() int32
Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.
type MetadataRequest ¶
type MetadataRequest struct {
Topics []string
}
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker Topics []*TopicMetadata }
func (*MetadataResponse) AddBroker ¶
func (m *MetadataResponse) AddBroker(addr string, id int32)
func (*MetadataResponse) AddTopicPartition ¶
func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32)
type MockBroker ¶
type MockBroker struct {
// contains filtered or unexported fields
}
MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that accepts a single connection. It reads Kafka requests from that connection and returns each response from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response the server sleeps for 250ms instead of reading a request).
When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.
It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.
func NewMockBroker ¶
func NewMockBroker(t TestState, brokerID int32) *MockBroker
NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestState and the broker exits.
func (*MockBroker) Addr ¶
func (b *MockBroker) Addr() string
func (*MockBroker) BrokerID ¶
func (b *MockBroker) BrokerID() int32
func (*MockBroker) Close ¶
func (b *MockBroker) Close()
func (*MockBroker) Port ¶
func (b *MockBroker) Port() int32
func (*MockBroker) Returns ¶
func (b *MockBroker) Returns(e encoder)
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string // contains filtered or unexported fields }
type OffsetCommitResponse ¶
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { ConsumerGroup string // contains filtered or unexported fields }
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Blocks map[string]map[int32]*OffsetFetchResponseBlock
}
type OffsetMethod ¶
type OffsetMethod int
OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
const ( // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the // offset at which to start, allowing the user to manually specify their desired starting offset. OffsetMethodManual OffsetMethod = iota // OffsetMethodNewest causes the consumer to start at the most recent available offset, as // determined by querying the broker. OffsetMethodNewest // OffsetMethodOldest causes the consumer to start at the oldest available offset, as // determined by querying the broker. OffsetMethodOldest )
type OffsetRequest ¶
type OffsetRequest struct {
// contains filtered or unexported fields
}
func (*OffsetRequest) AddBlock ¶
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32)
type OffsetResponse ¶
type OffsetResponse struct {
Blocks map[string]map[int32]*OffsetResponseBlock
}
func (*OffsetResponse) AddTopicPartition ¶
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)
func (*OffsetResponse) GetBlock ¶
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock
type OffsetResponseBlock ¶
type OffsetTime ¶
type OffsetTime int64
OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64 value will be interpreted as milliseconds, or use the special constants defined here.
const ( // LatestOffsets askes for the latest offsets. LatestOffsets OffsetTime = -1 // EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order, // asking for the earliest offset will always return you a single element. EarliestOffset OffsetTime = -2 )
type PartitionMetadata ¶
type Partitioner ¶
Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.
func NewHashPartitioner ¶
func NewHashPartitioner() Partitioner
func NewRandomPartitioner ¶
func NewRandomPartitioner() Partitioner
func NewRoundRobinPartitioner ¶
func NewRoundRobinPartitioner() Partitioner
type PartitionerConstructor ¶
type PartitionerConstructor func() Partitioner
PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type ProduceError ¶
type ProduceError struct { Msg *MessageToSend Err error }
ProduceError is the type of error generated when the producer fails to deliver a message. It contains the original MessageToSend as well as the actual error value. If the AckSuccesses configuration value is set to true then every message sent generates a ProduceError, but successes will have a nil Err field.
type ProduceErrors ¶
type ProduceErrors []*ProduceError
ProduceErrors is a type that wraps a batch of "ProduceError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.
func (ProduceErrors) Error ¶
func (pe ProduceErrors) Error() string
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks RequiredAcks Timeout int32 // contains filtered or unexported fields }
func (*ProduceRequest) AddMessage ¶
func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
func (*ProduceRequest) AddSet ¶
func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)
type ProduceResponse ¶
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
}
func (*ProduceResponse) AddTopicPartition ¶
func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)
func (*ProduceResponse) GetBlock ¶
func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock
type ProduceResponseBlock ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer publishes Kafka messages. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
Example ¶
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig()) if err != nil { panic(err) } else { fmt.Println("> connected") } defer client.Close() producer, err := NewProducer(client, nil) if err != nil { panic(err) } defer producer.Close() for { select { case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}: fmt.Println("> message queued") case err := <-producer.Errors(): panic(err.Err) } }
Output:
func NewProducer ¶
func NewProducer(client *Client, config *ProducerConfig) (*Producer, error)
NewProducer creates a new Producer using the given client.
func (*Producer) Close ¶
Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.
func (*Producer) Errors ¶
func (p *Producer) Errors() <-chan *ProduceError
Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock. It is suggested that you send messages and read errors together in a single select statement.
func (*Producer) Input ¶
func (p *Producer) Input() chan<- *MessageToSend
Input is the input channel for the user to write messages to that they wish to send.
func (*Producer) Successes ¶
func (p *Producer) Successes() <-chan *MessageToSend
Successes is the success output channel back to the user when AckSuccesses is configured. If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock. It is suggested that you send and read messages together in a single select statement.
type ProducerConfig ¶
type ProducerConfig struct { Partitioner PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash). RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal). Timeout time.Duration // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression). FlushMsgCount int // The number of messages needed to trigger a flush. FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued. FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered. AckSuccesses bool // If enabled, successfully delivered messages will be returned on the Successes channel. MaxMessageBytes int // The maximum permitted size of a message (defaults to 1000000) ChannelBufferSize int // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered). }
ProducerConfig is used to pass multiple configuration options to NewProducer.
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
func (*ProducerConfig) Validate ¶
func (config *ProducerConfig) Validate() error
Validate checks a ProducerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.
type RandomPartitioner ¶
type RandomPartitioner struct {
// contains filtered or unexported fields
}
RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any positive int16 value is valid, or the constants defined here.
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all replicas to commit before responding. WaitForAll RequiredAcks = -1 )
type RoundRobinPartitioner ¶
type RoundRobinPartitioner struct {
// contains filtered or unexported fields
}
RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
type SimpleProducer ¶
type SimpleProducer struct {
// contains filtered or unexported fields
}
SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
Example ¶
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig()) if err != nil { panic(err) } else { fmt.Println("> connected") } defer client.Close() producer, err := NewSimpleProducer(client, "my_topic", nil) if err != nil { panic(err) } defer producer.Close() for { err = producer.SendMessage(nil, StringEncoder("testing 123")) if err != nil { panic(err) } else { fmt.Println("> message sent") } }
Output:
func NewSimpleProducer ¶
func NewSimpleProducer(client *Client, topic string, partitioner PartitionerConstructor) (*SimpleProducer, error)
NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the partitioner is nil, messages are partitioned by the hash of the key (or randomly if there is no key).
func (*SimpleProducer) Close ¶
func (sp *SimpleProducer) Close() error
Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.
func (*SimpleProducer) SendMessage ¶
func (sp *SimpleProducer) SendMessage(key, value Encoder) error
SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that you can do things like
producer.SendMessage(nil, sarama.StringEncoder("hello world"))
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
type TestState ¶
type TestState interface { Error(args ...interface{}) Fatal(args ...interface{}) Fatalf(format string, args ...interface{}) }
TestState is a generic interface for a test state, implemented e.g. by testing.T
type TopicMetadata ¶
type TopicMetadata struct { Err KError Name string Partitions []*PartitionMetadata }
Source Files
¶
- broker.go
- client.go
- consumer.go
- consumer_metadata_request.go
- consumer_metadata_response.go
- crc32_field.go
- encoder_decoder.go
- errors.go
- fetch_request.go
- fetch_response.go
- length_field.go
- message.go
- message_set.go
- metadata_request.go
- metadata_response.go
- mockbroker.go
- offset_commit_request.go
- offset_commit_response.go
- offset_fetch_request.go
- offset_fetch_response.go
- offset_request.go
- offset_response.go
- packet_decoder.go
- packet_encoder.go
- partitioner.go
- prep_encoder.go
- produce_request.go
- produce_response.go
- producer.go
- real_decoder.go
- real_encoder.go
- request.go
- response_header.go
- sarama.go
- simple_producer.go
- snappy.go
- utils.go