Example CDC usage in Go

package main

import (


func applyChange(record *neo4j.Record) error { (1)
	jsonOutput, err := json.Marshal(asMap(record))
	if err != nil {
		return fmt.Errorf("unable to jsonify record: %w", err)

	fmt.Println(string(pretty.Color(pretty.Pretty(jsonOutput), pretty.TerminalStyle)))

	return nil

func queryChangeID(ctx context.Context, driver neo4j.DriverWithContext, database string, query string) (string, error) {
	result, err := neo4j.ExecuteQuery(ctx, driver, query, nil, neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase(database), neo4j.ExecuteQueryWithReadersRouting())
	if err != nil {
		return "", fmt.Errorf("unable to query change identifier: %w", err)

	if len(result.Records) != 1 {
		return "", fmt.Errorf("expected one record, but got %d", len(result.Records))

	id, _, err := neo4j.GetRecordValue[string](result.Records[0], "id")
	if err != nil {
		return "", fmt.Errorf("unable to extract id: %w", err)

	return id, nil

func asMap(record *neo4j.Record) map[string]any {
	result := make(map[string]any, len(record.Keys))

	for i := 0; i < len(record.Keys); i++ {
		result[record.Keys[i]] = record.Values[i]

	return result

type CDCService struct {
	driver    neo4j.DriverWithContext
	database  string
	waitGroup sync.WaitGroup
	cursor    atomic.Pointer[string]
	selectors []any

func (s *CDCService) queryChanges(ctx context.Context) error { (2)
	session := s.driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: s.database})

	current, err := s.currentChangeID(ctx)
	if err != nil {
		return err
	_, err = session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
		result, err := tx.Run(ctx, "CALL cdc.query($from, $selectors)", map[string]any{
			"from":      s.from(),
			"selectors": s.selectors,
		if err != nil {
			return "", err

		var record *neo4j.Record
		if !result.Peek(ctx) {
			s.setFrom(current) (3)
		} else {
			for result.NextRecord(ctx, &record) {
				err := applyChange(record) (4)
				if err != nil {
					return "", fmt.Errorf("error processing record: %w", err)

				id, isNil, err := neo4j.GetRecordValue[string](record, "id")
				if err != nil || isNil {
					return "", fmt.Errorf("missing or invalid id value returned")
				s.setFrom(id) (5)

		return s.from(), nil
	if err != nil {
		return fmt.Errorf("unable to query/process changes: %w", err)

	return nil

func (s *CDCService) earliestChangeID(ctx context.Context) (string, error) { (6)
	return queryChangeID(ctx, s.driver, s.database, "CALL cdc.earliest()")

func (s *CDCService) currentChangeID(ctx context.Context) (string, error) { (7)
	return queryChangeID(ctx, s.driver, s.database, "CALL cdc.current()")

func (s *CDCService) from() string {
	return *s.cursor.Load()

func (s *CDCService) setFrom(from string) {

func (s *CDCService) Start(ctx context.Context) error {
	if s.from() == "" {
		current, err := s.currentChangeID(ctx)
		if err != nil {
			return err

	go func(ctx context.Context) {
		defer func() {

		timer := time.NewTimer(0 * time.Millisecond)
		for {
			select {
			case <-ctx.Done():
			case <-timer.C:
					err := s.queryChanges(ctx)
					if err != nil {
						log.Printf("error querying/processing changes: %v", err)

					timer.Reset(500 * time.Millisecond) (8)

	return nil

func (s *CDCService) WaitForExit() {

func NewCDCService(uri string, username string, password string, database string, from string, selectors []any) (*CDCService, error) {
	driver, err := neo4j.NewDriverWithContext(uri, neo4j.BasicAuth(username, password, ""))
	if err != nil {
		return nil, fmt.Errorf("unable to create driver: %w", err)

	cdc := &CDCService{
		driver:    driver,
		database:  database,
		waitGroup: sync.WaitGroup{},
		cursor:    atomic.Pointer[string]{},
		selectors: selectors,

	return cdc, nil

var (
	address  string
	database string
	username string
	password string
	from     string

func main() {
	rootCmd := &cobra.Command{
		Run: func(cmd *cobra.Command, args []string) {
			ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)

			selectors := []any{
				//map[string]any{"select": "n", "labels": []string{"Person", "Employee"}}, (9)

			cdc, err := NewCDCService(address, username, password, database, from, selectors)
			if err != nil {

			if err := cdc.Start(ctx); err != nil {


	rootCmd.Flags().StringVarP(&address, "address", "a", "bolt://localhost:7687", "Bolt URI")
	rootCmd.Flags().StringVarP(&database, "database", "d", "", "Database")
	rootCmd.Flags().StringVarP(&username, "username", "u", "neo4j", "Username")
	rootCmd.Flags().StringVarP(&password, "password", "p", "passw0rd", "Password")
	rootCmd.Flags().StringVarP(&from, "from", "f", "", "Change identifier to query changes from")

1 This method is called once for each change.
2 This function fetches the changes from the database.
3 The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details.
4 A function is called once for each change.
5 Note that ExecuteRead may retry failing queries. To avoid seeing the same change twice, update the cursor as the changes are applied.
6 Use this function to get the earliest available change id.
7 Use this function to get the current change id.
8 The timer is reset so that queryChanges gets called repeatedly.
9 The results may be filtered to return a subset of changes. The out-commented line would select only node changes that have both Person and Employee labels.