Documentation
¶
Index ¶
- Constants
- type ChannelDeMultiplexer
- type Comm
- type CommConfig
- type ConnConfig
- type ReceivedMessageImpl
- func (rmi *ReceivedMessageImpl) Ack(err error)
- func (rmi *ReceivedMessageImpl) GetConnectionInfo() *protoext.ConnectionInfo
- func (rmi *ReceivedMessageImpl) GetEnvelope() *pbgossip.Envelope
- func (rmi *ReceivedMessageImpl) GetSignedGossipMessage() *protoext.SignedGossipMessage
- func (rmi *ReceivedMessageImpl) Respond(msg *pbgossip.GossipMessage)
- type SendResult
- type SendResults
Constants ¶
View Source
const ( DefaultDialTimeout = 3 * time.Second DefaultConnTimeout = 2 * time.Second DefaultRecvBuffSize = 20 DefaultSendBuffSize = 20 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelDeMultiplexer ¶
type ChannelDeMultiplexer struct {
// contains filtered or unexported fields
}
ChannelDeMultiplexer 是一个通道多路复用器,可以注册通道(AddChannel),同时可以根据注册时 定义的谓词(common.MessageAcceptor)将消息发送到对应通道(DeMultiplex)。
func NewChannelDeMultiplexer ¶
func NewChannelDeMultiplexer() *ChannelDeMultiplexer
func (*ChannelDeMultiplexer) AddChannel ¶
func (cdm *ChannelDeMultiplexer) AddChannel(pred common.MessageAcceptor) <-chan interface{}
AddChannel 方法用于向多路复用器注册一个通道。如果多路复用器已经停止(isStopped() 返回 true), 则返回一个被关闭的通道,以防止外部接收者一直等待。否则,创建一个带有缓冲区的通道(bidirectionalCh), 将其注册到 channels 切片中,并返回该通道。
func (*ChannelDeMultiplexer) DeMultiplex ¶
func (cdm *ChannelDeMultiplexer) DeMultiplex(msg interface{})
DeMultiplex 方法用于接收一个消息,并根据注册的谓词将消息发送到相应的通道。如果多路复用器已 经停止(isStopped() 返回 true),则直接返回。否则,遍历 channels 切片,对每个通道,如果谓词判 断该消息应该发送到该通道,则将消息发送到通道的 ch 通道中。
func (*ChannelDeMultiplexer) Stop ¶
func (cdm *ChannelDeMultiplexer) Stop()
Stop 方法用于停止多路复用器的操作。首先,检查 stopCh 通道是否已经关闭,如果已经关闭,则直接返回。否则, 关闭 stopCh 通道,等待所有的 DeMultiplex 调用完成,然后锁定互斥锁,关闭所有已注册通道的 ch 通道,并清 空 channels 切片。
type Comm ¶
type Comm interface { GetPKIid() common.PKIid Send(msg *protoext.SignedGossipMessage, peers ...*discovery.NetworkMember) SendWithAck(msg *protoext.SignedGossipMessage, timeout time.Duration, minAck int, peers ...*discovery.NetworkMember) SendResults // Probe 探测远程节点,如果有响应则返回 nil,如果没有响应则返回错误信息。 Probe(peer *discovery.NetworkMember) error Handshake(peer *discovery.NetworkMember) (api.PeerIdentity, error) Accept(common.MessageAcceptor) <-chan protoext.ReceivedMessage PresumedDead() <-chan common.PKIid IdentitySwitch() <-chan common.PKIid CloseConn(peer *discovery.NetworkMember) SetLogger(logger *hlogging.HyperchainLogger) Stop() }
func NewCommInstance ¶
func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper, peerIdentity api.PeerIdentity, secureDialOpts api.PeerSecureDialOpts, sa api.SecurityAdvisor, commMetrics *metrics.CommMetrics, config CommConfig, dialOpts ...grpc.DialOption) (Comm, error)
type CommConfig ¶
type ConnConfig ¶
type ReceivedMessageImpl ¶
type ReceivedMessageImpl struct {
// contains filtered or unexported fields
}
func (*ReceivedMessageImpl) Ack ¶
func (rmi *ReceivedMessageImpl) Ack(err error)
func (*ReceivedMessageImpl) GetConnectionInfo ¶
func (rmi *ReceivedMessageImpl) GetConnectionInfo() *protoext.ConnectionInfo
func (*ReceivedMessageImpl) GetEnvelope ¶
func (rmi *ReceivedMessageImpl) GetEnvelope() *pbgossip.Envelope
func (*ReceivedMessageImpl) GetSignedGossipMessage ¶
func (rmi *ReceivedMessageImpl) GetSignedGossipMessage() *protoext.SignedGossipMessage
func (*ReceivedMessageImpl) Respond ¶
func (rmi *ReceivedMessageImpl) Respond(msg *pbgossip.GossipMessage)
type SendResult ¶
type SendResult struct { discovery.NetworkMember // contains filtered or unexported fields }
type SendResults ¶
type SendResults []SendResult
func (SendResults) AckCount ¶
func (srs SendResults) AckCount() int
func (SendResults) NackCount ¶
func (srs SendResults) NackCount() int
func (SendResults) String ¶
func (srs SendResults) String() string
Click to show internal directories.
Click to hide internal directories.