Oni Kafka Framework

Oni is Kafka Framework written in Go (Golang). that makes you easy to consume and produce kafka messages using robust
API wrapper for kafka-go thanks to segmentio.
the usage most likely same with Gin / Echo web framework.
Oni Mask art by @inksyndromeartwork
- Required go installed on your machine
go version
- Get oni and kafka-go
go get -u github.com/coffeehaze/oni
go get -u github.com/segmentio/kafka-go
- Import oni
import "github.com/coffeehaze/oni"
Quick Start
- Create package
and create foo.go
file and place this code to it
package model
type Foo struct {
FooContent string `json:"foo_content"`
- Create package
and create main.go
file and place this code to it
package main
import (
func main() {
ctx := context.Background()
defer ctx.Done()
// initialize consumer
stream := oni.NewStream(kafka.ReaderConfig{
Brokers: []string{
"localhost:8097", // kafka brokers 1
"localhost:8098", // kafka brokers 2
"localhost:8099", // kafka brokers 3 you can only define one inside array
Topic: "foos", // topic you want to listen at
GroupID: "consumer-group-foos",
foosConsumer := oni.NewConsumer(stream)
"create.foo", // event key you want to map to specific handler function
func(ctx oni.Context) error {
var foo model.Foo
err := ctx.ShouldBindJSON(&foo) // bind message value to struct
if err != nil {
return err
fmt.Println(fmt.Sprintf("key=%s value=%s foo=%s", ctx.KeyString(), ctx.ValueString(), foo))
return nil
// initialize oni runner
// to help start and graceful shutdown all producer and consumer you defined
oniRunner := oni.Runner{
Context: ctx,
Timeout: 15 * time.Second,
Syscall: oni.SyscallOpt(
Consumers: oni.ConsumerOpt(foosConsumer),
- Create package
and create main.go
file and place this code to it
package main
import (
func main() {
foos := &kafka.Writer{
Addr: kafka.TCP("localhost:8097"), // kafka broker
Topic: "foos", // target topic you want to send
// create json object using json.Marshal
fooObj := model.Foo{
FooContent: fmt.Sprintf("This is new foo %d", time.Now().Unix()),
fooByte, _ := json.Marshal(fooObj)
err := foos.WriteMessages(context.Background(), kafka.Message{
Key: []byte("create.foo"), // target event you want to send at
Value: fooByte, // fooObj marshal result as the value
- Start run
and run producer/main.go
API Examples
oni.NewStream(cfg kafka.ReaderConfig)
// example for oni.NewStream(cfg kafka.ReaderConfig)
stream := oni.NewStream(kafka.ReaderConfig{
Brokers: []string{"localhost:8097"},
Topic: "foos",
GroupID: "consumer-group-foos",
oni.NewConsumer(stream *oni.Stream) IConsumer
// example for oni.NewConsumer(stream *oni.Stream)
// default consume mode is implicit
consumer := oni.NewConsumer(stream)
IConsumer.ErrorHandler(callbackFunc ErrorCallbackFunc)
// set global error handler which will be invoked when oni.HandlerFunc returns error
// this function should be called before handler creation and only called once
consumer.ErrorHandler(func (err error) {
if err != nil {
// do error handling logic such as logging
// or handle special error cases
// set consume mode to implicit which means every message
// received by *oni.Stream will be automatically ack or committed
// this function should be called before handler creation
// set consume mode to explicit which means every message
// received by *oni.Stream will be ack or committed manually using Context.Ack()
// this function should be called before handler creation
IConsumer.Group(keyGroup string) *Consumer
// create new group of consumer key event prefix, for example `event.notification.blast`
// could be had last suffix like `email.channel` and `sms.channel` so your next handler
// should be had key event `email.channel` and `sms.channel` and actual handler event key
// for each handler should look like this
// `event.notification.blast.email.channel`
// `event.notification.blast.sms.channel`
// and previous *oni.Stream behavior should inherit to new group
notificationBlastEvent := consumer.Group("event.notification.blast")
notificationBlastEvent.Handler("email.channel", func (ctx oni.Context) error {})
notificationBlastEvent.Handler("sms.channel", func (ctx oni.Context) error {})
IConsumer.Handler(key string, handlerFunc ...HandlerFunc)
// create handler function for specific key event, for this example is `event.send.email`
// message that produced to `event.send.email` key will be received by handler function
// defined in this handler creation and message value and details will be propagated through
// oni.Context
consumer.Handler("event.send.email", func (ctx oni.Context) error {
// put business logic here
return nil
IConsumer.Producer(name string, producerFunc ProducerFunc)
// create producer that can be accessed by its name through oni.Context functions that
// allows business logic to access producer by the name and use it for sending message
// to targeted topic, can be defined multiple times and only created when producer get
// called by oni.Context function and closed after sent the message
consumer.Producer("notification_producer", func() *kafka.Writer {
// you can define using *kafka.Writer struct or you can use templates from oni
// by returning this oni.BasicWriter(addr net.Addr, topic string) function.
return &kafka.Writer{
Addr: kafka.TCP("localhost:8097"),
Topic: "notification",
Context.ShouldBindJSON(v interface{}) error
func (ctx oni.Context) error {
var foo model.Foo
// binding received message value into model.Foo struct
err := ctx.ShouldBindJSON(&foo)
if err != nil {
return err
return nil
Context.ShouldRetryWith(producerFuncName string) error
func (ctx oni.Context) error {
// send back message to `retries` topic to be re-processed in side `main` topic
err := ctx.ShouldRetryWith("producer_name")
if err != nil {
return err
return nil
Context.ShouldErrorWith(producerFuncName string) error
func (ctx oni.Context) error {
// send back message to `failures` topic to be marked as invalid request format
// or any system failures possible
err := ctx.ShouldErrorWith("producer_name")
if err != nil {
return err
return nil
Context.ShouldReturnWith(producerFuncName string) error
func (ctx oni.Context) error {
// send back message from `retries` topic to `main` topic to be re-processed
// this function should be called only inside `retries` topic oni.HandlerFunc
err := ctx.ShouldReturnWith("producer_name")
if err != nil {
return err
return nil
Context.Ack() error
func (ctx oni.Context) error {
// ack-knowledge or commit message, this function only valid when using explicit
// consume mode because explicit mode doesn't automatically commit messages
err := ctx.Ack()
if err != nil {
return err
return nil
Context.ValueBytes() []byte
func (ctx oni.Context) error {
// returns message value as []byte
return nil
Context.ValueString() string
func (ctx oni.Context) error {
// returns message value as string
return nil
Context.KeyBytes() []byte
func (ctx oni.Context) error {
// returns message key as []byte
return nil
Context.KeyString() string
func (ctx oni.Context) error {
// returns message key as string
return nil
Context.Message() kafka.Message
func (ctx oni.Context) error {
// returns all message details
return nil
Context.ReaderStats() kafka.ReaderStats
func (ctx oni.Context) error {
// returns all reader stats
return nil
Context.ReaderConfig() kafka.ReaderConfig
func (ctx oni.Context) error {
// returns all reader configurations
return nil
Context.GetProducer(producerFuncName string) *kafka.Writer
func (ctx oni.Context) error {
// return find producer using its name, registered by this function
// IConsumer.Producer(name string, producerFunc ProducerFunc)
// to be used for sending message to topic you want
return nil
Context.OuterContext() context.Context
func (ctx oni.Context) error {
// return outer context that signed in the first creation of consumer
// can be used for getting key-value data inside it
return nil
Context.FindKey(key string) interface{}
func (ctx oni.Context) error {
// find key inside outer context and return it as interface{}
return nil
Context.CreateKeyVal(key string, val interface{})
func (ctx oni.Context) error {
// create key with its value into outer context
ctx.CreateKeyVal("key_name", "this is value put whatever you want inside here")
return nil