sync

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrSnapshot = errors.New("snapshot error")

Functions

This section is empty.

Types

type ErrSync added in v1.5.0

type ErrSync struct {
	Publisher enc.Name
	BootTime  uint64
	Err       error
}

func (*ErrSync) Error added in v1.5.0

func (e *ErrSync) Error() string

func (*ErrSync) Unwrap added in v1.5.0

func (e *ErrSync) Unwrap() error

type SimplePs added in v1.5.0

type SimplePs[V any] struct {
	// contains filtered or unexported fields
}

SimplePs is a simple Pub/Sub system.

func NewSimplePs added in v1.5.0

func NewSimplePs[V any]() SimplePs[V]

func (*SimplePs[V]) HasSub added in v1.5.0

func (ps *SimplePs[V]) HasSub(prefix enc.Name) bool

func (*SimplePs[V]) Publish added in v1.5.0

func (ps *SimplePs[V]) Publish(name enc.Name, data V)

func (*SimplePs[V]) Subs added in v1.5.0

func (ps *SimplePs[V]) Subs(prefix enc.Name) iter.Seq[func(V)]

func (*SimplePs[V]) Subscribe added in v1.5.0

func (ps *SimplePs[V]) Subscribe(prefix enc.Name, callback func(V)) error

func (*SimplePs[V]) Unsubscribe added in v1.5.0

func (ps *SimplePs[V]) Unsubscribe(prefix enc.Name)

type SimplePsSub added in v1.5.0

type SimplePsSub[V any] struct {
	// Prefix is the name prefix to subscribe.
	Prefix enc.Name
	// Callback is the callback function.
	Callback func(V)
}

type Snapshot added in v1.5.0

type Snapshot interface {
	// Snapshot returns the Snapshot trait.
	Snapshot() Snapshot
	// contains filtered or unexported methods
}

type SnapshotNodeHistory added in v1.5.0

type SnapshotNodeHistory struct {
	// Client is the object client.
	Client ndn.Client
	// Threshold is the number of updates before a snapshot is taken.
	Threshold uint64
	// contains filtered or unexported fields
}

SnapshotNodeLatest is a snapshot strategy that assumes that it is not possible to take a snapshot of the application state. Instead, it creates a snapshot of the entire publication history.

This strategy should be used with highly persistent storage, as it will store all publications since the node bootstrapped, and fetch publications from a node's previous instances (bootstraps). To ensure that publications from previous instances are available, the application must use NDN Repo.

func (*SnapshotNodeHistory) Snapshot added in v1.5.0

func (s *SnapshotNodeHistory) Snapshot() Snapshot

func (*SnapshotNodeHistory) String added in v1.5.0

func (s *SnapshotNodeHistory) String() string

type SnapshotNodeLatest added in v1.5.0

type SnapshotNodeLatest struct {
	// Client is the object client.
	Client ndn.Client

	// SnapMe is the callback to get a snapshot of the application state.
	//
	// The state should encode the entire state of the node, and should replace
	// any previous publications completely. If this snapshot is delivered to a
	// node, previous publications will be ignored by the receiving node.
	//
	// The callback is passed the name of the snapshot that will be created.
	SnapMe func(enc.Name) (enc.Wire, error)
	// Threshold is the number of updates before a snapshot is taken.
	Threshold uint64
	// contains filtered or unexported fields
}

SnapshotNodeLatest is a snapshot strategy that takes a snapshot of the application state whenever a certain number of updates have been made.

Each snapshot is treated as self-contained and replaces any previous publications completely. Only the latest (hence the name) snapshot is fetched by other nodes, and previous publications are ignored.

When a node bootstraps again, this strategy assumes that the previous state is now invalid and fetches the latest snapshot.

func (*SnapshotNodeLatest) Snapshot added in v1.5.0

func (s *SnapshotNodeLatest) Snapshot() Snapshot

func (*SnapshotNodeLatest) String added in v1.5.0

func (s *SnapshotNodeLatest) String() string

type SnapshotNull added in v1.5.0

type SnapshotNull struct {
}

SnapshotNull is a non-snapshot strategy.

func (*SnapshotNull) Snapshot added in v1.5.0

func (s *SnapshotNull) Snapshot() Snapshot

type SvMap added in v1.4.3

type SvMap[V any] map[string][]SvMapVal[V]

Map representation of the state vector.

func NewSvMap added in v1.4.3

func NewSvMap[V any](size int) SvMap[V]

Create a new state vector map.

func (SvMap[V]) Encode added in v1.4.3

func (m SvMap[V]) Encode(seq func(V) uint64) *spec_svs.StateVector

Encode the state vector map to a StateVector. seq is the function to get the sequence number

func (SvMap[V]) Get added in v1.4.3

func (m SvMap[V]) Get(hash string, boot uint64) (value V)

Get seq entry for a bootstrap time.

func (SvMap[V]) IsNewerThan added in v1.4.3

func (m SvMap[V]) IsNewerThan(other SvMap[V], cmp func(V, V) bool) bool

Check if a SvMap is newer than another. cmp(a, b) is the function to compare values (a > b).

func (SvMap[V]) Iter added in v1.5.0

func (m SvMap[V]) Iter() iter.Seq2[enc.Name, []SvMapVal[V]]

func (SvMap[V]) Set added in v1.4.3

func (m SvMap[V]) Set(hash string, boot uint64, value V)

type SvMapVal added in v1.5.0

type SvMapVal[V any] struct {
	Boot  uint64
	Value V
}

One entry in the state vector map.

func (*SvMapVal[V]) Cmp added in v1.5.0

func (*SvMapVal[V]) Cmp(a, b SvMapVal[V]) int

type SvSync

type SvSync struct {
	// contains filtered or unexported fields
}

func NewSvSync

func NewSvSync(opts SvSyncOpts) *SvSync

NewSvSync creates a new SV Sync instance.

func (*SvSync) GetBootTime added in v1.4.3

func (s *SvSync) GetBootTime() uint64

func (*SvSync) GetSeqNo

func (s *SvSync) GetSeqNo(name enc.Name) uint64

GetSeqNo returns the sequence number for a name.

func (*SvSync) IncrSeqNo

func (s *SvSync) IncrSeqNo(name enc.Name) uint64

IncrSeqNo increments the sequence number for a name. The instance must only increment sequence numbers for names it owns.

func (*SvSync) SetSeqNo

func (s *SvSync) SetSeqNo(name enc.Name, seqNo uint64) error

SetSeqNo sets the sequence number for a name. The instance must only set sequence numbers for names it owns. The sequence number must be greater than the previous value.

func (*SvSync) Start

func (s *SvSync) Start() (err error)

Start the SV Sync instance.

func (*SvSync) Stop

func (s *SvSync) Stop() error

Stop the SV Sync instance.

func (*SvSync) String added in v1.4.3

func (s *SvSync) String() string

Instance log identifier

type SvSyncOpts added in v1.4.3

type SvSyncOpts struct {
	Client      ndn.Client
	GroupPrefix enc.Name
	OnUpdate    func(SvSyncUpdate)

	InitialState      *spec_svs.StateVector
	BootTime          uint64
	PeriodicTimeout   time.Duration
	SuppressionPeriod time.Duration
}

type SvSyncUpdate

type SvSyncUpdate struct {
	Name enc.Name
	Boot uint64
	High uint64
	Low  uint64
}

type SvsALO added in v1.5.0

type SvsALO struct {
	// contains filtered or unexported fields
}

SvsALO is a Sync Transport with At Least One delivery semantics.

func NewSvsALO added in v1.5.0

func NewSvsALO(opts SvsAloOpts) *SvsALO

NewSvsALO creates a new SvsALO instance.

func (*SvsALO) BootTime added in v1.5.0

func (s *SvsALO) BootTime() uint64

BootTime returns the boot time of the instance.

func (*SvsALO) DataPrefix added in v1.5.0

func (s *SvsALO) DataPrefix() enc.Name

DataPrefix is the data route prefix for this instance.

func (*SvsALO) Publish added in v1.5.0

func (s *SvsALO) Publish(content enc.Wire) (enc.Name, *spec_svs.InstanceState, error)

Publish sends a message to the group

func (*SvsALO) SetOnError added in v1.5.0

func (s *SvsALO) SetOnError(callback func(error))

SetOnError sets the error callback. You can likely cast the received error as SyncError.

SyncError includes the name of the affected publisher and the error. Applications can use this callback to selectively unsubscribe from publishers that are not responding.

func (*SvsALO) SetOnPublisher added in v1.5.0

func (s *SvsALO) SetOnPublisher(callback func(enc.Name))

SetOnPublisher sets the publisher callback.

This will be called when an update from a new publisher is received. This includes both updates for publishers that are already subscribed and other non-subscribed publishers. Applications can use this callback to test the liveness of publishers and selectively subscribe to them.

func (*SvsALO) Start added in v1.5.0

func (s *SvsALO) Start() error

Start starts the SvsALO instance.

func (*SvsALO) Stop added in v1.5.0

func (s *SvsALO) Stop() error

Stop stops the SvsALO instance.

func (*SvsALO) String added in v1.5.0

func (s *SvsALO) String() string

String is the log identifier.

func (*SvsALO) SubscribePublisher added in v1.5.0

func (s *SvsALO) SubscribePublisher(prefix enc.Name, callback func(SvsPub)) error

SubscribePublisher subscribes to all publishers matchin a name prefix. Only one subscriber per prefix is allowed. If the prefix is already subscribed, the callback is replaced.

func (*SvsALO) SyncPrefix added in v1.5.0

func (s *SvsALO) SyncPrefix() enc.Name

SyncPrefix is the sync route prefix for this instance.

func (*SvsALO) UnsubscribePublisher added in v1.5.0

func (s *SvsALO) UnsubscribePublisher(prefix enc.Name)

UnsubscribePublisher unsubscribes removes callbacks added with subscribe. The callback may still receive messages for some time after this call. The application must handle these messages correctly.

type SvsAloOpts added in v1.5.0

type SvsAloOpts struct {
	// Name is the name of this instance producer.
	Name enc.Name
	// Svs is the options for the underlying SVS instance.
	Svs SvSyncOpts
	// Snapshot is the snapshot strategy.
	Snapshot Snapshot
	// InitialState is the initial state of the instance.
	InitialState *spec_svs.InstanceState

	// MaxPipelineSize is the number of objects to fetch
	// concurrently for a single publisher (default 10)
	MaxPipelineSize uint64
}

type SvsPub added in v1.5.0

type SvsPub struct {
	// Publisher that produced the data.
	Publisher enc.Name
	// DataName is the name of the data.
	DataName enc.Name
	// Content of the data publication.
	Content enc.Wire
	// Boot time of the publisher.
	BootTime uint64
	// Sequence number of the publisher.
	SeqNum uint64
	// IsSnapshot is true if this is a snapshot.
	IsSnapshot bool
	// State is the state after this publication is applied.
	State *spec_svs.InstanceState
	// contains filtered or unexported fields
}

SvsPub is the generic received data publication from SVS.

func (*SvsPub) Bytes added in v1.5.0

func (p *SvsPub) Bytes() []byte

Bytes gets the bytes of the data publication.

This will allocate a new byte slice and copy the content. Using Content directly is more efficient whenever possible.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳