Emitters

Emitters deliver key-value messages into a particular topic in Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

Emitter

Setting up an emitter requires

  • a list of brokers
  • a target topic
  • a target codec
1
2
3
4
5
6
7
 
emitter, err := goka.NewEmitter([]string{"localhost:9092"}, "topic-name", new(codecs.Int64))
// handle error

// wait for all outstanding messages to be sent
err := emitter.Finish()
// handle error

Emitters write key-value pairs into a specific kafka topic. Encoding the message is done by a codec.

The emitter determines the partition based on the key using a hasher passed to the underlying sarama partitioner partitioner.

Messages can be sent

  • asynchronously (using the underlying producer’s batching) using Emit or

    Asynchronous calls return a EmitSync that can be used to subscribe on success or failure of the operation.

1
2
3
4
5
6
7
8
9
 
prom, err := emitter.Emit("key", int64(123))
// handle error

prom.Then(func(err error){
  if err != nil{
    // handle asynchronous error
  }
})
  • synchronously using EmitSync. The call returns when the message was sent or failed.
1
2
3
 
err := emitter.EmitSync("key", int64(124))
// handle error
Last modified January 22, 2022: emitter docs (e3f1214)