messenger

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2021 License: MIT Imports: 8 Imported by: 0

README

messenger

📬 A minimalistic library for abstracting asynchronus messaging

Installation

Adding messenger to your Go module is as easy as calling this command in your project

go get github.com/cozy-hosting/messenger

Usage

Being a minimalistic library, messenger only provides the basics. The rest is up to your specific need.

Create a messenger
msgr, err := messenger.NewRabbitMessenger("amqp://guest:guest@localhost:5672/")
defer service.Close(func(err error) {
    log.Fatal(err)
})
if err != nil {
    log.Fatal(err)
}
Define an exchange
defaultExchange := messenger.NewExchange()
Define a queue
exampleQueue := messenger.NewQueue().Named("example")
Publish a message
helloMessage := messenger.NewMessage("Hello World")

if err := msgr.Publish(defaultExchange, exampleQueue, helloMessage); err != nil {
    log.Fatal(err)
}
Consume messages
helloConsumption := messenger.NewConsumption(func(ctx m.Context) {
    message, err := ctx.GetDelivery().GetMessage()
    if err != err {
        log.Fatal(err)
    }   
    
    log.Println(message)
}).AutoAcknowledge()

free, err := msgr.Consume(defaultExchange, exampleQueue, helloConsumption)
defer free()
if err != nil {
    log.Fatal(err)
}

Future plans

  • Unit tests for the existing components
  • Support for more message queue implementations

Copyright © 2021 - The cozy team & contributors

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel added in v0.2.0

type Channel interface {
	DeclareExchange(exchange Exchange) error

	DeclareQueue(queue Queue, args map[string]interface{}) error
	DeleteQueue(queue Queue) error

	BindQueueToExchange(exchange Exchange, queue Queue) error

	Publish(exchange Exchange, queue Queue, message Message) error
	Consume(exchange Exchange, queue Queue, consumer Consumer) error

	Close() error
}

type Command added in v0.2.0

type Command interface {
	Handle() error
}

type Connection added in v0.2.0

type Connection interface {
	GetChannel() (Channel, error)

	Close() error
}

type Consumer added in v0.2.0

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer added in v0.2.0

func NewConsumer(handler handler) Consumer

func (Consumer) Named added in v0.2.0

func (c Consumer) Named(name string) Consumer

func (Consumer) ShouldAutoAcknowledge added in v0.2.0

func (c Consumer) ShouldAutoAcknowledge() Consumer

type Context

type Context interface {
	GetDelivery() Delivery

	Publish(message Message) error
}

type Delivery added in v0.2.0

type Delivery interface {
	GetMessage() (Message, error)

	Acknowledge() error
	NegativeAcknowledge(requeue bool) error
}

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

func NewExchange

func NewExchange() Exchange

func (Exchange) Named

func (e Exchange) Named(name string) Exchange

func (Exchange) ShouldAutoRemove

func (e Exchange) ShouldAutoRemove() Exchange

func (Exchange) WithStrategy

func (e Exchange) WithStrategy(strategy string) Exchange

type Message

type Message struct {
	Series   uuid.UUID `json:"series"`
	Revision int       `json:"revision"`

	From *address `json:"from"`
	To   *address `json:"to"`

	TimeStamp time.Time `json:"time_stamp"`

	BodyType string      `json:"body_type"`
	Body     interface{} `json:"body"`
}

func NewMessage

func NewMessage(body interface{}) Message

func (Message) ReceivedBy added in v0.2.0

func (m Message) ReceivedBy(to *address) Message

func (Message) ReplyTo added in v0.2.0

func (m Message) ReplyTo(body interface{}) Message

func (Message) SendFrom added in v0.2.0

func (m Message) SendFrom(from *address) Message

func (Message) String

func (message Message) String() string

type MessageDeserializer added in v0.2.0

type MessageDeserializer interface {
	Deserialize() (Message, error)
}

type MessageSerializer added in v0.2.0

type MessageSerializer interface {
	Serialize() (string, error)
}

type Messenger

type Messenger interface {
	Publish(exchange Exchange, queue Queue, message Message) error
	Consume(exchange Exchange, queue Queue, consumer Consumer) (func() error, error)

	Close(func(err error))
}

func NewRabbitMessenger

func NewRabbitMessenger(url string) (Messenger, error)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue() Queue

func (Queue) Named

func (q Queue) Named(name string) Queue

func (Queue) ShouldAutoRemove

func (q Queue) ShouldAutoRemove() Queue

func (Queue) WithTimeToLive

func (q Queue) WithTimeToLive(duration time.Duration) Queue

func (Queue) WithTopic

func (q Queue) WithTopic(topic string) Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳