kafkajobs

package
v0.0.0-...-332a778 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EnsureTopicExists

func EnsureTopicExists(bootstrapServers string, topicName string, numPartitions uint, replicationFactor uint, retentionHours uint)

numPartitions 0 means "try read from KAFKA_NUM_PARTITIONS env var", if it is not defined, use default. replicationFactor = 0 means "try read from KAFKA_REPLICATION_FACTOR env var", if it is not defined, use default. retentionHours = 0 means "try read from KAFKA_RETENTION_HOURS env var", if it is not defined, use default.

Types

type Job

type Job = []byte

type JobQueueProducer

type JobQueueProducer struct {
	// contains filtered or unexported fields
}

func NewJobQueueProducer

func NewJobQueueProducer(bootstrapServers string, topicName string) JobQueueProducer

func (*JobQueueProducer) Close

func (p *JobQueueProducer) Close()

func (*JobQueueProducer) Enqueue

func (p *JobQueueProducer) Enqueue(jobKey string, jobBody []byte)

Blockes until the job delivery is received from the cluster

type JobQueueWorker

type JobQueueWorker struct {
	// contains filtered or unexported fields
}

func NewJobQueueWorker

func NewJobQueueWorker(bootstrapServers string, groupId string, topicName string, maxPermitedJobProcessing time.Duration) JobQueueWorker

func (*JobQueueWorker) Close

func (w *JobQueueWorker) Close()

func (*JobQueueWorker) Run

func (w *JobQueueWorker) Run(c chan<- Job, confirmationChannel <-chan int, stopChannel <-chan int)

Run the worker loop. The jobs are pushed to the channel "c".

func (*JobQueueWorker) TryGetNextJob

func (w *JobQueueWorker) TryGetNextJob(pollingInterval time.Duration) (Job, *kafka.Message, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳