Documentation
¶
Index ¶
- Constants
- Variables
- func Consume(ctx context.Context, topic string, since time.Time, ...)
- func InitializeKafkaBroker(ctx context.Context) error
- func InitializeStubBroker(bufferSize int) error
- func InitializeTopicRequests(ctx context.Context)
- func Send(ctx context.Context, messages ...*GenericMessage) error
- type AvailabilityStatusMessage
- type Broker
- type GenericHeader
- type GenericMessage
- type NativeMessage
- type NotificationContext
- type NotificationError
- type NotificationEvent
- type NotificationMessage
- type SourceResult
- type StatusType
Constants ¶
View Source
const ( NotificationSuccessEventType = "launch-success" NotificationFailureEventType = "launch-failed" )
Variables ¶
View Source
var ( ErrDifferentTopic = errors.New("messages in batch have different topics") ErrUnknownSASLMechanism = errors.New("unknown SASL mechanism") )
View Source
var ( AvailabilityStatusRequestTopic string SourcesStatusTopic string NotificationTopic string )
topics after clowder mapping
Functions ¶
func InitializeKafkaBroker ¶
func InitializeStubBroker ¶
func InitializeTopicRequests ¶
InitializeTopicRequests performs clowder mapping of topics.
Types ¶
type AvailabilityStatusMessage ¶
type AvailabilityStatusMessage struct {
SourceID string `json:"source_id"`
}
func NewAvailabilityStatusMessage ¶
func NewAvailabilityStatusMessage(msg *GenericMessage) (*AvailabilityStatusMessage, error)
func (AvailabilityStatusMessage) GenericMessage ¶
func (m AvailabilityStatusMessage) GenericMessage(ctx context.Context) (GenericMessage, error)
type Broker ¶
type Broker interface { // Send one or more messages to the kafka Send(ctx context.Context, messages ...*GenericMessage) error // Consume messages of a single topic in a loop. Blocking call, use context cancellation to stop. Consume(ctx context.Context, topic string, since time.Time, handler func(ctx context.Context, message *GenericMessage)) }
func NewStubBroker ¶
type GenericHeader ¶
func GenericHeaders ¶
func GenericHeaders(args ...string) []GenericHeader
GenericHeaders returns slice of headers
type GenericMessage ¶
type GenericMessage struct { // Topic of the message. Some producers already have associated topic, in that case Topic from the message will be ignored. Topic string // Key is used for topic partitioning. Can be nil. Key []byte // Value is the payload. Typically, a JSON marshaled data. Value []byte // List of key-value pairs for each message. Headers []GenericHeader }
GenericMessage is a platform independent message.
func NewMessageFromKafka ¶
func NewMessageFromKafka(km *kafka.Message) *GenericMessage
NewMessageFromKafka converts generic message to native message
func (GenericMessage) Header ¶
func (m GenericMessage) Header(name string) string
func (GenericMessage) KafkaMessage ¶
func (m GenericMessage) KafkaMessage() kafka.Message
KafkaMessage converts from generic to native message.
type NativeMessage ¶
type NativeMessage interface { // GenericMessage returns a generic message that is platform independent. GenericMessage(ctx context.Context) (GenericMessage, error) }
NativeMessage represents a native (kafka) message. It can be converted to GenericMessage.
type NotificationContext ¶
type NotificationError ¶
type NotificationError struct {
Error string `json:"error"`
}
type NotificationEvent ¶
type NotificationEvent struct {
Payload json.RawMessage `json:"payload"`
}
type NotificationMessage ¶
type NotificationMessage struct { Version string `json:"version"` Bundle string `json:"bundle"` Application string `json:"application"` EventType string `json:"event_type"` Timestamp string `json:"timestamp"` AccountID string `json:"account_id"` OrgId string `json:"org_id"` Context interface{} `json:"context"` Events []NotificationEvent `json:"events"` Recipients []notificationRecipients `json:"recipients"` ID string `json:"id"` }
func (NotificationMessage) GenericMessage ¶
func (m NotificationMessage) GenericMessage(ctx context.Context) (GenericMessage, error)
type SourceResult ¶
type SourceResult struct { MessageContext context.Context `json:"-"` // Carries logger and identity ResourceID string `json:"resource_id"` ResourceType string `json:"resource_type"` Status StatusType `json:"status"` UserError string `json:"error"` Err error `json:"-"` // Sources do not support error field MissingPermissions []string `json:"-"` // Sources do not support reason field }
func (SourceResult) GenericMessage ¶
func (sr SourceResult) GenericMessage(ctx context.Context) (GenericMessage, error)
type StatusType ¶
type StatusType string
const ( StatusAvailable StatusType = "available" )
func (StatusType) String ¶
func (st StatusType) String() string
Click to show internal directories.
Click to hide internal directories.