import getopt
import json
import sys
import time
from threading import Thread
from neo4j import GraphDatabase
class CDCService:
def __init__(self, driver, database, start_cursor=None, selectors=None):
self.driver = driver
self.database = database
self.cursor = start_cursor
if self.cursor is None:
self.cursor = self.current_change_id()
self.selectors = selectors
def apply_change(self, record): (1)
record_dict = {
k: record.get(k)
for k in ('id', 'txId', 'seq', 'event', 'metadata')
}
print(json.dumps(record_dict, indent=2, default=repr))
def query_changes_query(self, tx):
current = self.current_change_id()
result = tx.run('CALL db.cdc.query($cursor, $selectors)', (2)
cursor=self.cursor, selectors=self.selectors)
if result.peek() == None:
self.cursor = current (3)
else:
for record in result:
try:
self.apply_change(record) (4)
except Exception as e:
print('Error whilst applying change', e)
break
self.cursor = record['id'] (5)
def query_changes(self):
with self.driver.session(database=self.database) as session:
session.execute_read(self.query_changes_query)
def earliest_change_id(self): (6)
records, _, _ = self.driver.execute_query(
'CALL db.cdc.earliest', database_=self.database)
return records[0]['id']
def current_change_id(self): (7)
records, _, _ = self.driver.execute_query(
'CALL db.cdc.current', database_=self.database)
return records[0]['id']
def run(self):
while True: (9)
self.query_changes()
time.sleep(0.5)
def main(argv):
# Default values
address = 'neo4j://localhost:7687'
database = 'neo4j'
username = 'neo4j'
password = 'passw0rd'
cursor = None
opts, _ = getopt.getopt(
argv, 'a:d:u:p:f:',
['address=', 'database=', 'username=', 'password=', 'from='])
for opt, arg in opts:
if opt in ('-a', '--address'):
address = arg
elif opt in ('-d', '--database'):
database = arg
elif opt in ('-u', '--username'):
username = arg
elif opt in ('-p', '--password'):
password = arg
elif opt in ('-f', '--from'):
cursor = arg
selectors = [ (8)
# {'select': 'n'}
]
with GraphDatabase.driver(address, auth=(username, password)) as driver:
cdc = CDCService(driver, database, cursor, selectors)
cdc_thread = Thread(target=cdc.run, daemon=True)
cdc_thread.start()
cdc_thread.join()
if __name__ == '__main__':
main(sys.argv[1:])