Machine learning pipelines: Node classification
This Jupyter notebook is hosted here in the Neo4j Graph Data Science Client Github repository.
The notebook shows the usage of GDS machine learning pipelines with the Python client and the well-known Cora dataset.
The task we cover here is a typical use case in graph machine learning: the classification of nodes given a graph and some node features.
1. Setup
We need a dedicated environment where Neo4j and GDS are available, for example a fresh AuraDS instance (which comes with GDS preinstalled) or Neo4j Desktop with a dedicated database.
Please note that we will be writing to and deleting data from Neo4j.
Once the credentials to access this environment are available, we can
install the graphdatascience
package and import the client class.
%pip install graphdatascience
import os
from graphdatascience import GraphDataScience
When using a local Neo4j setup, the default connection URI is
bolt://localhost:7687
; when using AuraDS, instead, the connection
URI is slightly different as it uses the neo4j+s
protocol. In this
case, the client should also include the aura_ds=True
flag to enable
AuraDS-recommended settings. Check the
Neo4j
GDS Client docs for more details.
# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
NEO4J_AUTH = (
os.environ.get("NEO4J_USER"),
os.environ.get("NEO4J_PASSWORD"),
)
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)
# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)
We also need to check that the version of the GDS library is 2.2.0 or newer, because we will use the concept of “context” introduced in GDS 2.2.0.
assert gds.version() >= "2.2.0"
Finally, we import json
to help in writing the Cypher queries used
to load the data, and numpy
and pandas
for further data
processing.
import json
import numpy as np
import pandas as pd
2. Loading the Cora dataset
First of all, we need to load the Cora dataset on Neo4j. The latest versions of the GDS client include the Cora dataset as a ready-to-use graph (see for instance the PyG example notebook); alternatively, the graph construction notebook shows how to project the Cora graph in memory without writing it to Neo4j. In this tutorial, anyway, we use data from CSV files and some Cypher code to run an end-to-end example, from loading the source data into Neo4j to training a model and using it for predictions.
Please note that, if you use the Cora graph loader or the graph construction method on an AuraDS instance, you cannot write the data to the Neo4j database.
The CSV files can be found at the following URIs:
CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"
Upon loading, we need to perform an additional preprocessing step to
convert the subject
field (which is a string in the dataset) into an
integer, because node properties have to be numerical in order to be
projected into a graph; although we could assign consecutive IDs, we
assign an ID other than 0 to the first subject to later show how the
class labels are represented in the model.
We also select a number of nodes to be held out to test the model after it has been trained. NOTE: This is not related to the algorithm test/split ratio.
SUBJECT_TO_ID = {
"Neural_Networks": 100,
"Rule_Learning": 1,
"Reinforcement_Learning": 2,
"Probabilistic_Methods": 3,
"Theory": 4,
"Genetic_Algorithms": 5,
"Case_Based": 6,
}
HOLDOUT_NODES = 10
We can now load the CSV files using the LOAD CSV
Cypher statement
and some basic data transformation:
# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")
# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
LOAD CSV FROM "{CORA_CONTENT}" AS row
WITH
{subject_map} AS subject_to_id,
toInteger(row[0]) AS extId,
row[1] AS subject,
toIntegerList(row[2..]) AS features
MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
WITH p LIMIT {HOLDOUT_NODES}
REMOVE p:Paper
SET p:UnclassifiedPaper
"""
# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
LOAD CSV FROM "{CORA_CITES}" AS row
MATCH (n), (m)
WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
MERGE (n)-[:CITES]->(m)
"""
# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)
With the data loaded on Neo4j, we can now project a graph including all
the nodes and the CITES
relationship as undirected (and with
SINGLE
aggregation, to skip repeated relationships as a result of
adding the inverse direction).
# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
"cora-graph",
{"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
{"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)
We can finally check the number of nodes and relationships in the newly-projected graph to make sure it has been created correctly:
assert G.node_count() == 2708
assert G.relationship_count() == 10556
3. Pipeline catalog basics
Once the dataset has been loaded, we can define a node classification machine learning pipeline.
# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")
We can check that the pipeline has actually been created with the
list
method:
# List all pipelines
gds.beta.pipeline.list()
# Alternatively, get the details of a specific pipeline object
gds.beta.pipeline.list(node_pipeline)
4. Configuring the pipeline
We can now configure the pipeline. As a reminder, we need to:
-
Select a subset of the available node properties to be used as features for the machine learning model
-
Configure the train/test split and the number of folds for k-fold cross-validation (optional)
-
Configure the candidate models for training
-
Configure autotuning (optional) In this example we use Logistic Regression as a candidate model for the training, but other algorithms (such as Random Forest) are available as well. We also set some reasonable starting parameters that can be further tuned according to the needed metrics.
Some hyperparameters such as penalty
can be single values or ranges.
If they are expressed as ranges, autotuning is used to search their best
value.
The configureAutoTuning
method can be used to set the number of
model candidates to try. Here we choose 5 to keep the training time
short.
# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])
# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)
# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)
5. Training the pipeline
The configured pipeline is now ready to select and train a model. We also run a training estimate, to make sure there are enough resources to run the actual training afterwards.
The Node Classification model supports several evaluation metrics. Here
we use the global metric F1_WEIGHTED
.
NOTE: The concurrency
parameter is explicitly set to 4 (the
default value) for demonstration purposes. The maximum concurrency in
the library is limited to 4 for Neo4j Community Edition.
# Estimate the resources needed for training the model
node_pipeline.train_estimate(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
We can inspect the result of the training, for example to print the evaluation metrics of the trained model.
# Uncomment to print all stats
# print(stats.to_json(indent=2))
# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]
6. Using the model for prediction
After training, the model is ready to classify unclassified data.
One simple way to use the predict
mode is to just stream the result
of the prediction. This can be impractical when a graph is very large,
so it should be only used for experimentation purposes.
predicted = model.predict_stream(
G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)
The result of the prediction is a Pandas DataFrame
containing the
predicted class and the predicted probabilities for all the classes for
each node.
predicted
The order of the classes in the predictedProbabilities
field is
given in the model information, and can be used to retrieve the
predicted probability for the predicted class.
Please note that the order in which the classes appear in the
predictedProbabilities
field is somewhat arbitrary, so the correct
way to access each probability is via the class index obtained from the
model, not its position.
# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)
# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)
predicted
7. Adding a data preprocessing step
The quality of the model can potentially be increased by adding more
features or by using different features altogether. One way is to use
algorithms such as FastRP that create embeddings based on both node
properties and graph features, which can be added via the
addNodeProperty
pipeline method. Such properties are “transient”,
in that they are automatically created and removed by the pipeline
itself.
In this example we also use the contextNodeLabels
parameter to
explicitly set the types of nodes we calculate the embeddings for, and
we include both the classified and the unclassified nodes. This is
useful because the more nodes are used, the better the generated
embeddings are. Although it may seem counterintuitive, unclassified
nodes do not need to be completely unobserved during training (so, for
instance, information on their neighbours can be retained). More
information can be found in graph ML publications such as the
Graph Representation Learning
Book.
node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")
# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
"fastRP",
mutateProperty="embedding",
embeddingDimension=512,
propertyRatio=1.0,
randomSeed=42,
featureProperties=["features"],
contextNodeLabels=["Paper", "UnclassifiedPaper"],
)
# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])
# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)
The training then proceeds as in the previous section:
# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
G,
targetNodeLabels=["Paper"],
modelName="cora-pipeline-model-fastrp",
targetProperty="subject",
metrics=["F1_WEIGHTED"],
randomSeed=42,
concurrency=4,
)
The F1_WEIGHTED
metrics is better with embeddings:
print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])
The classification using predict_stream
can be run in the same way:
predicted_fastrp = model_fastrp.predict_stream(
G,
modelName="cora-pipeline-model-fastrp",
includePredictedProbabilities=True,
targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))
Instead of streaming the results, the prediction can be run in
mutate
mode to be more performant, especially when the predicted
values are used multiple times. The predicted nodes can be retrieved
using the streamNodeProperty
method with the UnclassifiedPaper
class.
model_fastrp.predict_mutate(
G,
mutateProperty="predictedClass",
modelName="cora-pipeline-model-fastrp",
predictedProbabilityProperty="predictedProbabilities",
targetNodeLabels=["UnclassifiedPaper"],
)
predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp
This is useful to compare the result of classification with the original
subject
value of the test nodes, which must be retrieved from the
Neo4j database since it has been excluded from the projected graph.
# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())
# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])
# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")
As we can see, the prediction for all the test nodes is accurate.
8. Writing result back to Neo4j
Having the predicted class written back to the graph, we can now write them back to the Neo4j database.
Please note that this step is not applicable if you are running this notebook on AuraDS.
gds.graph.nodeProperties.write(
G,
node_properties=["predictedClass"],
node_labels=["UnclassifiedPaper"],
)
9. Cleanup
When the graph, the model and the pipeline are no longer needed, they should be dropped to free up memory. This only needs to be done if the Neo4j or AuraDS instance is not restarted, since a restart would clean up all the in-memory content anyway.
model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()
G.drop()
The Neo4j database instead needs to be cleaned up explicitly if no longer useful:
gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")
It is good practice to close the client as well:
gds.close()