Create a job specification file

The job configuration file instructs Dataflow on how to run the import (where to source the data from, how to map it into Neo4j, etc). It consists of a JSON object with four sections.

Job specification JSON skeleton
{
  "config": { ... },  (1)
  "sources": [  (2)
    { ... }
  ],
  "targets": [  (3)
    { ... }
  ],
  "actions": [  (4)
    { ... }
  ]
}
1 config — Global flags affecting how the import is performed (optional)
2 sources — Data source definitions (relational)
3 targets — Data target definitions (graph: nodes/relationships/Cypher queries)
4 actions — One-off actions (optional)

At a high level, the job fetches data from sources and transforms/imports them into the targets.

{
  "config": {
    "reset_db": true,
    "index_all_properties": false
  },
  "sources": [
    {
      "type": "bigquery",
      "name": "persons",
      "query": "SELECT person_tmdbId, name, bornIn, born, died FROM team-connectors-dev.movies.persons"
    },
    {
      "type": "bigquery",
      "name": "movies",
      "query": "SELECT movieId, title, imdbRating, year FROM team-connectors-dev.movies.movies"
    },
    {
      "type": "bigquery",
      "name": "directed",
      "query": "SELECT movieId, person_tmdbId FROM team-connectors-dev.movies.directed"
    },
    {
      "type": "bigquery",
      "name": "acted_in",
      "query": "SELECT movieId, person_tmdbId, role FROM team-connectors-dev.movies.acted_in"
    }
  ],
  "targets": [
    {
      "node": {
        "source": "persons",
        "name": "Person",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "labels": [
            "\"Person\""
          ],
          "properties": {
            "keys": [
              {"person_tmdbId": "id"}
            ],
            "unique": [],
            "indexed": [
              {"name": "name"}
            ],
            "strings": [
              {"bornIn": "born_in"}
            ],
            "dates": [
              {"born": "born"},
              {"died": "died"}
            ]
          }
        }
      }
    },
    {
      "node": {
        "source": "movies",
        "name": "Movies",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "labels": [
            "\"Movie\""
          ],
          "properties": {
            "keys": [
              {"movieId": "id"}
            ],
            "unique": [],
            "indexed": [
              {"title": "title"}
            ],
            "floats": [
              {"imdbRating": "rating"},
              {"year": "year"}
            ]
          }
        }
      }
    },
    {
      "edge": {
        "source": "directed",
        "name": "Directed",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "type": "\"DIRECTED\"",
          "source": {
            "label": "\"Person\"",
            "key": {"person_tmdbId": "id"}
          },
          "target": {
            "label": "\"Movie\"",
            "key": {"movieId": "id"}
          },
          "properties": {}
        }
      }
    },
    {
      "edge": {
        "source": "acted_in",
        "name": "Acted_in",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "type": "\"ACTED_IN\"",
          "source": {
            "label": "\"Person\"",
            "key": {"person_tmdbId": "id"}
          },
          "target": {
            "label": "\"Movie\"",
            "key": {"movieId": "id"}
          },
          "properties": {
            "strings": [
              {"role": "role"}
            ]
          }
        }
      }
    }
  ]
}

Configuration

The config object contains global configuration for the import job. All settings have a default, so you don’t need to specify them unless you wish to alter them.

Configuration settings and their defaults
"config": {
  "reset_db": false,
  "index_all_properties": false,
  "node_target_batch_size": 5000,
  "relationship_target_batch_size": 1000,
  "query_target_batch_size": 1000,
  "node_target_parallelism": 10,
  "relationship_target_parallelism": 1,
  "query_target_parallelism": 1
}
  • reset_db (bool) — Whether to clear the target database before importing. Deletes data, indexes, and constraints.

  • index_all_properties (bool) — Whether to create indexes for all properties. See Cypher® → Search-performance indexes.

  • node_target_batch_size (int) — Number of nodes to be processed for each transaction.

  • relationship_target_batch_size (int) — Number of relationships to be processed for each transaction.

  • query_target_batch_size (int) — Number of rows to be processed for each custom query.

  • node_target_parallelism (int) — Number of max concurrent transactions for node targets per worker.

  • relationship_target_parallelism (int) — Number of max concurrent transactions for relationship targets per worker. Values higher than 1 should be set with care, as they may result in deadlocks.

  • query_target_parallelism (int) — Number of max concurrent transactions for Cypher query targets per worker. Values higher than 1 should be set with care, as they may result in deadlocks.

Sources

The sources section contains the definitions of the data sources, as a list. As a rough guideline, you can think one table <=> one source. The importer will leverage the data surfaced by the sources and make it available to the targets, which eventually map it into Neo4j.

To import a BigQuery dataset, three attributes are compulsory.

{
  "type": "bigquery",
  "name": "movies",
  "query": "SELECT movieId, title FROM team-connectors-dev.movies.movies"
}
  • type (string) — bigquery.

  • name (string) — a human-friendly label for the source (unique among all sources). You will use this to reference the source in the targets section.

  • query (string) — the dataset to extract from BigQuery, as an SQL query. Notice that:

    1. the source BigQuery table can have more columns than what you select in the query;

    2. multiple targets can use the same source, even filtering it for a subset of columns.

Columns of type BIGNUMERIC, GEOGRAPHY, JSON, INTERVAL and STRUCT are not supported.

Targets

The targets section contains the definitions of the graph entities that will result from the import.

Neo4j represents objects with nodes (ex. movies, people) and connects them with relationships (ex. ACTED_IN, DIRECTED). Each object in the targets section will generate a corresponding entity (node or relationship) in Neo4j drawing data from a source. It is also possible to run custom Cypher queries.

Targets specification skeleton
"targets": {
  "nodes": [ ... ],
  "relationships": [ ... ],
  "queries": [ ... ]
}

By default, you don’t need to think about dependencies between nodes and relationships. Relationship targets are always processed after the targets corresponding to their start and end node. It is however possible to add other targets as dependencies.

Node objects

Node entities must be grouped in a list keyed nodes inside the targets object.

Node targets specification skeleton
"targets": {
  "nodes": [
    { <nodeSpec1> },
    { <nodeSpec2> },
    ...
  ]
}

Compulsory fields

Each node object must at minimum have attributes source, name, labels, and properties.

{
  "source": "<sourceName>",
  "name": "<targetName>",
  "labels": ["<label1>", "<label2>", ...],
  "properties": [
    {
      "source_field": "<bigQueryColumnName>",
      "target_field": "<neo4jPropertyName>",
      "target_property_type": "<neo4jPropertyType>"
    },
    { <propertyObj2> },
    ...
  ],
}
  • source (string) — The name of the source this target should draw data from. Should match one of the names from the sources objects.

  • name (string) — A human-friendly name for the target (unique among all names).

  • labels (list of strings) — Labels to mark the nodes with.

  • properties (list of objects) — Mapping between source columns and node properties.
    Valid values for target_property_type are: boolean, byte_array (assumes base64 encoding), date, duration, float, integer, local_date, local_datetime, local_time, point, string, zoned_datetime, zoned_time. {target_property_type-valid-values-extra}

Schema definition

You may create indexes and constraints on the imported nodes through the schema object. The schema setup is equivalent to manually running the relevant CREATE INDEX/CONSTRAINT commands, except they are run automatically ahead of import for each entity type.

If the global config index_all_properties is set to true, all properties will be indexed with range indexes.
Node target schema definition and their defaults
{
  ...
  "schema": {
    "enable_type_constraints": true,
    "key_constraints": [
      {
        "name": "<constraintName>",
        "label": "<label>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "unique_constraints": [
      {
        "name": "<constraintName>",
        "label": "<label>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "existence_constraints": [
      {
        "name": "<constraintName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>"
      }
    ],
    "range_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
      }
    ],
    "text_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "point_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "fulltext_indexes": [
      {
        "name": "<indexName>",
        "labels": ["label1", "label2", ...],
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "vector_indexes": [
      {
        "name": "<indexName>",
        "label": "<label>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ]
  }
}

Where the attributes for each object are:

  • name (string) — The name of the index or constraint to be created in Neo4j.

  • label (string) or labels (list of strings) — The label(s) on which the index or constraint should be enforced upon.

  • property (string) or properties (list of strings) — The property(s) on which the index or constraint should be enforced upon.

  • options (object) — The options with which the index or constraint should be created with (refer to the individual pages for each index and constraint type). When present, it is optional, except for vector indexes where it is mandatory.

Source data must not have null values for key_constraints columns, or they will clash with the node key constraint. If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL). Alternatively, use the where attribute in a source transformation.
The options key_constraints and unique_constraints require Neo4j/Aura Enterprise Edition, and do not have any effect when run against a Neo4j Community Edition installation.

Configuration

Node target config options and their defaults
{
  ...
  "active": true,
  "write_mode": "merge",
  "source_transformations": {
    "enable_grouping": true
  },
  "depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
  • active (bool) — Whether the target should be included in the import (default: true).

  • write_mode (string) — The creation mode in Neo4j. Either create or merge (default). See CREATE and MERGE for info on the Cypher clauses behavior.

  • source_transformations (object) — If enable_grouping is set to true, the import will append the SQL clause GROUP BY on all fields specified in key_constraints and properties. If set to false, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions and further fields, see Source transformations.

  • depends_on (list of strings) — The name of the target(s) that should execute before the current one.

Example

A node object example for import of Person nodes
{
  "source": "persons",
  "name": "Person",
  "labels": [ "Person" ],
  "properties": [
    {
      "source_field": "person_tmdbId",
      "target_field": "id",
      "target_property_type": "string"
    },
    {
      "source_field": "name",
      "target_field": "name",
      "target_property_type": "string"
    },
    {
      "source_field": "bornIn",
      "target_field": "bornLocation",
      "target_property_type": "string"
    },
    {
      "source_field": "born",
      "target_field": "bornDate",
      "target_property_type": "local_date"
    },
    {
      "source_field": "died",
      "target_field": "diedDate",
      "target_property_type": "local_date"
    }
  ],
  "schema": {
    "key_constraints": [
      {
        "name": "personIdKey",
        "label": "Person",
        "properties": ["id"]
      }
    ],
    "unique_constraints": [
      {
        "name": "personNameUnique",
        "label": "Person",
        "properties": ["name"]
      }
    ]
  }
}

Relationship objects

Relationship entities must be grouped in a list keyed relationships inside the targets object.

Relationship targets specification skeleton
"targets": {
  ...
  "relationships": [
    { <relationshipSpec1> },
    { <relationshipSpec2> },
    ...
  ]
}

Compulsory fields

Each relationship object must at minimum have attributes source, name, and type.

It must also contain information about which node targets the relationship links together. You provide this through start_node_reference and end_node_reference.

{
  "source": "<sourceName>",
  "name": "<targetName>",
  "type": "<relationshipType>",
  "start_node_reference": "<nodeTargetName>",
  "end_node_reference": "<nodeTargetName>"
}
  • source (string) — The name of the source this target should draw data from. Should match one of the names from the sources objects.

  • name (string) — A human-friendly name for the target (unique among all names).

  • type (string) — Type to assign to the relationship.

  • start_node_reference (string) — The name of the node target that acts as start for the relationship.

  • end_node_reference (string) — The name of the node target that acts as end for the relationship.

keys, unique and mandatory options require Aura or Neo4j Enterprise Edition, and will not have any effect when run against a Neo4j Community Edition installation.

Properties

Relationships may also map source columns as properties.

{
  ...
  "properties": [
    {
      "source_field": "<bigQueryColumnName>",
      "target_field": "<neo4jPropertyName>",
      "target_property_type": "<neo4jPropertyType>"
    },
    { <propertyObj2> },
    ...
  ]
}
  • properties (list of objects) — Mapping between source columns and relationship properties.
    Valid values for target_property_type are: boolean, byte_array (assumes base64 encoding), date, duration, float, integer, local_date, local_datetime, local_time, point, string, zoned_datetime, zoned_time. {target_property_type-valid-values-extra}

Schema definition

You may create indexes and constraints on the imported relationships through the schema object. The schema setup is equivalent to manually running the relevant CREATE INDEX/CONSTRAINT commands, except they are run automatically ahead of import for each relationship type.

If the global config index_all_properties is set to true, all properties will be indexed with range indexes.
Relationship target schema definition and their defaults
{
  ...
  "schema": {
    "enable_type_constraints": true,
    "key_constraints": [
      {
        "name": "<constraintName>",
        "type": "<relationshipType>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "unique_constraints": [
      {
        "name": "<constraintName>",
        "type": "<relationshipType>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "existence_constraints": [
      {
        "name": "<constraintName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>"
      }
    ],
    "range_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
      }
    ],
    "text_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "point_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ],
    "fulltext_indexes": [
      {
        "name": "<indexName>",
        "types": ["<relationshipType1>", "<relationshipType2>", ...],
        "properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
        "options": {}
      }
    ],
    "vector_indexes": [
      {
        "name": "<indexName>",
        "type": "<relationshipType>",
        "property": "<neo4jPropertyName>",
        "options": {}
      }
    ]
  }
}

Where the attributes for each object are:

  • name (string) — The name of the index or constraint to be created in Neo4j.

  • type (string) — The type on which the index or constraint should be enforced upon.

  • property (string) or properties (list of strings) — The property(s) on which the index or constraint should be enforced upon.

  • options (object) — The options with which the index or constraint should be created with (refer to the individual pages for each index and constraint type). When present, it is optional, except for vector indexes where it is mandatory.

Source data must not have null values for key_constraints columns, or they will clash with the relationship key constraint. If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL). Alternatively, use the where attribute in a source transformation.
The options key_constraints and unique_constraints require Neo4j/Aura Enterprise Edition, and do not have any effect when run against a Neo4j Community Edition installation.

Configuration

Relationship target config options and their defaults
{
  ...
  "active": true,
  "node_match_mode": "merge",
  "write_mode": "merge",
  "source_transformations": {
    "enable_grouping": true
  },
  "depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
  • active (bool) — Whether the target should be included in the import.

  • node_match_mode (string) — What Cypher clause to use to fetch the source/end nodes ahead of creating a relationship between them. Valid values are create, match, or merge (default), respectively resulting in the Cypher clauses CREATE, MATCH, and MERGE.

  • write_mode (string) — The creation mode in Neo4j. Either create or merge (default). See CREATE and MERGE for info on the Cypher clauses behavior.

  • source_transformations (object) — If enable_grouping is set to true, the import will SQL GROUP BY on all fields specified in key_constraints and properties. If set to false, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions and further fields, see Source transformations.

  • depends_on (list of strings) — The name of the target(s) that should execute before the current one.

Example

A relationship object example for import of ACTED_IN relationships
{
  "source": "acted_in",
  "name": "Acted_in",
  "type": "ACTED_IN",
  "start_node_reference": "Persons",
  "end_node_reference": "Movies",
  "properties": [
    {
      "source_field": "role",
      "target_field": "role",
      "target_property_type": "string"
    }
  ]
}

Custom query targets

Custom query targets are useful when the import requires a complex query that does not easily fit into the node/relationship targets format. Query targets receive batches of rows through the variable $rows.

Custom queries must be grouped in a list keyed queries inside the targets object.

Query targets specification skeleton
"targets": {
  ...
  "queries": [
    { <querySpec1> },
    { <querySpec2> },
    ...
  ]
}
Do not use custom queries to run Cypher that does not directly depend on a source; use actions instead. One-off queries, especially if not idempotent, are not fit to use in custom query targets. The reason for this is that queries from targets are run in batches, so a custom query may be run several times depending on the number of $rows batches extracted from the source.

Compulsory fields

Each query target must at minimum have attributes source, name, and query.

{
  "source": "<sourceName>",
  "name": "<targetName>",
  "query": "<cypherQuery>"
}
  • source (string) — The name of the source this target should draw data from. Should match one of the names from the sources objects.

  • name (string) — A human-friendly name for the target (unique among all names).

  • query (string) — A Cypher query. Data from the source is available as a list in the parameter $rows.

Configuration

Query target config options and their defaults
{
  ...
  "active": true,
  "depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
  • active (bool) — Whether the target should be included in the import.

  • depends_on (list of strings) — The name of the target(s) that should execute before the current one.

Example

A query object example for import of Person nodes and setting a date on creation
{
  "custom_query": {
    "name": "Person nodes",
    "source": "persons",
    "query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
  }
}

Source transformations

Each node and relationship target can optionally have a source_transformation attribute containing aggregation functions. This can be useful to extract higher-level dimensions from a more granular source. Aggregations result in extra fields that become available for property mappings.

"source_transformations": {
  "enable_grouping": true,
  "aggregations": [ {
    "expression": "",
    "field_name": ""
   },
   { aggregationObj2 }, ...
  ],
  "limit": -1,
  "where": "",
  "order_by": [
    {
      "expression": "column_name",
      "order": "<asc/desc>"
    },
    { orderObj2 }, ...
  ],
}
  • enable_grouping (bool) — Must be true for aggregations/where to work.

  • aggregations (list of objects) — Aggregations are specified as SQL queries in the expression attribute, and the result is available as a source column under the name specified in field_name.

  • limit (int) — Caps the number of source rows that are considered for import (defaults to no limit, encoded as -1).

  • where (string) — Filters out source data prior to import (with an SQL WHERE clause format).

  • order_by (list of objects) — Enforces ordering on the source.

Example

A transformation object example on a fictitious data set
{
  "enable_grouping": true,
  "aggregations": [
    {
      "expression": "SUM(unit_price*quantity)",
      "field_name": "total_amount_sold"
    },
    {
      "expression": "SUM(quantity)",
      "field_name": "total_quantity_sold"
    }
  ],
  "limit": 50,
  "where": "sourceId IS NOT NULL"
}

Actions

The actions section contains commands that can be run before or after specific steps of the import process. Each step is called a stage. You may for example submit HTTP requests when steps complete, execute SQL queries on the source, or run Cypher statements on the Neo4j target instance.

Actions specification skeleton
  ...
  "actions": [
    { <actionSpec1> },
    { <actionSpec2> },
    ...
  ]

Each action object must at minimum have the attribute name, type, and stage. Further attributes depend on the action type.

{
  "type": "http",
  "name": "<actionName>",
  "stage": "<stageName>",
  "method": "<get/post>",
  "url": "<targetUrl>",
  "headers": {}
}
  • type (string) — The action type.

  • name (string) —  A human-friendly name for the action (unique among all names).

  • stage (string) — At what point of the import the action should run. Valid values are: start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end.

  • method (string) — The HTTP method; either get or post.

  • url (string) — The URL the HTTP request should target.

  • headers (object, optional) — Request headers.

Action example for sending a GET request after import completes
{
  "type": "http",
  "name": "Post load ping",
  "stage": "end",
  "method": "get",
  "url": "https://neo4j.com/success",
  "headers": {
    "secret": "314159",
    "moreSecret": "17320"
  }
}
{
  "type": "cypher",
  "name": "<actionName>",
  "stage": "<stageName>",
  "query": "<cypherQuery>",
  "execution_mode": "<transaction/autocommit>"
}
  • type (string) — The action type.

  • name (string) —  A human-friendly name for the action (unique among all names).

  • stage (string) — At what point of the import the action should run. Valid values are: start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end.

  • query (string) — The Cypher query to run.

  • execution_mode (string, optional) — Under what mode the query should be executed. Valid values are transaction, autocommit (default: transaction).

Action example for creating an importJob node after import completes
{
  "type": "cypher",
  "name": "Post load log",
  "stage": "end",
  "query": "MERGE (:importJob {date: datetime()})"
}
{
  "type": "bigquery",
  "name": "<actionName>",
  "stage": "<stageName>",
  "sql": "<sqlQuery>"
}
  • type (string) — The action type.

  • name (string) —  A human-friendly name for the action (unique among all names).

  • stage (string) — At what point of the import the action should run. Valid values are: start, post_sources, pre_nodes, post_nodes, pre_relationships, post_relationships, pre_queries, post_queries, end.

  • sql (string) — The SQL query to run.

Action example for sending a GET request after import completes
{
  "type": "bigquery",
  "name": "Post load log",
  "stage": "end",
  "sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}

Variables

Key-values can be supplied in Dataflow to replace $ delimited tokens. You can provide parameters in the Options JSON field when creating the Dataflow job, as a JSON object. Variable interpolation works in:

  • BigQuery source query (SQL)

  • Text source URL

  • Custom Cypher target query

  • BigQuery action SQL

  • Cypher action query

  • HTTP GET/POST URL and header values.

Variables must be prefixed by the $ symbol (ex. $limit), and may be used in job specification files and in readQuery or inputFilePattern (source URI) command-line parameters.