kafkahandler

package
v0.0.0-...-ec59de8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var KafkaConsumer = kafkaConsumerManager{}

Global KafkaConsumer variable represents the Kafka consumer manager instance.

Note: Before using KafkaConsumer, the InitializeKafkaConsumerManager function should be called to properly set up KafkaConsumer with the kafkaConsumerManager.

View Source
var KafkaProducer = kafkaProducerManager{}

Global KafkaProducer variable represents the Kafka producer manager instance.

Note: Before using KafkaProducer, the InitializeKafkaProducerManager function should be called to properly set up KafkaProducer with the kafkaProducerManager.

Functions

func CheckAllKafkaConnections

func CheckAllKafkaConnections(brokers []string) error

CheckAllKafkaConnections checks if all Kafka brokers are reachable. It takes a slice of broker addresses as input and returns an error if any broker is not reachable.

func InitializeKafkaConsumerManager

func InitializeKafkaConsumerManager(
	ctx context.Context,
	workDone chan int,
	workersPerTopic map[string]int,
	wg *sync.WaitGroup, brokers []string,
	processMsg func(msg kafka.Message, workerName string))

InitializeKafkaConsumerManager sets up the kafkaConsumerManager instance. It configures the necessary parameters for managing Kafka consumers based on the provided input.

Parameters:

  • ctx: The context.Context used for managing cancellation and timeouts for the consumer manager.
  • workDone: A channel of type int that signals when all worker goroutines have completed their tasks.
  • workersPerTopic: A map where keys are topic names (strings) and values are the number of workers (ints) assigned to each topic, dictating how many consumer instances will process messages from each topic.
  • wg: A pointer to a sync.WaitGroup that helps synchronize the completion of goroutines, allowing the main program to wait for all worker goroutines to finish before proceeding or exiting.
  • brokers: A slice of strings representing the addresses of the Kafka brokers to which the consumer manager will connect.
  • processMsg: A function that takes a kafka.Message and a workerName (string) as parameters. This function is called to process each message that the consumer receives.

func InitializeKafkaProducerManager

func InitializeKafkaProducerManager(brokers []string)

InitializeKafkaProducerManager sets up the Kafka producer by creating a Kafka writer configured with the provided broker addresses. This initializes the kafkaProducerManager for message production.

func SendConsumerResponse

func SendConsumerResponse(workerName, newId, fileType, status string)

SendConsumerResponse produces a Kafka message to the "media-docker-files-response" topic.

Parameters: - workerName: Name of the worker processing the message. - newId: Unique identifier for the message being processed. - fileType: Type of the file being processed. Allowed values:

  • "video"
  • "videoResolutions"
  • "image"
  • "audio"

- status: Status of the file processing. Allowed values:

  • "completed"
  • "failed"

CAUTION: Providing values outside the allowed range for fileType or status may cause errors during further processing by client backend services.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳