base

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2022 License: MIT Imports: 7 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSequenceNotFound = errors.New("sequence not found")
)

Functions

func EventStreamSampleGroup

func EventStreamSampleGroup(factory func() EventStream)

Types

type Bracket

type Bracket struct {
	NextSequence int64
	LastSequence int64
}

func All

func All() Bracket

func From

func From(next int64) Bracket

func Range

func Range(next, last int64) Bracket

func (*Bracket) Sanitize added in v0.1.2

func (s *Bracket) Sanitize(lastSequence int64)

type Event

type Event struct {
	Sequence   int64                  `json:"sequence,omitempty" yaml:"sequence,omitempty"`
	Aggregate  []string               `json:"aggregate,omitempty" yaml:"aggregate,omitempty"`
	Type       string                 `json:"type,omitempty" yaml:"type,omitempty"`
	OccurredAt time.Time              `json:"occurred_at,omitempty" yaml:"occurred_at,omitempty"`
	Payload    map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"`
}

type EventBuilder

type EventBuilder = func(e *Event)

type EventHandler

type EventHandler func(e *Event) error

type EventStore added in v0.1.3

type EventStore interface {
	Store(event *Event) (int64, error)
	LastKnownSequence() int64
	Read(sequence int64) (*Event, error)
	ReadAll(ctx context.Context, sel Selector, bracket Bracket, handler EventHandler) error
}

type EventStream

type EventStream interface {
	Emit(event *Event) (int64, error)
	LastSequence() int64
	Get(sequence int64) (*Event, error)
	Stream(ctx context.Context, sel Selector, bracket Bracket, handler EventHandler) error
	Listen(ctx context.Context, sel Selector, handler EventHandler) error
	Subscribe(ctx context.Context, persistentClientID string, sel Selector, handler EventHandler) (Subscription, error)
	Acknowledge(persistentClientID string, sequence int64) error
	// Subscriptions returns all currently known Subscriptions.
	Subscriptions() []Subscription
}

type EventStreamWrapper

type EventStreamWrapper struct {
	// contains filtered or unexported fields
}

func NewWrapper

func NewWrapper(stream EventStream) *EventStreamWrapper

func NewWrapperWithStartTime

func NewWrapperWithStartTime(stream EventStream, startTime time.Time) *EventStreamWrapper

func (*EventStreamWrapper) After

func (s *EventStreamWrapper) After(duration time.Duration) EventBuilder

func (*EventStreamWrapper) Agg

func (s *EventStreamWrapper) Agg(a ...string) EventBuilder

func (*EventStreamWrapper) DefAgg

func (s *EventStreamWrapper) DefAgg() EventBuilder

func (*EventStreamWrapper) Emit

func (s *EventStreamWrapper) Emit(builders ...EventBuilder) (*Event, error)

func (*EventStreamWrapper) IncrBy

func (s *EventStreamWrapper) IncrBy(duration time.Duration) EventBuilder

func (*EventStreamWrapper) Stream

func (s *EventStreamWrapper) Stream() EventStream

func (*EventStreamWrapper) Type

type SelectOption

type SelectOption func(s *Selector)

func SelectAggregate

func SelectAggregate(agg ...string) SelectOption

func SelectType

func SelectType(t string) SelectOption

type Selector

type Selector struct {
	Aggregate []string
	Type      string
}

func ParseSelector

func ParseSelector(s string) (*Selector, error)

func Select

func Select(options ...SelectOption) Selector

func (*Selector) IsComplete

func (s *Selector) IsComplete() bool

func (*Selector) Matches

func (s *Selector) Matches(event *Event) bool

type SequenceStore

type SequenceStore interface {
	Get(persistentClientID string) (int64, error)
	Store(persistentClientID string, sequence int64) error
}

type Subscription

type Subscription interface {
	PersistentID() string
	// ActiveSelector returns the currently active Selector.
	ActiveSelector() Selector
	LastAcknowledgedSequence() (int64, error)
	Acknowledge(sequence int64) error
	// Active returns whether this Subscription is currently active.
	Active() bool
	// InactiveSince returns the time this Subscription last became inactive.
	InactiveSince() time.Time
	// Wait for the Subscription to become inactive (disconnected)
	Wait() error
	// DropOuts returns how often this Subscription has dropped out of the live stream.
	DropOuts() int
	// Shutdown closes this Subscription and removes all associated state. The Subscription can not be resumed after this call.
	Shutdown()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳