Control results flow with reactive streams
In a reactive flow, consumers dictate the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server.
An example use-case is an application fetching records from a Neo4j server and doing some very time-consuming post-processing on each one. If the server were allowed to push records to the client as soon as it has them available, the client may be overflown with a lot of entries while its processing is still lagging behind. The Reactive API ensures that the receiving side is not forced to buffer arbitrary amounts of data.
The driver’s reactive implementation lives in the reactivestreams
sub-package and relies on the reactor-core
package from Project Reactor.
Install dependencies
To use reactive features, you need to add the relevant dependencies to your project first (refer to Reactor → Reference → Getting reactor).
-
Add Reactor’s BOM to your
pom.xml
in adependencyManagement
section. Notice that this is in addition to the regulardependencies
section. If adependencyManagement
section already exists in your pom, add only the contents.<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
-
Add the
reactor-core
dependency to thedependencies
section. Notice that the version tag is omitted (it is picked up from Reactor’s BOM).<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency>
Reactive query examples
The basic driver’s concepts are the same as the synchronous case, but queries are run through a ReactiveSession
, and the objects related to querying have a reactive counterpart and prefix.
Managed transaction with reactive sessions
.executeRead()
examplepackage demo;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
public class App {
public static void main(String... args) {
final String dbUri = "<URI for Neo4j database>";
final String dbUser = "<Username>";
final String dbPassword = "<Password>";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
Flux<Record> records = Flux.usingWhen( (1)
Mono.just(driver.session( (2)
ReactiveSession.class, (3)
SessionConfig.builder().withDatabase("neo4j").build()
)),
rxSession -> Mono.fromDirect(rxSession.executeRead( (4)
tx -> Mono
.fromDirect(tx.run("UNWIND range (1, 5) AS x RETURN x")) (5)
.flatMapMany(ReactiveResult::records) (6)
)),
ReactiveSession::close (7)
);
// block for demonstration purposes
List<Value> values = records.map(record -> record.get("x")).collectList().block(); (8)
System.out.println(values);
}
}
}
1 | Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction) is used to create a new session, run queries using it, and finally close it.
It will ensure the resource is alive for the time it is needed, and allows to specify the cleanup operation to undertake at the end. |
2 | .usingWhen() takes a resource supplier in the form of a Publisher , hence why session creation is wrapped in a Mono.just() call, which spawns a Mono from any value. |
3 | The session creation is similar to the async case, and the same configuration methods apply.
The difference is that the first argument must be ReactiveSession.class , and the return value is a ReactiveSession object. |
4 | The method ReactiveSession.executeRead() runs a read transaction and returns a Publisher with the callee’s return, which Mono.fromDirect() converts into a Mono . |
5 | The method tx.run() returns a Publisher<ReactiveResult> , which Mono.fromDirect() converts into a Mono . |
6 | Before the final result is returned, Mono.flatMapMany() retrieves the records from the result and returns them as a new Flux . |
7 | The final cleanup closes the session. |
8 | To show the result of the reactive workflow, .block() waits for the flow to complete so that values can be printed.
In a real application you wouldn’t block but rather forward the records publisher to your framework of choice, which would process them in a meaningful way. |
You may run several queries within the same reactive session through several calls to executeRead/Write() within the workerClosure .
|
Implicit transaction with reactive sessions
The following example is very similar to the previous one, except it uses an implicit transaction.
.run()
examplepackage demo;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
public class App {
public static void main(String... args) {
final String dbUri = "<URI for Neo4j database>";
final String dbUser = "<Username>";
final String dbPassword = "<Password>";
try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
driver.verifyConnectivity();
Flux<Record> records = Flux.usingWhen(
Mono.just(driver.session(
ReactiveSession.class,
SessionConfig.builder().withDatabase("neo4j").build()
)),
rxSession -> Mono
.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"))
.flatMapMany(ReactiveResult::records),
ReactiveSession::close
);
// block for demonstration purposes
List<Value> values = records.map(record -> record.get("x")).collectList().block();
System.out.println(values);
}
}
}
Always defer session creation
It’s important to remember that in reactive programming a Publisher doesn’t come to life until a Subscriber attaches to it. A Publisher is just an abstract description of your asynchronous process, but it’s only the act of subscribing that triggers the flow of data in the whole chain.
For this reason, always be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain.
Doing so may result in many open sessions, none doing work and all waiting for a Publisher to use them, potentially exhausting the number of available sessions for your application.
The previous examples use Flux.usingWhen()
to address this.
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Mono<ReactiveResult> rxResult = Mono.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"));
// until somebody subscribes to `rxResult`, the Publisher doesn't materialize, but the session is busy!
Glossary
- LTS
-
A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.
- Aura
-
Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.
- Cypher
-
Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.
- APOC
-
Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.
- Bolt
-
Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.
- ACID
-
Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.
- eventual consistency
-
A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.
- causal consistency
-
A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.
- NULL
-
The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with
null
. - transaction
-
A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.
- backpressure
-
Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.
- transaction function
-
A transaction function is a callback executed by an
executeRead
orexecuteWrite
call. The driver automatically re-executes the callback in case of server failure. - Driver
-
A
Driver
object holds the details required to establish connections with a Neo4j database.