Adding a folder listener
Together with other load procedures such as apoc.load.csv or apoc.load.json, it is possible to load files asynchronously.
This example creates nodes with the parameters supplied by the trigger, triggered only on create and modifications of files in the csvFolder:
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
where $fileName is the file created/modified,
$filePath is the relative path of the file, that is $IMPORT_DIR/csvFolder/[FILENAME.csv],
$fileDirectory is the relative path of the directory, that is $IMPORT_DIR/csvFolder
and $listenEventType is the triggered event, that is CREATE or MODIFY.
Given that our IMPORT_DIR is set to import and the following file is uploaded to import/csvFolder folder:
name,age
Selma,8
Rana,11
Selina,18
and then, executing MATCH (n:CsvToNode) RETURN properties(n) as props:
| props |
|---|
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] } |
If we modify the test.csv as follow:
name,age
Selma,80
Rana,110
Selina,180
we obtain 3 new nodes with these props:
| props |
|---|
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] } |
List and remove procedures
We can see the list of listeners executing:
CALL apoc.load.directory.async.list();
For example, if we execute a:
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
the result will be:
| name | status | pattern | cypher | urlDir | config | error |
|---|---|---|---|---|---|---|
"csvImport" |
"RUNNING" |
"*.csv" |
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})" |
<importUrlDir>/csvFolder |
{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 } |
"" |
We can remove a specific listener executing the remove procedure with the given name as a parameter:
CALL apoc.load.directory.async.remove('csvImport')
the result will be the list of remaining ones.
Moreover, we can remove everything with the following procedure (which will return an empty result):
CALL apoc.load.directory.async.removeAll()
Error handling
When for some reason, the listener fails, its status field change from RUNNING to ERROR, and the associated error is output.
If we execute call apoc.load.directory.async.list, we obtain, for example:
| name | status | pattern | cypher | urlDir | config | error |
|---|---|---|---|---|---|---|
|
|
|
|
|
|
|
Configuration
Please note that to use the apoc.load.directory.async.* procedures, the following config needs to be enabled:
apoc.import.file.enabled=true
The following setting wil allow you to change the import folder
dbms.directories.import=import
It is possible to set apoc.import.file.use_neo4j_config=false to search for files in an absolute paths:
CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');
Usage Examples
¦apoc.load.directory.async.list
|
The following procedures can be used to add, remove and list triggers: Each trigger consists of a listener observing one or more folders which will trigger the execution of a custom cypher query. The apoc.load.directory.async procedures are used for managing triggers.