Documentation
¶
Index ¶
- Variables
- func GetTopicName(networkID uint32, chainID string, eventType EventType) string
- func NewMessage(id string, chainID string, body []byte, timestamp int64, nanosecond int64) services.Consumable
- func NewMessageWithKafka(id string, chainID string, body []byte, timestamp int64, nanosecond int64, ...) services.Consumable
- func NewProducerCChain() utils.ListenCloserFactory
- type BufferContainer
- type EventType
- type Message
- type Processor
- type ProcessorDB
- type ProcessorFactory
- type ProcessorFactoryChainDB
- type ProcessorFactoryInstDB
- type ProcessorManager
- type Producer
- type ProducerCChain
- type TracerParam
- type WorkPacket
- type WorkPacketCChain
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnknownProcessorType is returned when encountering a client type with no // known implementation ErrUnknownProcessorType = errors.New("unknown processor type") // ErrNoMessage is no message ErrNoMessage = errors.New("no message") )
Functions ¶
func NewMessage ¶
func NewMessageWithKafka ¶
func NewMessageWithKafka(id string, chainID string, body []byte, timestamp int64, nanosecond int64, kafkaMessage *kafkaMessage.Message, ) services.Consumable
func NewProducerCChain ¶
func NewProducerCChain() utils.ListenCloserFactory
Types ¶
type BufferContainer ¶
type BufferContainer struct {
// contains filtered or unexported fields
}
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is a message on the event stream
func (*Message) KafkaMessage ¶
func (m *Message) KafkaMessage() *kafkaMessage.Message
func (*Message) Nanosecond ¶
type Processor ¶
type Processor interface { ProcessNextMessage() error Close() error Failure() Success() ID() string }
Processor handles writing and reading to/from the event stream
type ProcessorDB ¶
type ProcessorFactory ¶
type ProcessorFactory func(*services.Control, cfg.Config, string, string, int, int) (Processor, error)
ProcessorFactory takes in configuration and returns a stream Processor
type ProcessorFactoryChainDB ¶
type ProcessorFactoryChainDB func(*services.Control, cfg.Config, string, string) (ProcessorDB, error)
func NewConsumerDBFactory ¶
func NewConsumerDBFactory(factory serviceConsumerFactory, eventType EventType) ProcessorFactoryChainDB
NewConsumerFactory returns a processorFactory for the given service consumer
type ProcessorFactoryInstDB ¶
func NewConsumerCChainDB ¶
func NewConsumerCChainDB() ProcessorFactoryInstDB
type ProcessorManager ¶
type ProcessorManager struct {
// contains filtered or unexported fields
}
ProcessorManager supervises the Processor lifecycle; it will use the given configuration and ProcessorFactory to keep a Processor active
func NewProcessorManager ¶
func NewProcessorManager(sc *services.Control, conf cfg.Config, factory ProcessorFactory, idx int, maxIdx int) *ProcessorManager
NewProcessorManager creates a new *ProcessorManager ready for listening
func (*ProcessorManager) Close ¶
func (c *ProcessorManager) Close() error
Close tells the workers to shutdown and waits for them to all stop
func (*ProcessorManager) Listen ¶
func (c *ProcessorManager) Listen() error
Listen sets a client to listen for and handle incoming messages
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
producer reads from the socket and writes to the event stream
func NewProducer ¶
func NewProducer(sc *services.Control, conf cfg.Config, _ string, chainID string, eventType EventType) (*Producer, error)
NewProducer creates a producer using the given config
func (*Producer) ProcessNextMessage ¶
ProcessNextMessage takes in a Message from the IPC socket and writes it to the db
type ProducerCChain ¶
type ProducerCChain struct {
// contains filtered or unexported fields
}
func (*ProducerCChain) Failure ¶
func (p *ProducerCChain) Failure()
func (*ProducerCChain) ID ¶
func (p *ProducerCChain) ID() string
func (*ProducerCChain) Listen ¶
func (p *ProducerCChain) Listen() error
func (*ProducerCChain) ProcessNextMessage ¶
func (p *ProducerCChain) ProcessNextMessage() error
func (*ProducerCChain) Success ¶
func (p *ProducerCChain) Success()
type TracerParam ¶
type TracerParam struct {
Tracer string `json:"tracer"`
}
type WorkPacket ¶
type WorkPacket struct {
// contains filtered or unexported fields
}
type WorkPacketCChain ¶
type WorkPacketCChain struct {
// contains filtered or unexported fields
}