Documentation
¶
Index ¶
- func GetKafkaAggregateTypeTopic(cfg KafkaEventsBusConfig, aggregateType string) kafka.TopicConfig
- func GetTopicName(eventStorePrefix string, aggregateType string) string
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) Apply(event any) error
- func (a *AggregateBase) ClearChanges()
- func (a *AggregateBase) GetChanges() []any
- func (a *AggregateBase) GetID() string
- func (a *AggregateBase) GetType() AggregateType
- func (a *AggregateBase) GetVersion() uint64
- func (a *AggregateBase) Load(events []any) error
- func (a *AggregateBase) RaiseEvent(event any) error
- func (a *AggregateBase) SetID(id string) *AggregateBase
- func (a *AggregateBase) SetType(aggregateType AggregateType)
- func (a *AggregateBase) String() string
- func (a *AggregateBase) ToSnapshot()
- type AggregateRoot
- type AggregateType
- type Apply
- type Event
- func (e *Event) GetAggregateID() string
- func (e *Event) GetAggregateType() AggregateType
- func (e *Event) GetData() []byte
- func (e *Event) GetEventID() string
- func (e *Event) GetEventType() EventType
- func (e *Event) GetJsonData(data interface{}) error
- func (e *Event) GetJsonMetadata(metaData interface{}) error
- func (e *Event) GetMetadata() []byte
- func (e *Event) GetString() string
- func (e *Event) GetTimeStamp() time.Time
- func (e *Event) GetVersion() uint64
- func (e *Event) SetAggregateType(aggregateType AggregateType)
- func (e *Event) SetData(data []byte) *Event
- func (e *Event) SetJsonData(data interface{}) error
- func (e *Event) SetMetadata(metaData interface{}) error
- func (e *Event) SetVersion(aggregateVersion uint64)
- func (e *Event) String() string
- type EventType
- type KafkaEventsBus
- type KafkaEventsBusConfig
- type Load
- type RaiseEvent
- type When
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetKafkaAggregateTypeTopic ¶
func GetKafkaAggregateTypeTopic(cfg KafkaEventsBusConfig, aggregateType string) kafka.TopicConfig
func GetTopicName ¶
Types ¶
type Aggregate ¶
type Aggregate interface { When AggregateRoot RaiseEvent }
type AggregateBase ¶
type AggregateBase struct { ID string Version uint64 Changes []any Type AggregateType // contains filtered or unexported fields }
AggregateBase base aggregate contains all main necessary fields
func NewAggregateBase ¶
func NewAggregateBase(when when) *AggregateBase
NewAggregateBase AggregateBase constructor, contains all main fields and methods, main aggregate must realize When interface and pass as argument to constructor Example of recommended aggregate constructor method:
func NewOrderAggregate() *OrderAggregate { orderAggregate := &OrderAggregate{ Order: models.NewOrder(), } base := es.NewAggregateBase(orderAggregate.When) base.SetType(OrderAggregateType) orderAggregate.AggregateBase = base return orderAggregate }
func (*AggregateBase) Apply ¶
func (a *AggregateBase) Apply(event any) error
Apply push event to aggregate uncommitted events using When method
func (*AggregateBase) ClearChanges ¶
func (a *AggregateBase) ClearChanges()
ClearChanges clear AggregateBase uncommitted Event's
func (*AggregateBase) GetChanges ¶
func (a *AggregateBase) GetChanges() []any
GetChanges get AggregateBase uncommitted Event's
func (*AggregateBase) GetType ¶
func (a *AggregateBase) GetType() AggregateType
GetType get AggregateBase AggregateType
func (*AggregateBase) GetVersion ¶
func (a *AggregateBase) GetVersion() uint64
GetVersion get AggregateBase version
func (*AggregateBase) Load ¶
func (a *AggregateBase) Load(events []any) error
Load add existing events from event store to aggregate using When interface method
func (*AggregateBase) RaiseEvent ¶
func (a *AggregateBase) RaiseEvent(event any) error
RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (*AggregateBase) SetID ¶
func (a *AggregateBase) SetID(id string) *AggregateBase
SetID set AggregateBase ID
func (*AggregateBase) SetType ¶
func (a *AggregateBase) SetType(aggregateType AggregateType)
SetType set AggregateBase AggregateType
func (*AggregateBase) String ¶
func (a *AggregateBase) String() string
func (*AggregateBase) ToSnapshot ¶
func (a *AggregateBase) ToSnapshot()
ToSnapshot prepare AggregateBase for saving Snapshot.
type AggregateRoot ¶
type AggregateRoot interface { GetID() string SetID(id string) *AggregateBase GetType() AggregateType SetType(aggregateType AggregateType) GetChanges() []any ClearChanges() GetVersion() uint64 ToSnapshot() String() string Load Apply RaiseEvent }
AggregateRoot contains all methods of AggregateBase
type Event ¶
type Event struct { EventID string AggregateID string EventType EventType AggregateType AggregateType Version uint64 Data []byte Metadata []byte Timestamp time.Time }
Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.
func NewBaseEvent ¶
NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.
func (*Event) GetAggregateID ¶
GetAggregateID is the AggregateID of the Aggregate that the Event belongs to
func (*Event) GetAggregateType ¶
func (e *Event) GetAggregateType() AggregateType
GetAggregateType is the AggregateType that the Event can be applied to.
func (*Event) GetEventType ¶
GetEventType returns the EventType of the event.
func (*Event) GetJsonData ¶
GetJsonData json unmarshal data attached to the Event.
func (*Event) GetJsonMetadata ¶
GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.
func (*Event) GetMetadata ¶
GetMetadata is app-specific metadata such as request AggregateID, originating user etc.
func (*Event) GetTimeStamp ¶
GetTimeStamp get timestamp of the Event.
func (*Event) GetVersion ¶
GetVersion is the version of the Aggregate after the Event has been applied.
func (*Event) SetAggregateType ¶
func (e *Event) SetAggregateType(aggregateType AggregateType)
SetAggregateType set the AggregateType that the Event can be applied to.
func (*Event) SetJsonData ¶
SetJsonData serialize to json and set data attached to the Event.
func (*Event) SetMetadata ¶
SetMetadata add app-specific metadata serialized as json for the Event.
func (*Event) SetVersion ¶
SetVersion set the version of the Aggregate.
type EventType ¶
type EventType string
EventType is the type of any event, used as its unique identifier.
type KafkaEventsBus ¶
type KafkaEventsBus struct {
// contains filtered or unexported fields
}
func NewKafkaEventsBus ¶
func NewKafkaEventsBus(producer kafkaClient.Producer, cfg KafkaEventsBusConfig) *KafkaEventsBus
NewKafkaEventsBus kafkaEventsBus constructor.
func (*KafkaEventsBus) ProcessEvents ¶
func (e *KafkaEventsBus) ProcessEvents(ctx context.Context, events []Event) error
ProcessEvents serialize to json and publish es.Event's to the kafka topic.
type KafkaEventsBusConfig ¶
type KafkaEventsBusConfig struct { Topic string `mapstructure:"topic" validate:"required"` TopicPrefix string `mapstructure:"topicPrefix" validate:"required"` Partitions int `mapstructure:"partitions" validate:"required,gte=0"` ReplicationFactor int `mapstructure:"replicationFactor" validate:"required,gte=0"` Headers []kafka.Header }
KafkaEventsBusConfig kafka eventbus config.
type RaiseEvent ¶
RaiseEvent process applied Aggregate Event from event store