Neo4j and Apache Spark
This approach is deprecated in favor of the Neo4j Connector for Apache Spark. This page is being maintained for reference, but is not current or supported. Please consult the Neo4j Connector for Apache Spark for the latest supported connector. |
This page is deprecated in favor of the Neo4j Connector for Apache Spark
You should have a sound understanding of both Apache Spark and Neo4j, each data model, data processing paradigm and APIs to leverage them effectively together.
Intermediate
General Observations
Apache Spark is a clustered, in-memory data processing solution that scales processing of large datasets easily across many machines. It also comes with GraphX and GraphFrames two frameworks for running graph compute operations on your data.
You can integrate with Spark in a variety of ways. Either to pre-process (aggregate, filter, convert) your raw data to be imported into Neo4j.
Spark can also serve as external Graph Compute solution, where you
-
export data of selected subgraphs from Neo4j to Spark,
-
compute the analytic aspects, and
-
write the results back to Neo4j
-
to be used in your Neo4j operations and Cypher queries.
Neo4j itself is capable of running graph processing on medium to large graphs quickly. For instance the graph-processing project demonstrates that we can run PageRank (5 iterations) on the dbpedia dataset (10M nodes, 125M relationships) in 20 seconds as a Neo4j server extension or user defined procedure. Spark might be better suited for larger datasets or more intensive compute operations. |
Neo4j-Spark-Connector
The Neo4j Spark Connector uses the binary Bolt protocol to transfer data from and to a Neo4j server.
The information on this page refers to the old (2.4.5 release) of the spark connector. For more up to date information, an easier and more modern API, consult the Neo4j Connector for Apache Spark. |
It offers Spark-2.0 APIs for RDD, DataFrame, GraphX and GraphFrames, so you’re free to chose how you want to use and process your Neo4j graph data in Apache Spark.
Configure Neo4j-URL, -user and -password via spark.neo4j.bolt.*
Spark Config options.
The general usage is:
-
create
org.neo4j.spark.Neo4j(sc)
-
set
cypher(query,[params])
,nodes(query,[params])
,rels(query,[params])
as direct query, or
pattern("Label1",Seq("REL"),"Label2")
orpattern( ("Label1","prop1"),("REL","prop"),("Label2","prop2") )
-
optionally define
partitions(n)
,batch(size)
,rows(count)
for parallelism -
choose which datatype to return
-
loadRowRdd
,loadNodeRdds
,loadRelRdd
,loadRdd[T]
-
loadDataFrame
,loadDataFrame(schema)
-
loadGraph[VD,ED]
-
loadGraphFrame[VD,ED]
-
Here is a basic example for loading a RDD[Row]
.
org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<password> \
--packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2,graphframes:graphframes:0.2.0-spark2.0-s_2.11
import org.neo4j.spark._
val neo = Neo4j(sc)
val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id ").loadRowRdd
rdd.count
// inferred schema
rdd.first.schema.fieldNames
// => ["id"]
rdd.first.schema("id")
// => StructField(id,LongType,true)
neo.cypher("MATCH (n:Person) RETURN id(n)").loadRdd[Long].mean
// => res30: Double = 236696.5
neo.cypher("MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id").param("maxId", 10).loadRowRdd.count
// => res34: Long = 10
Similar operations are available for DataFrames and GraphX.
The GraphX integration also allows to write data back to Neo4j with a save
operation.
To use GraphFrames you have to declare it as package. Then you can load a GraphFrame with graph data from Neo4j and run graph algorithms or pattern matchin on it (the latter will be slower than in Neo4j).
import org.neo4j.spark._
val neo = Neo4j(sc)
import org.graphframes._
val graphFrame = neo.pattern(("Person","id"),("KNOWS",null), ("Person","id")).partitions(3).rows(1000).loadGraphFrame
graphFrame.vertices.count
// => 100
graphFrame.edges.count
// => 1000
val pageRankFrame = graphFrame.pageRank.maxIter(5).run()
val ranked = pageRankFrame.vertices
ranked.printSchema()
val top3 = ranked.orderBy(ranked.col("pagerank").desc).take(3)
// => top3: Array[org.apache.spark.sql.Row]
// => Array([236716,70,0.62285...], [236653,7,0.62285...], [236658,12,0.62285])
More examples and details can be found in the docs of the GitHub repository.
Neo4j-Mazerunner
An interest in analytical graph processing led Kenny Bastani to work on an integration solution. It allows to export dedicated datasets, e.g. node or relationship-lists to Spark.
It supports these algorithms:
-
PageRank
-
Closeness Centrality
-
Betweenness Centrality
-
Triangle Counting
-
Connected Components
-
Strongly Connected Components
After running graph processing algorithms the results are written back concurrently and transactionally to Neo4j.
One focus of this approach is on data safety, that’s why it uses a persistent queue (RabbitMQ) to communicate data between Neo4j and Spark.
The infrastructure is set up using Docker containers, there are dedicated containers for Spark, RabbitMQ, HDFS and Neo4j with the Mazerunner Extension.
More details can be found on the project’s GitHub page.
Spark for Data Preprocessing
One example of pre-processing raw data (Chicago Crime dataset) into a format that’s well suited for import into Neo4j, was demonstrated by Mark Needham. He combined a number of functions into a Spark-job that takes the existing data, cleans and aggregates it and outputs fragments which are recombined later to larger files.
The approach is detailed in his blog post: "Spark: Generating CSV Files to import into Neo4j".
Was this page helpful?