Adding a folder listener
Together with other load procedures such as apoc.load.csv
or apoc.load.json
, it is possible to load files asynchronously.
This example creates nodes with the parameters supplied by the trigger, triggered only on create and modifications of files in the csvFolder:
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
where $fileName
is the file created/modified,
$filePath
is the relative path of the file, that is $IMPORT_DIR/csvFolder/[FILENAME.csv]
,
$fileDirectory
is the relative path of the directory, that is $IMPORT_DIR/csvFolder
and $listenEventType
is the triggered event, that is CREATE
or MODIFY
.
Given that our IMPORT_DIR is set to import
and the following file is uploaded to import/csvFolder
folder:
name,age
Selma,8
Rana,11
Selina,18
and then, executing MATCH (n:CsvToNode) RETURN properties(n) as props
:
props |
---|
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] } |
{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] } |
If we modify the test.csv
as follow:
name,age
Selma,80
Rana,110
Selina,180
we obtain 3 new nodes with these props:
props |
---|
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] } |
{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] } |
List and remove procedures
We can see the list of listeners executing:
CALL apoc.load.directory.async.list();
For example, if we execute a:
CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})
the result will be:
name | status | pattern | cypher | urlDir | config | error |
---|---|---|---|---|---|---|
"csvImport" |
"RUNNING" |
"*.csv" |
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})" |
<importUrlDir>/csvFolder |
{"listenEventType": ["CREATE", "MODIFY"], "interval": 1000 } |
"" |
We can remove a specific listener executing the remove procedure with the given name as a parameter:
CALL apoc.load.directory.async.remove('csvImport')
the result will be the list of remaining ones.
Moreover, we can remove everything with the following procedure (which will return an empty result):
CALL apoc.load.directory.async.removeAll()
Error handling
When for some reason, the listener fails, its status
field change from RUNNING
to ERROR
, and the associated error is output.
If we execute call apoc.load.directory.async.list
, we obtain, for example:
name | status | pattern | cypher | urlDir | config | error |
---|---|---|---|---|---|---|
|
|
|
|
|
|
|
Configuration
Please note that to use the apoc.load.directory.async.*
procedures, the following config needs to be enabled:
apoc.import.file.enabled=true
The following setting wil allow you to change the import folder
dbms.directories.import=import
It is possible to set apoc.import.file.use_neo4j_config=false
to search for files in an absolute paths:
CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');
Usage Examples
¦apoc.load.directory.async.list
|
The following procedures can be used to add, remove and list triggers: Each trigger consists of a listener observing one or more folders which will trigger the execution of a custom cypher query. The apoc.load.directory.async procedures are used for managing triggers.