amqpfx

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: MIT Imports: 5 Imported by: 0

README

AMQP Fx

Fx Module for amqp client. This project provide a thin wrapper around https://github.com/Azure/go-amqp.

Installation

Use Go modules to install amqpfx.

go get -u github.com/lagolibs/amqpfx

Example usage with multiple queue and viper:

Usage

amq.uris:
  clienta: amqp://localhost:61616
  clientb: amqp://localhost:61617
package main

import (
	"github.com/lagolibs/amqpfx"
	"github.com/samber/lo"
	"github.com/spf13/viper"
	"go.mongodb.org/mongo-driver/mongo"
	"go.uber.org/fx"
	"os"
)

func init() {
	viper.AddConfigPath(lo.Must(os.Getwd()))
	viper.SetConfigType("yaml")
	viper.SetConfigName("config")
	viper.AutomaticEnv()

	if err := viper.SafeWriteConfig(); err != nil {
		lo.Must0(viper.ReadInConfig())
	}
}

func main() {
	app := fx.New(
		amqpfx.NewModule("amq", amqpfx.WithURIs(viper.GetStringMapString("amq.uris"))),
		fx.Invoke(fx.Annotate(func(client *amqpfx.Client, client2 *amqpfx.Client) {}, fx.ParamTags(`name:"amq_clienta"`, `name:"amq_clienb"`))),
	)

	app.Run()
}

See tests for more usage.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewModule

func NewModule(namespace string, opts ...ModuleOption) fx.Option

NewModule construct a new fx Module for mongodb, using configuration options Each mongo client will be named as <namespace>_<name> Also register a <namespace> group

Example
package main

import (
	"github.com/lagolibs/amqpfx"
	"go.uber.org/fx"
)

func main() {
	configs := make(map[string]string, 2)
	configs["clienta"] = "amqp://localhost:61616"
	configs["clientb"] = "amqp://127.0.0.1:61616"

	fx.New(
		amqpfx.NewModule("amq", amqpfx.WithURIs(configs)),
		fx.Invoke(
			fx.Annotate(func(client *amqpfx.Client) {},
				fx.ParamTags(`name:"amq_clienta"`),
			),
		),
	).Run()
}
Output:

Example (SingleClient)
package main

import (
	"github.com/lagolibs/amqpfx"
	"go.uber.org/fx"
)

func main() {
	configs := make(map[string]string, 2)
	configs["clienta"] = "amqp://localhost:61616"

	fx.New(
		amqpfx.NewModule("amq", amqpfx.WithURIs(configs)),
		fx.Invoke(
			fx.Annotate(func(client *amqpfx.Client) {},
				fx.ParamTags(`name:"amq_clienta"`),
			),
		),
	).Run()
}
Output:

func NewSimpleModule

func NewSimpleModule(namespace string, uri string) fx.Option

NewSimpleModule construct a module contain single client. Does not register group namespace. The name of the mongo client is the same as the name space.

Example
package main

import (
	"github.com/lagolibs/amqpfx"
	"go.uber.org/fx"
)

func main() {
	fx.New(
		amqpfx.NewSimpleModule("amq", "amqp://localhost:61616"),
		fx.Invoke(
			fx.Annotate(func(client *amqpfx.Client) {},
				fx.ParamTags(`name:"amq"`),
			),
		),
	).Run()
}
Output:

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient added in v0.2.0

func NewClient(config *ClientConfig) (*Client, error)

func (*Client) Close

func (s *Client) Close() error

func (*Client) NewConn

func (s *Client) NewConn() (*Conn, error)

type ClientConfig added in v0.2.0

type ClientConfig struct {
	// contains filtered or unexported fields
}

func NewClientConfig added in v0.2.0

func NewClientConfig() ClientConfig

type Conn

type Conn struct {
	*amqp.Conn
	ConnectTimeout time.Duration
}

func (*Conn) NewSession

func (c *Conn) NewSession() (*Session, error)

type ModuleOption

type ModuleOption func(conf *moduleConfig)

ModuleOption applies an option to moduleConfig

func WithURIs

func WithURIs(uris map[string]string) ModuleOption

WithURIs create ModuleOption that parse a map of uris into moduleConfig. This help integrate with configuration library such as vipers

type ReceiveOption

type ReceiveOption = func(conf *amqp.ReceiveOptions)

type Receiver

type Receiver struct {
	*amqp.Receiver
}

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context, opts ...ReceiveOption) (*amqp.Message, error)

type ReceiverOption

type ReceiverOption = func(conf *amqp.ReceiverOptions)

type SendOption

type SendOption = func(conf *amqp.SendOptions)

type Sender

type Sender struct {
	*amqp.Sender
}

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption) error

type SenderOption

type SenderOption = func(conf *amqp.SenderOptions)

type Session

type Session struct {
	*amqp.Session
	ConnectTimeout time.Duration
}

func (*Session) NewReceiver

func (s *Session) NewReceiver(source string, opts ...ReceiverOption) (*Receiver, error)

func (*Session) NewSender

func (s *Session) NewSender(source string, opts ...SenderOption) (*Sender, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳