Documentation
¶
Overview ¶
Package pubsubkit provides helper to interact with GCP PubSub
Index ¶
- Variables
- func NewPubSubClient(projectID string, opts ...option.ClientOption) (*pubsub.Client, error)
- func NewPubSubSubscription(client *pubsub.Client, subID string, cfg pubsub.SubscriptionConfig, ...) (*pubsub.Subscription, error)
- func NewPubSubTopic(client *pubsub.Client, topicID string, cfg *pubsub.TopicConfig, opts ...Option) (*pubsub.Topic, error)
- func ReceiveSubscription(ctx context.Context, sub *pubsub.Subscription, handler WorkerHandlerFunc, ...) (err error)
- type Message
- type Option
- type Options
- type WorkerHandlerFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSubscriptionNotFound ... ErrSubscriptionNotFound = errors.New("pubsub subscription doesn't exists") // ErrTopicNotFound ... ErrTopicNotFound = errors.New("pubsub topic doesn't exists") )
var ( // ErrInvalidSubscription returned when try to receive message from subscription `nil`. ErrInvalidSubscription = errors.New("pubsubkit: subscription cannot be nil") // ErrInvalidSubscriptionHandler returned when try to receive message from subscription using nil handler. ErrInvalidSubscriptionHandler = errors.New("pubsubkit: handler cannot be nil") )
Functions ¶
func NewPubSubClient ¶
NewPubSubClient returns new PubSub client in 5s timeout.
func NewPubSubSubscription ¶
func NewPubSubSubscription( client *pubsub.Client, subID string, cfg pubsub.SubscriptionConfig, opts ...Option, ) (*pubsub.Subscription, error)
NewPubSubSubscription returns new PubSub topic subscriber in 5s timeout.
func NewPubSubTopic ¶
func NewPubSubTopic( client *pubsub.Client, topicID string, cfg *pubsub.TopicConfig, opts ...Option, ) (*pubsub.Topic, error)
NewPubSubTopic returns new PubSub topic publisher in 5s timeout.
func ReceiveSubscription ¶
func ReceiveSubscription( ctx context.Context, sub *pubsub.Subscription, handler WorkerHandlerFunc, opts ...Option, ) (err error)
ReceiveSubscription blocks to receive messages from pubsub subscription Call with goroutine if you'd like to do something else in the meantime.
go func() { if err := pubsubkit.ReceiveSubscription(...); err != nil { // handle error } }()
It will `Nack()` message when handler returns error & DLT found `Ack()` when handler is success, or error with DLT not found it also logs the process using `toolkit/log` package.
Types ¶
type Message ¶
type Message interface { // ID identifies this message. // This ID is assigned by the server and is populated for Messages obtained from a subscription. // This field is read-only. ID() string // Data is the actual data in the message. Data() []byte // Attributes represents the key-value pairs the current message // is labelled with. Attributes() map[string]string // The time at which the message was published. // This is populated by the server for Messages obtained from a subscription. // This field is read-only. PublishTime() time.Time // DeliveryAttempt is the number of times a message has been delivered. // This is part of the dead lettering feature that forwards messages that // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. // If dead lettering is enabled, this will be set on all attempts, starting // with value 1. Otherwise, the value will be nil. // This field is read-only. DeliveryAttempt() int // DLTSupported defines wether message's topic has DLT nor not DLTSupported() bool // OrderingKey identifies related messages for which publish order should // be respected. If empty string is used, message will be sent unordered. OrderingKey() string }
Message wraps *pubsub.Message without the Nack() & Ack() handler it is designed as parameter to `WorkerHandlerFunc`.
type Option ¶
type Option func(*Options)
Option sets options for connect pubsub.
func WithAutoCreate ¶
func WithAutoCreate() Option
WithAutoCreate will create pubsub resource when it's not exists yet.
func WithoutCheckExistance ¶
func WithoutCheckExistance() Option
WithoutCheckExistance bypass pubsub resource existence validations.