es

package
v0.0.0-...-d7d3ec7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetKafkaAggregateTypeTopic

func GetKafkaAggregateTypeTopic(cfg KafkaEventsBusConfig, aggregateType string) kafka.TopicConfig

func GetTopicName

func GetTopicName(eventStorePrefix string, aggregateType string) string

Types

type Aggregate

type Aggregate interface {
	When
	AggregateRoot
	RaiseEvent
}

type AggregateBase

type AggregateBase struct {
	ID      string
	Version uint64
	Changes []any
	Type    AggregateType
	// contains filtered or unexported fields
}

AggregateBase base aggregate contains all main necessary fields

func NewAggregateBase

func NewAggregateBase(when when) *AggregateBase

NewAggregateBase AggregateBase constructor, contains all main fields and methods, main aggregate must realize When interface and pass as argument to constructor Example of recommended aggregate constructor method:

func NewOrderAggregate() *OrderAggregate {
	orderAggregate := &OrderAggregate{
		Order: models.NewOrder(),
	}
	base := es.NewAggregateBase(orderAggregate.When)
	base.SetType(OrderAggregateType)
	orderAggregate.AggregateBase = base
	return orderAggregate
}

func (*AggregateBase) Apply

func (a *AggregateBase) Apply(event any) error

Apply push event to aggregate uncommitted events using When method

func (*AggregateBase) ClearChanges

func (a *AggregateBase) ClearChanges()

ClearChanges clear AggregateBase uncommitted Event's

func (*AggregateBase) GetChanges

func (a *AggregateBase) GetChanges() []any

GetChanges get AggregateBase uncommitted Event's

func (*AggregateBase) GetID

func (a *AggregateBase) GetID() string

GetID get AggregateBase ID

func (*AggregateBase) GetType

func (a *AggregateBase) GetType() AggregateType

GetType get AggregateBase AggregateType

func (*AggregateBase) GetVersion

func (a *AggregateBase) GetVersion() uint64

GetVersion get AggregateBase version

func (*AggregateBase) Load

func (a *AggregateBase) Load(events []any) error

Load add existing events from event store to aggregate using When interface method

func (*AggregateBase) RaiseEvent

func (a *AggregateBase) RaiseEvent(event any) error

RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string) *AggregateBase

SetID set AggregateBase ID

func (*AggregateBase) SetType

func (a *AggregateBase) SetType(aggregateType AggregateType)

SetType set AggregateBase AggregateType

func (*AggregateBase) String

func (a *AggregateBase) String() string

func (*AggregateBase) ToSnapshot

func (a *AggregateBase) ToSnapshot()

ToSnapshot prepare AggregateBase for saving Snapshot.

type AggregateRoot

type AggregateRoot interface {
	GetID() string
	SetID(id string) *AggregateBase
	GetType() AggregateType
	SetType(aggregateType AggregateType)
	GetChanges() []any
	ClearChanges()
	GetVersion() uint64
	ToSnapshot()
	String() string
	Load
	Apply
	RaiseEvent
}

AggregateRoot contains all methods of AggregateBase

type AggregateType

type AggregateType string

AggregateType type of the Aggregate

type Apply

type Apply interface {
	Apply(event any) error
}

Apply process Aggregate Event

type Event

type Event struct {
	EventID       string
	AggregateID   string
	EventType     EventType
	AggregateType AggregateType
	Version       uint64
	Data          []byte
	Metadata      []byte
	Timestamp     time.Time
}

Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.

func NewBaseEvent

func NewBaseEvent(aggregate Aggregate, eventType EventType) Event

NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.

func NewEvent

func NewEvent(aggregate Aggregate, eventType EventType, data []byte, metadata []byte) Event

func (*Event) GetAggregateID

func (e *Event) GetAggregateID() string

GetAggregateID is the AggregateID of the Aggregate that the Event belongs to

func (*Event) GetAggregateType

func (e *Event) GetAggregateType() AggregateType

GetAggregateType is the AggregateType that the Event can be applied to.

func (*Event) GetData

func (e *Event) GetData() []byte

GetData The data attached to the Event serialized to bytes.

func (*Event) GetEventID

func (e *Event) GetEventID() string

GetEventID get EventID of the Event.

func (*Event) GetEventType

func (e *Event) GetEventType() EventType

GetEventType returns the EventType of the event.

func (*Event) GetJsonData

func (e *Event) GetJsonData(data interface{}) error

GetJsonData json unmarshal data attached to the Event.

func (*Event) GetJsonMetadata

func (e *Event) GetJsonMetadata(metaData interface{}) error

GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.

func (*Event) GetMetadata

func (e *Event) GetMetadata() []byte

GetMetadata is app-specific metadata such as request AggregateID, originating user etc.

func (*Event) GetString

func (e *Event) GetString() string

GetString A string representation of the Event.

func (*Event) GetTimeStamp

func (e *Event) GetTimeStamp() time.Time

GetTimeStamp get timestamp of the Event.

func (*Event) GetVersion

func (e *Event) GetVersion() uint64

GetVersion is the version of the Aggregate after the Event has been applied.

func (*Event) SetAggregateType

func (e *Event) SetAggregateType(aggregateType AggregateType)

SetAggregateType set the AggregateType that the Event can be applied to.

func (*Event) SetData

func (e *Event) SetData(data []byte) *Event

SetData add the data attached to the Event serialized to bytes.

func (*Event) SetJsonData

func (e *Event) SetJsonData(data interface{}) error

SetJsonData serialize to json and set data attached to the Event.

func (*Event) SetMetadata

func (e *Event) SetMetadata(metaData interface{}) error

SetMetadata add app-specific metadata serialized as json for the Event.

func (*Event) SetVersion

func (e *Event) SetVersion(aggregateVersion uint64)

SetVersion set the version of the Aggregate.

func (*Event) String

func (e *Event) String() string

type EventType

type EventType string

EventType is the type of any event, used as its unique identifier.

type KafkaEventsBus

type KafkaEventsBus struct {
	// contains filtered or unexported fields
}

func NewKafkaEventsBus

func NewKafkaEventsBus(producer kafkaClient.Producer, cfg KafkaEventsBusConfig) *KafkaEventsBus

NewKafkaEventsBus kafkaEventsBus constructor.

func (*KafkaEventsBus) ProcessEvents

func (e *KafkaEventsBus) ProcessEvents(ctx context.Context, events []Event) error

ProcessEvents serialize to json and publish es.Event's to the kafka topic.

type KafkaEventsBusConfig

type KafkaEventsBusConfig struct {
	Topic             string `mapstructure:"topic" validate:"required"`
	TopicPrefix       string `mapstructure:"topicPrefix" validate:"required"`
	Partitions        int    `mapstructure:"partitions" validate:"required,gte=0"`
	ReplicationFactor int    `mapstructure:"replicationFactor" validate:"required,gte=0"`
	Headers           []kafka.Header
}

KafkaEventsBusConfig kafka eventbus config.

type Load

type Load interface {
	Load(events []any) error
}

Load create Aggregate state from Event's.

type RaiseEvent

type RaiseEvent interface {
	RaiseEvent(event any) error
}

RaiseEvent process applied Aggregate Event from event store

type When

type When interface {
	When(event any) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳