Documentation
¶
Index ¶
- Variables
- func CheckAllKafkaConnections(brokers []string) error
- func InitializeKafkaConsumerManager(ctx context.Context, workDone chan int, workersPerTopic map[string]int, ...)
- func InitializeKafkaProducerManager(brokers []string)
- func SendConsumerResponse(workerName, newId, fileType, status string)
Constants ¶
This section is empty.
Variables ¶
var KafkaConsumer = kafkaConsumerManager{}
Global KafkaConsumer variable represents the Kafka consumer manager instance.
Note: Before using KafkaConsumer, the InitializeKafkaConsumerManager function should be called to properly set up KafkaConsumer with the kafkaConsumerManager.
var KafkaProducer = kafkaProducerManager{}
Global KafkaProducer variable represents the Kafka producer manager instance.
Note: Before using KafkaProducer, the InitializeKafkaProducerManager function should be called to properly set up KafkaProducer with the kafkaProducerManager.
Functions ¶
func CheckAllKafkaConnections ¶
CheckAllKafkaConnections checks if all Kafka brokers are reachable. It takes a slice of broker addresses as input and returns an error if any broker is not reachable.
func InitializeKafkaConsumerManager ¶
func InitializeKafkaConsumerManager( ctx context.Context, workDone chan int, workersPerTopic map[string]int, wg *sync.WaitGroup, brokers []string, processMsg func(msg kafka.Message, workerName string))
InitializeKafkaConsumerManager sets up the kafkaConsumerManager instance. It configures the necessary parameters for managing Kafka consumers based on the provided input.
Parameters:
- ctx: The context.Context used for managing cancellation and timeouts for the consumer manager.
- workDone: A channel of type int that signals when all worker goroutines have completed their tasks.
- workersPerTopic: A map where keys are topic names (strings) and values are the number of workers (ints) assigned to each topic, dictating how many consumer instances will process messages from each topic.
- wg: A pointer to a sync.WaitGroup that helps synchronize the completion of goroutines, allowing the main program to wait for all worker goroutines to finish before proceeding or exiting.
- brokers: A slice of strings representing the addresses of the Kafka brokers to which the consumer manager will connect.
- processMsg: A function that takes a kafka.Message and a workerName (string) as parameters. This function is called to process each message that the consumer receives.
func InitializeKafkaProducerManager ¶
func InitializeKafkaProducerManager(brokers []string)
InitializeKafkaProducerManager sets up the Kafka producer by creating a Kafka writer configured with the provided broker addresses. This initializes the kafkaProducerManager for message production.
func SendConsumerResponse ¶
func SendConsumerResponse(workerName, newId, fileType, status string)
SendConsumerResponse produces a Kafka message to the "media-docker-files-response" topic.
Parameters: - workerName: Name of the worker processing the message. - newId: Unique identifier for the message being processed. - fileType: Type of the file being processed. Allowed values:
- "video"
- "videoResolutions"
- "image"
- "audio"
- status: Status of the file processing. Allowed values:
- "completed"
- "failed"
CAUTION: Providing values outside the allowed range for fileType or status may cause errors during further processing by client backend services.
Types ¶
This section is empty.