Documentation
¶
Overview ¶
Package session defines an abstraction of a session during a distributed RPC.
During a stream-based distributed RPC in minogrpc, the stream is kept alive during the whole protocol to act as a health check so that resources can be cleaned eventually, or if something goes wrong. The session manages this state while also managing the relays to other participants that the node must forward the messages to. Basically, a session has one or several relays open to the parent nodes and zero, one or multiple relays to other participants depending on the routing of the messages.
The package implements a unicast and a stream relay. Stream relay is only used when the orchestrator of a protocol is connecting to the first participant. Unicast is then used so that the sender of a message can receive feedbacks on the status of the message.
Documentation Last Review: 07.10.20202
Index ¶
Constants ¶
const HandshakeKey = "handshake"
HandshakeKey is the key to the handshake store in the headers.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Address ¶
type Address struct {
// contains filtered or unexported fields
}
Address is a representation of the network Address of a participant. The overlay implementation requires a difference between an orchestrator and its source address, where the former initiates a protocol and the latter participates.
See session.wrapAddress for the abstraction provided to a caller external to the overlay module.
- implements mino.Address
func NewAddressFromURL ¶
NewAddressFromURL creates a new address given a URL.
func NewOrchestratorAddress ¶
NewOrchestratorAddress creates a new address which will be considered as the initiator of a protocol.
func (Address) ConnectionType ¶
func (a Address) ConnectionType() mino.AddressConnectionType
ConnectionType returns how to connect to the other host
func (Address) Equal ¶
Equal implements 'mino.Address'. It returns true if both addresses are exactly similar, in the sense that an orchestrator won't match a follower address with the same host.
func (Address) GetDialAddress ¶
GetDialAddress returns a string formatted to be understood by grpc.Dial() functions.
func (Address) GetHostname ¶
GetHostname parses the address to extract the hostname.
func (Address) MarshalText ¶
MarshalText implements mino.Address. It returns the text format of the address that can later be deserialized.
type ConnectionManager ¶
type ConnectionManager interface { Len() int Acquire(mino.Address) (grpc.ClientConnInterface, error) Release(mino.Address) }
ConnectionManager is an interface required by the session to open and release connections to the relays.
type NonBlockingQueue ¶
NonBlockingQueue is an implementation of a queue that makes sure pushing a message will never hang. The queue will fill a buffer if the channel is not drained and will drop messages when the limit is reached.
- implements session.Queue
func (*NonBlockingQueue) Channel ¶
func (q *NonBlockingQueue) Channel() <-chan router.Packet
Channel implements session.Queue. It returns a channel that will be populated with incoming messages. The queue uses a buffer when the channel is busy therefore this channel should listened to as much as possible to drain the messages. At some point when the size of the buffer reaches a limit, messages will be dropped.
type PacketStream ¶
type PacketStream interface { Context() context.Context Send(*ptypes.Packet) error Recv() (*ptypes.Packet, error) }
PacketStream is a gRPC stream to send and receive protobuf packets.
type Relay ¶
type Relay interface { // GetDistantAddress returns the address of the peer at the other end of the // relay. GetDistantAddress() mino.Address // Stream returns the stream that is holding the relay. Stream() PacketStream // Send sends a packet through the relay. Send(ctx context.Context, p router.Packet) (*ptypes.Ack, error) // Close closes the relay and clean the resources. Close() error }
Relay is the interface of the relays spawn by the session when trying to contact a child node.
func NewRelay ¶
func NewRelay( stream PacketStream, gw mino.Address, ctx serde.Context, conn grpc.ClientConnInterface, md metadata.MD, ) Relay
NewRelay returns a new relay that will send messages to the gateway through unicast requests.
func NewStreamRelay ¶
NewStreamRelay creates a new relay that will send the packets through the stream.
type Session ¶
type Session interface { mino.Sender mino.Receiver // GetNumParents returns the number of active parents for the session. GetNumParents() int // Listen takes a stream that will determine when to close the session. Listen(parent Relay, table router.RoutingTable, ready chan struct{}) // SetPassive sets a new passive parent. A passive parent is part of the // parent relays, but the stream does not listen to, and thus it is not // removed from the map if it closed. SetPassive(parent Relay, table router.RoutingTable) // Close shutdowns the session so that future calls to receive will return // an error. Close() // RecvPacket takes a packet and the address of the distant peer that have // sent it, then pass it to the correct relay according to the routing // table. RecvPacket(from mino.Address, p *ptypes.Packet) (*ptypes.Ack, error) }
Session is an interface for a stream session that allows to send messages to the parent and relays, while receiving the ones for the local address.
func NewSession ¶
func NewSession( md metadata.MD, me mino.Address, msgFac serde.Factory, pktFac router.PacketFactory, ctx serde.Context, connMgr ConnectionManager, ) Session
NewSession creates a new session for the provided parent relay.