The Hitchhiker’s Guide to Stream Processing

In my previous post I discussed some of the features of different stream processing platforms, but I did not touch on how one goes about choosing one.  In the book Kafka: The Definitive Guide, the authors provide some valuable advice on how to make a choice.  Below I present three additional tips for choosing a stream processing platform.

  1. Favor reuse over polyglot streaming.
  2. Try the Kafka abstraction funnel.
  3. Check out the Stream Processing Kickstarter project.

Favor Reuse Over Polyglot Streaming

One of the characteristics of many microservice architectures is the use of several different data stores, which is often referred to as polyglot persistence.  The idea is that each microservice should be responsible for its own data, which often leads to each microservice possibly introducing a different data store into the organization.   However, at my last company, Yammer, this led to an operational nightmare as we attempted to manage and develop expertise in a variety of data stores.  In the end, we decided to consolidate on one relational database (PostgreSQL) and one non-relational database (HBase).

The real underlying goal for data use in microservices is to eliminate coupling between business subdomains in an application, which can be better achieved by schema independence, rather than polyglot persistence.  Schema independence can easily be achieved by partitioning data properly within a single data store, using namespaces, schemas, or other techniques, and does not necessitate the use of polyglot persistence.

In a streaming architecture, one of the goals is to share data rather than to keep it siloed.  This is often referred to as turning the database inside-out, as achieved by many event-driven systems. This means that schemas need to be shared as well, which leads to the use of schema registries.  Consequently, it makes even less sense to use more than one streaming platform in an organization, which would lead to polyglot streaming.   Therefore, if your organization is already using a stream processing platform, perhaps in a different group from the one that you are in, try using that.

Try the Kafka Abstraction Funnel

Kafka has emerged as the foundation for stream processing in today’s enterprises.  Kafka provides a low-level Producer-Consumer API, but it also comes bundled with a stream processing framework called Kafka Streams, which has both a Processor API and a Streams DSL (that is built on top of the Processor API).  Additionally, Confluent (the company I work for) has just released KSQL, which is built on top of Kafka Streams.

These four layers (KSQL, the Kafka Streams DSL, the Kafka Streams Processor API, and the Producer-Consumer API) form a hierarchy of abstractions, which I refer to as the Kafka abstraction funnel.   In accordance with the 80-20 rule, the top of the funnel, KSQL, should be able to handle 80% of your use cases, and for the remaining 20%, you can drop down into the Kafka Streams DSL.  Likewise, the Kafka Streams DSL should be able to handle 80% of the use cases that KSQL cannot handle, and for the remaining 20%, you can drop down into the Kafka Streams Processor API.  Finally, at the bottom of the funnel is the Producer-Consumer API, which should be able to handle everything else.

The Kafka abstraction funnel is very easy to get started with, provides support for stream-relational processing, and simply builds on technologies already provided with Kafka.  Give it a try if you can.

I’ll have more to say about the power of the Kafka abstraction funnel in my next post.

Check Out the Stream Processing Kickstarter Project

Martin Kleppman has likened Kafka to Unix pipes, with the stream processor being the Unix program that reads from one pipe and writes to another.  However, if you’re using Kafka but you’ve not settled on the Kafka abstraction funnel, figuring out exactly which stream processor to use can be a bewildering task.

There are other posts that compare the different streaming platforms.  Often these comparisons show how to write a WordCount application, which has become the “Hello World” example of data processing.   However, most of the WordCount examples don’t actually show how to integrate with Kafka.  Therefore I’ve put together the Stream Processing Kickstarter that has the WordCount application written in each of the most popular streaming frameworks.  Furthermore, in each WordCount application, I show how to read textual data from a Kafka topic and then output the word counts to another Kafka topic.  Also, each example in the project can be easily executed against an embedded Kafka cluster by simply running a unit test.

Below I show the code for each WordCount application so you can get a sense what it is like to use each streaming platform.1

Beam

In Apache Beam, integration with Kafka is achieved by using a Kafka reader transform and a Kafka writer transform.

public class BeamWordCount implements WordCount {

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    KafkaIO.Read<String, String> reader = KafkaIO.<String, String>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(inputTopic)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .updateConsumerProperties(
            ImmutableMap.of(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));

    PCollection<String> input = p.apply(reader.withoutMetadata())
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(KV::getValue))
        .apply(ParDo.of(new AddTimestampFn()));

    PCollection<String> windowedWords = input.apply(
        Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes());

    PCollection<KV<String, Long>> wordCounts =
        windowedWords.apply(new CountWords());

    wordCounts.apply(KafkaIO.<String, Long>write()
        .withBootstrapServers(bootstrapServers)
        .withTopic(outputTopic)
        .withKeySerializer(StringSerializer.class)
        .withValueSerializer(LongSerializer.class));

    p.run().waitUntilFinish(Duration.standardSeconds(5));
  }

  @Override
  public void close() {
  }

  static class AddTimestampFn extends DoFn<String, String> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      c.outputWithTimestamp(c.element(), Instant.now());
    }
  }

  static class ExtractWordsFn extends DoFn<String, String> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      // Split the line into words.
      String[] words = c.element().toLowerCase().split("\\W+");

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          c.output(word);
        }
      }
    }
  }

  static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.perElement());

      return wordCounts;
    }
  }
}

Flink

The Flink example uses a Kafka source and a Kafka sink.

public class FlinkWordCount implements WordCount {

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) throws Exception {

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("group.id", "testgroup");
    props.put("auto.offset.reset", "earliest");

    // set up the execution environment
    final StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> input = env
        .addSource(new FlinkKafkaConsumer011<>(
            inputTopic,
            new SimpleStringSchema(),
            props
        ));

    DataStream<Tuple2<String, Long>> counts = input
        .map(line -> line.toLowerCase().split("\\W+"))
        .flatMap((String[] tokens, Collector<Tuple2<String, Long>> out) -> {
          // emit the pairs with non-zero-length words
          Arrays.stream(tokens)
              .filter(t -> t.length() > 0)
              .forEach(t -> out.collect(new Tuple2<>(t, 1L)));
        })
        .keyBy(0)
        .sum(1);

    counts.addSink(
        new FlinkKafkaProducer011<>(
            outputTopic,
            new WordCountSerializer(outputTopic),
            props
        ));

    // execute program
    env.execute("Streaming WordCount Example");
  }

  @Override
  public void close() {
  }

  static class WordCountSerializer implements
      KeyedSerializationSchema<Tuple2<String, Long>>,
      java.io.Serializable {

    private final String outputTopic;

    public WordCountSerializer(String outputTopic) {
      this.outputTopic = outputTopic;
    }

    public byte[] serializeKey(Tuple2<String, Long> element) {
      return new StringSerializer().serialize(null, element.getField(0));
    }

    public byte[] serializeValue(Tuple2<String, Long> element) {
      return new LongSerializer().serialize(null, element.getField(1));
    }

    public String getTargetTopic(Tuple2<String, Long> element) {
      return outputTopic;
    }
  }
}

Kafka Streams

Kafka Streams, being built directly atop Kafka, has the simplest integration with Kafka.  An input stream is constructed directly from a Kafka topic and then later the data in the stream is sent directly to another Kafka topic.

public class KafkaStreamsWordCount implements WordCount {

  private KafkaStreams streams;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-example-client");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(
        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName()
    );
    props.put(
        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName()
    );
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(
        StreamsConfig.STATE_DIR_CONFIG,
        Utils.tempDirectory().getAbsolutePath()
    );

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> input = builder.stream(inputTopic);

    KTable<String, Long> wordCounts = input
        .flatMapValues(
            value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .count();

    wordCounts.toStream().to(
        outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

    streams = new KafkaStreams(builder.build(), props);
    streams.start();
  }

  @Override
  public void close() {
    streams.close();
  }
}

Samza

Samza can also create streams directly from Kafka topics by using a Kafka system factory.

public class SamzaWordCount implements WordCount {

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    Map<String, String> configs = new HashMap<>();

    configs.put(JobConfig.JOB_NAME(), "word-count");
    configs.put(JobConfig.PROCESSOR_ID(), "1");
    configs.put(
        JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY,
        PassthroughCoordinationUtilsFactory.class.getName()
    );
    configs.put(
        JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
        PassthroughJobCoordinatorFactory.class.getName()
    );
    configs.put(
        TaskConfig.GROUPER_FACTORY(),
        SingleContainerGrouperFactory.class.getName()
    );
    configs.put(
        "systems.kafka.samza.factory",
        "org.apache.samza.system.kafka.KafkaSystemFactory"
    );
    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapServers);
    configs.put("systems.kafka.consumer.zookeeper.connect", zookeeperConnect);
    configs.put("systems.kafka.samza.offset.default", "oldest");
    configs.put("systems.kafka.default.stream.replication.factor", "1");
    configs.put("job.default.system", "kafka");

    LocalApplicationRunner runner =
        new LocalApplicationRunner(new MapConfig(configs));
    runner.run(new WordCountApplication(inputTopic, outputTopic));
    runner.waitForFinish();
  }

  @Override
  public void close() {
  }

  static class WordCountApplication implements StreamApplication {

    private String inputTopic;
    private String outputTopic;

    public WordCountApplication(String inputTopic, String outputTopic) {
      this.inputTopic = inputTopic;
      this.outputTopic = outputTopic;
    }

    @Override
    public void init(StreamGraph graph, Config config) {
      MessageStream<KV<String, String>> words =
          graph.getInputStream(
              inputTopic, KVSerde.of(new StringSerde(), new StringSerde()));
      OutputStream<KV<String, Long>> counts =
          graph.getOutputStream(
              outputTopic, KVSerde.of(new StringSerde(), new LongSerde()));

      words.flatMap(kv -> Arrays.asList(kv.value.toLowerCase().split("\\W+")))
          .window(Windows.keyedTumblingWindow(
              (String kv) -> kv, Duration.ofSeconds(1),
              () -> 0L, (m, prevCount) -> prevCount + 1,
              new StringSerde(), new LongSerde()
          )
              .setEarlyTrigger(Triggers.repeat(Triggers.count(1)))
              .setAccumulationMode(AccumulationMode.ACCUMULATING), "count")
          .map(windowPane -> {
            String word = windowPane.getKey().getKey();
            long count = windowPane.getMessage();
            return KV.of(word, count);
          })
          .sendTo(counts);
    }
  }
}

Spark

Spark structured streaming can create streams for Kafka by specifying a Kafka format.  Spark only supports writing strings or byte arrays to Kafka, so we use a Spark UDF to convert the count from a Long to a byte[].

public class SparkWordCount implements WordCount, java.io.Serializable {

  private StreamingQuery query;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    SparkSession spark = SparkSession
        .builder()
        .appName("JavaStructuredKafkaWordCount")
        .config("spark.master", "local[*]")
        .getOrCreate();

    spark.udf().register(
        "serializeLong", new SparkLongSerializer(), DataTypes.BinaryType);

    // Create DataSet representing the stream of input lines from kafka
    Dataset<String> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", inputTopic)
        .option("startingOffsets", "earliest")
        .load()
        .selectExpr("CAST(value as STRING)")
        .as(Encoders.STRING());

    // Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(
        (FlatMapFunction<String, String>) x ->
            Arrays.asList(x.toLowerCase().split("\\W+"))
                .iterator(),
        Encoders.STRING()
    ).groupBy("value").count();

    // Start running the query that outputs the running counts to Kafka
    query = wordCounts
        .selectExpr("CAST(value AS STRING) key",
            "CAST(serializeLong(count) AS BINARY) value")
        .writeStream()
        .outputMode("update")
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("topic", outputTopic)
        .option("checkpointLocation", Utils.tempDirectory().getAbsolutePath())
        .start();
  }

  @Override
  public void close() {
    query.stop();
  }

  static class SparkLongSerializer implements UDF1<Long, byte[]> {
    @Override
    public byte[] call(Long i) {
      return new LongSerializer().serialize(null, i.longValue());
    }
  }
}

Storm

In Storm we use a Kafka spout to read data from Kafka and then a Kafka bolt to write data.

public class StormWordCount implements WordCount {

  private LocalCluster cluster;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    KafkaSpoutConfig<String, String> config =
        KafkaSpoutConfig.builder(bootstrapServers, inputTopic)
            .setRecordTranslator((r) ->
                new Values(r.value()), new Fields("value"))
            .build();
    KafkaSpout<String, String> spout = new KafkaSpout<>(config);

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", spout);
    builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 4)
        .fieldsGrouping("split", new Fields("word"));

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("acks", "1");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", LongSerializer.class.getName());

    KafkaBolt<String, Long> bolt = new KafkaBolt<String, Long>()
        .withProducerProperties(props)
        .withTopicSelector(new DefaultTopicSelector(outputTopic))
        .withTupleToKafkaMapper(
            new FieldNameBasedTupleToKafkaMapper<>("word","count"));
    builder.setBolt("forwardToKafka", bolt, 8)
        .fieldsGrouping("count", new Fields("word"));

    Config conf = new Config();
    cluster = new LocalCluster();
    cluster.submitTopology("wordCount", conf, builder.createTopology());
  }

  @Override
  public void close() {
    cluster.shutdown();
  }

  static class SplitSentence extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String sentence = tuple.getString(0);

      for (String word : sentence.toLowerCase().split("\\W+")) {
        collector.emit(new Values(word, 1L));
      }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }

  static class WordCount extends BaseBasicBolt {
    Map<String, Long> counts = new HashMap<>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Long count = counts.get(word);
      if (count == null) {
        count = 0L;
      }
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }
}

Trident

Trident also uses a Kafka spout to read data and then a Kafka state factory to write data.

public class TridentWordCount implements WordCount {

  private LocalCluster cluster;

  @Override
  public void countWords(
      String bootstrapServers, String zookeeperConnect,
      String inputTopic, String outputTopic) {

    KafkaSpoutConfig<String, String> config =
        KafkaSpoutConfig.builder(bootstrapServers, inputTopic).build();
    KafkaTridentSpoutOpaque<String, String> spout =
        new KafkaTridentSpoutOpaque<>(config);

    TridentTopology topology = new TridentTopology();
    Stream wordCounts = topology.newStream("spout1", spout)
        .each(new Fields("value"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(
            new MemoryMapState.Factory(), new Count(), new Fields("count"))
        .newValuesStream();

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("acks", "1");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", LongSerializer.class.getName());

    TridentKafkaStateFactory<String, Long> stateFactory =
        new TridentKafkaStateFactory<String, Long>()
            .withProducerProperties(props)
            .withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
            .withTridentTupleToKafkaMapper(
                new FieldNameBasedTupleToKafkaMapper<String, Long>(
                    "word","count"));
    wordCounts.partitionPersist(
        stateFactory,
        new Fields("word", "count"),
        new TridentKafkaStateUpdater(),
        new Fields()
    );

    Config conf = new Config();
    cluster = new LocalCluster();
    cluster.submitTopology("wordCount", conf, topology.build());
  }

  @Override
  public void close() {
    cluster.shutdown();
  }

  static class Split extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
      String sentence = tuple.getString(0);
      for (String word : sentence.toLowerCase().split("\\W+")) {
        collector.emit(new Values(word));
      }
    }
  }
}
Image

Stream-Relational Processing Platforms

In past posts I had discussed the differences between Spark and Flink in terms of stream processing, with Spark treating streaming data as micro-batches, and Flink treating streaming data as a first-class citizen.  Apache Beam is another streaming framework that has an interesting goal of generalizing other stream processing platforms.

Recently I joined Confluent, so I was interested in learning how Beam differs from Kafka Streams.  Kafka Streams is a stream processing framework on top of Kafka that has both a functional DSL as well as a declarative SQL layer called KSQL.  Confluent has an informative blog that compares Kafka Streams with Beam in great technical detail.  From what I’ve gathered, one way to state the differences between the two systems is as follows:

  • Kafka Streams is a stream-relational processing platform.
  • Apache Beam is a stream-only processing platform.

A stream-relational processing platform has the following capabilities which are typically missing in a stream-only processing platform:

  • Relations (or tables) are first-class citizens, i.e. each has an independent identity.
  • Relations can be transformed into other relations.
  • Relations can be queried in an ad-hoc manner.

For example, in Beam one can aggregate state using windowing, but this state cannot be queried in an ad-hoc manner.  It can only be emitted back into the stream through the use of triggers.  This is why Beam can be viewed as a stream-only system.

The capability to transform one relation into another is what gives relational algebra (and SQL) its power.  Operations on relations such as selection, projection, and Cartesian product result in more relations, which can then be used for further operations.  This is referred to as the closure property of relational algebra.

Ad-hoc querying allows the user to inject himself as a “sink” at various points in the stream.  At these junctures, he can construct interactive queries to extract the information that he is really interested in.  The user’s temporal presence becomes intertwined with the stream.  This essentially gives the stream a more dynamic nature, something that can help the larger organization be more dynamic as well.

Jay Kreps has written a lot about stream-table (or stream-relation) duality, where streams and tables (or relations) can be considered as two ways of looking at the same data.  This duality has also been explored in research projects like CQL, which was one of the first systems to provide support for both streams and relations1.   In the CQL paper, the authors discuss how both stream-relational systems and stream-only systems are equally expressive.  However, the authors also point out the ease-of-use of stream-relational systems over stream-only systems:

…the dual approach results in more intuitive queries than the stream-only approach.2

Modern day stream processing platforms attempt to unify the world of batch and stream processing.  Batches are essentially treated as bounded streams.  However, stream-relational processing platforms go beyond stream-only processing platforms by not only unifying batch and stream processing, but also unifying the world of streams and relations.  This means that streams can be converted to relations and vice versa within the data flow of the framework itself (and not just at the endpoints of the flow).

Beam is meant to generalize stream-only systems.  Streaming engines can plug into Beam by implementing a Beam Runner.  In order to understand both Kafka Streams and Beam, I spent a couple of weekends writing a Beam Runner for Kafka Streams3.  It provides a proof-of-concept of how the Beam Dataflow model can be implemented on top of Kafka Streams.  However, it doesn’t make use of the ability of Kafka Streams to represent relations, nor does it use the native windowing functionality of Kafka Streams, so it is not as useful to someone who might want the full power of stream-relational functionality4.

I would like to dwell a bit more on stream-table duality as I feel it is a fundamental concept in computer science, not just in databases.  As Jay Kreps has mentioned, stream-table duality has been explored extensively by CQL (2003).  However, a similar duality was known by the functional programming community even earlier.  Here is an interesting reference from over 30 years ago:

“Now we have seen that streams provide an alternative way to model objects with local state.  We can model a changing quantity, such as the local state of some object, using a stream that represents the time history of successive states.  In essence, we represent time explicitly, using streams, so that we decouple time in our simulated world from the sequence of events that take place during evaluation.”5

The above quote is from Structure and Interpretation of Computer Programs, a classic text in computer science.  From a programming perspective, modeling objects with streams leads to a functional programming view of time, whereas modeling objects with state leads to an imperative programming view of time.  So perhaps stream-table duality can also be called stream-state duality.  Streams and state are alternative ways of modeling the temporality of objects in the external world.  This is one of the key ideas behind event sourcing.

If one wants to get even more abstract, analogous concepts exist in philosophy, specifically within the metaphysics of time.  For example,

“…eternalism and presentism are conflicting views about the nature of time.  The eternalist claims that all times exist and thus that the objects present at all past, present, and future times exist.  In contrast, the presentist argues that only the present exists, and thus that only those objects present now exist.  As a result the eternalist, but not the presentist, can sincerely quantify over all times and objects existing at those times.”6

So a Beam proponent would be close to an eternalist, while a MySQL advocate would be close to a presentist.  With Kafka Streams, you get to be both!

Now that I’ve been able to encompass all of temporal reality with this blog, let me presently return to the main point.  Kafka Streams, as a stream-relational processing platform, provides first-class support for relations, relation transformations, and ad-hoc querying, which are all missing in a generalized stream-only framework like Beam7 .  And as the world of relational databases has shown (for almost 50 years since Codd’s original paper), the use of relations, along with being able to transform and query them in an interactive fashion, can provide a critically important tool to enterprises that wish to be more responsive amidst changing business needs and circumstances.

 

 

 

 

 

Stream-Relational Processing Platforms

Graph Analytics on HBase with HGraphDB and Apache Flink Gelly

Previously, I’ve shown how both Apache Giraph and Apache Spark GraphFrames can be used to analyze graphs stored in HGraphDB.  In this blog I will show how yet another graph analytics framework, Apache Flink Gelly, can be used with HGraphDB.

First, some observations on how Giraph, GraphFrames, and Gelly differ.  Giraph runs on Hadoop MapReduce, while GraphFrames and Gelly run on Spark and Flink, respectively. MapReduce has been pivotal in launching the era of big data.  Two of its characteristics are the following:

  1. MapReduce has only 3 steps (the map, shuffle, and reduce steps)
  2. MapReduce processes data in batches

The fact that MapReduce utilizes only 3 steps has led to the development of workflow engines like Apache Oozie that can combine MapReduce jobs into more complex flows, represented by directed acyclic graphs.  Also, the fact that MapReduce performs only batch processing has led to the development of stream processing frameworks like Apache Storm, which is often combined with Hadoop MapReduce in what is referred to as a lambda architecture.

Later, dataflow engines such as Apache Spark and Apache Flink were developed to handle data processing as a single job, rather than several independent MapReduce jobs that need to be chained together.  However, while Spark is fundamentally a batch-oriented framework, Flink is fundamentally a stream-oriented framework.   Both try to unify batch and stream processing in different ways.  Spark provides stream processing by breaking data into micro-batches.  Flink posits that batch is a special case of streaming, and that stream-processing engines can handle batches better than batch-processing engines can handle streams.

Needless to say, users who have the requirement to process big data, including large graphs, have a plethora of unique and interesting options at their disposal today.

To use Apache Flink Gelly with HGraphDB, graph data first needs to be wrapped in Flink DataSets.  HGraphDB provides two classes, HBaseVertexInputFormat and HBaseEdgeInputFormat, than can be used to import the vertices and edges of a graph into DataSets.

As a demonstration, we can run one of the Gelly neighborhood examples on HGraphDB as follows.  First we create the graph in the example:

Vertex v1 = graph.addVertex(T.id, 1L);
Vertex v2 = graph.addVertex(T.id, 2L);
Vertex v3 = graph.addVertex(T.id, 3L);
Vertex v4 = graph.addVertex(T.id, 4L);
Vertex v5 = graph.addVertex(T.id, 5L);
v1.addEdge("e", v2, "weight", 0.1);
v1.addEdge("e", v3, "weight", 0.5);
v1.addEdge("e", v4, "weight", 0.4);
v2.addEdge("e", v4, "weight", 0.7);
v2.addEdge("e", v5, "weight", 0.3);
v3.addEdge("e", v4, "weight", 0.2);
v4.addEdge("e", v5, "weight", 0.9);

A vertex in Gelly consists of an ID and a value, whereas an edge in Gelly consists of the source vertex ID, the target vertex ID, and an optional value.  When using HBaseVertexInputFormat and HBaseEdgeInputFormat, the name of a property can be specified for the property value in the HGraphDB vertex or edge to be associated with the Gelly vertex or edge.  If no property name is specified, then the value will default to the ID of the vertex or edge.  Below we import the vertices using an instance of HBaseVertexInputFormat with no property name specified, and we import the edges using an instance of HBaseEdgeInputFormat with the property name specified as “weight”.

HBaseGraphConfiguration conf = graph.configuration();
ExecutionEnvironment env = 
    ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> vertices = env.createInput(
    new HBaseVertexInputFormat<>(conf),
    TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
);
DataSet<Tuple3<Long, Long, Double>> edges = env.createInput(
    new HBaseEdgeInputFormat<>(conf, "weight"),
    TypeInformation.of(new TypeHint<Tuple3<Long, Long, Double>>() {})
);

Once we have the two DataSets, we can create a Gelly graph as follows:

Graph<Long, Long, Double> gelly = 
    Graph.fromTupleDataSet(vertices, edges, env);

Finally, running the neighborhood processing example is exactly the same as in the documentation:

DataSet<Tuple2<Long, Double>> minWeights = 
    gelly.reduceOnEdges(new SelectMinWeight(), EdgeDirection.OUT);

// user-defined function to select the minimum weight
static final class SelectMinWeight implements ReduceEdgesFunction {
    @Override
    public Double reduceEdges(
        Double firstEdgeValue, Double secondEdgeValue) {
        return Math.min(firstEdgeValue, secondEdgeValue);
    }
}

HGraphDB brings together several big data technologies in the Apache ecosystem in order to process large graphs. Graph data can be stored in Apache HBase, OLTP graph operations can be performed using Apache TinkerPop, and complex graph analytics can be performed using Apache Giraph, Apache Spark GraphFrames, or Apache Flink Gelly.

Graph Analytics on HBase with HGraphDB and Apache Flink Gelly

HBase Application Archetypes Redux

At Yammer, we’ve transitioned away from polyglot persistence to persistence consolidation. In a microservice architecture, the principle that each microservice should be responsible for its own data had led to a proliferation of different types of data stores at Yammer. This in turn led to multiple efforts to make sure that each data store could be easily used, monitored, operationalized, and maintained. In the end, we decided it would be more efficient, both architecturally and organizationally, to reduce the number of data store types in use at Yammer to as few as possible.

Today HBase is the primary data store for non-relational data at Yammer (we use PostgreSQL for relational data).  Microservices are still responsible for their own data, but the data is segregated by cluster boundaries or mechanisms within the data store itself (such as HBase namespaces or PostgreSQL schemas).

HBase was chosen for a number of reasons, including its performance, scalability, reliability, its support for strong consistency, and its ability to support a wide variety of data models.  At Yammer we have a number of services that rely on HBase for persistence in production:

  • Feedie, a feeds service
  • RoyalMail, an inbox service
  • Ocular, for tracking messages that a user has viewed
  • Streamie, for storing activity streams
  • Prankie, a ranking service with time-based decay
  • Authlog, for authorization audit trails
  • Spammie, for spam monitoring and blocking
  • Graphene, a generic graph modeling service

HBase is able to satisfy the persistence needs of several very different domains. Of course, there are some use cases for which HBase is not recommended, for example, when using raw HDFS would be more efficient, or when ad-hoc querying via SQL is preferred (although projects like Apache Phoenix can provide SQL on top of HBase).

Previously, Lars George and Jonathan Hsieh from Cloudera attempted to survey the most commonly occurring use cases for HBase, which they referred to as application archetypes.  In their presentation, they categorized archetypes as either “good”, “bad”, or “maybe” when used with HBase. Below I present an augmented listing of their “good” archetypes, along with pointers to projects that implement them.

Entity

The Entity archetype is the most natural of the archetypes.  HBase, being a wide column store, can represent the entity properties with individual columns.  Projects like Apache Gora and HEntityDB support this archetype.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<entity ID>  <property 1 value> <property 2 value>

Entities can be also stored in the same manner as with a key-value store.  In this case the entity would be serialized as a binary or JSON value in a single column.

Column Family: default
Row Key Column: body
<entity ID>  <entity blob>

SORTED COLLECTION

The Sorted Collection archetype is a generalization of the original Messaging archetype that was presented.  In this archetype the entities are stored as binary or JSON values, with the column qualifier being the value of the sort key to use.  For example, in a messaging feed, the column qualifier would be a timestamp or a monotonically increasing counter of some sort.  The column qualifier can also be “inverted” (such as by subtracting a numeric ID from the maximum possible value) so that entities are stored in descending order.

Column Family: default
Row Key Column: <sort key 1 value> Column: <sort key 2 value>
<collection ID>  <entity 1 blob> <entity 2 blob>

Alternatively, each entity can be stored as a set of properties.  This is similar to how Cassandra implements CQL.  HEntityDB supports storing entity collections in this manner.

Column Family: default
Row Key Column: <sort key 1 value + property 1 name> Column: <sort key 1 value + property 2 name> Column: <sort key 2 value + property 1 name> Column: <sort key 2 value + property 2 name>
<collection ID> <property 1 of entity 1> <property 2 of entity 1> <property 1 of entity 2> <property 2 of entity 2>

In order to access entities by some other value than the sort key, additional column families representing indices can be used.

Column Family: sorted Column Family: index
Row Key Column: <sort key 1 value> Column: <sort key 2 value> Column: <index 1 value> Column: <index 2 value>
<collection ID>  <entity 1 blob> <entity 2 blob> <entity 1 blob> <entity 2 blob>

To prevent the collection from growing unbounded, a coprocessor can be used to trim the sorted collection during compactions.  If index column families are used, the coprocessor would also remove corresponding entries from the index column families when trimming the sorted collection.  At Yammer, both the Feedie and RoyalMail services use this technique.  Both services also use server-side filters for efficient pagination of the sorted collection during queries.

DOCUMENT

Using a technique called key-flattening, a document can be shredded by storing each value in the document according to the path from the root to the name of the element containing the value.  HDocDB uses this approach.

Column Family: default
Row Key Column: <property 1 path> Column: <property 2 path>
<document ID>  <property 1 value> <property 2 value>

The document can also be stored as a binary value, in which case support for Medium Objects (MOBs) can be used if the documents are large.  This approach is described in the book Architecting HBase Applications.

Column Family: default
Row Key Column: body
<document ID>  <reference to MOB>

GRAPH

There are many ways to store a graph in HBase.  One method is to use an adjacency list, where each vertex stores its neighbors in the same row.  This is the approach taken in JanusGraph.

Column Family: default
Row Key Column: <edge 1 key> Column: <edge 2 key> Column: <property 1 name> Column: <property 2 name>
<vertex ID>  <edge 1 properties> <edge 2 properties> <property 1 value> <property 2 value>

In the table above, the edge key is actually comprised of a number of parts, including the label, direction, edge ID, and adjacent vertex ID.

Alternatively, a separate table to represent edges can be used, in which case the incident vertices are stored in the same row as an edge.   This may scale better if the adjacency list is large, such as in a social network.  This is the approach taken in both Zen and HGraphDB.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<vertex ID> <property 1 value> <property 2 value>
Column Family: default
Row Key Column: fromVertex Column: toVertex Column: <property 1 name> Column: <property 2 name>
<edge ID>  <vertex ID> <vertex ID> <property 1 value> <property 2 value>

When storing edges in a separate table, additional index tables must be used to provide efficient access to the incident edges of a vertex.  For example, the full list of tables in HGraphDB can be viewed here.

QUEUE

A queue can be modeled by using a row key comprised of the consumer ID and a counter.  Both Cask and Box implement queues in this manner.

Column Family: default
Row Key Column: metadata Column: body
<consumer ID + counter>  <message metadata> <message body>

Cask also uses coprocessors for efficient scan filtering and queue trimming, and Apache Tephra for transactional queue processing.

METRICS

The Metrics archetype is a variant of the Entity archetype in which the column values are counters or some other aggregate.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<entity ID>  <property 1 counter> <property 2 counter>

HGraphDB is actually a combination of the Graph and Metrics archetypes, as arbitrary counters can be stored on either vertices or edges.

Update: For other projects that use HBase, see this list.

HBase Application Archetypes Redux

Graph Analytics on HBase with HGraphDB and Spark GraphFrames

In a previous post, I showed how to analyze graphs stored in HGraphDB using Apache Giraph.  Giraph depends on Hadoop, and some developers may be using Spark instead.  In this blog I will show how to analyze HGraphDB graphs using Apache Spark GraphFrames.

In order to prepare data stored in HGraphDB for GraphFrames, we need to import vertex and edge data from HGraphDB into Spark DataFrames.  Hortonworks provides a Spark-on-HBase Connector to do just that.  The Spark-on-HBase Connector allows for custom serde (serializer/deserializer) types to be created by implementing the SHCDataType trait.  The serde for HGraphDB is available here.  (When testing the serde, I ran into some issues with the Spark-on-HBase Connector for which I have submitted pull requests.  Hopefully those will be merged soon.  In the meantime, you can use my fork of the Spark-on-HBase Connector.  Update:  Thanks to HortonWorks, these have been merged.)

To demonstrate how to use HGraphDB with GraphFrames, we first use HGraphDB to create the same graph example that is used in the GraphFrames User Guide.

Vertex a = graph.addVertex(T.id, "a", "name", "Alice", "age", 34);
Vertex b = graph.addVertex(T.id, "b", "name", "Bob", "age", 36);
Vertex c = graph.addVertex(T.id, "c", "name", "Charlie", "age", 30);
Vertex d = graph.addVertex(T.id, "d", "name", "David", "age", 29);
Vertex e = graph.addVertex(T.id, "e", "name", "Esther", "age", 32);
Vertex f = graph.addVertex(T.id, "f", "name", "Fanny", "age", 36);
Vertex g = graph.addVertex(T.id, "g", "name", "Gabby", "age", 60);
a.addEdge("friend", b);
b.addEdge("follow", c);
c.addEdge("follow", b);
f.addEdge("follow", c);
e.addEdge("follow", f);
e.addEdge("friend", d);
d.addEdge("friend", a);
a.addEdge("friend", e);

Now that the graph is stored in HGraphDB, we need to specify a schema to be used by the Spark-on-HBase Connector for retrieving vertex and edge data.

def vertexCatalog = s"""{
    |"table":{"namespace":"testGraph", "name":"vertices",
    |  "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"string"},
      |"name":{"cf":"f", "col":"name", "type":"string"},
      |"age":{"cf":"f", "col":"age", "type":"int"}
    |}
  |}""".stripMargin

def edgeCatalog = s"""{
    |"table":{"namespace":"testGraph", "name":"edges",
    |  "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"string"},
      |"relationship":{"cf":"f", "col":"~l", "type":"string"},
      |"src":{"cf":"f", "col":"~f", "type":"string"},
      |"dst":{"cf":"f", "col":"~t", "type":"string"}
    |}
  |}""".stripMargin

Some things to note about this schema:

  • The HGraphDB serde is specified as the tableCoder above.
  • All HGraphDB columns are stored in a column family named f.
  • Vertex and edge labels are stored in a column with qualifier ~l.
  • The source and destination columns have qualifiers ~f and ~t, respectively.
  • All vertex and edge properties are stored in columns with the qualifiers simply being the name of the property.

Now that we have a schema, we can create Spark DataFrames for both the vertices and edges, and then pass these to the GraphFrame factory.

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()
}
val verticesDataFrame = withCatalog(vertexCatalog)
val edgesDataFrame = withCatalog(edgeCatalog)
val g = GraphFrame(verticesDataFrame, edgesDataFrame)

With the GraphFrame in hand, we now have full access to the Spark GraphFrame APIs. For instance, here are some arbitrary graph operations from the GraphFrames Quick Start.

// Query: Get in-degree of each vertex.
g.inDegrees.show()

// Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

// Run PageRank algorithm, and show results.
val results = g.pageRank.resetProbability(0.01).maxIter(20).run()
results.vertices.select("id", "pagerank").show()

You can see further graph operations against our example graph (taken from the GraphFrames User Guide) in this test.

As you can see, HGraphDB makes graphs stored in HBase easily accessible by Apache TinkerPopApache Giraph, and now Apache Spark GraphFrames.

Graph Analytics on HBase with HGraphDB and Spark GraphFrames

Don’t Settle For Eventual Consistency

This week Google released Cloud Spanner1, a publicly available version of their Spanner database. This completes the public release of their 3 main databases, Bigtable (released as Cloud Bigtable), Megastore (released as Cloud Datastore), and Spanner. Spanner is the culmination of Google’s research in data stores, which provides a globally distributed, relational database that is both strongly consistent and highly available.

But doesn’t the CAP theorem state that we have to choose consistency over availability, or availability over consistency? Over the years, Google has been arguing that you can have both strong consistency and high availability, and that you don’t have to settle for eventual consistency. In fact, all 3 of Google’s data stores are strongly consistent systems.

Some Background

In 2000, Brewer came up with the CAP conjecture2, which was later proved as a theorem by Gilbert and Lynch3. It states that you can choose only 2 of the 3 properties:

  • C: consistency (or linearizability)
  • A: 100% availability (in the context of network partitions)
  • P: tolerance of network partitions

Later Coda Hale made the point that you can’t sacrifice partition tolerance, so really the choice is between CP and AP (and not CA)4.

What is the tradeoff?

According to the CAP theorem, when you choose a data store, you must choose either an AP system (that is eventually consistent) or a CP system (that is strongly consistent). But Google would argue the following points:

  1. In AP systems, client code becomes more complex and error-prone in order to deal with inconsistencies.
  2. AP systems are not 100% available in practice.
  3. CP systems can be made to be highly available in practice.
  4. From the above 3 points, when you choose availability over consistency, you are not gaining 100% availability but you are losing consistency and you are gaining complexity.

Let’s drill down into these points.

Client complexity

Here is what Google has to say about using AP systems:

“We also have a lot of experience with eventual consistency systems at Google. In all such systems, we find developers spend a significant fraction of their time building extremely complex and error-prone mechanisms to cope with eventual consistency and handle data that may be out of date. We think this is an unacceptable burden to place on developers and that consistency problems should be solved at the database level.”5

This has led Google to focus on data stores that are CP.

AP systems in practice

Many engineers are confused about the definition of “availability” in the CAP theorem. Most engineers think of availability in terms of a service level agreement (SLA) or a service level objective (SLO), which is typically measured in “9s”. However, as Kleppmann has pointed out, the “availability” in the CAP theorem is not a measurement or a metric, but a liveness property of an algorithm.6 I am going to distinguish between the two types of availability by referring to them as “effective availability” and “algorithmic availability”.

  • Effective availability: the empirically measured percentage of successful requests over some period, often measured in “9s”.
  • Algorithmic availability: a liveness property of an algorithm where every request to a non-failing node must eventually return a valid response.

The CAP theorem is only concerned with algorithmic availability.  An algorithmic availability of 100% does not guarantee an effective availability of 100%. The algorithmic availability from the CAP theorem only applies if both the implementation and the execution of the algorithm is without error. In practice, most outages to an AP system are not due to network issues, which the algorithm can handle, but rather to implementation defects, user errors, misconfiguration, resource limits, and misbehaving clients. Google found that in Spanner only 7.6% of its errors were network-related, whereas 52.5% of errors were user-related (such as overload and misconfiguration) and 13.3% of errors were due to bugs. Google actually refers to these errors as “incidents” since they were able to prevent most of them from affecting availability.7

At Yammer we have experience with AP systems, and we’ve seen loss of availability for both Cassandra and Riak for various reasons.  Our AP systems have not been more reliable than our CP systems, yet they have been more difficult to work with and reason about in the presence of inconsistencies.  Other companies have also seen outages with AP systems in production.8 So in practice, AP systems are just as susceptible as CP systems to outages due to issues such as human error and buggy code, both on the client side and the server side.

CP systems in practice

With Spanner, Google is able to attain an availability of 5 “9s”, which is 5.26 minutes of downtime per year.7 Likewise, Facebook uses HBase, another CP system based on Bigtable, and claims to be able to attain an availability of between 4 to 5 “9s”.9 In practice, mature CP systems can be made to be highly available. In fact, due to its strong consistency and high availability, Google refers to Spanner as “effectively” CA, which means they are focusing on effective availability (a practical measure) and not algorithmic availability (a theoretical property).

A bad tradeoff?

With an AP system, you are giving up consistency, and not really gaining anything in terms of effective availability, the type of availability you really care about.  Some might think you can regain strong consistency in an AP system by using strict quorums (where the number of nodes written + number of nodes read > number of replicas).  Cassandra calls this “tunable consistency”.  However, Kleppmann has shown that even with strict quorums, inconsistencies can result.10  So when choosing (algorithmic) availability over consistency, you are giving up consistency for not much in return, as well as gaining complexity in your clients when they have to deal with inconsistencies.

Summary

There’s nothing wrong with using an AP system in general. An AP system might exhibit the lower latencies that you require (such as with a cache), or perhaps your data is immutable so you don’t care as much about strong consistency, or perhaps 99.9% consistency is “good enough”.11 These are all valid reasons for accepting eventual consistency.  However, in practice AP systems are not necessarily more highly available than CP systems, so don’t settle for eventual consistency in order to gain availability. The availability you think you will be getting (effective) is not the availability you will actually get (algorithmic), which will not be as useful as you might think.

 

 

 

 

 


  1. D. Srivastava. Introducing Cloud Spanner: a global database service for mission-critical applications, 2017 
  2. E. Brewer. Towards robust distributed systems. Proceedings of the 19th Annual ACM Symposium on Principles of Distributed Computing, Portland, OR, 2000 
  3. S. Gilbert, N. Lynch. Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services. ACM SIGACT News 33(2), 2002 
  4. C. Hale. You Can’t Sacrifice Partition Tolerance, 2010 
  5.  J. Corbett, J. Dean, M. Epstein, A. Fikes, C. Frost, JJ Furman, S. Ghemawat, A. Gubarev, C. Heiser, P. Hochschild, W. Hsieh, S. Kanthak, E. Kogan, H. Li, A. Lloyd, S. Melnik, D. Mwaura, D. Nagle, S. Quinlan, R. Rao, L. Rolig, Y. Saito, M. Szymaniak, C. Taylor, R. Wang, and D. Woodford. Spanner: Google’s Globally-Distributed Database. Proceedings of OSDI ‘12: Tenth Symposium on Operating System Design and Implementation, Hollywood, CA, October, 2012 
  6. M. Kleppmann. A Critique of the CAP Theorem, 2015 
  7. E. Brewer. Spanner, TrueTime, and the CAP Theorem, 2017 
  8. D. Nadolny. PagerDuty: One Year of Cassandra Failures, 2015 
  9. Z. Fong, R. Shroff. HydraBase – The evolution of HBase@Facebook, 2014 
  10. M. Kleppmann. Designing Data-Intensive Applications, Chapter 9, p 334, 2017 
  11. P. Bailis, A. Ghodsi. Eventual consistency today: limitations, extensions, and beyond. Commun. ACM 56(5), 55–63, 2013 
Don’t Settle For Eventual Consistency

Graph Analytics on HBase with HGraphDB and Giraph

HGraphDB is a client framework for HBase that provides a TinkerPop Graph API.  HGraphDB also provides integration with Apache Giraph, a graph compute engine for analyzing graphs that Facebook has shown to be massively scalable.  In this blog we will show how to convert a sample Giraph computation that works with text files to instead work with HGraphDB.

In the Giraph quick start, the SimpleShortestPathsComputation is used to show how to run a Giraph computation against a graph contained in a file as a JSON representation.  Here are the contents of the JSON file:

[0,0,[[1,1],[3,3]]]
[1,0,[[0,1],[2,2],[3,1]]]
[2,0,[[1,2],[4,4]]]
[3,0,[[0,3],[1,1],[4,4]]]
[4,0,[[3,4],[2,4]]]

Each line above has the format [fromVertexId, vertexValue, [[toVertexId, edgeValue],...]], where the edgeValue is the weight or cost of the edge that will be used for the path computation.

To run the example in the Giraph quick start, the following command line is used:

hadoop jar giraph-examples-1.3.0-SNAPSHOT-for-hadoop-2.5.1-jar-with-dependencies.jar \
    org.apache.giraph.GiraphRunner \
    org.apache.giraph.examples.SimpleShortestPathsComputation \
    -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat \
    -vip /user/ryokota/input/tiny_graph.txt \
    -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \
    -op /user/ryokota/output/shortestpaths \
    -w 1 -ca giraph.SplitMasterWorker=false

The results of the job will appear in a file under the output path (/user/ryokota/output/shortestpaths), with the following contents:

0 1.0
1 0.0
2 2.0
3 1.0
4 5.0

Now let’s leave that example and consider the exact same graph stored in HGraphDB.  The graph above can be created in HGraphDB using the following statements.

        Vertex v0 = graph.addVertex(T.id, 0);
        Vertex v1 = graph.addVertex(T.id, 1);
        Vertex v2 = graph.addVertex(T.id, 2);
        Vertex v3 = graph.addVertex(T.id, 3);
        Vertex v4 = graph.addVertex(T.id, 4);
        v0.addEdge("e", v1, "weight", 1);
        v0.addEdge("e", v3, "weight", 3);
        v1.addEdge("e", v0, "weight", 1);
        v1.addEdge("e", v2, "weight", 2);
        v1.addEdge("e", v3, "weight", 1);
        v2.addEdge("e", v1, "weight", 2);
        v2.addEdge("e", v4, "weight", 4);
        v3.addEdge("e", v0, "weight", 3);
        v3.addEdge("e", v1, "weight", 1);
        v3.addEdge("e", v4, "weight", 4);
        v4.addEdge("e", v3, "weight", 4);
        v4.addEdge("e", v2, "weight", 4);

There is also a class called HBaseBulkLoader that can be used for more efficient creation of larger graphs.

Instead of using the JSON input format above, HGraphDB provides two input formats, HBaseVertexInputFormat and HBaseEdgeInputFormat, which will read from the vertices table and edges table in HBase, respectively.  To use these formats, the Giraph computation needs to be changed slightly.  Here is the original SimpleShortestPathsComputation:

public class SimpleShortestPathsComputation extends BasicComputation<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
  ...
  @Override
  public void compute(
      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
      Iterable<DoubleWritable> messages) throws IOException {
    if (getSuperstep() == 0) {
      vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (minDist < vertex.getValue().get()) {
      vertex.setValue(new DoubleWritable(minDist));
      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
        double distance = minDist + edge.getValue().get();
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
    vertex.voteToHalt();
  }
}

And here is the version for HGraphDB.  The main changes are in bold.

public class SimpleShortestPathsComputation extends
        HBaseComputation<Long, DoubleWritable, FloatWritable, DoubleWritable> {
  ...
  @Override
  public void compute(
      Vertex<ObjectWritable<Long>, VertexValueWritable<DoubleWritable>, EdgeValueWritable<FloatWritable>> vertex,
      Iterable<DoubleWritable> messages) throws IOException {
    VertexValueWritable<DoubleWritable> vertexValue = vertex.getValue();
    if (getSuperstep() == 0) {
      vertexValue.setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (minDist < vertexValue.getValue().get()) {
      vertexValue.setValue(new DoubleWritable(minDist));
      for (Edge<ObjectWritable, EdgeValueWritable> edge : vertex.getEdges()) {
        double distance = minDist + ((Number) edge.getValue().getEdge().property("weight").value()).doubleValue();
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
    vertex.voteToHalt();
  }
}

The major difference is that when using HBaseVertexInputFormat, the “value” of a Giraph vertex is an instance of type VertexValueWritable, which is comprised of an HBaseVertex and a Writable value.   Likewise when using HBaseEdgeInputFormat, the “value” of a Giraph edge is an instance of type EdgeValueWritable, which is comprised of an HBaseEdge and a Writable value.  The instances of HBaseVertex and HBaseEdge should be considered read-only and only be used to obtain IDs and property values.

Running the above Giraph computation against HBase is similar to running the original example.  Note that we also have to customize IdWithValueTextOutputFormat to work properly with VertexValueWritable.

./hadoop jar hgraphdb-0.4.4-SNAPSHOT-test-jar-with-dependencies.jar \
    org.apache.giraph.GiraphRunner \
    io.hgraphdb.giraph.examples.SimpleShortestPathsComputation \
    -vif io.hgraphdb.giraph.HBaseVertexInputFormat \
    -eif io.hgraphdb.giraph.HBaseEdgeInputFormat \
    -vof io.hgraphdb.giraph.examples.IdWithValueTextOutputFormat \
    -op /user/ryokota/output/shortestpaths \
    -w 1 -ca giraph.SplitMasterWorker=false \
    -ca hbase.zookeeper.quorum=127.0.0.1 \
    -ca zookeeper.znode.parent=/hbase-unsecure \
    -ca gremlin.hbase.namespace=testgraph \
    -ca hbase.mapreduce.edgetable=testgraph:edges \
    -ca hbase.mapreduce.vertextable=testgraph:vertices

As an alternative to using a text-based output format such as IdWithValueTextOutputFormat, HGraphDB provides two abstract output formats, HBaseVertexOutputFormat and HBaseEdgeOutputFormat, that can be used to modify the graph after a Giraph computation.  For example, the shortest path result for each vertex could be set as a property on the vertex by extending HBaseVertexOutputFormat and implementing the method

public abstract void writeVertex(HBaseBulkLoader writer, HBaseVertex vertex, Writable value);

As you can see, HGraphDB extends the functionality in Apache Giraph by making it quite easy to both read and write graphs stored in HBase when performing sophisticated graph analytics.

Graph Analytics on HBase with HGraphDB and Giraph

HGraphDB: HBase as a TinkerPop Graph Database

The use of graph databases is common among social networking companies. A social network can easily be represented as a graph model, so a graph database is a natural fit. For instance, Facebook has a graph database called Tao, Twitter has FlockDB, and Pinterest has Zen. At Yammer, an enterprise social network, we rely on HBase for much of our messaging infrastructure, so I decided to see if HBase could also be used for some graph modelling and analysis.

Below I put together a wish list of what I wanted to see in a graph database.

  • It should be implemented directly on top of HBase.
  • It should support the TinkerPop 3 API.
  • It should allow the user to supply IDs for both vertices and edges.
  • It should allow user-supplied IDs to be either strings or numbers.
  • It should allow property values to be of arbitrary type, including maps, arrays, and serializable objects.
  • It should support indexing vertices by label and property.
  • It should support indexing edges by label and property, specific to a given vertex.
  • It should support range queries and pagination with both vertex indices and edge indices.

I did not find a graph database that met all of the above criteria. For instance, Titan is a graph database that supports the TinkerPop API, but it is not implemented directly on HBase. Rather, it is implemented on top of an abstraction layer that can be integrated with HBase, Cassandra, or Berkeley DB as its underlying store. Also, Titan does not support user-supplied IDs. S2Graph is a graph database that is implemented directly on HBase, and it supports both user-supplied IDs and indices on edges, but it does not yet support the TinkerPop API nor does it support indices on vertices.

This led me to create HGraphDB, a TinkerPop 3 layer for HBase. It provides support for all of the above bullet points. Feel free to try it out if you are interested in using HBase as a graph database.

HGraphDB: HBase as a TinkerPop Graph Database

Tips On Writing Custom HBase Filters

Two of the most useful and powerful features of HBase are its support for server-side filters and coprocessors.  For example, custom filters can be used for efficient pagination, while custom coprocessors can be used to provide endpoints to provide efficient aggregation of data in HBase.  In addition, more sophisticated filters and coprocessors can be used to turn HBase into an entirely different data store, such as a JSON document store (HDocDB), a relational database (Phoenix), or others.

While working with custom filters, I ran into a couple of issues that I didn’t find documented elsewhere (perhaps I missed them), so I thought I’d jot them down here to benefit others.

First, when writing a custom filter, the cells passed to the filterKeyValue method are a superset of the cells that will be returned to the client.  The main reason for this is that even though a column family may be specified to retain only one version of a cell, multiple versions of the cell may still exist in the store because a compaction has not yet taken place, and the pruning of versions in the query result doesn’t happen until after filterKeyValue is called.  This actually took me by surprise, as I didn’t find it documented anywhere, and my initial mental model assumed that the pruning of versions would happen before this method was called.  (Update:  This has since been filed as HBASE-17125.)

The second tip is in regard to the filterRowCells method.  This method gives you the list of cells that have passed previous filter methods, and allows you to modify it before it is passed to the next phase of the filter pipeline.   For example, here is how the DependentColumnFilter in HBase uses this method to filter out cells that don’t have a matching timestamp.

  @Override
  public void filterRowCells(List<Cell> kvs) {
    Iterator<? extends Cell> it = kvs.iterator();
    Cell kv;
    while(it.hasNext()) {
      kv = it.next();
      if(!stampSet.contains(kv.getTimestamp())) {
        it.remove();
      }
    }
  }

However, when implementing filterRowCells, the Iterator.remove method should not be used. This is because the underlying list of cells is passed as an ArrayList, and Iterator.remove is an O(n) operation for instances of ArrayList.   As more and more elements are removed from within filterRowCells, the time complexity of this operation will begin to approach O(n2).   Instead, the Guava method Iterables.removeIf should be preferred (or Collection.removeIf, if you are using Java 8).

  @Override
  public void filterRowCells(List<Cell> kvs) {
    Iterables.removeIf(kvs, new Predicate<Cell>() {
      @Override
      public boolean apply(Cell kv) {
        return !stampSet.contains(kv.getTimestamp());
      }
    });
  }

The Iterables.removeIf method will check to see if the Iterable passed to it is an instance of RandomAccess (which is true for ArrayList), and if so, will remove all elements that pass the specified Predicate in total O(n) time (by making use of ArrayList.set).

One of our queries using a custom filter was passing tens of thousands of cells to filterRowCells and filtering a majority of the cells out using Iterator.remove.  After changing the custom filter to use Iterables.removeIf, the query time dropped from 800 ms to 250 ms.

Since HBase already uses the Iterables class from Guava, I’ve submitted HBASE-16893 and PHOENIX-3393 to change the filters in the HBase and Phoenix codebases to use Iterables.removeIf instead of Iterator.remove.

Tips On Writing Custom HBase Filters

Adventures in Hardening HBase

When using HBase, it is often desirable to encrypt data in transit between an HBase client and an HBase server.  This might be the case, for example, when storing PII (Personally Identifiable Information) in HBase, or when running HBase in a multi-tenant cloud environment.

Transport encryption is often enabled by configuring HBase to use SASL with GSSAPI/Kerberos to provide data confidentiality and integrity on a per-connection basis.  However, the default implementation of GSSAPI/Kerberos does not seem to make use of AES-NI hardware acceleration.  In our testing, we have seen up to a 50% increase in the P75 measurements for latencies of some of our HBase applications when using GSSAPI/Kerberos encryption versus no encryption.

One workaround is to bypass the encryption used by SASL and use an encryption library that can support AES-NI acceleration.  This effort has already been completed for HDFS (HDFS-6606) and is in progress for Hadoop (HADOOP-10768).  Based on some of this earlier work, similar changes can be made for HBase.

The way that the fix for HADOOP-10768 works is conceptually as follows.  If the Hadoop client has been configured to negotiate a cipher suite in place of the one negotiated by SASL, then the following actions will take place:

  • The client will send the server a set of cipher suites that it supports.
  • The server will negotiate a mutually acceptable cipher suite.
  • At the end of the SASL handshake, the server will generate a pair of encryption keys using the cipher suite and send them to the client via the secure SASL channel.
  • The generated encryption keys, instead of the SASL layer, will be used to encrypt all subsequent traffic between the client and server.

Originally I was hoping that the work for HADOOP-10768 would be easily portable to the HBase codebase.  It seems that some of the HBase code for SASL support originated from the corresponding Hadoop code, but has since diverged.  For example, when performing the SASL handshake, the Hadoop client and server use protocol buffers to wrap the SASL state and SASL token, whereas the HBase client and server do not use protocol buffers when passing this data.

Instead, in HBase, during the SASL handshake the client sends

  • The integer length of the SASL token
  • The bytes of the SASL token

whereas the server sends

  • An integer which is either 0 for success or 1 for failure
  • In the case of success,
    • The integer length of the SASL token
    • The bytes of the SASL token
  • In the case of failure,
    • A string representing the class of the Exception
    • A string representing an error message

There is one exception to the above scheme, and that is if the server sends a special integer SWITCH_TO_SIMPLE_AUTH (represented as -88) in place of the length of the SASL token, the rest of the message is ignored and the client falls back to simple authentication instead of completing the SASL handshake.

In order to adapt the fix for HADOOP-10768 for HBase, I decided to use another special integer called USE_NEGOTIATED_CIPHER (represented as -89) for messages related to cipher suite negotiation between client and server.  If the client is configured to negotiate a cipher suite, then at the beginning of the SASL handshake, in place of a message containing only the length and bytes of a SASL token, it will send a message of the form

  • USE_NEGOTIATED_CIPHER (-89)
  • A string representing the acceptable cipher suites
  • The integer length of the SASL token
  • The bytes of the SASL token

And at the end of the SASL handshake, the server will send one additional message of the form

  • A zero for success
  • USE_NEGOTIATED_CIPHER (-89)
  • A string representing the negotiated cipher suite
  • A pair of encryption keys
  • A pair of initialization vectors

We can turn on DEBUG logging for HBase to see what the client and server SASL negotiation normally looks like, without the custom cipher negotiation.  Here is the client:

Creating SASL GSSAPI client. Server's Kerberos principal name is XXXX
Have sent token of size 688 from initSASLContext.
Will read input token of size 108 for processing by initSASLContext
Will send token of size 0 from initSASLContext.
Will read input token of size 32 for processing by initSASLContext
Will send token of size 32 from initSASLContext.
SASL client context established. Negotiated QoP: auth-cont

And here is the server:

Kerberos principal name is XXXX
Created SASL server with mechanism = GSSAPI
Have read input token of size 688 for processing by saslServer.evaluateResponse()
Will send token of size 108 from saslServer.
Have read input token of size 0 for processing by saslServer.evaluateResponse()
Will send token of size 32 from saslServer.
Have read input token of size 32 for processing by saslServer.evaluateResponse()
SASL server GSSAPI callback: setting canonicalized client ID: XXXX
SASL server context established. Authenticated client: XXXX (auth:SIMPLE). Negotiated QoP is auth-cont

To enable custom cipher negotiation, we set the following HBase configuration parameters for both the client and server (in addition to the properties to enable Kerberos):

<property>
  <name>hbase.rpc.security.crypto.cipher.suites</name> 
  <value>AES/CTR/NoPadding</value>
</property>
<property>
  <name>hbase.rpc.protection</name>
  <value>privacy</value>
</property>

With the above configuration, here is the client (new actions in bold):

Creating SASL GSSAPI client. Server's Kerberos principal name is XXXX
Will send client ciphers: AES/CTR/NoPadding
Have sent token of size 651 from initSASLContext.
Will read input token of size 110 for processing by initSASLContext
Will send token of size 0 from initSASLContext.
Will read input token of size 65 for processing by initSASLContext
Will send token of size 65 from initSASLContext.
Client using cipher suite AES/CTR/NoPadding with server
SASL client context established. Negotiated QoP: auth-cont

And here is the server, when using custom cipher negotiation (new actions in bold):

Have read client ciphers: AES/CTR/NoPadding
Kerberos principal name is XXXX
Created SASL server with mechanism = GSSAPI
Have read input token of size 651 for processing by saslServer.evaluateResponse()
Will send token of size 110 from saslServer.
Have read input token of size 0 for processing by saslServer.evaluateResponse()
Will send token of size 65 from saslServer.
Have read input token of size 65 for processing by saslServer.evaluateResponse()
SASL server GSSAPI callback: setting canonicalized client ID: XXXX
Server using cipher suite AES/CTR/NoPadding with client
SASL server context established. Authenticated client: XXXX (auth
:SIMPLE). Negotiated QoP is auth-cont

Once the cipher suite negotiation is complete, both the client and server will have created an instance of SaslCryptoCodec to perform the encryption. The client will call SaslCryptoCodec.wrap()/unwrap() instead of SaslClient.wrap()/unwrap() while the server will call SaslCryptoCodec.wrap()/unwrap() instead of SaslServer.wrap()/unwrap().  This is the same technique as used in HADOOP-10768.

With the above code deployed to our production servers, we can compare the latencies of different encryption modes for one of our HBase applications.  (In order to run clients in different modes we have also patched our HBase servers with the fix for HBASE-14865.)  Below we show the P50, P75, and P95 latencies over a 12 hour period.  The higher line is an HBase client configured with GSSAPI/Kerberos encryption (higher is worse), the middle line is an HBase client configured with accelerated encryption, and the lower line is an HBase client configured with no encryption.

screen-shot-2016-09-13-at-11-23-46-am

screen-shot-2016-09-13-at-11-24-19-am

screen-shot-2016-09-13-at-11-24-47-am

Also, here is the user CPU time for the three differently configured HBase clients (GSSAPI/Kerberos encryption, accelerated encryption, no encryption).

screen-shot-2016-09-13-at-11-25-51-am

We can see that accelerated encryption provides a significant performance improvement over GSSAPI/Kerberos encryption.  The changes I made to HBase in order to support accelerated encryption are available at HBASE-16633.

Adventures in Hardening HBase