Documentation
¶
Index ¶
- Variables
- func KVFromProto(msg proto.Message) (string, []byte, error)
- func NewConsumerContext(ctx context.Context, r Consumer) context.Context
- func NewProducerContext(ctx context.Context, r Producer) context.Context
- func RegisterConsumer(kind string, e LazyConsumer)
- func RegisterProducer(kind string, e LazyProducer)
- type Config
- func (*Config) Descriptor() ([]byte, []int)deprecated
- func (x *Config) GetAddr() string
- func (x *Config) GetExtra() *structpb.Struct
- func (x *Config) GetGroup() string
- func (x *Config) GetKafka() *Config_Kafka
- func (x *Config) GetPulsar() *Config_Pulsar
- func (x *Config) GetTopic() string
- func (x *Config) GetType() string
- func (*Config) ProtoMessage()
- func (x *Config) ProtoReflect() protoreflect.Message
- func (x *Config) Reset()
- func (x *Config) String() string
- func (m *Config) Validate() error
- func (m *Config) ValidateAll() error
- type ConfigMultiError
- type ConfigValidationError
- type Config_Kafka
- func (*Config_Kafka) Descriptor() ([]byte, []int)deprecated
- func (x *Config_Kafka) GetVersion() *wrapperspb.StringValue
- func (*Config_Kafka) ProtoMessage()
- func (x *Config_Kafka) ProtoReflect() protoreflect.Message
- func (x *Config_Kafka) Reset()
- func (x *Config_Kafka) String() string
- func (m *Config_Kafka) Validate() error
- func (m *Config_Kafka) ValidateAll() error
- type Config_KafkaMultiError
- type Config_KafkaValidationError
- func (e Config_KafkaValidationError) Cause() error
- func (e Config_KafkaValidationError) Error() string
- func (e Config_KafkaValidationError) ErrorName() string
- func (e Config_KafkaValidationError) Field() string
- func (e Config_KafkaValidationError) Key() bool
- func (e Config_KafkaValidationError) Reason() string
- type Config_Pulsar
- func (*Config_Pulsar) Descriptor() ([]byte, []int)deprecated
- func (x *Config_Pulsar) GetConnectionTimeout() *durationpb.Duration
- func (x *Config_Pulsar) GetOperationTimeout() *durationpb.Duration
- func (*Config_Pulsar) ProtoMessage()
- func (x *Config_Pulsar) ProtoReflect() protoreflect.Message
- func (x *Config_Pulsar) Reset()
- func (x *Config_Pulsar) String() string
- func (m *Config_Pulsar) Validate() error
- func (m *Config_Pulsar) ValidateAll() error
- type Config_PulsarMultiError
- type Config_PulsarValidationError
- func (e Config_PulsarValidationError) Cause() error
- func (e Config_PulsarValidationError) Error() string
- func (e Config_PulsarValidationError) ErrorName() string
- func (e Config_PulsarValidationError) Field() string
- func (e Config_PulsarValidationError) Key() bool
- func (e Config_PulsarValidationError) Reason() string
- type Consumer
- type ConsumerFactoryServer
- type ConsumerHandler
- type ConsumerHandlerFunc
- type ConsumerMiddlewareFunc
- type ConsumerMux
- type ConsumerServer
- type ErrFormatFunc
- type Event
- type HandlerFuncOf
- type HandlerOf
- type Header
- type LazyConsumer
- type LazyProducer
- type Message
- type Producer
- type ProducerMiddlewareFunc
- type ProducerMux
- type RecoverOption
Constants ¶
This section is empty.
Variables ¶
var File_event_event_proto protoreflect.FileDescriptor
Functions ¶
func RegisterConsumer ¶
func RegisterConsumer(kind string, e LazyConsumer)
func RegisterProducer ¶
func RegisterProducer(kind string, e LazyProducer)
Types ¶
type Config ¶
type Config struct { Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"` Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` Group string `protobuf:"bytes,4,opt,name=group,proto3" json:"group,omitempty"` Kafka *Config_Kafka `protobuf:"bytes,10,opt,name=kafka,proto3" json:"kafka,omitempty"` Pulsar *Config_Pulsar `protobuf:"bytes,11,opt,name=pulsar,proto3" json:"pulsar,omitempty"` Extra *structpb.Struct `protobuf:"bytes,100,opt,name=extra,proto3" json:"extra,omitempty"` // contains filtered or unexported fields }
func (*Config) Descriptor
deprecated
func (*Config) GetKafka ¶
func (x *Config) GetKafka() *Config_Kafka
func (*Config) GetPulsar ¶
func (x *Config) GetPulsar() *Config_Pulsar
func (*Config) ProtoMessage ¶
func (*Config) ProtoMessage()
func (*Config) ProtoReflect ¶
func (x *Config) ProtoReflect() protoreflect.Message
func (*Config) Validate ¶
Validate checks the field values on Config with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.
func (*Config) ValidateAll ¶
ValidateAll checks the field values on Config with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in ConfigMultiError, or nil if none found.
type ConfigMultiError ¶
type ConfigMultiError []error
ConfigMultiError is an error wrapping multiple validation errors returned by Config.ValidateAll() if the designated constraints aren't met.
func (ConfigMultiError) AllErrors ¶
func (m ConfigMultiError) AllErrors() []error
AllErrors returns a list of validation violation errors.
func (ConfigMultiError) Error ¶
func (m ConfigMultiError) Error() string
Error returns a concatenation of all the error messages it wraps.
type ConfigValidationError ¶
type ConfigValidationError struct {
// contains filtered or unexported fields
}
ConfigValidationError is the validation error returned by Config.Validate if the designated constraints aren't met.
func (ConfigValidationError) Cause ¶
func (e ConfigValidationError) Cause() error
Cause function returns cause value.
func (ConfigValidationError) Error ¶
func (e ConfigValidationError) Error() string
Error satisfies the builtin error interface
func (ConfigValidationError) ErrorName ¶
func (e ConfigValidationError) ErrorName() string
ErrorName returns error name.
func (ConfigValidationError) Field ¶
func (e ConfigValidationError) Field() string
Field function returns field value.
func (ConfigValidationError) Key ¶
func (e ConfigValidationError) Key() bool
Key function returns key value.
func (ConfigValidationError) Reason ¶
func (e ConfigValidationError) Reason() string
Reason function returns reason value.
type Config_Kafka ¶
type Config_Kafka struct { Version *wrapperspb.StringValue `protobuf:"bytes,1,opt,name=version,proto3,oneof" json:"version,omitempty"` // contains filtered or unexported fields }
func (*Config_Kafka) Descriptor
deprecated
func (*Config_Kafka) Descriptor() ([]byte, []int)
Deprecated: Use Config_Kafka.ProtoReflect.Descriptor instead.
func (*Config_Kafka) GetVersion ¶
func (x *Config_Kafka) GetVersion() *wrapperspb.StringValue
func (*Config_Kafka) ProtoMessage ¶
func (*Config_Kafka) ProtoMessage()
func (*Config_Kafka) ProtoReflect ¶
func (x *Config_Kafka) ProtoReflect() protoreflect.Message
func (*Config_Kafka) Reset ¶
func (x *Config_Kafka) Reset()
func (*Config_Kafka) String ¶
func (x *Config_Kafka) String() string
func (*Config_Kafka) Validate ¶
func (m *Config_Kafka) Validate() error
Validate checks the field values on Config_Kafka with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.
func (*Config_Kafka) ValidateAll ¶
func (m *Config_Kafka) ValidateAll() error
ValidateAll checks the field values on Config_Kafka with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in Config_KafkaMultiError, or nil if none found.
type Config_KafkaMultiError ¶
type Config_KafkaMultiError []error
Config_KafkaMultiError is an error wrapping multiple validation errors returned by Config_Kafka.ValidateAll() if the designated constraints aren't met.
func (Config_KafkaMultiError) AllErrors ¶
func (m Config_KafkaMultiError) AllErrors() []error
AllErrors returns a list of validation violation errors.
func (Config_KafkaMultiError) Error ¶
func (m Config_KafkaMultiError) Error() string
Error returns a concatenation of all the error messages it wraps.
type Config_KafkaValidationError ¶
type Config_KafkaValidationError struct {
// contains filtered or unexported fields
}
Config_KafkaValidationError is the validation error returned by Config_Kafka.Validate if the designated constraints aren't met.
func (Config_KafkaValidationError) Cause ¶
func (e Config_KafkaValidationError) Cause() error
Cause function returns cause value.
func (Config_KafkaValidationError) Error ¶
func (e Config_KafkaValidationError) Error() string
Error satisfies the builtin error interface
func (Config_KafkaValidationError) ErrorName ¶
func (e Config_KafkaValidationError) ErrorName() string
ErrorName returns error name.
func (Config_KafkaValidationError) Field ¶
func (e Config_KafkaValidationError) Field() string
Field function returns field value.
func (Config_KafkaValidationError) Key ¶
func (e Config_KafkaValidationError) Key() bool
Key function returns key value.
func (Config_KafkaValidationError) Reason ¶
func (e Config_KafkaValidationError) Reason() string
Reason function returns reason value.
type Config_Pulsar ¶
type Config_Pulsar struct { OperationTimeout *durationpb.Duration `protobuf:"bytes,1,opt,name=operation_timeout,json=operationTimeout,proto3,oneof" json:"operation_timeout,omitempty"` ConnectionTimeout *durationpb.Duration `protobuf:"bytes,2,opt,name=connection_timeout,json=connectionTimeout,proto3,oneof" json:"connection_timeout,omitempty"` // contains filtered or unexported fields }
func (*Config_Pulsar) Descriptor
deprecated
func (*Config_Pulsar) Descriptor() ([]byte, []int)
Deprecated: Use Config_Pulsar.ProtoReflect.Descriptor instead.
func (*Config_Pulsar) GetConnectionTimeout ¶
func (x *Config_Pulsar) GetConnectionTimeout() *durationpb.Duration
func (*Config_Pulsar) GetOperationTimeout ¶
func (x *Config_Pulsar) GetOperationTimeout() *durationpb.Duration
func (*Config_Pulsar) ProtoMessage ¶
func (*Config_Pulsar) ProtoMessage()
func (*Config_Pulsar) ProtoReflect ¶
func (x *Config_Pulsar) ProtoReflect() protoreflect.Message
func (*Config_Pulsar) Reset ¶
func (x *Config_Pulsar) Reset()
func (*Config_Pulsar) String ¶
func (x *Config_Pulsar) String() string
func (*Config_Pulsar) Validate ¶
func (m *Config_Pulsar) Validate() error
Validate checks the field values on Config_Pulsar with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.
func (*Config_Pulsar) ValidateAll ¶
func (m *Config_Pulsar) ValidateAll() error
ValidateAll checks the field values on Config_Pulsar with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in Config_PulsarMultiError, or nil if none found.
type Config_PulsarMultiError ¶
type Config_PulsarMultiError []error
Config_PulsarMultiError is an error wrapping multiple validation errors returned by Config_Pulsar.ValidateAll() if the designated constraints aren't met.
func (Config_PulsarMultiError) AllErrors ¶
func (m Config_PulsarMultiError) AllErrors() []error
AllErrors returns a list of validation violation errors.
func (Config_PulsarMultiError) Error ¶
func (m Config_PulsarMultiError) Error() string
Error returns a concatenation of all the error messages it wraps.
type Config_PulsarValidationError ¶
type Config_PulsarValidationError struct {
// contains filtered or unexported fields
}
Config_PulsarValidationError is the validation error returned by Config_Pulsar.Validate if the designated constraints aren't met.
func (Config_PulsarValidationError) Cause ¶
func (e Config_PulsarValidationError) Cause() error
Cause function returns cause value.
func (Config_PulsarValidationError) Error ¶
func (e Config_PulsarValidationError) Error() string
Error satisfies the builtin error interface
func (Config_PulsarValidationError) ErrorName ¶
func (e Config_PulsarValidationError) ErrorName() string
ErrorName returns error name.
func (Config_PulsarValidationError) Field ¶
func (e Config_PulsarValidationError) Field() string
Field function returns field value.
func (Config_PulsarValidationError) Key ¶
func (e Config_PulsarValidationError) Key() bool
Key function returns key value.
func (Config_PulsarValidationError) Reason ¶
func (e Config_PulsarValidationError) Reason() string
Reason function returns reason value.
type Consumer ¶
type ConsumerFactoryServer ¶
type ConsumerFactoryServer struct { *ConsumerMux // contains filtered or unexported fields }
ConsumerFactoryServer resolve LazyConsumer from factory, then wrap as kratos server
func NewConsumerFactoryServer ¶
func NewConsumerFactoryServer(cfg *Config) *ConsumerFactoryServer
type ConsumerHandler ¶
func FilterKey ¶
func FilterKey(key string, handler ConsumerHandler) ConsumerHandler
func NewTransformer ¶
func NewTransformer[T any](t func(context.Context, Event) (T, error), f HandlerOf[T]) ConsumerHandler
NewTransformer wrap handle by transform event to T
func ProtoHandler ¶
func ProtoHandler[T proto.Message](msg T, h HandlerOf[T]) ConsumerHandler
type ConsumerHandlerFunc ¶
type ConsumerHandlerFunc HandlerFuncOf[Event]
type ConsumerMiddlewareFunc ¶
type ConsumerMiddlewareFunc func(ConsumerHandler) ConsumerHandler
func ConsumerChain ¶
func ConsumerChain(m ...ConsumerMiddlewareFunc) ConsumerMiddlewareFunc
func ConsumerRecover ¶
func ConsumerRecover(opt ...RecoverOption) ConsumerMiddlewareFunc
ConsumerRecover prevent consumer from panic
func ConsumerUow ¶
func ConsumerUow(uowMgr uow.Manager) ConsumerMiddlewareFunc
ConsumerUow wrap handler into a unit of work (transaction)
type ConsumerMux ¶
type ConsumerMux struct {
// contains filtered or unexported fields
}
func (*ConsumerMux) Append ¶
func (mux *ConsumerMux) Append(h ConsumerHandler)
Append will append handler into mux,
func (*ConsumerMux) Process ¶
func (mux *ConsumerMux) Process(ctx context.Context, event Event) error
Process call handler one by one until error happens
func (*ConsumerMux) Use ¶
func (mux *ConsumerMux) Use(mws ...ConsumerMiddlewareFunc)
Use appends a ConsumerMiddlewareFunc to the chain. Middlewares are executed in the order that they are applied to the ConsumerMux.
type ConsumerServer ¶
type ConsumerServer struct {
*ConsumerMux
}
func NewConsumerServer ¶
func NewConsumerServer(r Consumer) *ConsumerServer
NewConsumerServer create server from Consumer directly
type Event ¶
func NewMessage ¶
type HandlerFuncOf ¶
type LazyProducer ¶
type LazyProducer func(c *Config) (*ProducerMux, error)
type Producer ¶
type ProducerMiddlewareFunc ¶
func ChainProducer ¶
func ChainProducer(m ...ProducerMiddlewareFunc) ProducerMiddlewareFunc
type ProducerMux ¶
type ProducerMux struct { Producer // contains filtered or unexported fields }
func NewFactoryProducer ¶
func NewFactoryProducer(cfg *Config) (*ProducerMux, error)
func NewProducer ¶
func NewProducer(next Producer) *ProducerMux
NewProducer create a *SendWrap with middleware ability
func (*ProducerMux) BatchSend ¶
func (s *ProducerMux) BatchSend(ctx context.Context, msg []Event) error
func (*ProducerMux) Close ¶
func (s *ProducerMux) Close() error
func (*ProducerMux) Use ¶
func (s *ProducerMux) Use(m ...ProducerMiddlewareFunc)
type RecoverOption ¶
type RecoverOption func(*recoverOptions)
func WithErrorFormatter ¶
func WithErrorFormatter(f ErrFormatFunc) RecoverOption
func WithLogger ¶
func WithLogger(logger klog.Logger) RecoverOption