pgqueue

package module
v0.0.0-...-4c5ad21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

README

PostgreSQL-backed Queue library

GoDoc Build

This library allows to use a single PostgreSQL instance as a queue server.

Check the documentation for reference.

Check the examples to learn how to use it.

Documentation

Overview

Package pgqueue is a library allows to use a single PostgreSQL instance as a queue server.

pool, err := pgxpool.New(ctx, dsn)
if err != nil {
	log.Fatalln("cannot open database connection pool:", err)
}
client, err := pgqueue.Open(ctx, pool)
if err != nil {
	log.Fatalln("cannot create queue handler:", err)
}
defer client.Close()
if err := client.CreateTable(ctx); err != nil {
	log.Fatalln("cannot create queue table:", err)
}
queue := client.Queue("example-queue-reservation")
defer queue.Close()
content := []byte("content")
if err := queue.Push(ctx, content); err != nil {
	log.Fatalln("cannot push message to queue:", err)
}
r, err := queue.Reserve(ctx, 1*time.Minute)
if err != nil {
	log.Fatalln("cannot reserve message from the queue:", err)
}
fmt.Printf("content: %s\n", r.Content)
if err := r.Done(ctx); err != nil {
	log.Fatalln("cannot mark message as done:", err)
}
Example (Basic)
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool)
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queueName := "example-queue-name"
	if err := client.Push(ctx, queueName, []byte("content")); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	poppedContent, err := client.Pop(ctx, queueName, 1)
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", poppedContent[0])
}
Output:

content: content
Example (EmptyQueue)
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool)
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	_, err = client.Pop(ctx, "empty-name", 1)
	fmt.Println("err:", err)
}
Output:

err: empty queue
Example (Reservation)
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool)
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queueName := "example-queue-reservation"
	if err := client.Push(ctx, queueName, []byte("content")); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	msgs, err := client.Reserve(ctx, queueName, 1*time.Minute, 1)
	if err != nil {
		log.Fatalln("cannot reserve message from the queue:", err)
	}
	fmt.Printf("content: %s\n", msgs[0].Content())
	if err := client.Delete(ctx, msgs[0].ID()); err != nil {
		log.Fatalln("cannot mark message as done:", err)
	}
}
Output:

content: content
Example (ReservedReleased)
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool)
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queueName := "example-queue-release"
	if err := client.Push(ctx, queueName, []byte("content")); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	msgs, err := client.Reserve(ctx, queueName, 1*time.Minute, 1)
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", msgs[0].Content())
	if err := client.Release(ctx, msgs[0].ID()); err != nil {
		log.Fatalln("cannot release the message back to the queue:", err)
	}
}
Output:

content: content
Example (ReservedReleasedDeleted)
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool)
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queueName := "example-queue-release"
	if err := client.Push(ctx, queueName, []byte("content")); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	msgs, err := client.Reserve(ctx, queueName, 1*time.Minute, 1)
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", msgs[0].Content())
	if err := client.Delete(ctx, msgs[0].ID()); err != nil {
		log.Fatalln("cannot remove the message from the queue:", err)
	}
}
Output:

content: content
Example (ReservedTouch)
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool)
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queueName := "example-queue-touch"
	if err := client.Push(ctx, queueName, []byte("content")); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	msgs, err := client.Reserve(ctx, queueName, 10*time.Second, 1)
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", msgs[0].Content())
	time.Sleep(5 * time.Second)
	if err := client.Extend(ctx, 1*time.Minute, msgs[0].ID()); err != nil {
		log.Fatalln("cannot extend message lease:", err)
	}
}
Output:

content: content
Example (Vacuum)
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	"cirello.io/pgqueue"
	"github.com/jackc/pgx/v5/pgxpool"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	ctx := context.Background()
	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalln("cannot open database connection pool:", err)
	}
	client := pgqueue.Open(pool, pgqueue.WithMaxDeliveries(1), pgqueue.DisableAutoVacuum())
	defer client.Close()
	if err := client.CreateTable(ctx); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queueName := "example-queue-vacuum"
	for i := 0; i < 10; i++ {
		if err := client.Push(ctx, queueName, []byte("content")); err != nil {
			log.Fatalln("cannot push message to queue:", err)
		}
		if _, err := client.Pop(ctx, queueName, 1); err != nil {
			log.Fatalln("cannot pop message from the queue:", err)
		}
	}
	stats := client.Vacuum(ctx)
	if err := stats.Err(); err != nil {
		log.Fatalln("cannot clean up:", err)
	}
	fmt.Println("vacuum succeeded")
}
Output:

vacuum succeeded

Index

Examples

Constants

View Source
const DefaultMaxDeliveriesCount = 5

DefaultMaxDeliveriesCount is how many delivery attempt each message gets before getting skipped on Pop and Reserve calls.

Variables

View Source
var ErrAlreadyClosed = errors.New("queue is already closed")

ErrAlreadyClosed indicates the queue is closed and all its watchers are going to report the queue is no longer available.

View Source
var ErrDeadLetterQueueDisabled = errors.New("deadletter queue disabled")

ErrDeadLetterQueueDisabled indicates that is not possible to dump messages from the target deadletter queue because its support has been disabled.

View Source
var ErrEmptyQueue = fmt.Errorf("empty queue")

ErrEmptyQueue indicates there isn't any message available at the head of the queue.

View Source
var ErrInvalidDeadline = errors.New("invalid duration")

ErrInvalidDeadline indicates the target deadline may be in the past or zero.

View Source
var ErrInvalidDuration = errors.New("invalid duration")

ErrInvalidDuration indicates the duration used is too small. It must larger than a millisecond and be multiple of a millisecond.

View Source
var ErrReleaseIncomplete = errors.New("not all messages were released")

ErrReleaseIncomplete indicates that not all messages were released.

View Source
var ErrZeroSizedBulkOperation = errors.New("zero sized bulk operation")

ErrZeroSizedBulkOperation that the bulk operation size is zero.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client uses a postgreSQL database to run a queue system.

func Open

func Open(conn PgxConn, opts ...ClientOption) *Client

Open uses the given database connection and start operating the queue system.

func (*Client) ApproximateCount

func (c *Client) ApproximateCount(ctx context.Context, queueName string) (int, error)

ApproximateCount reports how many messages are available in the queue, for popping. It will skip messages that are currently being processed or stale.

func (*Client) Close

func (c *Client) Close() error

Close stops the queue system.

func (*Client) CreateTable

func (c *Client) CreateTable(ctx context.Context) error

CreateTable prepares the underlying table for the queue system.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, ids ...uint64) error

Delete removes the messages from the queue.

func (*Client) DumpDeadLetterQueue

func (c *Client) DumpDeadLetterQueue(ctx context.Context, queue string, n int) ([]*DeadMessage, error)

DumpDeadLetterQueue writes the messages into the writer and remove them from the database.

func (*Client) Extend

func (c *Client) Extend(ctx context.Context, extension time.Duration, ids ...uint64) error

Extend extends the messages lease by the given duration. The duration must be multiples of milliseconds.

func (*Client) Pop

func (c *Client) Pop(ctx context.Context, queueName string, n int) ([][]byte, error)

Pop retrieves a batch pending message from the queue, if any available. If the queue is empty, it returns ErrEmptyQueue.

func (*Client) Purge

func (c *Client) Purge(ctx context.Context, queueName string) error

Purge a queue, removing all messages from it.

func (*Client) Push

func (c *Client) Push(ctx context.Context, queueName string, contents ...[]byte) error

Push enqueues the given content batch to the target queue.

func (*Client) Release

func (c *Client) Release(ctx context.Context, ids ...uint64) error

Release puts the messages back to the queue.

func (*Client) Reserve

func (c *Client) Reserve(ctx context.Context, queueName string, lease time.Duration, n int) ([]*Message, error)

Reserve retrieves a batch of pending messages from the queue, if any available. It marks them as InProgress until the defined lease duration. If the message is not marked as Done by the lease time, it is returned to the queue. Lease duration must be multiple of milliseconds.

func (*Client) Vacuum

func (c *Client) Vacuum(ctx context.Context) VacuumStats

Vacuum cleans up the queue from done or dead messages.

func (*Client) VacuumStats

func (c *Client) VacuumStats() VacuumStats

VacuumStats returns the latest statistics of the auto-vacuum operation.

type ClientOption

type ClientOption func(*Client)

ClientOption reconfigures the behavior of the pgqueue Client.

func CustomAutoVacuumFrequency

func CustomAutoVacuumFrequency(d time.Duration) ClientOption

CustomAutoVacuumFrequency changes the frequency of the automatic vacuum.

func DisableAutoVacuum

func DisableAutoVacuum() ClientOption

DisableAutoVacuum forces the use of manual queue clean up.

func EnableDeadLetterQueue

func EnableDeadLetterQueue() ClientOption

EnableDeadLetterQueue keeps errored messages for later inspection.

func WithCustomTable

func WithCustomTable(tableName string) ClientOption

WithCustomTable changes the name of the postgresql table used for the queue.

func WithMaxDeliveries

func WithMaxDeliveries(maxDeliveries int) ClientOption

WithMaxDeliveries indicates how many delivery attempts each message gets. If zero, the client retries the message forever.

type DeadMessage

type DeadMessage struct {
	// contains filtered or unexported fields
}

DeadMessage represents one dead message from the queue.

func (*DeadMessage) Content

func (m *DeadMessage) Content() []byte

Content returns the content of the dead message.

func (*DeadMessage) ID

func (m *DeadMessage) ID() uint64

ID returns the unique identifier of the dead message.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents one message from the queue.

func (*Message) Content

func (m *Message) Content() []byte

Content returns the content of the message.

func (*Message) ID

func (m *Message) ID() uint64

ID returns the unique identifier of the message.

func (*Message) LeaseDeadline

func (m *Message) LeaseDeadline() time.Time

LeaseDeadline returns the time when the lease is going to expire.

type PgxConn

type PgxConn interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
	CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
}

PgxConn is an acquired *pgx.PgxConn from a Pool.

type State

type State string

State indicates the possible states of a message.

const (
	New        State = "new"
	InProgress State = "in-progress"
	Done       State = "done"
	Dead       State = "dead"
)

Acceptable states for messages.

type VacuumStats

type VacuumStats struct {
	// LastRun indicates the time of the lastest vacuum cycle.
	LastRun time.Time
	// PageSize indicates how large the vacuum operation was in order to
	// keep it short and non-disruptive.
	PageSize int64

	// DoneCount indicates how many messages were removed from the queue.
	DoneCount int64
	// ErrDone indicates why the cleaning up of complete messages failed. If
	// nil, it succeeded.
	ErrDone error

	// RestoreStaleCount indicates how many stale messages were restored
	// into the queue.
	RestoreStaleCount int64
	// ErrRestoreStale indicates why the restoration of stale messages
	// failed. If nil, it succeeded.
	ErrRestoreStale error

	// DeadLetterQueueCount indicates how many messages were diverted into
	// deadletter queues.
	DeadLetterQueueCount int64
	// ErrDeadLetterQueue indicates why the move of messages to deadletter
	// queue failed. If nil, it succeeded.
	ErrDeadLetterQueue error

	// BadMessagesDeleteCount indicates how many messages were deleted
	// because they have errored.
	BadMessagesDeleteCount int64
	// ErrBadMessagesDelete indicates why delete errored messages failed. If
	// nil, it succeeded.
	ErrBadMessagesDelete error

	// ErrTableVacuum indicates why the low-level vacuum operation on the
	// table failed.
	ErrTableVacuum error
	// contains filtered or unexported fields
}

VacuumStats reports the consequences of the clean up.

func (VacuumStats) Err

func (vs VacuumStats) Err() error

func (VacuumStats) String

func (vs VacuumStats) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳