Documentation
¶
Overview ¶
Package pgqueue is a library allows to use a single PostgreSQL instance as a queue server.
pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client, err := pgqueue.Open(ctx, pool) if err != nil { log.Fatalln("cannot create queue handler:", err) } defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-reservation") defer queue.Close() content := []byte("content") if err := queue.Push(ctx, content); err != nil { log.Fatalln("cannot push message to queue:", err) } r, err := queue.Reserve(ctx, 1*time.Minute) if err != nil { log.Fatalln("cannot reserve message from the queue:", err) } fmt.Printf("content: %s\n", r.Content) if err := r.Done(ctx); err != nil { log.Fatalln("cannot mark message as done:", err) }
Example (Basic) ¶
package main import ( "context" "fmt" "log" "os" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queueName := "example-queue-name" if err := client.Push(ctx, queueName, []byte("content")); err != nil { log.Fatalln("cannot push message to queue:", err) } poppedContent, err := client.Pop(ctx, queueName, 1) if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", poppedContent[0]) }
Output: content: content
Example (EmptyQueue) ¶
package main import ( "context" "fmt" "log" "os" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } _, err = client.Pop(ctx, "empty-name", 1) fmt.Println("err:", err) }
Output: err: empty queue
Example (Reservation) ¶
package main import ( "context" "fmt" "log" "os" "time" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queueName := "example-queue-reservation" if err := client.Push(ctx, queueName, []byte("content")); err != nil { log.Fatalln("cannot push message to queue:", err) } msgs, err := client.Reserve(ctx, queueName, 1*time.Minute, 1) if err != nil { log.Fatalln("cannot reserve message from the queue:", err) } fmt.Printf("content: %s\n", msgs[0].Content()) if err := client.Delete(ctx, msgs[0].ID()); err != nil { log.Fatalln("cannot mark message as done:", err) } }
Output: content: content
Example (ReservedReleased) ¶
package main import ( "context" "fmt" "log" "os" "time" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queueName := "example-queue-release" if err := client.Push(ctx, queueName, []byte("content")); err != nil { log.Fatalln("cannot push message to queue:", err) } msgs, err := client.Reserve(ctx, queueName, 1*time.Minute, 1) if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", msgs[0].Content()) if err := client.Release(ctx, msgs[0].ID()); err != nil { log.Fatalln("cannot release the message back to the queue:", err) } }
Output: content: content
Example (ReservedReleasedDeleted) ¶
package main import ( "context" "fmt" "log" "os" "time" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queueName := "example-queue-release" if err := client.Push(ctx, queueName, []byte("content")); err != nil { log.Fatalln("cannot push message to queue:", err) } msgs, err := client.Reserve(ctx, queueName, 1*time.Minute, 1) if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", msgs[0].Content()) if err := client.Delete(ctx, msgs[0].ID()); err != nil { log.Fatalln("cannot remove the message from the queue:", err) } }
Output: content: content
Example (ReservedTouch) ¶
package main import ( "context" "fmt" "log" "os" "time" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queueName := "example-queue-touch" if err := client.Push(ctx, queueName, []byte("content")); err != nil { log.Fatalln("cannot push message to queue:", err) } msgs, err := client.Reserve(ctx, queueName, 10*time.Second, 1) if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", msgs[0].Content()) time.Sleep(5 * time.Second) if err := client.Extend(ctx, 1*time.Minute, msgs[0].ID()); err != nil { log.Fatalln("cannot extend message lease:", err) } }
Output: content: content
Example (Vacuum) ¶
package main import ( "context" "fmt" "log" "os" "cirello.io/pgqueue" "github.com/jackc/pgx/v5/pgxpool" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { ctx := context.Background() pool, err := pgxpool.New(ctx, dsn) if err != nil { log.Fatalln("cannot open database connection pool:", err) } client := pgqueue.Open(pool, pgqueue.WithMaxDeliveries(1), pgqueue.DisableAutoVacuum()) defer client.Close() if err := client.CreateTable(ctx); err != nil { log.Fatalln("cannot create queue table:", err) } queueName := "example-queue-vacuum" for i := 0; i < 10; i++ { if err := client.Push(ctx, queueName, []byte("content")); err != nil { log.Fatalln("cannot push message to queue:", err) } if _, err := client.Pop(ctx, queueName, 1); err != nil { log.Fatalln("cannot pop message from the queue:", err) } } stats := client.Vacuum(ctx) if err := stats.Err(); err != nil { log.Fatalln("cannot clean up:", err) } fmt.Println("vacuum succeeded") }
Output: vacuum succeeded
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) ApproximateCount(ctx context.Context, queueName string) (int, error)
- func (c *Client) Close() error
- func (c *Client) CreateTable(ctx context.Context) error
- func (c *Client) Delete(ctx context.Context, ids ...uint64) error
- func (c *Client) DumpDeadLetterQueue(ctx context.Context, queue string, n int) ([]*DeadMessage, error)
- func (c *Client) Extend(ctx context.Context, extension time.Duration, ids ...uint64) error
- func (c *Client) Pop(ctx context.Context, queueName string, n int) ([][]byte, error)
- func (c *Client) Purge(ctx context.Context, queueName string) error
- func (c *Client) Push(ctx context.Context, queueName string, contents ...[]byte) error
- func (c *Client) Release(ctx context.Context, ids ...uint64) error
- func (c *Client) Reserve(ctx context.Context, queueName string, lease time.Duration, n int) ([]*Message, error)
- func (c *Client) Vacuum(ctx context.Context) VacuumStats
- func (c *Client) VacuumStats() VacuumStats
- type ClientOption
- type DeadMessage
- type Message
- type PgxConn
- type State
- type VacuumStats
Examples ¶
Constants ¶
const DefaultMaxDeliveriesCount = 5
DefaultMaxDeliveriesCount is how many delivery attempt each message gets before getting skipped on Pop and Reserve calls.
Variables ¶
ErrAlreadyClosed indicates the queue is closed and all its watchers are going to report the queue is no longer available.
ErrDeadLetterQueueDisabled indicates that is not possible to dump messages from the target deadletter queue because its support has been disabled.
ErrEmptyQueue indicates there isn't any message available at the head of the queue.
ErrInvalidDeadline indicates the target deadline may be in the past or zero.
ErrInvalidDuration indicates the duration used is too small. It must larger than a millisecond and be multiple of a millisecond.
ErrReleaseIncomplete indicates that not all messages were released.
ErrZeroSizedBulkOperation that the bulk operation size is zero.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client uses a postgreSQL database to run a queue system.
func Open ¶
func Open(conn PgxConn, opts ...ClientOption) *Client
Open uses the given database connection and start operating the queue system.
func (*Client) ApproximateCount ¶
ApproximateCount reports how many messages are available in the queue, for popping. It will skip messages that are currently being processed or stale.
func (*Client) CreateTable ¶
CreateTable prepares the underlying table for the queue system.
func (*Client) Delete ¶
Delete removes the messages from the queue.
func (*Client) DumpDeadLetterQueue ¶
func (c *Client) DumpDeadLetterQueue(ctx context.Context, queue string, n int) ([]*DeadMessage, error)
DumpDeadLetterQueue writes the messages into the writer and remove them from the database.
func (*Client) Extend ¶
Extend extends the messages lease by the given duration. The duration must be multiples of milliseconds.
func (*Client) Pop ¶
Pop retrieves a batch pending message from the queue, if any available. If the queue is empty, it returns ErrEmptyQueue.
func (*Client) Purge ¶
Purge a queue, removing all messages from it.
func (*Client) Push ¶
Push enqueues the given content batch to the target queue.
func (*Client) Release ¶
Release puts the messages back to the queue.
func (*Client) Reserve ¶
func (c *Client) Reserve(ctx context.Context, queueName string, lease time.Duration, n int) ([]*Message, error)
Reserve retrieves a batch of pending messages from the queue, if any available. It marks them as InProgress until the defined lease duration. If the message is not marked as Done by the lease time, it is returned to the queue. Lease duration must be multiple of milliseconds.
func (*Client) Vacuum ¶
func (c *Client) Vacuum(ctx context.Context) VacuumStats
Vacuum cleans up the queue from done or dead messages.
func (*Client) VacuumStats ¶
func (c *Client) VacuumStats() VacuumStats
VacuumStats returns the latest statistics of the auto-vacuum operation.
type ClientOption ¶
type ClientOption func(*Client)
ClientOption reconfigures the behavior of the pgqueue Client.
func CustomAutoVacuumFrequency ¶
func CustomAutoVacuumFrequency(d time.Duration) ClientOption
CustomAutoVacuumFrequency changes the frequency of the automatic vacuum.
func DisableAutoVacuum ¶
func DisableAutoVacuum() ClientOption
DisableAutoVacuum forces the use of manual queue clean up.
func EnableDeadLetterQueue ¶
func EnableDeadLetterQueue() ClientOption
EnableDeadLetterQueue keeps errored messages for later inspection.
func WithCustomTable ¶
func WithCustomTable(tableName string) ClientOption
WithCustomTable changes the name of the postgresql table used for the queue.
func WithMaxDeliveries ¶
func WithMaxDeliveries(maxDeliveries int) ClientOption
WithMaxDeliveries indicates how many delivery attempts each message gets. If zero, the client retries the message forever.
type DeadMessage ¶
type DeadMessage struct {
// contains filtered or unexported fields
}
DeadMessage represents one dead message from the queue.
func (*DeadMessage) Content ¶
func (m *DeadMessage) Content() []byte
Content returns the content of the dead message.
func (*DeadMessage) ID ¶
func (m *DeadMessage) ID() uint64
ID returns the unique identifier of the dead message.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents one message from the queue.
func (*Message) Content ¶
Content returns the content of the message.
type PgxConn ¶
type PgxConn interface { Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) }
PgxConn is an acquired *pgx.PgxConn from a Pool.
type VacuumStats ¶
type VacuumStats struct { // LastRun indicates the time of the lastest vacuum cycle. LastRun time.Time // PageSize indicates how large the vacuum operation was in order to // keep it short and non-disruptive. PageSize int64 // DoneCount indicates how many messages were removed from the queue. DoneCount int64 // ErrDone indicates why the cleaning up of complete messages failed. If // nil, it succeeded. ErrDone error // RestoreStaleCount indicates how many stale messages were restored // into the queue. RestoreStaleCount int64 // ErrRestoreStale indicates why the restoration of stale messages // failed. If nil, it succeeded. ErrRestoreStale error // DeadLetterQueueCount indicates how many messages were diverted into // deadletter queues. DeadLetterQueueCount int64 // ErrDeadLetterQueue indicates why the move of messages to deadletter // queue failed. If nil, it succeeded. ErrDeadLetterQueue error // BadMessagesDeleteCount indicates how many messages were deleted // because they have errored. BadMessagesDeleteCount int64 // ErrBadMessagesDelete indicates why delete errored messages failed. If // nil, it succeeded. ErrBadMessagesDelete error // ErrTableVacuum indicates why the low-level vacuum operation on the // table failed. ErrTableVacuum error // contains filtered or unexported fields }
VacuumStats reports the consequences of the clean up.
func (VacuumStats) Err ¶
func (vs VacuumStats) Err() error
func (VacuumStats) String ¶
func (vs VacuumStats) String() string