Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeWireFormat(data []byte, schemaID int) []byte
- func GivenCredentials(configuration Configuration) bool
- func SchemaRegistryClientWithConfiguration(configuration SchemaRegistryConfiguration) *srclient.SchemaRegistryClient
- type BasicAuth
- type Configuration
- type ConnectionConfig
- type ConsumeConfig
- type ConsumerConfiguration
- type Deserializer
- type Element
- type Kafka
- type Message
- type Module
- type ProduceConfig
- type ProducerConfiguration
- type ReaderConfig
- type RootModule
- type SASLConfig
- type SchemaRegistryConfiguration
- type Serde
- type SerdeType
- type Serializer
- type TLSConfig
- type WriterConfig
- type Xk6KafkaError
- func CreateSchema(client *srclient.SchemaRegistryClient, subject string, schema string, ...) (*srclient.Schema, *Xk6KafkaError)
- func DecodeWireFormat(message []byte) (int, []byte, *Xk6KafkaError)
- func DeserializeAvro(configuration Configuration, topic string, data []byte, element Element, ...) (interface{}, *Xk6KafkaError)
- func DeserializeByteArray(configuration Configuration, topic string, data []byte, element Element, ...) (interface{}, *Xk6KafkaError)
- func DeserializeJSON(configuration Configuration, topic string, data []byte, element Element, ...) (interface{}, *Xk6KafkaError)
- func DeserializeString(configuration Configuration, topic string, data []byte, element Element, ...) (interface{}, *Xk6KafkaError)
- func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)
- func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)
- func GetSchema(client *srclient.SchemaRegistryClient, subject string, schema string, ...) (*srclient.Schema, *Xk6KafkaError)
- func GetSubjectName(schema string, topic string, element Element, subjectNameStrategy string) (string, *Xk6KafkaError)
- func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)
- func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError
- func SerializeAvro(configuration Configuration, topic string, data interface{}, element Element, ...) ([]byte, *Xk6KafkaError)
- func SerializeByteArray(configuration Configuration, topic string, data interface{}, element Element, ...) ([]byte, *Xk6KafkaError)
- func SerializeJSON(configuration Configuration, topic string, data interface{}, element Element, ...) ([]byte, *Xk6KafkaError)
- func SerializeString(configuration Configuration, topic string, data interface{}, element Element, ...) ([]byte, *Xk6KafkaError)
- func ValidateConfiguration(configuration Configuration) *Xk6KafkaError
Constants ¶
const ( AvroSerializer string = "io.confluent.kafka.serializers.KafkaAvroSerializer" AvroDeserializer string = "io.confluent.kafka.serializers.KafkaAvroDeserializer" )
const ( ByteArray srclient.SchemaType = "BYTEARRAY" ByteArraySerializer string = "org.apache.kafka.common.serialization.ByteArraySerializer" ByteArrayDeserializer string = "org.apache.kafka.common.serialization.ByteArrayDeserializer" )
const ( JSONSchemaSerializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer" JSONSchemaDeserializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer" )
const ( Key Element = "key" Value Element = "value" MagicPrefixSize int = 5 ConcurrentRequests int = 16 )
const ( TopicNameStrategy string = "TopicNameStrategy" RecordNameStrategy string = "RecordNameStrategy" TopicRecordNameStrategy string = "TopicRecordNameStrategy" )
const ( // TODO: move these to their own package. ProtobufSerializer string = "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer" ProtobufDeserializer string = "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer" )
const ( String srclient.SchemaType = "STRING" StringSerializer string = "org.apache.kafka.common.serialization.StringSerializer" StringDeserializer string = "org.apache.kafka.common.serialization.StringDeserializer" )
const (
Timeout = time.Second * 10
)
Variables ¶
var ( GroupBalancers map[string]kafkago.GroupBalancer IsolationLevels map[string]kafkago.IsolationLevel DefaultDeserializer = StringDeserializer MaxWait = time.Millisecond * 200 RebalanceTimeout = time.Second * 5 )
var ( // ErrForbiddenInInitContext is used when a Kafka producer was used in the init context. ErrForbiddenInInitContext = NewXk6KafkaError( kafkaForbiddenInInitContext, "Producing Kafka messages in the init context is not supported", nil) // ErrInvalidDataType is used when a data type is not supported. ErrInvalidDataType = NewXk6KafkaError( invalidDataType, "Invalid data type provided for serializer/deserializer", nil) // ErrNotEnoughArguments is used when a function is called with too few arguments. ErrNotEnoughArguments = errors.New("not enough arguments") ErrInvalidPEMData = errors.New("tls: failed to find any PEM data in certificate input") )
var ( // CompressionCodecs is a map of compression codec names to their respective codecs. CompressionCodecs map[string]compress.Compression // Balancers is a map of balancer names to their respective balancers. Balancers map[string]kafkago.Balancer // DefaultSerializer is string serializer. DefaultSerializer = StringSerializer )
var TLSVersions map[string]uint16
TLSVersions is a map of TLS versions to their numeric values.
Functions ¶
func EncodeWireFormat ¶ added in v0.10.0
EncodeWireFormat adds the proprietary 5-byte prefix to the Avro, ProtoBuf or JSONSchema payload. https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func GivenCredentials ¶ added in v0.10.0
func GivenCredentials(configuration Configuration) bool
GivenCredentials returns true if the given configuration has credentials.
func SchemaRegistryClientWithConfiguration ¶ added in v0.10.0
func SchemaRegistryClientWithConfiguration(configuration SchemaRegistryConfiguration) *srclient.SchemaRegistryClient
SchemaRegistryClientWithConfiguration creates a SchemaRegistryClient instance with the given configuration. It will also configure auth and TLS credentials if exists.
Types ¶
type Configuration ¶ added in v0.4.0
type Configuration struct { Consumer ConsumerConfiguration `json:"consumer"` Producer ProducerConfiguration `json:"producer"` SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"` }
type ConnectionConfig ¶ added in v0.12.0
type ConnectionConfig struct { Address string `json:"address"` SASL SASLConfig `json:"sasl"` TLS TLSConfig `json:"tls"` }
type ConsumeConfig ¶ added in v0.12.0
type ConsumeConfig struct { Limit int64 `json:"limit"` Config Configuration `json:"config"` KeySchema string `json:"keySchema"` ValueSchema string `json:"valueSchema"` }
type ConsumerConfiguration ¶ added in v0.4.0
type Deserializer ¶ added in v0.8.0
type Deserializer func(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError)
type Message ¶ added in v0.12.0
type Message struct { Topic string `json:"topic"` // Setting Partition has no effect when writing messages. Partition int `json:"partition"` Offset int64 `json:"offset"` HighWaterMark int64 `json:"highWaterMark"` Key interface{} `json:"key"` Value interface{} `json:"value"` Headers map[string]interface{} `json:"headers"` // If not set at the creation, Time will be automatically set when // writing the message. Time time.Time `json:"time"` }
type ProduceConfig ¶ added in v0.12.0
type ProduceConfig struct { Messages []Message `json:"messages"` Config Configuration `json:"config"` KeySchema string `json:"keySchema"` ValueSchema string `json:"valueSchema"` }
type ProducerConfiguration ¶ added in v0.4.0
type ReaderConfig ¶ added in v0.12.0
type ReaderConfig struct { WatchPartitionChanges bool `json:"watchPartitionChanges"` ConnectLogger bool `json:"connectLogger"` Partition int `json:"partition"` QueueCapacity int `json:"queueCapacity"` MinBytes int `json:"minBytes"` MaxBytes int `json:"maxBytes"` MaxAttempts int `json:"maxAttempts"` GroupID string `json:"groupId"` Topic string `json:"topic"` IsolationLevel string `json:"isolationLevel"` StartOffset int64 `json:"startOffset"` Offset int64 `json:"offset"` Brokers []string `json:"brokers"` GroupTopics []string `json:"groupTopics"` GroupBalancers []string `json:"groupBalancers"` MaxWait time.Duration `json:"maxWait"` ReadLagInterval time.Duration `json:"readLagInterval"` HeartbeatInterval time.Duration `json:"heartbeatInterval"` CommitInterval time.Duration `json:"commitInterval"` PartitionWatchInterval time.Duration `json:"partitionWatchInterval"` SessionTimeout time.Duration `json:"sessionTimeout"` RebalanceTimeout time.Duration `json:"rebalanceTimeout"` JoinGroupBackoff time.Duration `json:"joinGroupBackoff"` RetentionTime time.Duration `json:"retentionTime"` ReadBackoffMin time.Duration `json:"readBackoffMin"` ReadBackoffMax time.Duration `json:"readBackoffMax"` SASL SASLConfig `json:"sasl"` TLS TLSConfig `json:"tls"` }
type RootModule ¶ added in v0.9.0
type RootModule struct{}
func (*RootModule) NewModuleInstance ¶ added in v0.9.0
func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance
NewModuleInstance creates a new instance of the Kafka module.
type SASLConfig ¶ added in v0.11.0
type SchemaRegistryConfiguration ¶ added in v0.4.0
type Serde ¶ added in v0.10.0
type Serde[T Serializer | Deserializer] struct { Registry map[string]*SerdeType[T] }
func NewDeserializersRegistry ¶ added in v0.10.0
func NewDeserializersRegistry() *Serde[Deserializer]
NewDeserializersRegistry creates a new instance of the Deserializer registry.
func NewSerializersRegistry ¶ added in v0.10.0
func NewSerializersRegistry() *Serde[Serializer]
NewSerializersRegistry creates a new instance of the Serializer registry.
type SerdeType ¶ added in v0.10.0
type SerdeType[T Serializer | Deserializer] struct { Function T Class string SchemaType srclient.SchemaType WireFormatted bool }
func NewSerdes ¶ added in v0.10.0
func NewSerdes[T Serializer | Deserializer]( function T, class string, schemaType srclient.SchemaType, wireFormatted bool, ) *SerdeType[T]
NewSerdes constructs a new SerdeType.
func (*SerdeType[Deserializer]) GetDeserializer ¶ added in v0.10.0
func (s *SerdeType[Deserializer]) GetDeserializer() Deserializer
GetDeserializer returns the deserializer if the given type is Deserializer.
func (*SerdeType[T]) GetSchemaType ¶ added in v0.10.0
func (s *SerdeType[T]) GetSchemaType() srclient.SchemaType
GetSchemaType returns the schema type.
func (*SerdeType[Serializer]) GetSerializer ¶ added in v0.10.0
func (s *SerdeType[Serializer]) GetSerializer() Serializer
GetSerializer returns the serializer if the given type is Serializer.
func (*SerdeType[T]) IsWireFormatted ¶ added in v0.10.0
IsWireFormatted returns true if the schema is wire formatted.
type Serializer ¶ added in v0.8.0
type Serializer func( configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, *Xk6KafkaError)
type WriterConfig ¶ added in v0.12.0
type WriterConfig struct { AutoCreateTopic bool `json:"autoCreateTopic"` ConnectLogger bool `json:"connectLogger"` MaxAttempts int `json:"maxAttempts"` BatchSize int `json:"batchSize"` BatchBytes int `json:"batchBytes"` RequiredAcks int `json:"requiredAcks"` Topic string `json:"topic"` Balancer string `json:"balancer"` Compression string `json:"compression"` Brokers []string `json:"brokers"` BatchTimeout time.Duration `json:"batchTimeout"` ReadTimeout time.Duration `json:"readTimeout"` WriteTimeout time.Duration `json:"writeTimeout"` SASL SASLConfig `json:"sasl"` TLS TLSConfig `json:"tls"` }
type Xk6KafkaError ¶ added in v0.10.0
func CreateSchema ¶ added in v0.10.0
func CreateSchema( client *srclient.SchemaRegistryClient, subject string, schema string, schemaType srclient.SchemaType, ) (*srclient.Schema, *Xk6KafkaError)
CreateSchema creates a new schema in the schema registry.
func DecodeWireFormat ¶ added in v0.10.0
func DecodeWireFormat(message []byte) (int, []byte, *Xk6KafkaError)
DecodeWireFormat removes the proprietary 5-byte prefix from the Avro, ProtoBuf or JSONSchema payload. https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func DeserializeAvro ¶ added in v0.8.0
func DeserializeAvro( configuration Configuration, topic string, data []byte, element Element, schema string, version int, ) (interface{}, *Xk6KafkaError)
DeserializeAvro deserializes the given data from wire-formatted Avro binary format and returns it as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and decode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a byte array. nolint: funlen
func DeserializeByteArray ¶ added in v0.8.0
func DeserializeByteArray( configuration Configuration, topic string, data []byte, element Element, schema string, version int, ) (interface{}, *Xk6KafkaError)
DeserializeByteArray deserializes the given data from a byte array and returns it. It just returns the data as is. The configuration, topic, element, schema and version are just used to conform with the interface.
func DeserializeJSON ¶ added in v0.13.0
func DeserializeJSON( configuration Configuration, topic string, data []byte, element Element, schema string, version int, ) (interface{}, *Xk6KafkaError)
DeserializeJSON deserializes the data from JSON and returns the decoded data. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and decode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a byte array. nolint: funlen
func DeserializeString ¶ added in v0.8.0
func DeserializeString( configuration Configuration, topic string, data []byte, element Element, schema string, version int, ) (interface{}, *Xk6KafkaError)
DeserializeString deserializes a string from bytes.
func GetDialer ¶ added in v0.11.0
func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)
GetDialer creates a kafka dialer from the given auth string or an unauthenticated dialer if the auth string is empty.
func GetSASLMechanism ¶ added in v0.11.0
func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)
GetSASLMechanism returns a kafka SASL config from the given credentials.
func GetSchema ¶ added in v0.10.0
func GetSchema( client *srclient.SchemaRegistryClient, subject string, schema string, schemaType srclient.SchemaType, version int, ) (*srclient.Schema, *Xk6KafkaError)
GetSchema returns the schema for the given subject and schema ID and version.
func GetSubjectName ¶ added in v0.11.0
func GetSubjectName(schema string, topic string, element Element, subjectNameStrategy string) (string, *Xk6KafkaError)
GetSubjectName return the subject name strategy for the given schema and topic.
func GetTLSConfig ¶ added in v0.10.0
func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)
GetTLSConfig creates a TLS config from the given TLS config struct and checks for errors. nolint: funlen
func NewXk6KafkaError ¶ added in v0.10.0
func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError
NewXk6KafkaError is the constructor for Xk6KafkaError.
func SerializeAvro ¶ added in v0.8.0
func SerializeAvro( configuration Configuration, topic string, data interface{}, element Element, schema string, version int, ) ([]byte, *Xk6KafkaError)
SerializeAvro serializes the given data to wire-formatted Avro binary format and returns it as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and encode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a string. nolint: funlen
func SerializeByteArray ¶ added in v0.8.0
func SerializeByteArray( configuration Configuration, topic string, data interface{}, element Element, schema string, version int, ) ([]byte, *Xk6KafkaError)
SerializeByteArray serializes the given data into a byte array and returns it. If the data is not a byte array, an error is returned. The configuration, topic, element, schema and version are just used to conform with the interface.
func SerializeJSON ¶ added in v0.13.0
func SerializeJSON( configuration Configuration, topic string, data interface{}, element Element, schema string, version int, ) ([]byte, *Xk6KafkaError)
SerializeJSON serializes the data to JSON and adds the wire format to the data and returns the serialized data. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and encode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a string. nolint: funlen
func SerializeString ¶ added in v0.8.0
func SerializeString( configuration Configuration, topic string, data interface{}, element Element, schema string, version int, ) ([]byte, *Xk6KafkaError)
SerializeString serializes a string to bytes.
func ValidateConfiguration ¶ added in v0.10.0
func ValidateConfiguration(configuration Configuration) *Xk6KafkaError
ValidateConfiguration validates the given configuration.
func (Xk6KafkaError) Error ¶ added in v0.10.0
func (e Xk6KafkaError) Error() string
Error implements the `error` interface, so Xk6KafkaError are normal Go errors.
func (Xk6KafkaError) Unwrap ¶ added in v0.10.0
func (e Xk6KafkaError) Unwrap() error
Unwrap implements the `xerrors.Wrapper` interface, so Xk6KafkaError are a bit future-proof Go 2 errors.