Documentation
¶
Overview ¶
Amazon kinesis producer A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.
Note: this project start as a fork of `tj/go-kinesis`. if you are not intersting in the KPL aggregation logic, you probably want to check it out.
Index ¶
- type AggregatedRecordRequest
- type Aggregator
- type Config
- type DataRecord
- type DrainError
- type ErrIllegalPartitionKey
- type ErrRecordSizeExceeded
- type ErrStoppedProducer
- type FailureRecord
- type GetShardsFunc
- type LogValue
- type Logger
- type NopLogger
- type Producer
- type Putter
- type ShardBucketError
- type ShardLister
- type ShardMap
- func (m *ShardMap) Drain() ([]*AggregatedRecordRequest, []error)
- func (m *ShardMap) Put(userRecord UserRecord) (*AggregatedRecordRequest, error)
- func (m *ShardMap) Shards() []*k.Shard
- func (m *ShardMap) Size() int
- func (m *ShardMap) UpdateShards(shards []*k.Shard, pendingRecords []*AggregatedRecordRequest) ([]*AggregatedRecordRequest, error)
- type ShardRefreshError
- type ShardSlice
- type StdLogger
- type UserRecord
- type Work
- type WorkerPool
- func (wp *WorkerPool) Add(record *AggregatedRecordRequest)
- func (wp *WorkerPool) Close()
- func (wp *WorkerPool) Errors() chan error
- func (wp *WorkerPool) Flush()
- func (wp *WorkerPool) Pause() []*AggregatedRecordRequest
- func (wp *WorkerPool) Resume(records []*AggregatedRecordRequest)
- func (wp *WorkerPool) Start()
- func (wp *WorkerPool) Wait()
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregatedRecordRequest ¶
type AggregatedRecordRequest struct { Entry *k.PutRecordsRequestEntry UserRecords []UserRecord }
Contains the AWS Kinesis PutRecordsRequestEntry and UserRecords that are aggregated into the request. UserRecords are provided for more control over failure notifications
func NewAggregatedRecordRequest ¶
func NewAggregatedRecordRequest(data []byte, partitionKey, explicitHashKey *string, userRecords []UserRecord) *AggregatedRecordRequest
type Aggregator ¶
type Aggregator struct { // Aggregator holds onto its own RWMutex, but the caller of Aggregator methods is expected // to call Lock/Unlock sync.RWMutex // contains filtered or unexported fields }
func NewAggregator ¶
func NewAggregator(explicitHashKey *string) *Aggregator
NewAggregator initializes a new Aggregator with the given partitionKey
func (*Aggregator) Count ¶
func (a *Aggregator) Count() int
Count return how many records stored in the aggregator.
func (*Aggregator) Drain ¶
func (a *Aggregator) Drain() (*AggregatedRecordRequest, error)
Drain create an aggregated `kinesis.PutRecordsRequestEntry` that compatible with the KCL's deaggregation logic.
If you interested to know more about it. see: aggregation-format.md
func (*Aggregator) Put ¶
func (a *Aggregator) Put(userRecord UserRecord)
Put record using `data` and `partitionKey`. This method is thread-safe.
func (*Aggregator) Size ¶
func (a *Aggregator) Size() int
Size return how many bytes stored in the aggregator. including partition keys.
func (*Aggregator) WillOverflow ¶
func (a *Aggregator) WillOverflow(userRecord UserRecord) bool
WillOverflow checks if the aggregator will exceed max record size by attempting to Put the user record. If true, the aggregator should be drained before attempting a Put.
type Config ¶
type Config struct { // StreamName is the Kinesis stream. StreamName string // FlushInterval is a regular interval for flushing the buffer. Defaults to 5s. FlushInterval time.Duration // ShardRefreshInterval is a regular interval for refreshing the ShardMap. // Config.GetShards will be called at this interval. A value of 0 means no refresh // occurs. Default is 0 ShardRefreshInterval time.Duration // GetShards is called on NewProducer to initialze the ShardMap. // If ShardRefreshInterval is non-zero, GetShards will be called at that interval. // The default function returns a nil list of shards, which results in all records being // aggregated to a single record. GetShards GetShardsFunc // BatchCount determine the maximum number of items to pack in batch. // Must not exceed length. Defaults to 500. BatchCount int // BatchSize determine the maximum number of bytes to send with a PutRecords request. // Must not exceed 5MiB; Default to 5MiB. BatchSize int // AggregateBatchCount determine the maximum number of items to pack into an aggregated record. AggregateBatchCount int // AggregationBatchSize determine the maximum number of bytes to pack into an aggregated record. User records larger // than this will bypass aggregation. AggregateBatchSize int // BacklogCount determines the channel capacity before Put() will begin blocking. Default to `BatchCount`. BacklogCount int // Number of requests to sent concurrently. Default to 24. // If you are using the ListShards API in your GetShards function, those connections // will not be counted in MaxConnections. MaxConnections int // Logger is the logger used. Default to producer.Logger. Logger Logger // Enabling verbose logging. Default to false. Verbose bool // Client is the Putter interface implementation. Client Putter }
Config is the Producer configuration.
type DataRecord ¶
type DataRecord struct {
// contains filtered or unexported fields
}
func NewDataRecord ¶
func NewDataRecord(data []byte, partitionKey string) *DataRecord
func (*DataRecord) Data ¶
func (r *DataRecord) Data() []byte
func (*DataRecord) ExplicitHashKey ¶
func (r *DataRecord) ExplicitHashKey() *big.Int
func (*DataRecord) PartitionKey ¶
func (r *DataRecord) PartitionKey() string
func (*DataRecord) Size ¶
func (r *DataRecord) Size() int
type DrainError ¶
type DrainError struct { Err error // UserRecords in the buffer when drain attempt was made UserRecords []UserRecord }
func (*DrainError) Error ¶
func (e *DrainError) Error() string
type ErrIllegalPartitionKey ¶
type ErrIllegalPartitionKey struct {
UserRecord
}
func (*ErrIllegalPartitionKey) Error ¶
func (e *ErrIllegalPartitionKey) Error() string
type ErrRecordSizeExceeded ¶
type ErrRecordSizeExceeded struct {
UserRecord
}
func (*ErrRecordSizeExceeded) Error ¶
func (e *ErrRecordSizeExceeded) Error() string
type ErrStoppedProducer ¶
type ErrStoppedProducer struct {
UserRecord
}
func (*ErrStoppedProducer) Error ¶
func (e *ErrStoppedProducer) Error() string
type FailureRecord ¶
type FailureRecord struct { Err error // The PartitionKey that was used in the kinesis.PutRecordsRequestEntry PartitionKey string // The ExplicitHashKey that was used in the kinesis.PutRecordsRequestEntry. Will be the // empty string if nil ExplicitHashKey string // UserRecords that were contained in the failed aggregated record request UserRecords []UserRecord }
Failure record type for failures from Kinesis PutRecords request
func (*FailureRecord) Error ¶
func (e *FailureRecord) Error() string
type GetShardsFunc ¶
GetShardsFunc is called to populate the shard map on initialization and during refresh shard interval. GetShardsFunc will be called with the current shard list. During initialization, this will be nil. GetShardsFunc should return a shard list, a bool indicating if the shards should be updated and an error. If false bool or error is returned, shards will not be updated.
func GetKinesisShardsFunc ¶
func GetKinesisShardsFunc(client ShardLister, streamName string) GetShardsFunc
GetKinesisShardsFunc gets the active list of shards from Kinesis.ListShards API
func StaticGetShardsFunc ¶
func StaticGetShardsFunc(count int) GetShardsFunc
StaticGetShardsFunc returns a GetShardsFunc that when called, will generate a static list of shards with length count whos HashKeyRanges are evenly distributed
type LogValue ¶
type LogValue struct { Name string Value interface{} }
LogValue represents a key:value pair used by the Logger interface
type Logger ¶
type Logger interface { Info(msg string, values ...LogValue) Error(msg string, err error, values ...LogValue) }
Logger represents a simple interface used by kinesis-producer to handle logging
type Producer ¶
func (*Producer) NotifyFailures ¶
NotifyFailures registers and return listener to handle undeliverable messages. The incoming struct has a copy of the Data and the PartitionKey along with some error information about why the publishing failed.
func (*Producer) Put ¶
Put `data` using `partitionKey` asynchronously. This method is thread-safe.
Under the covers, the Producer will automatically re-attempt puts in case of transient errors. When unrecoverable error has detected(e.g: trying to put to in a stream that doesn't exist), the message will returned by the Producer. Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.
func (*Producer) PutUserRecord ¶
func (p *Producer) PutUserRecord(userRecord UserRecord) error
type Putter ¶
type Putter interface {
PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error)
}
Putter is the interface that wraps the KinesisAPI.PutRecords method.
type ShardBucketError ¶
type ShardBucketError struct {
UserRecord
}
func (*ShardBucketError) Error ¶
func (s *ShardBucketError) Error() string
type ShardLister ¶
type ShardLister interface {
ListShards(input *k.ListShardsInput) (*k.ListShardsOutput, error)
}
ShardLister is the interface that wraps the KinesisAPI.ListShards method.
type ShardMap ¶
Example ¶
logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} client := kinesis.New(session.New(aws.NewConfig())) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, Client: client, GetShards: GetKinesisShardsFunc(client, "test"), ShardRefreshInterval: 5 * time.Second, Logger: logger, }) pr.Start() failures := pr.NotifyFailures() // Handle failures go func() { for r := range failures { logger.Error("detected put failure", r) } }() go func() { for i := 0; i < 1000; i++ { pk := uuid.New().String() for j := 0; j < 5; j++ { err := pr.Put([]byte("foo"), pk) if err != nil { logger.Error("error producing", err) } } } }() time.Sleep(3 * time.Second) pr.Stop()
Output:
func NewShardMap ¶
NewShardMap initializes an aggregator for each shard. UserRecords that map to the same shard based on MD5 hash of their partition key (Same method used by Kinesis) will be aggregated together. Aggregators will use an ExplicitHashKey from their assigned shards when creating kinesis.PutRecordsRequestEntry. A ShardMap with an empty shards slice will return to unsharded behavior with a single aggregator. The aggregator will instead use the PartitionKey of the first UserRecord and no ExplicitHashKey.
func (*ShardMap) Drain ¶
func (m *ShardMap) Drain() ([]*AggregatedRecordRequest, []error)
Drain drains all the aggregators and returns a list of the results
func (*ShardMap) Put ¶
func (m *ShardMap) Put(userRecord UserRecord) (*AggregatedRecordRequest, error)
Put puts a UserRecord into the aggregator that maps to its partition key.
func (*ShardMap) Size ¶
Size return how many bytes stored in all the aggregators. including partition keys.
func (*ShardMap) UpdateShards ¶
func (m *ShardMap) UpdateShards(shards []*k.Shard, pendingRecords []*AggregatedRecordRequest) ([]*AggregatedRecordRequest, error)
Update the list of shards and redistribute buffered user records. Returns any records that were drained due to redistribution. Shards are not updated if an error occurs during redistribution. TODO: Can we optimize this? TODO: How to handle shard splitting? If a shard splits but we don't remap before sending
records to the new shards, once we do update our mapping, user records may end up in a new shard and we would lose the shard ordering. Consumer can probably figure it out since we retain original partition keys (but not explicit hash keys) Shard merging should not be an issue since records from both shards should fall into the merged hash key range.
type ShardRefreshError ¶
type ShardRefreshError struct {
Err error
}
func (*ShardRefreshError) Error ¶
func (s *ShardRefreshError) Error() string
type ShardSlice ¶
func (ShardSlice) Len ¶
func (p ShardSlice) Len() int
func (ShardSlice) Less ¶
func (p ShardSlice) Less(i, j int) bool
func (ShardSlice) Swap ¶
func (p ShardSlice) Swap(i, j int)
type StdLogger ¶
StdLogger implements the Logger interface using standard library loggers
type UserRecord ¶
type UserRecord interface { // PartitionKey returns the partition key of the record PartitionKey() string // ExplicitHashKey returns an optional explicit hash key that will be used for shard // mapping. Should return nil if there is none. ExplicitHashKey() *big.Int // The raw data payload of the record that should be added to the record Data() []byte // Size is the size of the record's data. Do not include the size of the partition key // in this result. The partition key's size is calculated separately by the aggregator. Size() int }
UserRecord represents an individual record that is meant for aggregation
Example ¶
logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} client := kinesis.New(session.New(aws.NewConfig())) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, Client: client, GetShards: GetKinesisShardsFunc(client, "test"), ShardRefreshInterval: 5 * time.Second, Logger: logger, }) pr.Start() failures := pr.NotifyFailures() // Handle failures go func() { for r := range failures { logger.Error("detected put failure", r) } }() go func() { for i := 0; i < 5000; i++ { record, err := newMyExampleUserRecord("foo", "bar") if err != nil { logger.Error("error creating user record", err) } err = pr.PutUserRecord(record) if err != nil { logger.Error("error producing", err) } } }() time.Sleep(3 * time.Second) pr.Stop()
Output:
type WorkerPool ¶
type WorkerPool struct { *Config // contains filtered or unexported fields }
func NewWorkerPool ¶
func NewWorkerPool(config *Config) *WorkerPool
func (*WorkerPool) Add ¶
func (wp *WorkerPool) Add(record *AggregatedRecordRequest)
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close()
func (*WorkerPool) Errors ¶
func (wp *WorkerPool) Errors() chan error
func (*WorkerPool) Flush ¶
func (wp *WorkerPool) Flush()
func (*WorkerPool) Pause ¶
func (wp *WorkerPool) Pause() []*AggregatedRecordRequest
func (*WorkerPool) Resume ¶
func (wp *WorkerPool) Resume(records []*AggregatedRecordRequest)
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start()
func (*WorkerPool) Wait ¶
func (wp *WorkerPool) Wait()
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
deaggregation package from https://github.com/kimutansk/go-kinesis-deaggregation/blob/9d28647d1ff4d296bdd7c12c0cad272c9303d2fc/deaggregator.go
|
deaggregation package from https://github.com/kimutansk/go-kinesis-deaggregation/blob/9d28647d1ff4d296bdd7c12c0cad272c9303d2fc/deaggregator.go |
loggers
|
|
Package pb is a generated protocol buffer package.
|
Package pb is a generated protocol buffer package. |