Skip to content

Updates for Release 1.3.0 #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 87 additions & 112 deletions README.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>com.oracle.database.r2dbc</groupId>
<artifactId>oracle-r2dbc</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
<name>oracle-r2dbc</name>
<description>
Oracle R2DBC Driver implementing version 1.0.0 of the R2DBC SPI for Oracle Database.
Expand Down Expand Up @@ -65,9 +65,9 @@

<properties>
<java.version>11</java.version>
<ojdbc.version>23.4.0.24.05</ojdbc.version>
<ojdbc.version>23.6.0.24.10</ojdbc.version>
<r2dbc.version>1.0.0.RELEASE</r2dbc.version>
<reactor.version>3.5.11</reactor.version>
<reactor.version>3.6.11</reactor.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<junit.version>5.9.1</junit.version>
<spring-jdbc.version>5.3.19</spring-jdbc.version>
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/oracle/r2dbc/impl/OracleBatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
*/
final class OracleBatchImpl implements Batch {

/** The OracleConnectionImpl that created this Batch */
private final OracleConnectionImpl r2dbcConnection;

/** Adapts Oracle JDBC Driver APIs into Reactive Streams APIs */
private final ReactiveJdbcAdapter adapter;

Expand Down Expand Up @@ -83,12 +86,11 @@ final class OracleBatchImpl implements Batch {
* @param jdbcConnection JDBC connection to an Oracle Database. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
OracleBatchImpl(
Duration timeout, Connection jdbcConnection, ReactiveJdbcAdapter adapter) {
OracleBatchImpl(Duration timeout, OracleConnectionImpl r2dbcConnection) {
this.timeout = timeout;
this.jdbcConnection =
requireNonNull(jdbcConnection, "jdbcConnection is null");
this.adapter = requireNonNull(adapter, "adapter is null");
this.r2dbcConnection = r2dbcConnection;
this.jdbcConnection = r2dbcConnection.jdbcConnection();
this.adapter = r2dbcConnection.adapter();
}

/**
Expand All @@ -103,7 +105,7 @@ public Batch add(String sql) {
requireOpenConnection(jdbcConnection);
requireNonNull(sql, "sql is null");
statements.add(
new OracleStatementImpl(sql, timeout, jdbcConnection, adapter));
new OracleStatementImpl(sql, timeout, r2dbcConnection));
return this;
}

Expand Down
82 changes: 77 additions & 5 deletions src/main/java/oracle/r2dbc/impl/OracleConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@
import java.sql.SQLException;
import java.sql.Savepoint;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
import static io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL;
import static io.r2dbc.spi.TransactionDefinition.LOCK_WAIT_TIMEOUT;
import static io.r2dbc.spi.TransactionDefinition.NAME;
import static io.r2dbc.spi.TransactionDefinition.READ_ONLY;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.fromJdbc;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireOpenConnection;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.runJdbc;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException;
Expand Down Expand Up @@ -126,6 +128,12 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
*/
private TransactionDefinition currentTransaction = null;

/**
* A set of tasks that must complete before the {@link #jdbcConnection} is
* closed. The tasks are executed by subscribing to a Publisher.
*/
private final Set<Publisher<?>> closeTasks = ConcurrentHashMap.newKeySet();

/**
* Constructs a new connection that uses the specified {@code adapter} to
* perform database operations with the specified {@code jdbcConnection}.
Expand Down Expand Up @@ -369,7 +377,52 @@ else if (isReadOnly == null && name == null) {
*/
@Override
public Publisher<Void> close() {
return adapter.publishClose(jdbcConnection);

Publisher<Void> closeTasksPublisher = Mono.defer(() -> {
Publisher<?>[] closeTasksArray = closeTasks.toArray(Publisher<?>[]::new);
closeTasks.clear();

return Flux.concatDelayError(closeTasksArray).then();
});

return Flux.concatDelayError(
closeTasksPublisher,
adapter.publishClose(jdbcConnection));
}

/**
* <p>
* Adds a publisher that must be subscribed to and must terminate before
* closing the JDBC connection. This method can be used to ensure that certain
* tasks are completed before the {@link #jdbcConnection()} is closed and
* becomes unusable.
* </p><p>
* The publisher returned by this method emits the same result as the
* publisher passed into this method. However, when the returned publisher
* terminates, it will also remove any reference to the publisher that was
* passed into this method. <i>If the returned publisher is never subscribed
* to, then the reference will not be cleared until the connection is
* closed!</i> So, this method should only be used in cases where the user is
* responsible for subscribing to the returned publisher, in the same way they
* would be responsible for calling close() on an AutoCloseable. If the user
* is not responsible for subscribing, then there is no reasonable way for
* them to reduce the number object references that this method will create;
* They would either need to close this connection, or subscribe to a
* publisher when there is no obligation to do so.
* </p>
*
* @param publisher Publisher that must be subscribed to before closing the
* JDBC connection. Not null.
*
* @return A publisher that emits the same result as the publisher passed into
* this method, and clears any reference to it when terminated. Not null.
*/
<T> Publisher<T> addCloseTask(Publisher<T> publisher) {
closeTasks.add(publisher);

return Publishers.concatTerminal(
publisher,
Mono.fromRunnable(() -> closeTasks.remove(publisher)));
}

/**
Expand Down Expand Up @@ -417,7 +470,7 @@ public Publisher<Void> commitTransaction() {
@Override
public Batch createBatch() {
requireOpenConnection(jdbcConnection);
return new OracleBatchImpl(statementTimeout, jdbcConnection, adapter);
return new OracleBatchImpl(statementTimeout, this);
}

/**
Expand All @@ -441,8 +494,7 @@ public Batch createBatch() {
public Statement createStatement(String sql) {
requireNonNull(sql, "sql is null");
requireOpenConnection(jdbcConnection);
return new OracleStatementImpl(
sql, statementTimeout, jdbcConnection, adapter);
return new OracleStatementImpl(sql, statementTimeout, this);
}

/**
Expand Down Expand Up @@ -826,4 +878,24 @@ public Publisher<Void> preRelease() {
});
}

/**
* Returns the JDBC connection that this R2DBC connection executes database
* calls with.
*
* @return The JDBC connection which backs this R2DBC connection. Not null.
*/
java.sql.Connection jdbcConnection() {
return jdbcConnection;
}

/**
* Returns the adapter that adapts the asynchronous API of the
* {@link #jdbcConnection()} that backs this R2DBC connection.
*
* @return The JDBC connection that backs this R2DBC connection. Not null.
*/
ReactiveJdbcAdapter adapter() {
return adapter;
}

}
94 changes: 50 additions & 44 deletions src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import oracle.sql.INTERVALYM;
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;
import org.reactivestreams.Publisher;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -93,6 +94,9 @@
class OracleReadableImpl implements io.r2dbc.spi.Readable {


/** The R2DBC connection that created this readable */
private final OracleConnectionImpl r2dbcConnection;

/** The JDBC connection that created this readable */
private final java.sql.Connection jdbcConnection;

Expand All @@ -117,21 +121,21 @@ class OracleReadableImpl implements io.r2dbc.spi.Readable {
* {@code jdbcReadable} and obtains metadata of the values from
* {@code resultMetadata}.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param jdbcReadable Readable values from a JDBC Driver. Not null.
* @param readablesMetadata Metadata of each value. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OracleReadableImpl(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, ReadablesMetadata<?> readablesMetadata,
ReactiveJdbcAdapter adapter) {
this.jdbcConnection = jdbcConnection;
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, ReadablesMetadata<?> readablesMetadata) {
this.r2dbcConnection = r2dbcConnection;
this.jdbcConnection = r2dbcConnection.jdbcConnection();
this.dependentCounter = dependentCounter;
this.jdbcReadable = jdbcReadable;
this.readablesMetadata = readablesMetadata;
this.adapter = adapter;
this.adapter = r2dbcConnection.adapter();
}

/**
Expand All @@ -151,11 +155,10 @@ private OracleReadableImpl(
* {@code metadata}. Not null.
*/
static Row createRow(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata) {
return new RowImpl(
jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
r2dbcConnection, dependentCounter, jdbcReadable, metadata);
}

/**
Expand All @@ -164,7 +167,7 @@ static Row createRow(
* the provided {@code jdbcReadable} and {@code rowMetadata}. The metadata
* object is used to determine the default type mapping of column values.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param dependentCounter Counter that is increased for each dependent
* {@code Result} created by the returned {@code OutParameters}
Expand All @@ -175,11 +178,10 @@ static Row createRow(
* {@code metadata}. Not null.
*/
static OutParameters createOutParameters(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata) {
return new OutParametersImpl(
jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
r2dbcConnection, dependentCounter, jdbcReadable, metadata);
}

/**
Expand Down Expand Up @@ -335,11 +337,16 @@ private ByteBuffer getByteBuffer(int index) {
*/
private Blob getBlob(int index) {
java.sql.Blob jdbcBlob = jdbcReadable.getObject(index, java.sql.Blob.class);
return jdbcBlob == null
? null
: OracleLargeObjects.createBlob(
adapter.publishBlobRead(jdbcBlob),
adapter.publishBlobFree(jdbcBlob));

if (jdbcBlob == null)
return null;

Publisher<Void> freePublisher =
r2dbcConnection.addCloseTask(adapter.publishBlobFree(jdbcBlob));

return OracleLargeObjects.createBlob(
adapter.publishBlobRead(jdbcBlob),
freePublisher);
}

/**
Expand Down Expand Up @@ -367,11 +374,15 @@ private Clob getClob(int index) {
jdbcClob = jdbcReadable.getObject(index, java.sql.Clob.class);
}

return jdbcClob == null
? null
: OracleLargeObjects.createClob(
adapter.publishClobRead(jdbcClob),
adapter.publishClobFree(jdbcClob));
if (jdbcClob == null)
return null;

Publisher<Void> freePublisher =
r2dbcConnection.addCloseTask(adapter.publishClobFree(jdbcClob));

return OracleLargeObjects.createClob(
adapter.publishClobRead(jdbcClob),
freePublisher);
}

/**
Expand Down Expand Up @@ -685,11 +696,10 @@ private OracleR2dbcObjectImpl getOracleR2dbcObject(int index) {
return null;

return new OracleR2dbcObjectImpl(
jdbcConnection,
r2dbcConnection,
dependentCounter,
new StructJdbcReadable(oracleStruct),
ReadablesMetadata.createAttributeMetadata(oracleStruct),
adapter);
ReadablesMetadata.createAttributeMetadata(oracleStruct));
}

/**
Expand Down Expand Up @@ -956,7 +966,7 @@ private Result getResult(int index) {

dependentCounter.increment();
return OracleResultImpl.createQueryResult(
dependentCounter, resultSet, adapter);
r2dbcConnection, dependentCounter, resultSet);
}

/**
Expand Down Expand Up @@ -994,17 +1004,16 @@ private static final class RowImpl
* {@code jdbcReadable}, and uses the specified {@code rowMetadata} to
* determine the default type mapping of column values.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param jdbcReadable Row data from the Oracle JDBC Driver. Not null.
* @param metadata Meta-data for the specified row. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private RowImpl(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata) {
super(r2dbcConnection, dependentCounter, jdbcReadable, metadata);
this.metadata = metadata;
}

Expand Down Expand Up @@ -1044,10 +1053,9 @@ private static final class OutParametersImpl
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OutParametersImpl(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata) {
super(r2dbcConnection, dependentCounter, jdbcReadable, metadata);
this.metadata = metadata;
}

Expand All @@ -1068,20 +1076,18 @@ private final class OracleR2dbcObjectImpl
* {@code jdbcReadable} and obtains metadata of the values from
* {@code outParametersMetaData}.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param structJdbcReadable Readable values from a JDBC Driver. Not null.
* @param metadata Metadata of each value. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OracleR2dbcObjectImpl(
java.sql.Connection jdbcConnection,
OracleConnectionImpl r2dbcConnection,
DependentCounter dependentCounter,
StructJdbcReadable structJdbcReadable,
OracleR2dbcObjectMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(
jdbcConnection, dependentCounter, structJdbcReadable, metadata, adapter);
OracleR2dbcObjectMetadataImpl metadata) {
super(r2dbcConnection, dependentCounter, structJdbcReadable, metadata);
this.metadata = metadata;
}

Expand Down
Loading
Loading