First Steps
This page shows simple examples of the three building blocks of Goka to get you up and running in no time.
Check out the Guide
for an in-depth introduction to all components or dive into more complex examples in the repository.
Adding data to Kafka
Let’s start by pushing some data to a Kafka-Topic.
|
|
So what’s happening here? The emitter is a simple wrapper around a Sarama AsyncProducer
. All it needs to know is brokers, the target topic and a Codec
, which is used to convert the provided value (here an int64
) to []byte
.
Kafka also []byte
for keys as well, but for convenience goka assumes all keys are string
.
Note that the emitter does not create the topic, if you need to do that manually, see Creating kafka topics
The topic now ends up being filled with the key-value pairs: (key-0, 0)
, (key-1, 1)
, (key-0, 2)
, (key-1, 3)
, (key-0, 4)
, … and so on
Processing data
Writing a processor is dead simple. Here is some very simple processor, converting int64
to string
and accumulating the incoming numbers by key
|
|
What do you need to set it up?
- list of brokers
- a group graph
A group graph configures the group of the processor, as well as input and output topics. This is only the most basic example though, check out Creating a Group Graph
for more details. It’s called group graph, since we consider a bunch of goka-components as a topology or graph.
The group of the processor is used by Kafka’s rebalance protocol. By default, starting multiple instances with the same group will split partitions, so provides a way to scale horizontally. That’s what you need to know for now.
In the example above, we do the following:
-
use
goka.Input
to consumeint64-numbers
, which is filled with numbers by the Emitter above)The provided callback is called for every key-value-pair (in parallel - one go-routine for each partition)
The codec takes care of converting each
[]byte
message toint64
for us safely. -
Sprintf
it into astring
and emit it using the samekey
intostring-numbers
.The output is configured using
goka.Output()
, because we need to specify its name and codec, so we don’t have to fiddle with[]byte
s in the callback. -
Accumulating the incoming values under the same key.
Every Goka Processor can define one
group-table
, which makes it a stateful processor. A table can only be modified by the processor defining it. Modifications are limited to the key of the currrently processed key. The value for the current key is read byctx.Value()
(or nil if not set) and written byctx.SetValue()
.Each
group-table
is stored locally usingleveldb
and emitted to kafka-topic, namedconverter-table
in our example. This table can be used asLookup
by other processors or read by clients using aView
(see next step)
There is much more you can do with Processors, just check out the Guide.
Reading a table
Ok. Now we have data, we have processed it (very naively, sure), now we want to provide an API for it. So let’s do it.
|
|
Using a view
is even simpler, because all you need to do is define its table (which is usually the state of an other processor, so we can reuse it’s name) and a codec.
Waiting for view.Recovered()
is necessary, because it will essentially download the whole kafka-topic on the first run and must not be used earlier, to avoid it serving outdated data.
Key-Value semantics seems raw and too simple, but in the end it’s very powerful and there are very rare cases where other semantics are necessary. We do (or intend to) provide examples to use other data structures as well.
Starting a local cluster
There are many ways to get up a running Kafka, maybe you already have access to some hosted cluster. If you don’t, spin up a cluster locally using docker-compoe
:
## going to the examples folder of goka
cd goka/examples
## starting a cluster
make start
Creating kafka topics
Goka provides a convenient component to set up topics manually if necessary
|
|
Setting up topics is required for
- Emitter topics
- Processor outputs
- Processor inputs
The reason goka processors (and emitters) do not create those topics automatically is to separate the concerns between processing data and maintaining infrastructure. Only exceptions are:
- processor tables, i.e. log-compacted topics that processors use to store their state
- processor loopbacks
The TopicManager can also set up tables (i.e. topics configured for log compaction), which is usually done by the Processor
automatically. Sometimes it’s necessary to be done manually:
- if you start adding a view without a running processor
- if the company policy does not all creating topics during run time