Documentation
¶
Index ¶
- Constants
- Variables
- func SetContextEventMessageKey(ctx context.Context, value interface{}, key ...EventMessageKey) context.Context
- type Adapter
- func (a *Adapter) Attach()
- func (a *Adapter) Detach()
- func (a *Adapter) EventContext(ctx context.Context) (context.Context, error)
- func (a *Adapter) GetConsumer(key string) *Consumer
- func (a *Adapter) GetProducer(key string) *Producer
- func (a *Adapter) Initialize()
- func (a *Adapter) Register() (string, interface{})
- type Configuration
- func (*Configuration) Descriptor() ([]byte, []int)deprecated
- func (x *Configuration) GetApiVersionFallbackMs() int64
- func (x *Configuration) GetApiVersionRequest() bool
- func (x *Configuration) GetBootstrapServers() []string
- func (x *Configuration) GetBrokerVersionFallback() string
- func (x *Configuration) GetConsumers() map[string]*Configuration_Consumer
- func (x *Configuration) GetDebug() string
- func (x *Configuration) GetEnableSslCertificateVerification() bool
- func (x *Configuration) GetMaxInFlight() int64
- func (x *Configuration) GetMaxMessageBytes() int64
- func (x *Configuration) GetMaxMessageCopyBytes() int64
- func (x *Configuration) GetMessageKeyFrom() IdentityFieldToMessageKey
- func (x *Configuration) GetProducers() map[string]*Configuration_Producer
- func (x *Configuration) GetReceiveMessageMaxBytes() int64
- func (x *Configuration) GetReconnectBackoffMaxMs() int64
- func (x *Configuration) GetReconnectBackoffMs() int64
- func (x *Configuration) GetSecurityProtocol() string
- func (x *Configuration) GetSslCaLocation() string
- func (x *Configuration) GetSslCertificateLocation() string
- func (x *Configuration) GetSslCrlLocation() string
- func (x *Configuration) GetSslKeyLocation() string
- func (x *Configuration) GetStatisticsIntervalMs() int64
- func (*Configuration) ProtoMessage()
- func (x *Configuration) ProtoReflect() protoreflect.Message
- func (x *Configuration) Reset()
- func (x *Configuration) String() string
- type Configuration_Consumer
- func (*Configuration_Consumer) Descriptor() ([]byte, []int)deprecated
- func (x *Configuration_Consumer) GetFetchErrorBackoffMs() int64
- func (x *Configuration_Consumer) GetFetchMessageMaxBytes() int64
- func (x *Configuration_Consumer) GetMaxPollIntervalMs() int64
- func (x *Configuration_Consumer) GetQueuedMaxMessagesKbytes() int64
- func (x *Configuration_Consumer) GetQueuedMinMessages() int64
- func (x *Configuration_Consumer) GetReadTimeoutMs() int64
- func (x *Configuration_Consumer) GetSessionTimeoutMs() int64
- func (x *Configuration_Consumer) GetTopics() []string
- func (*Configuration_Consumer) ProtoMessage()
- func (x *Configuration_Consumer) ProtoReflect() protoreflect.Message
- func (x *Configuration_Consumer) Reset()
- func (x *Configuration_Consumer) String() string
- type Configuration_Producer
- func (*Configuration_Producer) Descriptor() ([]byte, []int)deprecated
- func (x *Configuration_Producer) GetBatchNumMessages() int64
- func (x *Configuration_Producer) GetIsIdempotent() bool
- func (x *Configuration_Producer) GetIsTransactional() bool
- func (x *Configuration_Producer) GetMessageSendMaxRetries() int64
- func (x *Configuration_Producer) GetMessageTimeoutMs() int64
- func (x *Configuration_Producer) GetQueueBufferingMaxKbytes() int64
- func (x *Configuration_Producer) GetQueueBufferingMaxMessages() int64
- func (x *Configuration_Producer) GetQueueBufferingMaxMs() int64
- func (x *Configuration_Producer) GetRetryBackoffMs() int64
- func (x *Configuration_Producer) GetTransactionTimeoutMs() int64
- func (x *Configuration_Producer) GetTransactionalId() string
- func (*Configuration_Producer) ProtoMessage()
- func (x *Configuration_Producer) ProtoReflect() protoreflect.Message
- func (x *Configuration_Producer) Reset()
- func (x *Configuration_Producer) String() string
- type Consumer
- type Event
- func (*Event) Descriptor() ([]byte, []int)deprecated
- func (x *Event) GetClaim() *auth.ClaimEnvelope
- func (x *Event) GetData() []byte
- func (x *Event) GetMessageKey() string
- func (x *Event) GetName() string
- func (x *Event) GetTopic() string
- func (x *Event) Log(ev *zerolog.Event) *zerolog.Event
- func (x *Event) MarshalProto(msg protoreflect.ProtoMessage)
- func (x *Event) MessageKeyFromContext(ctx context.Context, key ...EventMessageKey) *Event
- func (*Event) ProtoMessage()
- func (x *Event) ProtoReflect() protoreflect.Message
- func (x *Event) Reset()
- func (x *Event) String() string
- func (x *Event) UnmarshalProto(msg protoreflect.ProtoMessage)
- type EventMessageKey
- type IdentityFieldToMessageKey
- func (IdentityFieldToMessageKey) Descriptor() protoreflect.EnumDescriptor
- func (x IdentityFieldToMessageKey) Enum() *IdentityFieldToMessageKey
- func (IdentityFieldToMessageKey) EnumDescriptor() ([]byte, []int)deprecated
- func (x IdentityFieldToMessageKey) Number() protoreflect.EnumNumber
- func (x IdentityFieldToMessageKey) String() string
- func (IdentityFieldToMessageKey) Type() protoreflect.EnumType
- type Producer
- type WrappedEvent
Constants ¶
View Source
const Key = "kafkaDriver"
Variables ¶
View Source
var ( IdentityFieldToMessageKey_name = map[int32]string{ 0: "TENANT_ID", 1: "RANDOM", } IdentityFieldToMessageKey_value = map[string]int32{ "TENANT_ID": 0, "RANDOM": 1, } )
Enum value maps for IdentityFieldToMessageKey.
View Source
var (
DefaultMessageKey = StandardMessageKey
)
View Source
var File_adapters_kafka_driver_kafka_driver_proto protoreflect.FileDescriptor
Functions ¶
func SetContextEventMessageKey ¶
func SetContextEventMessageKey(ctx context.Context, value interface{}, key ...EventMessageKey) context.Context
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
func (*Adapter) EventContext ¶
func (*Adapter) GetConsumer ¶
func (*Adapter) GetProducer ¶
func (*Adapter) Initialize ¶
func (a *Adapter) Initialize()
type Configuration ¶
type Configuration struct { BootstrapServers []string `protobuf:"bytes,1,rep,name=bootstrap_servers,json=bootstrapServers,proto3" json:"bootstrap_servers,omitempty"` MaxMessageBytes int64 `protobuf:"varint,2,opt,name=max_message_bytes,json=maxMessageBytes,proto3" json:"max_message_bytes,omitempty"` MaxMessageCopyBytes int64 `protobuf:"varint,3,opt,name=max_message_copy_bytes,json=maxMessageCopyBytes,proto3" json:"max_message_copy_bytes,omitempty"` ReceiveMessageMaxBytes int64 `` /* 132-byte string literal not displayed */ MaxInFlight int64 `protobuf:"varint,5,opt,name=max_in_flight,json=maxInFlight,proto3" json:"max_in_flight,omitempty"` Debug string `protobuf:"bytes,6,opt,name=debug,proto3" json:"debug,omitempty"` ReconnectBackoffMs int64 `protobuf:"varint,7,opt,name=reconnect_backoff_ms,json=reconnectBackoffMs,proto3" json:"reconnect_backoff_ms,omitempty"` ReconnectBackoffMaxMs int64 `` /* 129-byte string literal not displayed */ StatisticsIntervalMs int64 `protobuf:"varint,9,opt,name=statistics_interval_ms,json=statisticsIntervalMs,proto3" json:"statistics_interval_ms,omitempty"` ApiVersionRequest bool `protobuf:"varint,10,opt,name=api_version_request,json=apiVersionRequest,proto3" json:"api_version_request,omitempty"` ApiVersionFallbackMs int64 `` /* 127-byte string literal not displayed */ BrokerVersionFallback string `` /* 127-byte string literal not displayed */ SecurityProtocol string `protobuf:"bytes,13,opt,name=security_protocol,json=securityProtocol,proto3" json:"security_protocol,omitempty"` SslKeyLocation string `protobuf:"bytes,14,opt,name=ssl_key_location,json=sslKeyLocation,proto3" json:"ssl_key_location,omitempty"` SslCertificateLocation string `` /* 130-byte string literal not displayed */ SslCaLocation string `protobuf:"bytes,16,opt,name=ssl_ca_location,json=sslCaLocation,proto3" json:"ssl_ca_location,omitempty"` SslCrlLocation string `protobuf:"bytes,17,opt,name=ssl_crl_location,json=sslCrlLocation,proto3" json:"ssl_crl_location,omitempty"` EnableSslCertificateVerification bool `` /* 163-byte string literal not displayed */ Consumers map[string]*Configuration_Consumer `` /* 160-byte string literal not displayed */ Producers map[string]*Configuration_Producer `` /* 160-byte string literal not displayed */ MessageKeyFrom IdentityFieldToMessageKey `` /* 165-byte string literal not displayed */ // contains filtered or unexported fields }
func (*Configuration) Descriptor
deprecated
func (*Configuration) Descriptor() ([]byte, []int)
Deprecated: Use Configuration.ProtoReflect.Descriptor instead.
func (*Configuration) GetApiVersionFallbackMs ¶
func (x *Configuration) GetApiVersionFallbackMs() int64
func (*Configuration) GetApiVersionRequest ¶
func (x *Configuration) GetApiVersionRequest() bool
func (*Configuration) GetBootstrapServers ¶
func (x *Configuration) GetBootstrapServers() []string
func (*Configuration) GetBrokerVersionFallback ¶
func (x *Configuration) GetBrokerVersionFallback() string
func (*Configuration) GetConsumers ¶
func (x *Configuration) GetConsumers() map[string]*Configuration_Consumer
func (*Configuration) GetDebug ¶
func (x *Configuration) GetDebug() string
func (*Configuration) GetEnableSslCertificateVerification ¶
func (x *Configuration) GetEnableSslCertificateVerification() bool
func (*Configuration) GetMaxInFlight ¶
func (x *Configuration) GetMaxInFlight() int64
func (*Configuration) GetMaxMessageBytes ¶
func (x *Configuration) GetMaxMessageBytes() int64
func (*Configuration) GetMaxMessageCopyBytes ¶
func (x *Configuration) GetMaxMessageCopyBytes() int64
func (*Configuration) GetMessageKeyFrom ¶
func (x *Configuration) GetMessageKeyFrom() IdentityFieldToMessageKey
func (*Configuration) GetProducers ¶
func (x *Configuration) GetProducers() map[string]*Configuration_Producer
func (*Configuration) GetReceiveMessageMaxBytes ¶
func (x *Configuration) GetReceiveMessageMaxBytes() int64
func (*Configuration) GetReconnectBackoffMaxMs ¶
func (x *Configuration) GetReconnectBackoffMaxMs() int64
func (*Configuration) GetReconnectBackoffMs ¶
func (x *Configuration) GetReconnectBackoffMs() int64
func (*Configuration) GetSecurityProtocol ¶
func (x *Configuration) GetSecurityProtocol() string
func (*Configuration) GetSslCaLocation ¶
func (x *Configuration) GetSslCaLocation() string
func (*Configuration) GetSslCertificateLocation ¶
func (x *Configuration) GetSslCertificateLocation() string
func (*Configuration) GetSslCrlLocation ¶
func (x *Configuration) GetSslCrlLocation() string
func (*Configuration) GetSslKeyLocation ¶
func (x *Configuration) GetSslKeyLocation() string
func (*Configuration) GetStatisticsIntervalMs ¶
func (x *Configuration) GetStatisticsIntervalMs() int64
func (*Configuration) ProtoMessage ¶
func (*Configuration) ProtoMessage()
func (*Configuration) ProtoReflect ¶
func (x *Configuration) ProtoReflect() protoreflect.Message
func (*Configuration) Reset ¶
func (x *Configuration) Reset()
func (*Configuration) String ¶
func (x *Configuration) String() string
type Configuration_Consumer ¶
type Configuration_Consumer struct { SessionTimeoutMs int64 `protobuf:"varint,1,opt,name=session_timeout_ms,json=sessionTimeoutMs,proto3" json:"session_timeout_ms,omitempty"` MaxPollIntervalMs int64 `protobuf:"varint,2,opt,name=max_poll_interval_ms,json=maxPollIntervalMs,proto3" json:"max_poll_interval_ms,omitempty"` QueuedMinMessages int64 `protobuf:"varint,3,opt,name=queued_min_messages,json=queuedMinMessages,proto3" json:"queued_min_messages,omitempty"` QueuedMaxMessagesKbytes int64 `` /* 135-byte string literal not displayed */ FetchMessageMaxBytes int64 `` /* 126-byte string literal not displayed */ FetchErrorBackoffMs int64 `protobuf:"varint,6,opt,name=fetch_error_backoff_ms,json=fetchErrorBackoffMs,proto3" json:"fetch_error_backoff_ms,omitempty"` ReadTimeoutMs int64 `protobuf:"varint,7,opt,name=read_timeout_ms,json=readTimeoutMs,proto3" json:"read_timeout_ms,omitempty"` Topics []string `protobuf:"bytes,8,rep,name=topics,proto3" json:"topics,omitempty"` // contains filtered or unexported fields }
func (*Configuration_Consumer) Descriptor
deprecated
func (*Configuration_Consumer) Descriptor() ([]byte, []int)
Deprecated: Use Configuration_Consumer.ProtoReflect.Descriptor instead.
func (*Configuration_Consumer) GetFetchErrorBackoffMs ¶
func (x *Configuration_Consumer) GetFetchErrorBackoffMs() int64
func (*Configuration_Consumer) GetFetchMessageMaxBytes ¶
func (x *Configuration_Consumer) GetFetchMessageMaxBytes() int64
func (*Configuration_Consumer) GetMaxPollIntervalMs ¶
func (x *Configuration_Consumer) GetMaxPollIntervalMs() int64
func (*Configuration_Consumer) GetQueuedMaxMessagesKbytes ¶
func (x *Configuration_Consumer) GetQueuedMaxMessagesKbytes() int64
func (*Configuration_Consumer) GetQueuedMinMessages ¶
func (x *Configuration_Consumer) GetQueuedMinMessages() int64
func (*Configuration_Consumer) GetReadTimeoutMs ¶
func (x *Configuration_Consumer) GetReadTimeoutMs() int64
func (*Configuration_Consumer) GetSessionTimeoutMs ¶
func (x *Configuration_Consumer) GetSessionTimeoutMs() int64
func (*Configuration_Consumer) GetTopics ¶
func (x *Configuration_Consumer) GetTopics() []string
func (*Configuration_Consumer) ProtoMessage ¶
func (*Configuration_Consumer) ProtoMessage()
func (*Configuration_Consumer) ProtoReflect ¶
func (x *Configuration_Consumer) ProtoReflect() protoreflect.Message
func (*Configuration_Consumer) Reset ¶
func (x *Configuration_Consumer) Reset()
func (*Configuration_Consumer) String ¶
func (x *Configuration_Consumer) String() string
type Configuration_Producer ¶
type Configuration_Producer struct { QueueBufferingMaxMs int64 `protobuf:"varint,1,opt,name=queue_buffering_max_ms,json=queueBufferingMaxMs,proto3" json:"queue_buffering_max_ms,omitempty"` RetryBackoffMs int64 `protobuf:"varint,2,opt,name=retry_backoff_ms,json=retryBackoffMs,proto3" json:"retry_backoff_ms,omitempty"` BatchNumMessages int64 `protobuf:"varint,3,opt,name=batch_num_messages,json=batchNumMessages,proto3" json:"batch_num_messages,omitempty"` MessageTimeoutMs int64 `protobuf:"varint,4,opt,name=message_timeout_ms,json=messageTimeoutMs,proto3" json:"message_timeout_ms,omitempty"` TransactionTimeoutMs int64 `protobuf:"varint,5,opt,name=transaction_timeout_ms,json=transactionTimeoutMs,proto3" json:"transaction_timeout_ms,omitempty"` QueueBufferingMaxMessages int64 `` /* 141-byte string literal not displayed */ QueueBufferingMaxKbytes int64 `` /* 135-byte string literal not displayed */ MessageSendMaxRetries int64 `` /* 129-byte string literal not displayed */ IsTransactional bool `protobuf:"varint,9,opt,name=is_transactional,json=isTransactional,proto3" json:"is_transactional,omitempty"` IsIdempotent bool `protobuf:"varint,10,opt,name=is_idempotent,json=isIdempotent,proto3" json:"is_idempotent,omitempty"` TransactionalId string `protobuf:"bytes,11,opt,name=transactional_id,json=transactionalId,proto3" json:"transactional_id,omitempty"` // contains filtered or unexported fields }
func (*Configuration_Producer) Descriptor
deprecated
func (*Configuration_Producer) Descriptor() ([]byte, []int)
Deprecated: Use Configuration_Producer.ProtoReflect.Descriptor instead.
func (*Configuration_Producer) GetBatchNumMessages ¶
func (x *Configuration_Producer) GetBatchNumMessages() int64
func (*Configuration_Producer) GetIsIdempotent ¶
func (x *Configuration_Producer) GetIsIdempotent() bool
func (*Configuration_Producer) GetIsTransactional ¶
func (x *Configuration_Producer) GetIsTransactional() bool
func (*Configuration_Producer) GetMessageSendMaxRetries ¶
func (x *Configuration_Producer) GetMessageSendMaxRetries() int64
func (*Configuration_Producer) GetMessageTimeoutMs ¶
func (x *Configuration_Producer) GetMessageTimeoutMs() int64
func (*Configuration_Producer) GetQueueBufferingMaxKbytes ¶
func (x *Configuration_Producer) GetQueueBufferingMaxKbytes() int64
func (*Configuration_Producer) GetQueueBufferingMaxMessages ¶
func (x *Configuration_Producer) GetQueueBufferingMaxMessages() int64
func (*Configuration_Producer) GetQueueBufferingMaxMs ¶
func (x *Configuration_Producer) GetQueueBufferingMaxMs() int64
func (*Configuration_Producer) GetRetryBackoffMs ¶
func (x *Configuration_Producer) GetRetryBackoffMs() int64
func (*Configuration_Producer) GetTransactionTimeoutMs ¶
func (x *Configuration_Producer) GetTransactionTimeoutMs() int64
func (*Configuration_Producer) GetTransactionalId ¶
func (x *Configuration_Producer) GetTransactionalId() string
func (*Configuration_Producer) ProtoMessage ¶
func (*Configuration_Producer) ProtoMessage()
func (*Configuration_Producer) ProtoReflect ¶
func (x *Configuration_Producer) ProtoReflect() protoreflect.Message
func (*Configuration_Producer) Reset ¶
func (x *Configuration_Producer) Reset()
func (*Configuration_Producer) String ¶
func (x *Configuration_Producer) String() string
type Consumer ¶
type Consumer struct { *kafka.Consumer MessagesCh chan *WrappedEvent // contains filtered or unexported fields }
func (*Consumer) MustCommitEvent ¶
func (c *Consumer) MustCommitEvent(event *WrappedEvent)
type Event ¶
type Event struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` MessageKey string `protobuf:"bytes,3,opt,name=message_key,json=messageKey,proto3" json:"message_key,omitempty"` Topic string `protobuf:"bytes,4,opt,name=topic,proto3" json:"topic,omitempty"` Claim *auth.ClaimEnvelope `protobuf:"bytes,5,opt,name=claim,proto3" json:"claim,omitempty"` // contains filtered or unexported fields }
func (*Event) Descriptor
deprecated
func (*Event) GetClaim ¶
func (x *Event) GetClaim() *auth.ClaimEnvelope
func (*Event) GetMessageKey ¶
func (*Event) MarshalProto ¶
func (x *Event) MarshalProto(msg protoreflect.ProtoMessage)
func (*Event) MessageKeyFromContext ¶
func (x *Event) MessageKeyFromContext(ctx context.Context, key ...EventMessageKey) *Event
func (*Event) ProtoMessage ¶
func (*Event) ProtoMessage()
func (*Event) ProtoReflect ¶
func (x *Event) ProtoReflect() protoreflect.Message
func (*Event) UnmarshalProto ¶
func (x *Event) UnmarshalProto(msg protoreflect.ProtoMessage)
type IdentityFieldToMessageKey ¶
type IdentityFieldToMessageKey int32
const ( IdentityFieldToMessageKey_TENANT_ID IdentityFieldToMessageKey = 0 IdentityFieldToMessageKey_RANDOM IdentityFieldToMessageKey = 1 )
func (IdentityFieldToMessageKey) Descriptor ¶
func (IdentityFieldToMessageKey) Descriptor() protoreflect.EnumDescriptor
func (IdentityFieldToMessageKey) Enum ¶
func (x IdentityFieldToMessageKey) Enum() *IdentityFieldToMessageKey
func (IdentityFieldToMessageKey) EnumDescriptor
deprecated
func (IdentityFieldToMessageKey) EnumDescriptor() ([]byte, []int)
Deprecated: Use IdentityFieldToMessageKey.Descriptor instead.
func (IdentityFieldToMessageKey) Number ¶
func (x IdentityFieldToMessageKey) Number() protoreflect.EnumNumber
func (IdentityFieldToMessageKey) String ¶
func (x IdentityFieldToMessageKey) String() string
func (IdentityFieldToMessageKey) Type ¶
func (IdentityFieldToMessageKey) Type() protoreflect.EnumType
type WrappedEvent ¶
Click to show internal directories.
Click to hide internal directories.