queuelib

package module
v0.0.0-...-03d441d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2019 License: MIT Imports: 6 Imported by: 0

README

Queuelib - Golang, RabbitMQ

This is common queue library. It currently supports RabbitMQ.

Getting queuelib

go get -u github.com/krdganesh/queuelib

Instantiating the queue type

rmq, err := queuelib.Init("rabbitmq")

Connect to Queue

config := &queuelib.Config{
	ConString: "amqp://guest:guest@localhost:5672/",
}

result, err := rmq.Connect(config)

Publishing Message

var pubStruct = queuelib.PublishStruct{
	Exchange:    "testExchange",
	Key:         "",
	Mandatory:   false,
	Immediate:   false,
	ContentType: "text/plain",
	Message:     []byte("Testing Publish()"),
}

result, err := rmq.Publish(pubStruct)

Publish with Delay

//To use the delay feature, an exchange with type 'x-delayed-message' must be there.

var pubDelayStruct = queuelib.PublishStruct{
	Exchange:    "amqp.delay",
	Key:         "testKey",
	Mandatory:   false,
	Immediate:   false,
	ContentType: "text/plain",
	Message:     []byte("Testing Delayed Publish()"),
	Delay:       15000, //15 sec. delay
}

result, err := rmq.Publish(pubDelayStruct)

Subscribing Queue and Acknowledge

var subStruct = queuelib.SubscribeStruct{
	Queue:     "testQueue",
	Consumer:  "",
	AutoAck:   false,
	Exclusive: false,
	NoLocal:   false,
	NoWait:    false,
	PrefetchCount: 10, //Allows batching of messages
}

chForever := make(chan bool)
msgs, err := rmq.Subscribe(subStruct)
go func() {
	for msg := range msgs {
		log.Printf("Received a message: %s", msg.Body)
		result, err := rmq.Acknowledge(msg)
	}
}()
<-chForever

Get a Message and Acknowledge

var getStruct = queuelib.GetStruct{
	Queue:   "testQueue",
	AutoAck: false,
}

msg, ok, err := rmq.Get(getStruct)
log.Printf("Got a message: %s", msg.Body)

result, err := rmq.Acknowledge(msg)

Project Details

Author

Ganesh Karande
Email : [email protected]

Version

1.0.0

License

This project is licensed under the MIT License

Documentation

Index

Constants

View Source
const (
	//RABBITMQ : This is used in Factory
	RABBITMQ = "rabbitmq"
	KAFKA    = "kafka"
)

Variables

View Source
var (
	//ErrCursor : This is used when cursor is pointing to nil
	ErrCursor = errors.New("Invalid cursor")

	//ErrConnection : This is used when error occurs while creating connection
	ErrConnection = errors.New("Failed to connect to RabbitMQ, Kindly check RMQ Server is running / reachable and config data is correct")

	//ErrorInvalidQueue : This is used in Init(), where user passes unsupported queue type
	ErrorInvalidQueue = errors.New("Unsupported queue type")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	ConString      string
	Scheme         string
	Host           string
	Port           int
	Username       string
	Password       string
	Vhost          string
	KafkaTopic     string
	KafkaPartition int
}

Config : This struct is used to define all neccessary parameters required by Supported Queue Client i.e. RabbitMQ (As of now)

type Delivery

type Delivery struct {
	// Properties
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // queue implementation use - non-persistent (1) or persistent (2)
	Priority        uint8     // queue implementation use - 0 to 9
	CorrelationID   string    // application use - correlation identifier
	ReplyTo         string    // application use - address to reply to (ex: RPC)
	Expiration      string    // implementation use - message expiration spec
	MessageID       string    // application use - message identifier
	Timestamp       time.Time // application use - message timestamp
	Type            string    // application use - message type name
	UserID          string    // application use - creating user - should be authenticated user
	AppID           string    // application use - creating application id

	// Valid only with Channel.Consume
	ConsumerTag string

	// Valid only with Channel.Get
	MessageCount uint32

	DeliveryTag uint64
	Redelivered bool
	Exchange    string // basic.publish exhange
	RoutingKey  string // basic.publish routing key

	Body []byte

	KafkaTopic     string
	KafkaPartition int
	KafkaKey       []byte
}

Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.

type GetStruct

type GetStruct struct {
	Queue   string
	AutoAck bool
}

GetStruct : This struct is an input parameter for Get()

type Kafka

type Kafka struct {
	Connection *kafka.Conn
}

RabbitMQ : Pointer to this struct is retured in Init() if input QueueType is "rabbitmq"

func (*Kafka) Acknowledge

func (kafka *Kafka) Acknowledge(delivery Delivery) (result bool, err error)

Acknowledge : Function acknowledges a message using existing connection object. Input Parameters

DeliveryTag  : uint64

func (*Kafka) Connect

func (kafka *Kafka) Connect(config *Config) (result bool, err error)

Connect : Function connects to Kafka Server using connection string passed in Config. Input Parameters

config  : struct *Config

func (*Kafka) Get

func (kafka *Kafka) Get(get GetStruct) (msg Delivery, ok bool, err error)

Get : Function gets a message using existing connection object. Input Parameters

get  : struct GetStruct

func (*Kafka) Publish

func (kafka *Kafka) Publish(pub PublishStruct) (result bool, err error)

Publish : Function publishes the message using existing connection object. Input Parameters

pub  : struct PublishStruct

func (*Kafka) Subscribe

func (kafka *Kafka) Subscribe(sub SubscribeStruct) (delivery <-chan Delivery, err error)

Subscribe : Function consumes the messages using existing connection object. Input Parameters

sub  : struct SubscribeStruct

type PublishStruct

type PublishStruct struct {
	Exchange              string
	Key                   string
	Mandatory             bool
	Immediate             bool
	Message               []byte
	ContentType           string
	DeliveryTag           uint64
	Delay                 uint64 //Delay in milliseconds
	KafkaTimeoutInSeconds time.Duration
}

PublishStruct : This struct is an input parameter for Publish()

type Queue

type Queue interface {
	Connect(*Config) (bool, error)
	Publish(PublishStruct) (bool, error)
	Subscribe(SubscribeStruct) (<-chan Delivery, error)
	Get(GetStruct) (Delivery, bool, error)
	Acknowledge(Delivery) (bool, error)
}

Queue : This interface is used as return type of Init()

func Init

func Init(QueueType string) (Queue, error)

Init : Queue factory - Returns a Queue for general purpose

type RabbitMQ

type RabbitMQ struct {
	Connection *amqp.Connection
	Channel    *amqp.Channel
}

RabbitMQ : Pointer to this struct is retured in Init() if input QueueType is "rabbitmq"

func (*RabbitMQ) Acknowledge

func (rabbitmq *RabbitMQ) Acknowledge(delivery Delivery) (result bool, err error)

Acknowledge : Function acknowledges a message using existing connection object. Input Parameters

DeliveryTag  : uint64

func (*RabbitMQ) Connect

func (rabbitmq *RabbitMQ) Connect(config *Config) (result bool, err error)

Connect : Function connects to RabbitMQ Server using connection string passed in Config. Input Parameters

config  : struct *Config

func (*RabbitMQ) Get

func (rabbitmq *RabbitMQ) Get(get GetStruct) (msg Delivery, ok bool, err error)

Get : Function gets a message using existing connection object. Input Parameters

get  : struct GetStruct

func (*RabbitMQ) Publish

func (rabbitmq *RabbitMQ) Publish(pub PublishStruct) (result bool, err error)

Publish : Function publishes the message using existing connection object. Input Parameters

pub  : struct PublishStruct

func (*RabbitMQ) Subscribe

func (rabbitmq *RabbitMQ) Subscribe(sub SubscribeStruct) (delivery <-chan Delivery, err error)

Subscribe : Function consumes the messages using existing connection object. Input Parameters

sub  : struct SubscribeStruct

type SubscribeStruct

type SubscribeStruct struct {
	Queue                 string
	Consumer              string
	AutoAck               bool
	Exclusive             bool
	NoLocal               bool
	NoWait                bool
	PrefetchCount         int //Allows batching of messages
	PrefetchSize          int
	ApplyPrefetchGlobally bool //apply prefetch settings to all channels - across all consumers
	KafkaTopic            string
	KafkaConsumerGroupID  string
	KafkaBrokers          []string
}

SubscribeStruct : This struct is an input parameter for Subscribe()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳