Embedding Kafka Connect in Kafka Streams + KSQL

Previously I presented the Kafka abstraction funnel and how it provides a simple yet powerful tool for writing applications that use Apache Kafka.  In this post I will show how these abstractions also provide a straightforward means of interfacing with Kafka Connect, so that applications that use Kafka Streams and KSQL can easily integrate with external systems like MySQL, Elasticsearch, and others.1

This is a somewhat lengthy post, so feel free to skip to the summary below.

Twins Separated at Birth?

Normally when using Kafka Connect, one would launch a cluster of Connect workers to run a combination of source connectors, that pull data from an external system into Kafka, and sink connectors, that push data from Kafka to an external system.  Once the data is in Kafka, one could then use Kafka Streams or KSQL to perform stream processing on the data.

Let’s take a look at two of the primary abstractions in Kafka Connect, the SourceTask and the SinkTask. The heart of the SourceTask class is the poll() method, which returns data from the external system.

/**
 * SourceTask is a Task that pulls records from another system for 
 * storage in Kafka.
 */
public abstract class SourceTask implements Task {
    ...

    public abstract void start(Map<String, String> props);

    public abstract List<SourceRecord> poll() 
        throws InterruptedException;

    public abstract void stop();

    ...
}

Likewise, the heart of the SinkTask is the put(Collection<SinkRecord> records) method, which sends data to the external system.

/**
 * SinkTask is a Task that takes records loaded from Kafka and 
 * sends them to another system.
 */
public abstract class SinkTask implements Task {
    ...

    public abstract void start(Map<String, String> props);

    public abstract void put(Collection<SinkRecord> records);

    public void flush(
        Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

    public abstract void stop();

    ...
}

Do those Kafka Connect classes remind of us anything in the Kafka abstraction funnel? Yes, indeed, the Consumer and Producer interfaces. Here is the Consumer:

/**
 * A client that consumes records from a Kafka cluster.
 */
public interface Consumer<K, V> extends Closeable {
    ...

    public void subscribe(
        Pattern pattern, ConsumerRebalanceListener callback);

    public ConsumerRecords<K, V> poll(long timeout);

    public void unsubscribe();

    public void close();

    ...
}

And here is the Producer:

/**
 * A Kafka client that publishes records to the Kafka cluster.
 */
public interface Producer<K, V> extends Closeable {
     ...

    Future<RecordMetadata> send(
        ProducerRecord<K, V> record, Callback callback);

    void flush();

    void close();

    ...
}

There are other methods in those interfaces that I’ve elided, but the above methods are the primary ones used by Kafka Streams.

Connect, Meet Streams

The Connect APIs and the Producer-Consumer APIs are very similar. Perhaps we can create implementations of the Producer-Consumer APIs that merely delegate to the Connect APIs. But how would we plug in our new implementations into Kafka Streams? Well, it turns out that Kafka Streams allows you to implement an interface that instructs it where to obtain a Producer and a Consumer:

/**
 * {@code KafkaClientSupplier} can be used to provide custom Kafka clients 
 * to a {@link KafkaStreams} instance.
 */
public interface KafkaClientSupplier {

    AdminClient getAdminClient(final Map<String, Object> config);

    Producer<byte[], byte[]> getProducer(Map<String, Object> config);

    Consumer<byte[], byte[]> getConsumer(Map<String, Object> config);

    ...
}

That’s just we want. Let’s try implementing a new Consumer that delegates to a Connect SourceTask. We’ll call it ConnectSourceConsumer.

public class ConnectSourceConsumer implements Consumer<byte[], byte[]> {

    private final SourceTask task;
    private final Converter keyConverter;
    private final Converter valueConverter;
    ...
    
    public ConsumerRecords<byte[], byte[]> poll(long timeout) {
        // Poll the Connect source task
        List<SourceRecord> records = task.poll();
        return records != null 
            ? new ConsumerRecords<>(convertRecords(records)) 
            : ConsumerRecords.empty();
    }

    // Convert the Connect records into Consumer records
    private ConsumerRecords<byte[], byte[]> convertRecords(
            List<SourceRecord> records) {

        for (final SourceRecord record : records) {
            byte[] key = keyConverter.fromConnectData(
                record.topic(), record.keySchema(), record.key());
            byte[] value = valueConverter.fromConnectData(
                record.topic(), record.valueSchema(), record.value());
            int partition = record.kafkaPartition() != null 
                ? record.kafkaPartition() : 0;
            final ConsumerRecord<byte[], byte[]> consumerRecord =
                    new ConsumerRecord<>(
                            record.topic(),
                            partition,
                            ...
                            key,
                            value);
            TopicPartition tp = new TopicPartition(
                record.topic(), partition);
            List<ConsumerRecord<byte[], byte[]>> consumerRecords = 
                result.computeIfAbsent(tp, k -> new ArrayList<>());
            consumerRecords.add(consumerRecord);
        }
        return new ConsumerRecords<>(result);
    }

    ...
}

And here is the new Producer that delegates to a Connect SinkTask, called ConnectSinkProducer.

public class ConnectSinkProducer implements Producer<byte[], byte[]> {

    private final SinkTask task;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final List<SinkRecord> recordBatch;
    ...

    public Future<RecordMetadata> send(
        ProducerRecord<byte[], byte[]> record, Callback callback) {
        convertRecords(Collections.singletonList(record));
        ...
    }

    // Convert the Connect records into Producer records
    private void convertRecords(
        List<ProducerRecord<byte[], byte[]>> records) {

        for (ProducerRecord<byte[], byte[]> record : records) {
            SchemaAndValue keyAndSchema = record.key() != null
                ? keyConverter.toConnectData(record.topic(), record.key())
                : SchemaAndValue.NULL;
            SchemaAndValue valueAndSchema = record.value() != null
                ? valueConverter.toConnectData(
                    record.topic(), record.value())
                : SchemaAndValue.NULL;
            int partition = record.partition() != null 
                ? record.partition() : 0;
            SinkRecord producerRecord = new SinkRecord(
                    record.topic(), partition,
                    keyAndSchema.schema(), keyAndSchema.value(),
                    valueAndSchema.schema(), valueAndSchema.value(),
                    ...);
            recordBatch.add(producerRecord);
        }
    }

    public void flush() {
        deliverRecords();
    }

    private void deliverRecords() {
        // Finally, deliver this batch to the sink
        try {
            task.put(new ArrayList<>(recordBatch));
            recordBatch.clear();
        } catch (RetriableException e) {
            // The batch will be reprocessed on the next loop.
        } catch (Throwable t) {
            throw new ConnectException("Unrecoverable exception:", t);
        }
    }

    ...
}

With our new classes in hand, let’s implement KafkaClientSupplier so we can let Kafka Streams know about them.

public class ConnectClientSupplier implements KafkaClientSupplier {
    ...
    
    public Producer<byte[], byte[]> getProducer(
        final Map<String, Object> config) {
        return new ConnectSinkProducer(config);
    }

    public Consumer<byte[], byte[]> getConsumer(
        final Map<String, Object> config) {
        return new ConnectSourceConsumer(config);
    }

    ...
}

Words Without Counts

We now have enough to run a simple function using Kafka Connect embedded in Kafka Streams.2. Let’s give it a whirl.

The following code performs the first half of a WordCount application, where the input is a stream of lines of text, and the output is a stream of words. However, instead of using Kafka for input/output, we use the JDBC Connector to read from a database table and write to another.

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple");
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-example-client");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    ...
    StreamsConfig streamsConfig = new StreamsConfig(props);

    Map<String, Object> config = new HashMap<>();
    ...
    ConnectStreamsConfig connectConfig = new ConnectStreamsConfig(config);

    // The JDBC source task configuration
    Map<String, Object> config = new HashMap<>();
    config.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, jdbcUrl);
    config.put(JdbcSourceTaskConfig.TABLES_CONFIG, inputTopic);
    config.put(TaskConfig.TASK_CLASS_CONFIG, JdbcSourceTask.class.getName());
    TaskConfig sourceTaskConfig = new TaskConfig(config);

    // The JDBC sink task configuration
    Map<String, Object> config = new HashMap<>();
    config.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, jdbcUrl);
    config.put(TaskConfig.TASK_CLASS_CONFIG, JdbcSinkTask.class.getName());
    TaskConfig sinkTaskConfig = new TaskConfig(config);

    StreamsBuilder builder = new StreamsBuilder();

    KStream<SchemaAndValue, SchemaAndValue> input = 
        builder.stream(inputTopic);

    KStream<SchemaAndValue, SchemaAndValue> output = input
        .flatMapValues(value -> {
            Struct lines = (Struct) value.value();
            String[] strs = lines.get("lines").toString()
                .toLowerCase().split("\\W+");
            List<SchemaAndValue> result = new ArrayList<>();
            for (String str : strs) {
                if (str.length() > 0) {
                    Schema schema = SchemaBuilder.struct().name("word")
                        .field("word", Schema.STRING_SCHEMA).build();
                    Struct struct = new Struct(schema).put("word", str);
                    result.add(new SchemaAndValue(schema, struct));
                }
            }
            return result;
        });

    output.to(outputTopic);

    streams = new KafkaStreams(builder.build(), streamsConfig,
            new ConnectClientSupplier("JDBC", connectConfig,
                    Collections.singletonMap(inputTopic, sourceTaskConfig),
                    Collections.singletonMap(outputTopic, sinkTaskConfig)));
    streams.start();
}

When we run the above example, it executes without involving Kafka at all!

This Is Not A Pipe3

Now that we have the first half of a WordCount application, let’s complete it. Normally we would just add a groupBy function to the above example, however that won’t work with our JDBC pipeline. The reason can be found in the JavaDoc for groupBy:

Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named “${applicationId}-XXX-repartition”, where “applicationId” is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, “XXX” is an internally generated name, and “-repartition” is a fixed suffix.

So Kafka Streams will try to create a new topic, and it will do so by using the AdminClient obtained from KafkaClientSupplier. Therefore, we could, if we chose to, create an implementation of AdminClient that creates database tables instead of Kafka topics. However, if we remember to think of Kafka as Unix pipes, all we want to do is restructure our previous WordCount pipeline from this computation:

to the following computation, where the database interactions are no longer represented by pipes, but rather by stdin and stdout:

This allows Kafka Streams to use Kafka Connect without going through Kafka as an intermediary. In addition, there is no need for a cluster of Connect workers as the Kafka Streams layer is directly instantiating and managing the necessary Connect components. However, wherever there is a pipe (|) in the above pipeline, we still want Kafka to hold the intermediate results.

WordCount With Kafka Connect

So let’s continue to use an AdminClient that is backed by Kafka. However, if we want to use Kafka for intermediate results, we need to modify the APIs in ConnectClientSupplier. We will now need this class to return instances of Producer and Consumer that delegate to Kafka Connect for the stream input and output, but to Kafka for intermediate results that are produced within the stream.

public class ConnectClientSupplier implements KafkaClientSupplier {

    private DefaultKafkaClientSupplier defaultSupplier = 
        new DefaultKafkaClientSupplier();
    private String connectorName;
    private ConnectStreamsConfig connectStreamsConfig;
    private Map<String, TaskConfig> sourceTaskConfigs;
    private Map<String, TaskConfig> sinkTaskConfigs;

    public ConnectClientSupplier(
        String connectorName, ConnectStreamsConfig connectStreamsConfig,
        Map<String, TaskConfig> sourceTaskConfigs,
        Map<String, TaskConfig> sinkTaskConfigs) {

        this.connectorName = connectorName;
        this.connectStreamsConfig = connectStreamsConfig;
        this.sourceTaskConfigs = sourceTaskConfigs;
        this.sinkTaskConfigs = sinkTaskConfigs;
    }

    ...

    @Override
    public Producer<byte[], byte[]> getProducer(
        Map<String, Object> config) {

        ProducerConfig producerConfig = new ProducerConfig(
            ProducerConfig.addSerializerToConfig(config, 
                new ByteArraySerializer(), new ByteArraySerializer()));
        Map<String, ConnectSinkProducer> connectProducers =
            sinkTaskConfigs.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey,
                    e -> ConnectSinkProducer.create(
                        connectorName, connectStreamsConfig, 
                        e.getValue(), producerConfig)));

        // Return a Producer that delegates to Connect or Kafka
        return new WrappedProducer(
            connectProducers, defaultSupplier.getProducer(config));
    }

    @Override
    public Consumer<byte[], byte[]> getConsumer(
        Map<String, Object> config) {

        ConsumerConfig consumerConfig = new ConsumerConfig(
            ConsumerConfig.addDeserializerToConfig(config, 
                new ByteArrayDeserializer(), new ByteArrayDeserializer()));
        Map<String, ConnectSourceConsumer> connectConsumers =
            sourceTaskConfigs.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey,
                    e -> ConnectSourceConsumer.create(
                        connectorName, connectStreamsConfig, 
                        e.getValue(), consumerConfig)));

        // Return a Consumer that delegates to Connect and Kafka
        return new WrappedConsumer(
            connectConsumers, defaultSupplier.getConsumer(config));
    }
    
    ...
}

The WrappedProducer simply sends a record to either Kafka or the appropriate Connect SinkTask, depending on the topic or table name.

public class WrappedProducer implements Producer<byte[], byte[]> {
    private final Map<String, ConnectSinkProducer> connectProducers;
    private final Producer<byte[], byte[]> kafkaProducer;

    @Override
    public Future<RecordMetadata> send(
        ProducerRecord<byte[], byte[]> record, Callback callback) {
        String topic = record.topic();
        ConnectSinkProducer connectProducer = connectProducers.get(topic);
        if (connectProducer != null) {
            // Send to Connect
            return connectProducer.send(record, callback);
        } else {
            // Send to Kafka
            return kafkaProducer.send(record, callback);
        }
    }

    ...
}

The WrappedConsumer simply polls Kafka and all the Connect SourceTask instances and then combines their results.

public class WrappedConsumer implements Consumer<byte[], byte[]> {
    private final Map<String, ConnectSourceConsumer> connectConsumers;
    private final Consumer<byte[], byte[]> kafkaConsumer;

    @Override
    public ConsumerRecords<byte[], byte[]> poll(long timeout) {
        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records 
            = new HashMap<>();
        // Poll from Kafka
        poll(kafkaConsumer, timeout, records);
        for (ConnectSourceConsumer consumer : connectConsumers.values()) {
            // Poll from Connect
            poll(consumer, timeout, records);
        }
        return new ConsumerRecords<>(records);
    }

    private void poll(
        Consumer<byte[], byte[]> consumer, long timeout,
        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records) {
        try {
            ConsumerRecords<byte[], byte[]> rec = consumer.poll(timeout);
            for (TopicPartition tp : rec.partitions()) {
                records.put(tp, rec.records(tp);
            }
        } catch (Exception e) {
            log.error("Could not poll consumer", e);
        }
    }
    
    ...
}

Finally we can use groupBy to implement WordCount.

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple");
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-example-client");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    ...
    StreamsConfig streamsConfig = new StreamsConfig(props);

    Map<String, Object> config = new HashMap<>();
    ...
    ConnectStreamsConfig connectConfig = new ConnectStreamsConfig(config);

    // The JDBC source task configuration
    Map<String, Object> config = new HashMap<>();
    config.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, jdbcUrl);
    config.put(JdbcSourceTaskConfig.TABLES_CONFIG, inputTopic);
    config.put(TaskConfig.TASK_CLASS_CONFIG, JdbcSourceTask.class.getName());
    TaskConfig sourceTaskConfig = new TaskConfig(config);

    // The JDBC sink task configuration
    Map<String, Object> config = new HashMap<>();
    config.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, jdbcUrl);
    config.put(TaskConfig.TASK_CLASS_CONFIG, JdbcSinkTask.class.getName());
    TaskConfig sinkTaskConfig = new TaskConfig(config);

    StreamsBuilder builder = new StreamsBuilder();

    KStream<SchemaAndValue, SchemaAndValue> input = builder.stream(inputTopic);

    KStream<SchemaAndValue, String> words = input
        .flatMapValues(value -> {
            Struct lines = (Struct) value.value();
            String[] strs = lines.get("lines").toString()
                .toLowerCase().split("\\W+");
            List<String> result = new ArrayList<>();
            for (String str : strs) {
                if (str.length() > 0) {
                    result.add(str);
                }
            }
            return result;
        });

    KTable<String, Long> wordCounts = words
        .groupBy((key, word) -> word, 
            Serialized.with(Serdes.String(), Serdes.String()))
        .count();

    wordCounts.toStream().map(
        (key, value) -> {
            Schema schema = SchemaBuilder.struct().name("word")
                    .field("word", Schema.STRING_SCHEMA)
                    .field("count", Schema.INT64_SCHEMA).build();
            Struct struct = new Struct(schema)
                .put("word", key).put("count", value);
            return new KeyValue<>(SchemaAndValue.NULL, 
                new SchemaAndValue(schema, struct));
        }).to(outputTopic);

    streams = new KafkaStreams(builder.build(), streamsConfig,
            new ConnectClientSupplier("JDBC", connectConfig,
                    Collections.singletonMap(inputTopic, sourceTaskConfig),
                    Collections.singletonMap(outputTopic, sinkTaskConfig)));
    streams.start();

When we run this WordCount application, it will use the JDBC Connector for input and output, and Kafka for intermediate results, as expected.

Connect, Meet KSQL

Since KSQL is built on top of Kafka Streams, with the above classes we get integration between Kafka Connect and KSQL for free, thanks to the Kafka abstraction funnel.

For example, here is a KSQL program to retrieve word counts that are greater than 100.

    Map<String, Object> config = new HashMap<>();
    ...
    ConnectStreamsConfig connectConfig = new ConnectStreamsConfig(config);

    // The JDBC source task configuration
    Map<String, Object> config = new HashMap<>();
    config.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, jdbcUrl);
    config.put(JdbcSourceTaskConfig.TABLES_CONFIG, inputTopic);
    config.put(TaskConfig.TASK_CLASS_CONFIG, JdbcSourceTask.class.getName());
    TaskConfig sourceTaskConfig = new TaskConfig(config);

    KafkaClientSupplier clientSupplier = 
        new ConnectClientSupplier("JDBC", connectConfig,
                Collections.singletonMap(inputTopic, sourceTaskConfig),
                Collections.emptyMap());

    ksqlContext = KsqlContext.create(
        ksqlConfig, schemaRegistryClient, clientSupplier);

    ksqlContext.sql("CREATE STREAM WORD_COUNTS" 
        + " (ID int, WORD varchar, WORD_COUNT bigint)"
        + " WITH (kafka_topic='word_counts', value_format='AVRO', key='ID');";

    ksqlContext.sql("CREATE STREAM TOP_WORD_COUNTS AS "
        + " SELECT * FROM WORD_COUNTS WHERE WORD_COUNT > 100;";

When we run this example, it uses the JDBC Connector to read its input from a relational database. This is because we pass an instance of ConnectClientSupplier to the KsqlContext factory, in order to instruct the Kafka Streams layer underlying KSQL where to obtain the Producer and Consumer.4

Summary

With the above examples I’ve been able to demonstrate both

  1. the power of clean abstractions, especially as utilized in the Kafka abstraction funnel, and
  2. a promising method of integrating Kafka Connect with Kafka Streams and KSQL based on these abstractions.

Ironically, I have also shown that the Kafka abstraction funnel does not need to be tied to Kafka at all. It can be used with any system that provides implementations of the Producer-Consumer APIs, which reside at the bottom of the funnel. In this post I have shown how to plug in Kafka Connect at this level to achieve embedded Kafka Connect functionality within Kafka Streams. However, frameworks other than Kafka Connect could be used as well. In this light, Kafka Streams (as well as KSQL) can be viewed as a general stream processing platform in the same manner as Flink and Spark.

I hope you have enjoyed these excursions into some of the inner workings of Kafka Connect, Kafka Streams, and KSQL.

Embedding Kafka Connect in Kafka Streams + KSQL

The Hitchhiker’s Guide to Stream Processing

In my previous post I discussed some of the features of different stream processing platforms, but I did not touch on how one goes about choosing one.  In the book Kafka: The Definitive Guide, the authors provide some valuable advice on how to make a choice.  Below I present three additional tips for choosing a stream processing platform.

  1. Favor reuse over polyglot streaming.
  2. Try the Kafka abstraction funnel.
  3. Check out the Stream Processing Kickstarter project.

Favor Reuse Over Polyglot Streaming

One of the characteristics of many microservice architectures is the use of several different data stores, which is often referred to as polyglot persistence.  The idea is that each microservice should be responsible for its own data, which often leads to each microservice possibly introducing a different data store into the organization.   However, at my last company, Yammer, this led to an operational nightmare as we attempted to manage and develop expertise in a variety of data stores.  In the end, we decided to consolidate on one relational database (PostgreSQL) and one non-relational database (HBase).

The real underlying goal for data use in microservices is to eliminate coupling between business subdomains in an application, which can be better achieved by schema independence, rather than polyglot persistence.  Schema independence can easily be achieved by partitioning data properly within a single data store, using namespaces, schemas, or other techniques, and does not necessitate the use of polyglot persistence.

In a streaming architecture, one of the goals is to share data rather than to keep it siloed.  This is often referred to as turning the database inside-out, as achieved by many event-driven systems. This means that schemas need to be shared as well, which leads to the use of schema registries.  Consequently, it makes even less sense to use more than one streaming platform in an organization, which would lead to polyglot streaming.   Therefore, if your organization is already using a stream processing platform, perhaps in a different group from the one that you are in, try using that.

Try the Kafka Abstraction Funnel

Kafka has emerged as the foundation for stream processing in today’s enterprises.  Kafka provides a low-level Producer-Consumer API, but it also comes bundled with a stream processing framework called Kafka Streams, which has both a Processor API and a Streams DSL (that is built on top of the Processor API).  Additionally, Confluent (the company I work for) has just released KSQL, which is built on top of Kafka Streams.

These four layers (KSQL, the Kafka Streams DSL, the Kafka Streams Processor API, and the Producer-Consumer API) form a hierarchy of abstractions, which I refer to as the Kafka abstraction funnel.   In accordance with the 80-20 rule, the top of the funnel, KSQL, should be able to handle 80% of your use cases, and for the remaining 20%, you can drop down into the Kafka Streams DSL.  Likewise, the Kafka Streams DSL should be able to handle 80% of the use cases that KSQL cannot handle, and for the remaining 20%, you can drop down into the Kafka Streams Processor API.  Finally, at the bottom of the funnel is the Producer-Consumer API, which should be able to handle everything else.

The Kafka abstraction funnel is very easy to get started with, provides support for stream-relational processing, and simply builds on technologies already provided with Kafka.  Give it a try if you can.

I’ll have more to say about the power of the Kafka abstraction funnel in my next post.

Check Out the Stream Processing Kickstarter Project

Martin Kleppman has likened Kafka to Unix pipes, with the stream processor being the Unix program that reads from one pipe and writes to another.  However, if you’re using Kafka but you’ve not settled on the Kafka abstraction funnel, figuring out exactly which stream processor to use can be a bewildering task.

There are other posts that compare the different streaming platforms.  Often these comparisons show how to write a WordCount application, which has become the “Hello World” example of data processing.   However, most of the WordCount examples don’t actually show how to integrate with Kafka.  Therefore I’ve put together the Stream Processing Kickstarter that has the WordCount application written in each of the most popular streaming frameworks.  Furthermore, in each WordCount application, I show how to read textual data from a Kafka topic and then output the word counts to another Kafka topic.  Also, each example in the project can be easily executed against an embedded Kafka cluster by simply running a unit test.

Below I show the code for each WordCount application so you can get a sense what it is like to use each streaming platform.1

Beam

In Apache Beam, integration with Kafka is achieved by using a Kafka reader transform and a Kafka writer transform.

public class BeamWordCount implements WordCount {

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    KafkaIO.Read<String, String> reader = KafkaIO.<String, String>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(inputTopic)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .updateConsumerProperties(
            ImmutableMap.of(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));

    PCollection<String> input = p.apply(reader.withoutMetadata())
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(KV::getValue))
        .apply(ParDo.of(new AddTimestampFn()));

    PCollection<String> windowedWords = input.apply(
        Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes());

    PCollection<KV<String, Long>> wordCounts =
        windowedWords.apply(new CountWords());

    wordCounts.apply(KafkaIO.<String, Long>write()
        .withBootstrapServers(bootstrapServers)
        .withTopic(outputTopic)
        .withKeySerializer(StringSerializer.class)
        .withValueSerializer(LongSerializer.class));

    p.run().waitUntilFinish(Duration.standardSeconds(5));
  }

  @Override
  public void close() {
  }

  static class AddTimestampFn extends DoFn<String, String> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      c.outputWithTimestamp(c.element(), Instant.now());
    }
  }

  static class ExtractWordsFn extends DoFn<String, String> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      // Split the line into words.
      String[] words = c.element().toLowerCase().split("\\W+");

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          c.output(word);
        }
      }
    }
  }

  static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.perElement());

      return wordCounts;
    }
  }
}

Flink

The Flink example uses a Kafka source and a Kafka sink.

public class FlinkWordCount implements WordCount {

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) throws Exception {

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("group.id", "testgroup");
    props.put("auto.offset.reset", "earliest");

    // set up the execution environment
    final StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> input = env
        .addSource(new FlinkKafkaConsumer011<>(
            inputTopic,
            new SimpleStringSchema(),
            props
        ));

    DataStream<Tuple2<String, Long>> counts = input
        .map(line -> line.toLowerCase().split("\\W+"))
        .flatMap((String[] tokens, Collector<Tuple2<String, Long>> out) -> {
          // emit the pairs with non-zero-length words
          Arrays.stream(tokens)
              .filter(t -> t.length() > 0)
              .forEach(t -> out.collect(new Tuple2<>(t, 1L)));
        })
        .keyBy(0)
        .sum(1);

    counts.addSink(
        new FlinkKafkaProducer011<>(
            outputTopic,
            new WordCountSerializer(outputTopic),
            props
        ));

    // execute program
    env.execute("Streaming WordCount Example");
  }

  @Override
  public void close() {
  }

  static class WordCountSerializer implements
      KeyedSerializationSchema<Tuple2<String, Long>>,
      java.io.Serializable {

    private final String outputTopic;

    public WordCountSerializer(String outputTopic) {
      this.outputTopic = outputTopic;
    }

    public byte[] serializeKey(Tuple2<String, Long> element) {
      return new StringSerializer().serialize(null, element.getField(0));
    }

    public byte[] serializeValue(Tuple2<String, Long> element) {
      return new LongSerializer().serialize(null, element.getField(1));
    }

    public String getTargetTopic(Tuple2<String, Long> element) {
      return outputTopic;
    }
  }
}

Kafka Streams

Kafka Streams, being built directly atop Kafka, has the simplest integration with Kafka.  An input stream is constructed directly from a Kafka topic and then later the data in the stream is sent directly to another Kafka topic.

public class KafkaStreamsWordCount implements WordCount {

  private KafkaStreams streams;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-example-client");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(
        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName()
    );
    props.put(
        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName()
    );
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(
        StreamsConfig.STATE_DIR_CONFIG,
        Utils.tempDirectory().getAbsolutePath()
    );

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> input = builder.stream(inputTopic);

    KTable<String, Long> wordCounts = input
        .flatMapValues(
            value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .count();

    wordCounts.toStream().to(
        outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

    streams = new KafkaStreams(builder.build(), props);
    streams.start();
  }

  @Override
  public void close() {
    streams.close();
  }
}

Samza

Samza can also create streams directly from Kafka topics by using a Kafka system factory.

public class SamzaWordCount implements WordCount {

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    Map<String, String> configs = new HashMap<>();

    configs.put(JobConfig.JOB_NAME(), "word-count");
    configs.put(JobConfig.PROCESSOR_ID(), "1");
    configs.put(
        JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY,
        PassthroughCoordinationUtilsFactory.class.getName()
    );
    configs.put(
        JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
        PassthroughJobCoordinatorFactory.class.getName()
    );
    configs.put(
        TaskConfig.GROUPER_FACTORY(),
        SingleContainerGrouperFactory.class.getName()
    );
    configs.put(
        "systems.kafka.samza.factory",
        "org.apache.samza.system.kafka.KafkaSystemFactory"
    );
    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapServers);
    configs.put("systems.kafka.consumer.zookeeper.connect", zookeeperConnect);
    configs.put("systems.kafka.samza.offset.default", "oldest");
    configs.put("systems.kafka.default.stream.replication.factor", "1");
    configs.put("job.default.system", "kafka");

    LocalApplicationRunner runner =
        new LocalApplicationRunner(new MapConfig(configs));
    runner.run(new WordCountApplication(inputTopic, outputTopic));
    runner.waitForFinish();
  }

  @Override
  public void close() {
  }

  static class WordCountApplication implements StreamApplication {

    private String inputTopic;
    private String outputTopic;

    public WordCountApplication(String inputTopic, String outputTopic) {
      this.inputTopic = inputTopic;
      this.outputTopic = outputTopic;
    }

    @Override
    public void init(StreamGraph graph, Config config) {
      MessageStream<KV<String, String>> words =
          graph.getInputStream(
              inputTopic, KVSerde.of(new StringSerde(), new StringSerde()));
      OutputStream<KV<String, Long>> counts =
          graph.getOutputStream(
              outputTopic, KVSerde.of(new StringSerde(), new LongSerde()));

      words.flatMap(kv -> Arrays.asList(kv.value.toLowerCase().split("\\W+")))
          .window(Windows.keyedTumblingWindow(
              (String kv) -> kv, Duration.ofSeconds(1),
              () -> 0L, (m, prevCount) -> prevCount + 1,
              new StringSerde(), new LongSerde()
          )
              .setEarlyTrigger(Triggers.repeat(Triggers.count(1)))
              .setAccumulationMode(AccumulationMode.ACCUMULATING), "count")
          .map(windowPane -> {
            String word = windowPane.getKey().getKey();
            long count = windowPane.getMessage();
            return KV.of(word, count);
          })
          .sendTo(counts);
    }
  }
}

Spark

Spark structured streaming can create streams for Kafka by specifying a Kafka format.  Spark only supports writing strings or byte arrays to Kafka, so we use a Spark UDF to convert the count from a Long to a byte[].

public class SparkWordCount implements WordCount, java.io.Serializable {

  private StreamingQuery query;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    SparkSession spark = SparkSession
        .builder()
        .appName("JavaStructuredKafkaWordCount")
        .config("spark.master", "local[*]")
        .getOrCreate();

    spark.udf().register(
        "serializeLong", new SparkLongSerializer(), DataTypes.BinaryType);

    // Create DataSet representing the stream of input lines from kafka
    Dataset<String> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", inputTopic)
        .option("startingOffsets", "earliest")
        .load()
        .selectExpr("CAST(value as STRING)")
        .as(Encoders.STRING());

    // Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(
        (FlatMapFunction<String, String>) x ->
            Arrays.asList(x.toLowerCase().split("\\W+"))
                .iterator(),
        Encoders.STRING()
    ).groupBy("value").count();

    // Start running the query that outputs the running counts to Kafka
    query = wordCounts
        .selectExpr("CAST(value AS STRING) key",
            "CAST(serializeLong(count) AS BINARY) value")
        .writeStream()
        .outputMode("update")
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("topic", outputTopic)
        .option("checkpointLocation", Utils.tempDirectory().getAbsolutePath())
        .start();
  }

  @Override
  public void close() {
    query.stop();
  }

  static class SparkLongSerializer implements UDF1<Long, byte[]> {
    @Override
    public byte[] call(Long i) {
      return new LongSerializer().serialize(null, i.longValue());
    }
  }
}

Storm

In Storm we use a Kafka spout to read data from Kafka and then a Kafka bolt to write data.

public class StormWordCount implements WordCount {

  private LocalCluster cluster;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    KafkaSpoutConfig<String, String> config =
        KafkaSpoutConfig.builder(bootstrapServers, inputTopic)
            .setRecordTranslator((r) ->
                new Values(r.value()), new Fields("value"))
            .build();
    KafkaSpout<String, String> spout = new KafkaSpout<>(config);

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", spout);
    builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 4)
        .fieldsGrouping("split", new Fields("word"));

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("acks", "1");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", LongSerializer.class.getName());

    KafkaBolt<String, Long> bolt = new KafkaBolt<String, Long>()
        .withProducerProperties(props)
        .withTopicSelector(new DefaultTopicSelector(outputTopic))
        .withTupleToKafkaMapper(
            new FieldNameBasedTupleToKafkaMapper<>("word","count"));
    builder.setBolt("forwardToKafka", bolt, 8)
        .fieldsGrouping("count", new Fields("word"));

    Config conf = new Config();
    cluster = new LocalCluster();
    cluster.submitTopology("wordCount", conf, builder.createTopology());
  }

  @Override
  public void close() {
    cluster.shutdown();
  }

  static class SplitSentence extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String sentence = tuple.getString(0);

      for (String word : sentence.toLowerCase().split("\\W+")) {
        collector.emit(new Values(word, 1L));
      }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }

  static class WordCount extends BaseBasicBolt {
    Map<String, Long> counts = new HashMap<>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Long count = counts.get(word);
      if (count == null) {
        count = 0L;
      }
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }
}

Trident

Trident also uses a Kafka spout to read data and then a Kafka state factory to write data.

public class TridentWordCount implements WordCount {

  private LocalCluster cluster;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    KafkaSpoutConfig<String, String> config =
        KafkaSpoutConfig.builder(bootstrapServers, inputTopic).build();
    KafkaTridentSpoutOpaque<String, String> spout =
        new KafkaTridentSpoutOpaque<>(config);

    TridentTopology topology = new TridentTopology();
    Stream wordCounts = topology.newStream("spout1", spout)
        .each(new Fields("value"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(
            new MemoryMapState.Factory(), new Count(), new Fields("count"))
        .newValuesStream();

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("acks", "1");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", LongSerializer.class.getName());

    TridentKafkaStateFactory<String, Long> stateFactory =
        new TridentKafkaStateFactory<String, Long>()
            .withProducerProperties(props)
            .withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
            .withTridentTupleToKafkaMapper(
                new FieldNameBasedTupleToKafkaMapper<String, Long>(
                    "word","count"));
    wordCounts.partitionPersist(
        stateFactory,
        new Fields("word", "count"),
        new TridentKafkaStateUpdater(),
        new Fields()
    );

    Config conf = new Config();
    cluster = new LocalCluster();
    cluster.submitTopology("wordCount", conf, topology.build());
  }

  @Override
  public void close() {
    cluster.shutdown();
  }

  static class Split extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
      String sentence = tuple.getString(0);
      for (String word : sentence.toLowerCase().split("\\W+")) {
        collector.emit(new Values(word));
      }
    }
  }
}
Image