Periodic Execution
Cypher is great for querying graphs and importing and updating graph structures.
While during imports you can use PERIODIC COMMIT
to control transaction sizes in memory, but for other graph refactorings it’s not that easy to commit transactions regularly to free memory for new update state.
Procedure Overview
The table below describes the available procedures:
type | qualified name | signature | description |
---|---|---|---|
procedure |
|
|
apoc.periodic.iterate('statement returning items', 'statement per item', {batchSize:1000,iterateList:true,parallel:false,params:{},concurrency:50,retries:0}) YIELD batches, total - run the second statement for each item returned by the first statement. Returns number of batches and total processed rows |
procedure |
|
|
apoc.periodic.commit(statement,params) - runs the given statement in separate transactions until it returns 0 |
procedure |
|
|
apoc.periodic.rock_n_roll('some cypher for iteration', 'some cypher as action on each iteration', 10000) YIELD batches, total - run the action statement in batches over the iterator statement’s results in a separate thread. Returns number of batches and total processed rows |
Periodic Iterate
The apoc.periodic.iterate
procedure is helpful when you need to handle large amounts of data for import, refactoring, and other cases that require large transactions.
It provides a way to batch the data by dividing the workload into two parts:
- a data-driven statement
-
This defines how you select what data needs handled. You can provide a Cypher statement to select from existing graph data, read external data from a file or API, or retrieve data from another datastore.
- an operation statement
-
This defines what you want done to the selected data. You can do things like execute Cypher for updating or creating/deleting the data or run other procedures to manipulate and transform values before loading.
The data-driven statement is provided as the first statement that results in a stream of values to be processed.
The operation statement is provided as the second statement to process one element at a time or (with batchMode: "BATCH"
) a batch at a time.
The results of the data-driven statement are passed to the operation statement as parameters, so they are automatically made available with their names.
name | type | default | description |
---|---|---|---|
batchSize |
Long |
10000 |
run the specified number of operation statements in a single tx - params: {_count, _batch} |
parallel |
boolean |
false |
run operation statements in parallel (note that statements might deadlock if conflicting) |
retries |
Long |
0 |
if the operation statement fails with an error, sleep 100ms and retry until retries-count is reached - param {_retry} |
batchMode |
String |
"BATCH" |
how data-driven statements should be processed by operation statement. Valid values are:
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
params |
Map |
{} |
externally pass in map of params |
concurrency |
Long |
50 |
number of concurrent tasks are generated when using |
failedParams |
Long |
-1 |
if set to a non-negative value, each failed batch up to |
In APOC versions 4.0.0.11 and earlier, the
|
param | default | description |
---|---|---|
iterateList |
true |
execute operation statements once per batchSize (whole batchSize list is passed in as parameter {_batch})
|
Periodic Iterate Examples
Let’s go through some examples.
If you were to add an :Actor
label to several million :Person
nodes, you could run the following code:
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
Let’s break down the parameters passed to the procedure:
-
Our first Cypher statement selects all the
Person
nodes with anACTED_IN
relationship to another node and returns those persons. This is the data-driven portion where we select the data that we want to change. -
Our second Cypher statement sets the
:Actor
label on each of thePerson
nodes selected. This is the operation portion where we apply the change to the data from our first statement. -
And finally, we specify any configuration we want the procedure to use. We have defined a
batchSize
of 10,000 and to run the statements in parallel.
Executing this procedure would take all of our Person
nodes gathered in the first Cypher statement and update each of them with the second Cypher statement.
It divides the work into batches - taking 10,000 Person
nodes from the stream and updating them in a single transaction.
If we have 30,000 Person
nodes in our graph with an ACTED_IN
relationship, then it would break this down into 3 batches.
Finally, it runs those in parallel, as updating node labels or properties do not conflict.
For more complex operations like updating or removing relationships, either do not use parallel: true OR make sure that you batch the work in a way that each subgraph of data is updated in one operation, such as by transferring the root objects.
If you attempt complex operations, also enable retrying failed operations, e.g. with |
Now let us look at a more complex example.
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
"MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
Let’s break down the parameters passed to the procedure:
-
Our first Cypher statement selects all the
Order
nodes that have an order date greater thanOctober 13, 2016
(first Cypher statement). -
Our second Cypher statement takes those groups and finds the nodes that have a
HAS_ITEM
relationship to other nodes, then sums up the value of those items and sets that sum as a property (o.value
) for the total order value. -
Our configuration will batch those nodes into groups of 100 (
batchSize:100
) and run the batches in parallel for the second statement to process.
Batch mode: BATCH_SINGLE
If our operation statement calls a procedure that takes in a batch of values, we can use batchMode: "BATCH_SINGLE"
to get access to a batch of values to pass to that procedure.
When we use BATCH_SINGLE
, the operation statement will have access to the $_batch
parameter, which will contain a list of the fields returned in the data-driven statement.
For example, if the data driven statement is:
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
The contents of the $_batch
variable passed to the operation statement would be:
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
Let’s see an example of this in action. We’ll start by creating some nodes:
Person
and property id
UNWIND range(1,100000) as id create (:Person {id: id})
We can delete these nodes using the apoc.nodes.delete
procedure.
See Deleting Data.
This procedure takes in a list of nodes, which we can extract from the $_batch
parameter.
Person
nodes and deletes them in batches of 100CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN p",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
The contents of the $_batch
parameter that is used in the operation statement would be as follows:
[
{p: Node<1>},
{p: Node<2>},
...
]
We can use a list comprehension to extract the p
variable from each item in the list.
If we run this query, we’ll see the following output:
batch | operations |
---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |
Periodic Commit
Especially for graph processing it is useful to run a query repeatedly in separate transactions until it doesn’t process and generates any results anymore. So you can iterate in batches over elements that don’t fulfill a condition and update them so that they do afterwards.
as a safety net your statement used inside apoc.periodic.commit must contain a LIMIT clause.
|
The query is executed repeatedly in separate transactions until it returns 0.
call apoc.periodic.commit(
"match (user:User) WHERE exists( user.city )
with user limit {limit}
MERGE (city:City {name:user.city})
MERGE (user)-[:LIVES_IN]->(city)
REMOVE user.city
RETURN count(*)",
{limit:10000})
Updates | Executions |
---|---|
2000000 |
200 |