Example CDC usage in C#

using System.CommandLine;
using System.Text.Json;
using Neo4j.Driver;

#pragma warning disable CS4014

namespace Neo4j.CDC.Sample;

static class Program
    class CDCSample
        private readonly IDriver _driver;
        private readonly string? _database;
        private volatile string? _from;
        private readonly IEnumerable<object> _selectors;
        private readonly ManualResetEventSlim _event;

        public CDCSample(IDriver driver, string? database, string? from, IEnumerable<object>? selectors)
            _driver = driver ?? throw new ArgumentNullException(nameof(driver));
            _database = database;
            _from = from;
            _selectors = selectors ?? Enumerable.Empty<object>();
            _event = new ManualResetEventSlim();

        private static void ApplyChange(IRecord record) (1)
            var jsonText = JsonSerializer.Serialize(record.Values, new JsonSerializerOptions()
                WriteIndented = true


        private async Task QueryChanges(CancellationToken cancellation) (2)
            await using var session = _driver.AsyncSession(ConfigureSession(_database));
            var current = await CurrentChangeId();
            await session.ExecuteReadAsync(async tx =>
                var from = _from;
                var result = await tx.RunAsync("CALL db.cdc.query($from, $selectors)", new
                    selectors = _selectors,

                var processed = 0;
                await foreach (var record in result.WithCancellation(cancellation))
                    ApplyChange(record); (3)
                    _from = record["id"].As<string>(); (4)

                if (processed == 0)
                    _from = current; (5)

        private static Action<SessionConfigBuilder> ConfigureSession(string? database)
            return sc =>
                if (!string.IsNullOrEmpty(database))

        private async Task<string> EarliestChangeId() (6)
            var response = await _driver
                .ExecutableQuery("CALL db.cdc.earliest")
                .WithMap(record => record["id"].As<string>())

            return response.Result[0];

        private async Task<string> CurrentChangeId() (7)
            var response = await _driver
                .ExecutableQuery("CALL db.cdc.current")
                .WithMap(record => record["id"].As<string>())

            return response.Result[0];

        public async Task Start(CancellationToken cancellation)
            if (string.IsNullOrEmpty(_from))
                _from = await CurrentChangeId();

            Task.Factory.StartNew(async () =>
                    while (!cancellation.IsCancellationRequested)
                        await QueryChanges(cancellation);

                        await Task.Delay(TimeSpan.FromMilliseconds(500), cancellation); (8)
            }, cancellation, TaskCreationOptions.LongRunning, TaskScheduler.Current);

        public void WaitForExit()

    static async Task<int> Main(string[] args)
        var uriOption = new Option<string>("--address", () => "bolt://localhost:7687", "Bolt URI");
        var databaseOption = new Option<string?>("--database", () => "", "Database");
        var usernameOption = new Option<string>("--username", () => "neo4j", "Username");
        var passwordOption = new Option<string>("--password", () => "passw0rd", "Password");
        var fromOption = new Option<string?>("--from", () => null, "Change identifier to query changes from");

        var cmd = new RootCommand("Sample CDC application");

        cmd.SetHandler(ctx =>
            var cancellation = ctx.GetCancellationToken();
            var uri = ctx.ParseResult.GetValueForOption(uriOption);
            var database = ctx.ParseResult.GetValueForOption(databaseOption);
            var username = ctx.ParseResult.GetValueForOption(usernameOption);
            var password = ctx.ParseResult.GetValueForOption(passwordOption);
            var from = ctx.ParseResult.GetValueForOption(fromOption);

            DoRootCommand(cancellation, uri!, username!, password!, database, from)

        return await cmd.InvokeAsync(args);

    private static async Task DoRootCommand(CancellationToken cancellation, string uri, string username,
        string password,
        string? database, string? from)
            var selectors = new List<object>
                // new (9)
                // {
                //     select = "n", labels = new[] { "Person", "Employee" }
                // },
            await using var driver = GraphDatabase.Driver(uri, AuthTokens.Basic(username, password));
            var service = new CDCSample(driver, database, from, selectors);

            await service.Start(cancellation);

            await Console.Out.WriteLineAsync("starting...");
            await Console.Out.WriteLineAsync("quitting...");
        catch (Exception e)
            await Console.Error.WriteLineAsync("Error: " + e);
1 This method is called once for each change event. It should be replaced based on your use case.
2 This method fetches the changes from the database.
3 This method is called once for each change.
4 Note that ExecuteReadAsync may retry failing queries. To avoid seeing the same change twice, update the cursor as the changes are applied.
5 The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details.
6 Use this function to get the earliest available change id.
7 Use this function to get the current change id.
8 Wait for 500 milliseconds so that QueryChanges gets called repeatedly.
9 The results may be filtered to return a subset of changes. The out-commented line would select only node changes that have both Person and Employee labels.