Documentation
¶
Index ¶
- Variables
- func GetLoggerFromContext(ctx context.Context, key ContextKey) *slog.Logger
- func Init(ctx context.Context) context.Context
- func MetricsServerProcess(ctx context.Context, port int, gracefulShutdownTimeSec int) error
- func WithLoggerName(ctx context.Context, val ContextKey) context.Context
- func WithValue(ctx context.Context, key ContextKey, val any) context.Context
- type BambooHeartbeatPublisher
- type BambooLogHandler
- type BambooRequestConsumer
- func NewGoroutineBambooRequestConsumer(queue <-chan []byte) BambooRequestConsumer
- func NewKafkaBambooRequestConsumer(consumerOptions kafka.ReaderConfig, requestWaitTimeout time.Duration) BambooRequestConsumer
- func NewRedisBambooRequestConsumer(consumerOptions *redis.UniversalOptions, consumerChannel string, ...) BambooRequestConsumer
- type BambooRequestProducer
- func NewGoroutineBambooRequestProducer(ctx context.Context, workerName string, queue chan<- []byte) BambooRequestProducer
- func NewKafkaBambooRequestProducer(ctx context.Context, workerName string, kafkaWriter *kafka.Writer) BambooRequestProducer
- func NewRedisBambooRequestProducer(ctx context.Context, workerName string, producerOptions redis.UniversalOptions, ...) BambooRequestProducer
- type BambooResultPublisher
- type BambooResultSubscriber
- type BambooWorker
- type BambooWorkerClient
- type ByteArreayResult
- type CloseSubscribeConnectionFunc
- type ContextKey
- type CreateBambooRequestConsumerFunc
- type GoroutineBambooPubSubMap
- type LogConfigFunc
- type MetricsEventHandler
- type SubscribeFunc
- type WorkerFunc
- type WorkerJob
Constants ¶
This section is empty.
Variables ¶
View Source
var ( RequestIDKey = "bamboo_request_id" LoggerNameKey = "bamboo_logger_name" )
View Source
var (
BambooLoggers map[ContextKey]*slog.Logger
)
View Source
var ErrAborted = errors.New("Aborted")
View Source
var ErrContextCanceled = errors.New("ContextCanceled")
View Source
var ErrTimedout = errors.New("Timedout")
Functions ¶
func GetLoggerFromContext ¶ added in v0.0.2
func GetLoggerFromContext(ctx context.Context, key ContextKey) *slog.Logger
GetLoggerFromContext Gets the logger from context
func MetricsServerProcess ¶ added in v0.0.2
func WithLoggerName ¶ added in v0.0.2
func WithLoggerName(ctx context.Context, val ContextKey) context.Context
Types ¶
type BambooHeartbeatPublisher ¶ added in v0.0.2
type BambooHeartbeatPublisher interface { Ping(ctx context.Context) error Run(ctx context.Context, resultChannel string, heartbeatIntervalMSec int, done <-chan interface{}, aborted <-chan interface{}) error }
func NewGoroutineBambooHeartbeatPublisher ¶ added in v0.0.2
func NewGoroutineBambooHeartbeatPublisher(pubsubMap GoroutineBambooPubSubMap) BambooHeartbeatPublisher
func NewRedisBambooHeartbeatPublisher ¶
func NewRedisBambooHeartbeatPublisher(publisherOptions *redis.UniversalOptions) BambooHeartbeatPublisher
type BambooLogHandler ¶ added in v0.0.2
type BambooRequestConsumer ¶ added in v0.0.2
type BambooRequestConsumer interface { Ping(ctx context.Context) error Consume(ctx context.Context) (*pb.WorkerParameter, error) Close(ctx context.Context) error }
func NewGoroutineBambooRequestConsumer ¶ added in v0.0.2
func NewGoroutineBambooRequestConsumer(queue <-chan []byte) BambooRequestConsumer
func NewKafkaBambooRequestConsumer ¶ added in v0.0.2
func NewKafkaBambooRequestConsumer(consumerOptions kafka.ReaderConfig, requestWaitTimeout time.Duration) BambooRequestConsumer
func NewRedisBambooRequestConsumer ¶ added in v0.0.2
func NewRedisBambooRequestConsumer(consumerOptions *redis.UniversalOptions, consumerChannel string, requestWaitTimeout time.Duration) BambooRequestConsumer
type BambooRequestProducer ¶
type BambooRequestProducer interface { Produce(ctx context.Context, resultChannel string, heartbeatIntervalMSec int, jobTimeoutMSec int, headers map[string]string, data []byte) error Close(ctx context.Context) error }
func NewGoroutineBambooRequestProducer ¶ added in v0.0.2
func NewGoroutineBambooRequestProducer(ctx context.Context, workerName string, queue chan<- []byte) BambooRequestProducer
func NewKafkaBambooRequestProducer ¶
func NewKafkaBambooRequestProducer(ctx context.Context, workerName string, kafkaWriter *kafka.Writer) BambooRequestProducer
func NewRedisBambooRequestProducer ¶
func NewRedisBambooRequestProducer(ctx context.Context, workerName string, producerOptions redis.UniversalOptions, producerChannel string) BambooRequestProducer
type BambooResultPublisher ¶ added in v0.0.2
type BambooResultPublisher interface { Ping(ctx context.Context) error Publish(ctx context.Context, resultChannel string, result []byte) error }
func NewGoroutineBambooResultPublisher ¶ added in v0.0.2
func NewGoroutineBambooResultPublisher(pubsubMap GoroutineBambooPubSubMap) BambooResultPublisher
func NewRedisBambooResultPublisher ¶ added in v0.0.2
func NewRedisBambooResultPublisher(publisherOptions *redis.UniversalOptions) BambooResultPublisher
type BambooResultSubscriber ¶
type BambooResultSubscriber interface { Ping(ctx context.Context) error OpenSubscribeConnection(ctx context.Context, resultChannel string) (SubscribeFunc, CloseSubscribeConnectionFunc, error) }
func NewGoroutineBambooResultSubscriber ¶ added in v0.0.2
func NewGoroutineBambooResultSubscriber(ctx context.Context, workerName string, pubsubMap GoroutineBambooPubSubMap) BambooResultSubscriber
func NewRedisBambooResultSubscriber ¶ added in v0.0.2
func NewRedisBambooResultSubscriber(ctx context.Context, workerName string, subscriberConfig *redis.UniversalOptions) BambooResultSubscriber
type BambooWorker ¶
func NewBambooWorker ¶ added in v0.0.2
func NewBambooWorker(createRequestConsumerFunc CreateBambooRequestConsumerFunc, resultPublisher BambooResultPublisher, heartbeatPublisher BambooHeartbeatPublisher, workerFunc WorkerFunc, numWorkers int, logConfigFunc LogConfigFunc, metricsEventHandler MetricsEventHandler) (BambooWorker, error)
type BambooWorkerClient ¶ added in v0.0.2
type BambooWorkerClient interface { Ping(ctx context.Context) error Close(ctx context.Context) Call(ctx context.Context, heartbeatIntervalMSec int, jobTimeoutMSec int, headers map[string]string, param []byte) ([]byte, error) }
func NewBambooWorkerClient ¶ added in v0.0.2
func NewBambooWorkerClient(requestProducer BambooRequestProducer, resultSubscriber BambooResultSubscriber) BambooWorkerClient
type ByteArreayResult ¶
type CloseSubscribeConnectionFunc ¶ added in v0.0.2
type ContextKey ¶ added in v0.0.2
type ContextKey string
const ( BambooWorkerLoggerContextKey ContextKey = "BambooWorker" BambooWorkerClientLoggerContextKey ContextKey = "BambooWorkerClient" BambooWorkerJobLoggerContextKey ContextKey = "BambooWorkerJob" BambooHeartbeatPublisherLoggerContextKey ContextKey = "BambooHeartbeatPublisher" BambooRequestConsumerLoggerContextKey ContextKey = "BambooRequestConsumer" BambooRequestProducerLoggerContextKey ContextKey = "BambooRequestProducer" BambooResultPublisherLoggerContextKey ContextKey = "BambooResultPublisher" BambooResultSubscriberLoggerContextKey ContextKey = "BambooResultSubscriber" )
const ( RequestIDContextKey ContextKey = "RequestIDContextKey" LoggerNameContextKey ContextKey = "LoggerNameContextKey" )
type CreateBambooRequestConsumerFunc ¶ added in v0.0.2
type CreateBambooRequestConsumerFunc func(ctx context.Context) BambooRequestConsumer
type GoroutineBambooPubSubMap ¶ added in v0.0.2
type GoroutineBambooPubSubMap interface { CreateChannel(channelName string) chan []byte GetChannel(channelName string) (chan []byte, error) ClosePublishChannel(channelName string) error CloseSubscribeChannel(channelName string) error }
func NewGoroutineBambooPubSubMap ¶ added in v0.0.2
func NewGoroutineBambooPubSubMap() GoroutineBambooPubSubMap
type LogConfigFunc ¶
type MetricsEventHandler ¶ added in v0.0.2
type MetricsEventHandler interface { OnReceiveRequest() OnSuccessJob() OnInternalErrorJob() OnInvalidArgumentJob() OnIncrNumRunningWorkers() OnDecrNumRunningWorkers() }
func NewEmptyEventHandler ¶ added in v0.0.2
func NewEmptyEventHandler() MetricsEventHandler
func NewPrometheusEventHandler ¶ added in v0.0.2
func NewPrometheusEventHandler() MetricsEventHandler
type SubscribeFunc ¶ added in v0.0.2
type SubscribeFunc func(ctx context.Context) (*pb.WorkerResponse, error)
type WorkerFunc ¶
type WorkerJob ¶ added in v0.0.2
func NewWorkerJob ¶ added in v0.0.2
func NewWorkerJob(ctx context.Context, carrier propagation.MapCarrier, workerFunc WorkerFunc, headers map[string]string, parameter []byte, resultPublisher BambooResultPublisher, resultChannel string, done chan<- interface{}, aborted <-chan interface{}, logConfigFunc LogConfigFunc, metricsEventHandler MetricsEventHandler) WorkerJob
Source Files
¶
- core.go
- goroutine_heartbeat_publisher.go
- goroutine_pubsub_map.go
- goroutine_request_consumer.go
- goroutine_request_producer.go
- goroutine_result_publisher.go
- goroutine_result_subscriber.go
- kafka_request_consumer.go
- kafka_request_producer.go
- log_handler.go
- metrics_server.go
- prometheus_event_handler.go
- redis_heartbeat_publisher.go
- redis_request_consumer.go
- redis_request_producer.go
- redis_result_publisher.go
- redis_result_subscriber.go
- worker.go
- worker_client.go
- worker_job.go
Directories
¶
Path | Synopsis |
---|---|
example
|
|
calc-app
Module
|
|
goroutine-app
Module
|
|
helloworld-app
Module
|
|
worker-redis-redis
Module
|
|
workflow-app
Module
|
|
Click to show internal directories.
Click to hide internal directories.