Graph Analytics on HBase with HGraphDB and Spark GraphFrames

In a previous post, I showed how to analyze graphs stored in HGraphDB using Apache Giraph.  Giraph depends on Hadoop, and some developers may be using Spark instead.  In this blog I will show how to analyze HGraphDB graphs using Apache Spark GraphFrames.

In order to prepare data stored in HGraphDB for GraphFrames, we need to import vertex and edge data from HGraphDB into Spark DataFrames.  Hortonworks provides a Spark-on-HBase Connector to do just that.  The Spark-on-HBase Connector allows for custom serde (serializer/deserializer) types to be created by implementing the SHCDataType trait.  The serde for HGraphDB is available here.  (When testing the serde, I ran into some issues with the Spark-on-HBase Connector for which I have submitted pull requests.  Hopefully those will be merged soon.  In the meantime, you can use my fork of the Spark-on-HBase Connector.  Update:  Thanks to HortonWorks, these have been merged.)

To demonstrate how to use HGraphDB with GraphFrames, we first use HGraphDB to create the same graph example that is used in the GraphFrames User Guide.

Vertex a = graph.addVertex(T.id, "a", "name", "Alice", "age", 34);
Vertex b = graph.addVertex(T.id, "b", "name", "Bob", "age", 36);
Vertex c = graph.addVertex(T.id, "c", "name", "Charlie", "age", 30);
Vertex d = graph.addVertex(T.id, "d", "name", "David", "age", 29);
Vertex e = graph.addVertex(T.id, "e", "name", "Esther", "age", 32);
Vertex f = graph.addVertex(T.id, "f", "name", "Fanny", "age", 36);
Vertex g = graph.addVertex(T.id, "g", "name", "Gabby", "age", 60);
a.addEdge("friend", b);
b.addEdge("follow", c);
c.addEdge("follow", b);
f.addEdge("follow", c);
e.addEdge("follow", f);
e.addEdge("friend", d);
d.addEdge("friend", a);
a.addEdge("friend", e);

Now that the graph is stored in HGraphDB, we need to specify a schema to be used by the Spark-on-HBase Connector for retrieving vertex and edge data.

def vertexCatalog = s"""{
    |"table":{"namespace":"testGraph", "name":"vertices",
    |  "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"string"},
      |"name":{"cf":"f", "col":"name", "type":"string"},
      |"age":{"cf":"f", "col":"age", "type":"int"}
    |}
  |}""".stripMargin

def edgeCatalog = s"""{
    |"table":{"namespace":"testGraph", "name":"edges",
    |  "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"string"},
      |"relationship":{"cf":"f", "col":"~l", "type":"string"},
      |"src":{"cf":"f", "col":"~f", "type":"string"},
      |"dst":{"cf":"f", "col":"~t", "type":"string"}
    |}
  |}""".stripMargin

Some things to note about this schema:

  • The HGraphDB serde is specified as the tableCoder above.
  • All HGraphDB columns are stored in a column family named f.
  • Vertex and edge labels are stored in a column with qualifier ~l.
  • The source and destination columns have qualifiers ~f and ~t, respectively.
  • All vertex and edge properties are stored in columns with the qualifiers simply being the name of the property.

Now that we have a schema, we can create Spark DataFrames for both the vertices and edges, and then pass these to the GraphFrame factory.

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()
}
val verticesDataFrame = withCatalog(vertexCatalog)
val edgesDataFrame = withCatalog(edgeCatalog)
val g = GraphFrame(verticesDataFrame, edgesDataFrame)

With the GraphFrame in hand, we now have full access to the Spark GraphFrame APIs. For instance, here are some arbitrary graph operations from the GraphFrames Quick Start.

// Query: Get in-degree of each vertex.
g.inDegrees.show()

// Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

// Run PageRank algorithm, and show results.
val results = g.pageRank.resetProbability(0.01).maxIter(20).run()
results.vertices.select("id", "pagerank").show()

You can see further graph operations against our example graph (taken from the GraphFrames User Guide) in this test.

As you can see, HGraphDB makes graphs stored in HBase easily accessible by Apache TinkerPopApache Giraph, and now Apache Spark GraphFrames.

Graph Analytics on HBase with HGraphDB and Spark GraphFrames

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s