kafka

package module
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

README

xk6-kafka logo xk6-kafka

GitHub Workflow Status Docker Pulls Coverage Status Go Reference

The xk6-kafka project is a k6 extension that enables k6 users to load test Apache Kafka using a producer and possibly a consumer for debugging.

The real purpose of this extension is to test the system you meticulously designed to use Apache Kafka. So, you can test your consumers, hence your system, by auto-generating messages and sending them to your system via Apache Kafka.

You can send many messages with each connection to Kafka. These messages are arrays of objects containing a key and a value in various serialization formats, passed via configuration objects. Various serialization formats are supported, including strings, JSON, binary, Avro, and JSON Schema. Avro and JSON Schema can either be fetched from Schema Registry or hard-code directly in the script. SASL PLAIN/SCRAM authentication and message compression are also supported.

For debugging and testing purposes, a consumer is available to make sure you send the correct data to Kafka.

If you want to learn more about the extension, see the article explaining how to load test your Kafka producers and consumers using k6 on the k6 blog.

Supported Features

Download Binaries

The Official Docker Image

The official Docker image is available on Docker Hub. Before running your script, make the script available to the container by mounting a volume (a directory) or passing it via stdin.

docker run --rm -i mostafamoradian/xk6-kafka:latest run - <scripts/test_json.js
The Official Binaries

The binaries are generated by the build process and published on the releases page. Currently, binaries for the GNU/Linux, macOS, and Windows on amd64 (x86_64) machines are available.

Note: If you want to see an official build for your machine, please build and test xk6-kafka from source and then create an issue with details. I'll add the specific binary to the build pipeline and publish them on the next release.

Build from Source

You can build the k6 binary on various platforms, each with its requirements. The following shows how to build k6 binary with this extension on GNU/Linux distributions.

Prerequisites

You must have the latest Go version installed to build the k6 binary. The latest version should match k6 and xk6. I recommend gvm because it eases version management.

  • gvm for easier installation and management of Go versions on your machine
  • Git for cloning the project
  • xk6 for building k6 binary with extensions
Install and build the latest tagged version

Feel free to skip the first two steps if you already have Go installed.

  1. Install gvm by following its installation guide.

  2. Install the latest version of Go using gvm. You need Go 1.4 installed for bootstrapping into higher Go versions, as explained here.

  3. Install xk6:

    go install go.k6.io/xk6/cmd/xk6@latest
    
  4. Build the binary:

    xk6 build --with github.com/mostafa/xk6-kafka@latest
    

Note You can always use the latest version of k6 to build the extension, but the earliest version of k6 that supports extensions via xk6 is v0.32.0. The xk6 is constantly evolving, so some APIs may not be backward compatible.

Build for development

If you want to add a feature or make a fix, clone the project and build it using the following commands. The xk6 will force the build to use the local clone instead of fetching the latest version from the repository. This process enables you to update the code and test it locally.

git clone [email protected]:mostafa/xk6-kafka.git && cd xk6-kafka
xk6 build --with github.com/mostafa/xk6-kafka@latest=.

Example scripts

There are many examples in the script directory that show how to use various features of the extension.

How to Test You Kafka Setup

You can start testing your setup immediately, but it takes some time to develop the script, so it would be better to test your script against a development environment and then start testing your environment.

Development environment

I recommend the fast-data-dev Docker image by Lenses.io, a Kafka setup for development that includes Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors. It is relatively easy to set up if you have Docker installed. Just monitor Docker logs to have a working setup before attempting to test because the initial setup, leader election, and test data ingestion take time.

  1. Run the Kafka environment and expose the ports:

    sudo docker run \
        --detach --rm \
        --name lensesio \
        -p 2181:2181 \
        -p 3030:3030 \
        -p 8081-8083:8081-8083 \
        -p 9581-9585:9581-9585 \
        -p 9092:9092 \
        -e ADV_HOST=127.0.0.1 \
        -e RUN_TESTS=0 \
        lensesio/fast-data-dev:latest
    
  2. After running the command, visit localhost:3030 to get into the fast-data-dev environment.

  3. You can run the command to see the container logs:

    sudo docker logs -f -t lensesio
    

Note: If you have errors running the Kafka development environment, refer to the fast-data-dev documentation.

The xk6-kafka API

All the exported functions are available by importing the module object from k6/x/kafka. The exported objects, constants and other data structures are available in the index.d.ts file, and they always reflect the latest changes on the main branch. You can access the generated documentation at docs/README.md.

⚠️ Warning: The Javascript API is subject to change in future versions unless a new major version is released.

k6 Test Scripts

The example scripts are available as test_<format/feature>.js with more code and commented sections in the scripts directory. Since this project extends the functionality of k6, it has four stages in the test life cycle.

  1. To use the extension, you need to import it in your script, like any other JS module:

    // Either import the module object
    import * as kafka from "k6/x/kafka";
    
    // Or individual classes and constants
    import { Writer, Reader, Connection, SOME_CONSTANT } from "k6/x/kafka";
    
  2. You need to instantiate the classes in the init context. All the k6 options are also configured here:

    // Creates a new Writer object to produce messages to Kafka
    const writer = new Writer({
        // WriterConfig object
        brokers: ["localhost:9092"],
        topic: "my-topic",
    });
    
    const reader = new Reader({
        // ReaderConfig object
        brokers: ["localhost:9092"],
        topic: "my-topic",
    });
    
    const connection = new Connection({
        // ConnectionConfig object
        address: "localhost:9092",
    });
    
    if (__VU == 0) {
        // Create a topic on initialization (before producing messages)
        connection.createTopic({
            // TopicConfig object
            topic: "my-topic",
        });
    }
    
  3. In the VU code, you can produce messages to Kafka or consume messages from it:

    export default function() {
        // Fetch the list of all topics
        const topics = connection.listTopics();
        console.log(topics); // list of topics
    
        // Produces message to Kafka
        writer.produce({
            // ProduceConfig object
            messages: [
                // Message object(s)
                {
                    key: "my-key",
                    value: "my-value",
                },
            ],
        });
    
        // Consume messages from Kafka
        let messages = reader.consume({
            // ConsumeConfig object
            limit: 10
        });
    
        // your messages
        console.log(message);
    
        // You can use checks to verify the contents,
        // length and other properties of the message(s)
    }
    
  4. In the teardown function, close all the connections and possibly delete the topic:

    export function teardown(data) {
        // Delete the topic
        connection.deleteTopic("my-topic");
    
        // Close all connections
        writer.close();
        reader.close();
        connection.close();
    }
    
  5. You can now run k6 with the extension using the following command:

    ./k6 run --vus 50 --duration 60s scripts/test_json.js
    
  6. And here's the test result output:

    
              /\      |‾‾| /‾‾/   /‾‾/
         /\  /  \     |  |/  /   /  /
        /  \/    \    |     (   /   ‾‾\
       /          \   |  |\  \ |  (‾)  |
      / __________ \  |__| \__\ \_____/ .io
    
    execution: local
        script: scripts/test_json.js
        output: -
    
    scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
            * default: 50 looping VUs for 1m0s (gracefulStop: 30s)
    
    
    running (1m00.2s), 00/50 VUs, 13778 complete and 0 interrupted iterations
    default ✓ [======================================] 50 VUs  1m0s
    
        ✓ 10 messages are received
        ✓ Topic equals to xk6_kafka_json_topic
        ✓ Key is correct
        ✓ Value is correct
        ✓ Header equals {'mykey': 'myvalue'}
        ✓ Time is past
        ✓ Partition is zero
        ✓ Offset is gte zero
        ✓ High watermark is gte zero
    
        █ teardown
    
        checks.........................: 100.00% ✓ 124002       ✗ 0
        data_received..................: 0 B     0 B/s
        data_sent......................: 0 B     0 B/s
        iteration_duration.............: avg=217.98ms min=26.64ms med=216.88ms max=357.64ms p(90)=244.95ms p(95)=254.86ms
        iterations.....................: 13778   229.051752/s
        kafka.reader.dial.count........: 50      0.831223/s
        kafka.reader.dial.seconds......: avg=4.76µs   min=0s      med=0s       max=2.22ms   p(90)=0s       p(95)=0s
     ✓ kafka.reader.error.count.......: 0       0/s
        kafka.reader.fetch_bytes.max...: 1000000 min=1000000    max=1000000
        kafka.reader.fetch_bytes.min...: 1       min=1          max=1
        kafka.reader.fetch_wait.max....: 200ms   min=200ms      max=200ms
        kafka.reader.fetch.bytes.......: 0 B     0 B/s
        kafka.reader.fetch.size........: 0       0/s
        kafka.reader.fetches.count.....: 50      0.831223/s
        kafka.reader.lag...............: 7457    min=5736       max=14370
        kafka.reader.message.bytes.....: 27 MB   450 kB/s
        kafka.reader.message.count.....: 137830  2291.348744/s
        kafka.reader.offset............: 2740    min=11         max=2810
        kafka.reader.queue.capacity....: 1       min=1          max=1
        kafka.reader.queue.length......: 1       min=0          max=1
        kafka.reader.read.seconds......: avg=0s       min=0s      med=0s       max=0s       p(90)=0s       p(95)=0s
        kafka.reader.rebalance.count...: 0       0/s
        kafka.reader.timeouts.count....: 0       0/s
        kafka.reader.wait.seconds......: avg=7.44µs   min=0s      med=0s       max=3.17ms   p(90)=0s       p(95)=0s
        kafka.writer.acks.required.....: -1      min=-1         max=0
        kafka.writer.async.............: 0.00%   ✓ 0            ✗ 1377800
        kafka.writer.attempts.max......: 0       min=0          max=0
        kafka.writer.batch.bytes.......: 302 MB  5.0 MB/s
        kafka.writer.batch.max.........: 1       min=1          max=1
        kafka.writer.batch.size........: 1377800 22905.17521/s
        kafka.writer.batch.timeout.....: 0s      min=0s         max=0s
     ✓ kafka.writer.error.count.......: 0       0/s
        kafka.writer.message.bytes.....: 603 MB  10 MB/s
        kafka.writer.message.count.....: 2755600 45810.350421/s
        kafka.writer.read.timeout......: 0s      min=0s         max=0s
        kafka.writer.retries.count.....: 0       0/s
        kafka.writer.wait.seconds......: avg=0s       min=0s      med=0s       max=0s       p(90)=0s       p(95)=0s
        kafka.writer.write.count.......: 2755600 45810.350421/s
        kafka.writer.write.seconds.....: avg=1.02ms   min=79.29µs med=893.09µs max=24.26ms  p(90)=1.22ms   p(95)=1.74ms
        kafka.writer.write.timeout.....: 0s      min=0s         max=0s
        vus............................: 50      min=50         max=50
        vus_max........................: 50      min=50         max=50
    
Troubleshooting

To avoid getting the following error while running the test:

Failed to write message: [5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes

You can now use createTopic method to create the topic in Kafka. The scripts/test_topics.js script shows how to list topics on all Kafka partitions, create a topic or delete one. You always have the option to create it using kafka-topics command:

$ docker exec -it lensesio bash
(inside container)$ kafka-topics --create --topic xk6_kafka_avro_topic --bootstrap-server localhost:9092
(inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092

Note: If you want to test SASL authentication, look at this commit message, where I describe how to run a test environment.

Contributions, Issues and Feedback

I'd be thrilled to receive contributions and feedback on this project. You're always welcome to create an issue if you find one (or many). I would do my best to address the issues. Also, feel free to contribute by opening a PR with changes, and I'll do my best to review and merge it as soon as I can.

Backward Compatibility Notice

If you want to keep up to date with the latest changes, please follow the project board. Also, since v0.9.0, the main branch is the development branch and usually has the latest changes and might be unstable. If you want to use the latest features, you might need to build your binary by following the build from source instructions. In turn, the tagged releases and the Docker images are more stable.

I make no guarantee to keep the API stable, as this project is in active development unless I release a major version. The best way to keep up with the changes is to follow the xk6-kafka API and look at the scripts directory.

The Release Process

The main branch is the development branch, and the pull requests will be squashed and merged into the main branch. When a commit is tagged with a version, for example, v0.10.0, the build pipeline will build the main branch on that commit. The build process creates the binaries and the Docker image. If you want to test the latest unreleased features, you can clone the main branch and instruct the xk6 to use the locally cloned repository instead of using the @latest, which refers to the latest tagged version, as explained in the build for development section.

CycloneDX SBOM

Since v0.9.0, CycloneDX SBOMs are generated for go.mod and it can be accessed from the latest build of GitHub Actions for a tagged release or on the release assets. The artifacts are only kept for 90 days, so I'll also add the SBOM to the release assets.

Disclaimer

This project was a proof of concept but seems to be used by some companies nowadays. However, it isn't supported by the k6 team, but rather by me personally, and the APIs may change in the future. USE AT YOUR OWN RISK!

This project was AGPL3-licensed up until 7 October 2021, and then we relicensed it under the Apache License 2.0.

Documentation

Index

Constants

View Source
const (
	AvroSerializer   string = "io.confluent.kafka.serializers.KafkaAvroSerializer"
	AvroDeserializer string = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
View Source
const (
	ByteArray srclient.SchemaType = "BYTEARRAY"

	ByteArraySerializer   string = "org.apache.kafka.common.serialization.ByteArraySerializer"
	ByteArrayDeserializer string = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
)
View Source
const (
	JSONSchemaSerializer   string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer"
	JSONSchemaDeserializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer"
)
View Source
const (
	Key                Element = "key"
	Value              Element = "value"
	MagicPrefixSize    int     = 5
	ConcurrentRequests int     = 16
)
View Source
const (
	TopicNameStrategy       string = "TopicNameStrategy"
	RecordNameStrategy      string = "RecordNameStrategy"
	TopicRecordNameStrategy string = "TopicRecordNameStrategy"
)
View Source
const (
	// TODO: move these to their own package.
	ProtobufSerializer   string = "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"
	ProtobufDeserializer string = "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"
)
View Source
const (
	String srclient.SchemaType = "STRING"

	StringSerializer   string = "org.apache.kafka.common.serialization.StringSerializer"
	StringDeserializer string = "org.apache.kafka.common.serialization.StringDeserializer"
)
View Source
const (
	Timeout = time.Second * 10
)

Variables

View Source
var (
	GroupBalancers map[string]kafkago.GroupBalancer

	IsolationLevels map[string]kafkago.IsolationLevel

	DefaultDeserializer = StringDeserializer

	MaxWait          = time.Millisecond * 200
	RebalanceTimeout = time.Second * 5
)
View Source
var (
	// ErrForbiddenInInitContext is used when a Kafka producer was used in the init context.
	ErrForbiddenInInitContext = NewXk6KafkaError(
		kafkaForbiddenInInitContext,
		"Producing Kafka messages in the init context is not supported",
		nil)

	// ErrInvalidDataType is used when a data type is not supported.
	ErrInvalidDataType = NewXk6KafkaError(
		invalidDataType,
		"Invalid data type provided for serializer/deserializer",
		nil)

	// ErrNotEnoughArguments is used when a function is called with too few arguments.
	ErrNotEnoughArguments = errors.New("not enough arguments")

	ErrInvalidPEMData = errors.New("tls: failed to find any PEM data in certificate input")
)
View Source
var (

	// CompressionCodecs is a map of compression codec names to their respective codecs.
	CompressionCodecs map[string]compress.Compression

	// Balancers is a map of balancer names to their respective balancers.
	Balancers map[string]kafkago.Balancer

	// DefaultSerializer is string serializer.
	DefaultSerializer = StringSerializer
)
View Source
var TLSVersions map[string]uint16

TLSVersions is a map of TLS versions to their numeric values.

Functions

func EncodeWireFormat added in v0.10.0

func EncodeWireFormat(data []byte, schemaID int) []byte

EncodeWireFormat adds the proprietary 5-byte prefix to the Avro, ProtoBuf or JSONSchema payload. https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

func GivenCredentials added in v0.10.0

func GivenCredentials(configuration Configuration) bool

GivenCredentials returns true if the given configuration has credentials.

func SchemaRegistryClientWithConfiguration added in v0.10.0

func SchemaRegistryClientWithConfiguration(configuration SchemaRegistryConfiguration) *srclient.SchemaRegistryClient

SchemaRegistryClientWithConfiguration creates a SchemaRegistryClient instance with the given configuration. It will also configure auth and TLS credentials if exists.

Types

type BasicAuth added in v0.4.0

type BasicAuth struct {
	Username string `json:"username"`
	Password string `json:"password"`
}

type Configuration added in v0.4.0

type Configuration struct {
	Consumer       ConsumerConfiguration       `json:"consumer"`
	Producer       ProducerConfiguration       `json:"producer"`
	SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"`
}

type ConnectionConfig added in v0.12.0

type ConnectionConfig struct {
	Address string     `json:"address"`
	SASL    SASLConfig `json:"sasl"`
	TLS     TLSConfig  `json:"tls"`
}

type ConsumeConfig added in v0.12.0

type ConsumeConfig struct {
	Limit       int64         `json:"limit"`
	Config      Configuration `json:"config"`
	KeySchema   string        `json:"keySchema"`
	ValueSchema string        `json:"valueSchema"`
}

type ConsumerConfiguration added in v0.4.0

type ConsumerConfiguration struct {
	KeyDeserializer     string `json:"keyDeserializer"`
	ValueDeserializer   string `json:"valueDeserializer"`
	SubjectNameStrategy string `json:"subjectNameStrategy"`
	UseMagicPrefix      bool   `json:"useMagicPrefix"`
}

type Deserializer added in v0.8.0

type Deserializer func(configuration Configuration, topic string, data []byte,
	element Element, schema string, version int) (interface{}, *Xk6KafkaError)

type Element added in v0.10.0

type Element string

type Kafka added in v0.1.1

type Kafka struct {
	// contains filtered or unexported fields
}

type Message added in v0.12.0

type Message struct {
	Topic string `json:"topic"`

	// Setting Partition has no effect when writing messages.
	Partition     int                    `json:"partition"`
	Offset        int64                  `json:"offset"`
	HighWaterMark int64                  `json:"highWaterMark"`
	Key           interface{}            `json:"key"`
	Value         interface{}            `json:"value"`
	Headers       map[string]interface{} `json:"headers"`

	// If not set at the creation, Time will be automatically set when
	// writing the message.
	Time time.Time `json:"time"`
}

type Module added in v0.13.0

type Module struct {
	*Kafka
}

func (*Module) Exports added in v0.13.0

func (m *Module) Exports() modules.Exports

Exports returns the exports of the Kafka module, which are the functions that can be called from the JS code.

type ProduceConfig added in v0.12.0

type ProduceConfig struct {
	Messages    []Message     `json:"messages"`
	Config      Configuration `json:"config"`
	KeySchema   string        `json:"keySchema"`
	ValueSchema string        `json:"valueSchema"`
}

type ProducerConfiguration added in v0.4.0

type ProducerConfiguration struct {
	KeySerializer       string `json:"keySerializer"`
	ValueSerializer     string `json:"valueSerializer"`
	SubjectNameStrategy string `json:"subjectNameStrategy"`
}

type ReaderConfig added in v0.12.0

type ReaderConfig struct {
	WatchPartitionChanges  bool          `json:"watchPartitionChanges"`
	ConnectLogger          bool          `json:"connectLogger"`
	Partition              int           `json:"partition"`
	QueueCapacity          int           `json:"queueCapacity"`
	MinBytes               int           `json:"minBytes"`
	MaxBytes               int           `json:"maxBytes"`
	MaxAttempts            int           `json:"maxAttempts"`
	GroupID                string        `json:"groupId"`
	Topic                  string        `json:"topic"`
	IsolationLevel         string        `json:"isolationLevel"`
	StartOffset            int64         `json:"startOffset"`
	Offset                 int64         `json:"offset"`
	Brokers                []string      `json:"brokers"`
	GroupTopics            []string      `json:"groupTopics"`
	GroupBalancers         []string      `json:"groupBalancers"`
	MaxWait                time.Duration `json:"maxWait"`
	ReadLagInterval        time.Duration `json:"readLagInterval"`
	HeartbeatInterval      time.Duration `json:"heartbeatInterval"`
	CommitInterval         time.Duration `json:"commitInterval"`
	PartitionWatchInterval time.Duration `json:"partitionWatchInterval"`
	SessionTimeout         time.Duration `json:"sessionTimeout"`
	RebalanceTimeout       time.Duration `json:"rebalanceTimeout"`
	JoinGroupBackoff       time.Duration `json:"joinGroupBackoff"`
	RetentionTime          time.Duration `json:"retentionTime"`
	ReadBackoffMin         time.Duration `json:"readBackoffMin"`
	ReadBackoffMax         time.Duration `json:"readBackoffMax"`
	SASL                   SASLConfig    `json:"sasl"`
	TLS                    TLSConfig     `json:"tls"`
}

type RootModule added in v0.9.0

type RootModule struct{}

func New

func New() *RootModule

New creates a new instance of the root module.

func (*RootModule) NewModuleInstance added in v0.9.0

func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance

NewModuleInstance creates a new instance of the Kafka module.

type SASLConfig added in v0.11.0

type SASLConfig struct {
	Username  string `json:"username"`
	Password  string `json:"password"`
	Algorithm string `json:"algorithm"`
}

type SchemaRegistryConfiguration added in v0.4.0

type SchemaRegistryConfiguration struct {
	URL       string    `json:"url"`
	BasicAuth BasicAuth `json:"basicAuth"`
	UseLatest bool      `json:"useLatest"`
	TLS       TLSConfig `json:"tls"`
}

type Serde added in v0.10.0

type Serde[T Serializer | Deserializer] struct {
	Registry map[string]*SerdeType[T]
}

func NewDeserializersRegistry added in v0.10.0

func NewDeserializersRegistry() *Serde[Deserializer]

NewDeserializersRegistry creates a new instance of the Deserializer registry.

func NewSerializersRegistry added in v0.10.0

func NewSerializersRegistry() *Serde[Serializer]

NewSerializersRegistry creates a new instance of the Serializer registry.

type SerdeType added in v0.10.0

type SerdeType[T Serializer | Deserializer] struct {
	Function      T
	Class         string
	SchemaType    srclient.SchemaType
	WireFormatted bool
}

func NewSerdes added in v0.10.0

func NewSerdes[T Serializer | Deserializer](
	function T, class string, schemaType srclient.SchemaType, wireFormatted bool,
) *SerdeType[T]

NewSerdes constructs a new SerdeType.

func (*SerdeType[Deserializer]) GetDeserializer added in v0.10.0

func (s *SerdeType[Deserializer]) GetDeserializer() Deserializer

GetDeserializer returns the deserializer if the given type is Deserializer.

func (*SerdeType[T]) GetSchemaType added in v0.10.0

func (s *SerdeType[T]) GetSchemaType() srclient.SchemaType

GetSchemaType returns the schema type.

func (*SerdeType[Serializer]) GetSerializer added in v0.10.0

func (s *SerdeType[Serializer]) GetSerializer() Serializer

GetSerializer returns the serializer if the given type is Serializer.

func (*SerdeType[T]) IsWireFormatted added in v0.10.0

func (s *SerdeType[T]) IsWireFormatted() bool

IsWireFormatted returns true if the schema is wire formatted.

type Serializer added in v0.8.0

type Serializer func(
	configuration Configuration, topic string, data interface{},
	element Element, schema string, version int) ([]byte, *Xk6KafkaError)

type TLSConfig added in v0.10.0

type TLSConfig struct {
	EnableTLS             bool   `json:"enableTls"`
	InsecureSkipTLSVerify bool   `json:"insecureSkipTlsVerify"`
	MinVersion            string `json:"minVersion"`
	ClientCertPem         string `json:"clientCertPem"`
	ClientKeyPem          string `json:"clientKeyPem"`
	ServerCaPem           string `json:"serverCaPem"`
}

type WriterConfig added in v0.12.0

type WriterConfig struct {
	AutoCreateTopic bool          `json:"autoCreateTopic"`
	ConnectLogger   bool          `json:"connectLogger"`
	MaxAttempts     int           `json:"maxAttempts"`
	BatchSize       int           `json:"batchSize"`
	BatchBytes      int           `json:"batchBytes"`
	RequiredAcks    int           `json:"requiredAcks"`
	Topic           string        `json:"topic"`
	Balancer        string        `json:"balancer"`
	Compression     string        `json:"compression"`
	Brokers         []string      `json:"brokers"`
	BatchTimeout    time.Duration `json:"batchTimeout"`
	ReadTimeout     time.Duration `json:"readTimeout"`
	WriteTimeout    time.Duration `json:"writeTimeout"`
	SASL            SASLConfig    `json:"sasl"`
	TLS             TLSConfig     `json:"tls"`
}

type Xk6KafkaError added in v0.10.0

type Xk6KafkaError struct {
	Code          errCode
	Message       string
	OriginalError error
}

func CreateSchema added in v0.10.0

func CreateSchema(
	client *srclient.SchemaRegistryClient, subject string, schema string, schemaType srclient.SchemaType,
) (*srclient.Schema, *Xk6KafkaError)

CreateSchema creates a new schema in the schema registry.

func DecodeWireFormat added in v0.10.0

func DecodeWireFormat(message []byte) (int, []byte, *Xk6KafkaError)

DecodeWireFormat removes the proprietary 5-byte prefix from the Avro, ProtoBuf or JSONSchema payload. https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

func DeserializeAvro added in v0.8.0

func DeserializeAvro(
	configuration Configuration, topic string, data []byte,
	element Element, schema string, version int,
) (interface{}, *Xk6KafkaError)

DeserializeAvro deserializes the given data from wire-formatted Avro binary format and returns it as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and decode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a byte array. nolint: funlen

func DeserializeByteArray added in v0.8.0

func DeserializeByteArray(
	configuration Configuration, topic string, data []byte,
	element Element, schema string, version int,
) (interface{}, *Xk6KafkaError)

DeserializeByteArray deserializes the given data from a byte array and returns it. It just returns the data as is. The configuration, topic, element, schema and version are just used to conform with the interface.

func DeserializeJSON added in v0.13.0

func DeserializeJSON(
	configuration Configuration, topic string, data []byte,
	element Element, schema string, version int,
) (interface{}, *Xk6KafkaError)

DeserializeJSON deserializes the data from JSON and returns the decoded data. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and decode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a byte array. nolint: funlen

func DeserializeString added in v0.8.0

func DeserializeString(
	configuration Configuration, topic string, data []byte,
	element Element, schema string, version int,
) (interface{}, *Xk6KafkaError)

DeserializeString deserializes a string from bytes.

func GetDialer added in v0.11.0

func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)

GetDialer creates a kafka dialer from the given auth string or an unauthenticated dialer if the auth string is empty.

func GetSASLMechanism added in v0.11.0

func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)

GetSASLMechanism returns a kafka SASL config from the given credentials.

func GetSchema added in v0.10.0

func GetSchema(
	client *srclient.SchemaRegistryClient, subject string, schema string, schemaType srclient.SchemaType, version int,
) (*srclient.Schema, *Xk6KafkaError)

GetSchema returns the schema for the given subject and schema ID and version.

func GetSubjectName added in v0.11.0

func GetSubjectName(schema string, topic string, element Element, subjectNameStrategy string) (string, *Xk6KafkaError)

GetSubjectName return the subject name strategy for the given schema and topic.

func GetTLSConfig added in v0.10.0

func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)

GetTLSConfig creates a TLS config from the given TLS config struct and checks for errors. nolint: funlen

func NewXk6KafkaError added in v0.10.0

func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError

NewXk6KafkaError is the constructor for Xk6KafkaError.

func SerializeAvro added in v0.8.0

func SerializeAvro(
	configuration Configuration, topic string, data interface{},
	element Element, schema string, version int,
) ([]byte, *Xk6KafkaError)

SerializeAvro serializes the given data to wire-formatted Avro binary format and returns it as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and encode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a string. nolint: funlen

func SerializeByteArray added in v0.8.0

func SerializeByteArray(
	configuration Configuration, topic string, data interface{},
	element Element, schema string, version int,
) ([]byte, *Xk6KafkaError)

SerializeByteArray serializes the given data into a byte array and returns it. If the data is not a byte array, an error is returned. The configuration, topic, element, schema and version are just used to conform with the interface.

func SerializeJSON added in v0.13.0

func SerializeJSON(
	configuration Configuration, topic string, data interface{},
	element Element, schema string, version int,
) ([]byte, *Xk6KafkaError)

SerializeJSON serializes the data to JSON and adds the wire format to the data and returns the serialized data. It uses the given version to retrieve the schema from Schema Registry, otherwise it uses the given schema to manually create the codec and encode the data. The configuration is used to configure the Schema Registry client. The element is used to define the subject. The data should be a string. nolint: funlen

func SerializeString added in v0.8.0

func SerializeString(
	configuration Configuration, topic string, data interface{},
	element Element, schema string, version int,
) ([]byte, *Xk6KafkaError)

SerializeString serializes a string to bytes.

func ValidateConfiguration added in v0.10.0

func ValidateConfiguration(configuration Configuration) *Xk6KafkaError

ValidateConfiguration validates the given configuration.

func (Xk6KafkaError) Error added in v0.10.0

func (e Xk6KafkaError) Error() string

Error implements the `error` interface, so Xk6KafkaError are normal Go errors.

func (Xk6KafkaError) Unwrap added in v0.10.0

func (e Xk6KafkaError) Unwrap() error

Unwrap implements the `xerrors.Wrapper` interface, so Xk6KafkaError are a bit future-proof Go 2 errors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳