Skip to content

REF CURSOR Support #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 17, 2022
4 changes: 2 additions & 2 deletions .github/workflows/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ echo "HOST=localhost" >> src/test/resources/config.properties
echo "PORT=1521" >> src/test/resources/config.properties
echo "USER=test" >> src/test/resources/config.properties
echo "PASSWORD=test" >> src/test/resources/config.properties
echo "CONNECT_TIMEOUT=60" >> src/test/resources/config.properties
echo "SQL_TIMEOUT=60" >> src/test/resources/config.properties
echo "CONNECT_TIMEOUT=120" >> src/test/resources/config.properties
echo "SQL_TIMEOUT=120" >> src/test/resources/config.properties
mvn clean compile test
42 changes: 36 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,13 @@ for the out parameters is emitted last, after the `Result` for each cursor.
Oracle R2DBC supports type mappings between Java and SQL for non-standard data
types of Oracle Database.

| Oracle SQL Type | Java Type |
|---------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
| [JSON](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E441F541-BA31-4E8C-B7B4-D2FB8C42D0DF) | `javax.json.JsonObject` or `oracle.sql.json.OracleJsonObject` |
| [DATE](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-5405B652-C30E-4F4F-9D33-9A4CB2110F1B) | `java.time.LocalDateTime` |
| [INTERVAL DAY TO SECOND](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-B03DD036-66F8-4BD3-AF26-6D4433EBEC1C) | `java.time.Duration` |
| [INTERVAL YEAR TO MONTH](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-ED59E1B3-BA8D-4711-B5C8-B0199C676A95) | `java.time.Period` |
| Oracle SQL Type | Java Type |
|---------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------|
| [JSON](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E441F541-BA31-4E8C-B7B4-D2FB8C42D0DF) | `javax.json.JsonObject` or `oracle.sql.json.OracleJsonObject` |
| [DATE](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-5405B652-C30E-4F4F-9D33-9A4CB2110F1B) | `java.time.LocalDateTime` |
| [INTERVAL DAY TO SECOND](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-B03DD036-66F8-4BD3-AF26-6D4433EBEC1C) | `java.time.Duration` |
| [INTERVAL YEAR TO MONTH](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-ED59E1B3-BA8D-4711-B5C8-B0199C676A95) | `java.time.Period` |
| [SYS_REFCURSOR](https://docs.oracle.com/en/database/oracle/oracle-database/21/lnpls/static-sql.html#GUID-470A7A99-888A-46C2-BDAF-D4710E650F27) | `io.r2dbc.spi.Result` |
> Unlike the standard SQL type named "DATE", the Oracle Database type named
> "DATE" stores values for year, month, day, hour, minute, and second. The
> standard SQL type only stores year, month, and day. LocalDateTime objects are able
Expand Down Expand Up @@ -553,6 +554,35 @@ prefetched entirely, a smaller prefetch size can be configured using the
option, and the LOB can be consumed as a stream. By mapping LOB columns to
`Blob` or `Clob` objects, the content can be consumed as a reactive stream.

### REF Cursors
Use the `oracle.r2dbc.OracleR2dbcTypes.REF_CURSOR` type to bind `SYS_REFCURSOR` out
parameters:
```java
Publisher<Result> executeProcedure(Connection connection) {
connection.createStatement(
"BEGIN example_procedure(:cursor_parameter); END;")
.bind("cursor_parameter", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
.execute()
}
```
A `SYS_REFCURSOR` out parameter can be mapped to an `io.r2dbc.spi.Result`:
```java
Publisher<Result> mapOutParametersResult(Result outParametersResult) {
return outParametersResult.map(outParameters ->
outParameters.get("cursor_parameter", Result.class));
}
```
The rows of a `SYS_REFCURSOR` may be consumed from the `Result` it is
mapped to:
```java
Publisher<ExampleObject> mapRefCursorRows(Result refCursorResult) {
return refCursorResult.map(row ->
new ExampleObject(
row.get("id_column", Long.class),
row.get("value_column", String.class)));
}
```

## Secure Programming Guidelines
The following security related guidelines should be adhered to when programming
with the Oracle R2DBC Driver.
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/oracle/r2dbc/OracleR2dbcTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package oracle.r2dbc;

import io.r2dbc.spi.Result;
import io.r2dbc.spi.Type;
import oracle.sql.json.OracleJsonObject;

Expand Down Expand Up @@ -92,6 +93,12 @@ private OracleR2dbcTypes() {}
public static final Type TIMESTAMP_WITH_LOCAL_TIME_ZONE =
new TypeImpl(LocalDateTime.class, "TIMESTAMP WITH LOCAL TIME ZONE");

/**
* A cursor that is returned by a procedural call.
*/
public static final Type REF_CURSOR =
new TypeImpl(Result.class, "SYS_REFCURSOR");

/**
* Implementation of the {@link Type} SPI.
*/
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/oracle/r2dbc/impl/DependentCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package oracle.r2dbc.impl;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>
* A count of resources that depend on another resource to remain open. A
* dependent resource registers itself by incrementing the count, and
* deregisters itself by decrementing the count. The last dependent to
* deregister has the responsibility of subscribing to a {@code Publisher} that
* closes the resource it depended upon.
* </p><p>
* This class is conceptually similar to a {@code java.util.concurrent.Phaser}.
* Parties register by calling {@link #increment()}, and deregister by calling
* {@link #decrement()}. Asynchronous "phase advancement" is then handled by
* the {@code Publisher} which {@code decrement} returns.
* </p><p>
* This class offers a solution for tracking the consumption of
* {@link io.r2dbc.spi.Result} objects that depend on a JDBC statement to remain
* open until each result is consumed. Further explanations can be found in the
* JavaDocs of {@link OracleStatementImpl} and {@link OracleResultImpl}.
* </p>
*/
class DependentCounter {

/** Count of dependents */
private final AtomicInteger count = new AtomicInteger(0);

/** Publisher that closes the depended upon resource */
private final Publisher<Void> closePublisher;

/**
* Constructs a new counter that returns a resource closing publisher to the
* last dependent which unregisters. The counter is initialized with a count
* of zero.
* @param closePublisher Publisher that closes a resource. Not null.
*/
DependentCounter(Publisher<Void> closePublisher) {
this.closePublisher = closePublisher;
}

/**
* Increments the count of dependents by one.
* <em>
* A corresponding call to {@link #decrement()} MUST occur by the dependent
* which has called {@code increment()}
* </em>
*/
void increment() {
count.incrementAndGet();
}

/**
* <p>
* Returns a publisher that decrements the count of dependents by one when
* subscribed to.
* <em>
* A corresponding call to {@link #increment()} MUST have previously occurred
* by the dependent which has called {@code decrement()}
* </em>
* </p><p>
* The dependent which has called this method MUST subscribe to the returned
* published. If the dependent that calls this method is the last dependent to
* do so, then the returned publisher will close the depended upon resource.
* Otherwise, if more dependents remain, the returned publisher does nothing.
* The caller of this method has no way to tell which is the case, so it must
* subscribe to be safe.
* </p>
* @return A publisher that closes the depended upon resource after no
* dependents remain. Not null.
*/
Publisher<Void> decrement() {
return Mono.defer(() ->
count.decrementAndGet() == 0
? Mono.from(closePublisher)
: Mono.empty());
}

}
78 changes: 60 additions & 18 deletions src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.r2dbc.spi.R2dbcException;

import io.r2dbc.spi.R2dbcType;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Type;
Expand All @@ -37,6 +38,7 @@
import oracle.r2dbc.impl.ReadablesMetadata.RowMetadataImpl;

import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -65,19 +67,27 @@ class OracleReadableImpl implements io.r2dbc.spi.Readable {
/** Metadata of the values of this {@code Readable}. */
private final ReadablesMetadata<?> readablesMetadata;

/**
* A collection of results that depend on the JDBC statement which created
* this readable to remain open until all results are consumed.
*/
private final DependentCounter dependentCounter;

/**
* <p>
* Constructs a new {@code Readable} that supplies values of a
* {@code jdbcReadable} and obtains metadata of the values from
* {@code resultMetadata}.
* </p>
*
* @param jdbcReadable Readable values from a JDBC Driver. Not null.
* @param readablesMetadata Metadata of each value. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OracleReadableImpl(
JdbcReadable jdbcReadable, ReadablesMetadata<?> readablesMetadata,
ReactiveJdbcAdapter adapter) {
DependentCounter dependentCounter, JdbcReadable jdbcReadable,
ReadablesMetadata<?> readablesMetadata, ReactiveJdbcAdapter adapter) {
this.dependentCounter = dependentCounter;
this.jdbcReadable = jdbcReadable;
this.readablesMetadata = readablesMetadata;
this.adapter = adapter;
Expand All @@ -96,9 +106,9 @@ private OracleReadableImpl(
* {@code metadata}. Not null.
*/
static Row createRow(
JdbcReadable jdbcReadable, RowMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
return new RowImpl(jdbcReadable, metadata, adapter);
DependentCounter dependentCounter, JdbcReadable jdbcReadable,
RowMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
return new RowImpl(dependentCounter, jdbcReadable, metadata, adapter);
}
/**
* <p>
Expand All @@ -113,9 +123,10 @@ static Row createRow(
* {@code metadata}. Not null.
*/
static OutParameters createOutParameters(
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
return new OutParametersImpl(jdbcReadable, metadata, adapter);
DependentCounter dependentCounter, JdbcReadable jdbcReadable,
OutParametersMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
return new OutParametersImpl(
dependentCounter, jdbcReadable, metadata, adapter);
}

/**
Expand Down Expand Up @@ -162,8 +173,8 @@ public <T> T get(String name, Class<T> type) {
/**
* Returns the 0-based index of the value identified by {@code name}. This
* method implements a case-insensitive name match. If more than one
* value has a matching name, this method returns lowest index of all
* matching values.
* value has a matching name, this method returns lowest of all indexes that
* match.
* @param name The name of a value. Not null.
* @return The index of the named value within this {@code Readable}
* @throws NoSuchElementException If no column has a matching name.
Expand Down Expand Up @@ -208,6 +219,9 @@ else if (io.r2dbc.spi.Clob.class.equals(type)) {
else if (LocalDateTime.class.equals(type)) {
value = getLocalDateTime(index);
}
else if (Result.class.equals(type)) {
value = getResult(index);
}
else if (Object.class.equals(type)) {
// Use the default type mapping if Object.class has been specified.
// This method is invoked recursively with the default mapping, so long
Expand Down Expand Up @@ -327,6 +341,36 @@ private LocalDateTime getLocalDateTime(int index) {
}
}

/**
* <p>
* Converts the value of a column at the specified {@code index} to a
* {@code Result}. This method is intended for mapping REF CURSOR values,
* which JDBC will map to a {@link ResultSet}.
* </p><p>
* A REF CURSOR is closed when the JDBC statement that created it is closed.
* To prevent the cursor from getting closed, the Result returned by this
* method is immediately added to the collection of results that depend on the
* JDBC statement.
* </p><p>
* The Result returned by this method is received by user code, and user code
* MUST then fully consume it. The JDBC statement is not closed until the
* result is fully consumed.
* </p>
* @param index 0 based column index
* @return A column value as a {@code Result}, or null if the column value is
* NULL.
*/
private Result getResult(int index) {
ResultSet resultSet = jdbcReadable.getObject(index, ResultSet.class);

if (resultSet == null)
return null;

dependentCounter.increment();
return OracleResultImpl.createQueryResult(
dependentCounter, resultSet, adapter);
}

/**
* Checks if the specified zero-based {@code index} is a valid column index
* for this row. This method is used to verify index value parameters
Expand Down Expand Up @@ -368,10 +412,9 @@ private static final class RowImpl
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private RowImpl(
JdbcReadable jdbcReadable,
RowMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(jdbcReadable, metadata, adapter);
DependentCounter dependentCounter, JdbcReadable jdbcReadable,
RowMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
super(dependentCounter, jdbcReadable, metadata, adapter);
this.metadata = metadata;
}

Expand Down Expand Up @@ -410,10 +453,9 @@ private static final class OutParametersImpl
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OutParametersImpl(
JdbcReadable jdbcReadable,
OutParametersMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(jdbcReadable, metadata, adapter);
DependentCounter dependentCounter, JdbcReadable jdbcReadable,
OutParametersMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
super(dependentCounter,jdbcReadable, metadata, adapter);
this.metadata = metadata;
}

Expand Down
Loading