|
39 | 39 | import java.sql.Connection;
|
40 | 40 | import java.sql.PreparedStatement;
|
41 | 41 | import java.sql.ResultSet;
|
| 42 | +import java.sql.SQLException; |
42 | 43 | import java.sql.SQLType;
|
43 | 44 | import java.sql.SQLWarning;
|
44 | 45 | import java.time.Duration;
|
|
55 | 56 | import java.util.function.Function;
|
56 | 57 | import java.util.stream.IntStream;
|
57 | 58 |
|
| 59 | +import static java.sql.Statement.CLOSE_ALL_RESULTS; |
58 | 60 | import static java.sql.Statement.KEEP_CURRENT_RESULT;
|
59 | 61 | import static java.sql.Statement.RETURN_GENERATED_KEYS;
|
60 | 62 | import static java.util.Objects.requireNonNullElse;
|
@@ -1158,26 +1160,56 @@ private OracleResultImpl getWarnings(OracleResultImpl result) {
|
1158 | 1160 | */
|
1159 | 1161 | private Publisher<Void> deallocate(Collection<OracleResultImpl> results) {
|
1160 | 1162 |
|
1161 |
| - // Close the statement after all results are consumed |
| 1163 | + // Set up a counter that is decremented as each result is consumed. |
1162 | 1164 | AtomicInteger unconsumed = new AtomicInteger(results.size());
|
| 1165 | + |
| 1166 | + // Set up a publisher that decrements the counter, and closes the |
| 1167 | + // statement when it reaches zero |
1163 | 1168 | Publisher<Void> closeStatement = adapter.getLock().run(() -> {
|
1164 | 1169 | if (unconsumed.decrementAndGet() == 0)
|
1165 |
| - preparedStatement.close(); |
| 1170 | + closeStatement(); |
1166 | 1171 | });
|
1167 | 1172 |
|
| 1173 | + // Tell each unconsumed result to decrement the unconsumed count, and then |
| 1174 | + // close the statement when the count reaches zero. |
1168 | 1175 | for (OracleResultImpl result : results) {
|
1169 | 1176 | if (!result.onConsumed(closeStatement))
|
1170 | 1177 | unconsumed.decrementAndGet();
|
1171 | 1178 | }
|
1172 | 1179 |
|
1173 |
| - // If all results have already been consumed, the returned |
1174 |
| - // publisher closes the statement |
| 1180 | + // If there are no results, or all results have already been consumed, |
| 1181 | + // then the returned publisher closes the statement. |
1175 | 1182 | if (unconsumed.get() == 0)
|
1176 |
| - addDeallocation(adapter.getLock().run(preparedStatement::close)); |
| 1183 | + addDeallocation(adapter.getLock().run(this::closeStatement)); |
1177 | 1184 |
|
1178 | 1185 | return deallocators;
|
1179 | 1186 | }
|
1180 | 1187 |
|
| 1188 | + /** |
| 1189 | + * Closes the JDBC {@link #preparedStatement}. This method should only be |
| 1190 | + * called while holding the |
| 1191 | + * {@linkplain ReactiveJdbcAdapter#getLock() connection lock} |
| 1192 | + * @throws SQLException If the statement fails to close. |
| 1193 | + */ |
| 1194 | + private void closeStatement() throws SQLException { |
| 1195 | + try { |
| 1196 | + // Workaround Oracle JDBC bug #34545179: ResultSet references are |
| 1197 | + // retained even when the statement is closed. Calling getMoreResults |
| 1198 | + // with the CLOSE_ALL_RESULTS argument forces the driver to |
| 1199 | + // de-reference them. |
| 1200 | + preparedStatement.getMoreResults(CLOSE_ALL_RESULTS); |
| 1201 | + } |
| 1202 | + catch (SQLException sqlException) { |
| 1203 | + // It may be the case that the JDBC connection was closed, and so the |
| 1204 | + // statement was closed with it. Check for this, and ignore the |
| 1205 | + // SQLException if so. |
| 1206 | + if (!jdbcConnection.isClosed()) |
| 1207 | + throw sqlException; |
| 1208 | + } |
| 1209 | + |
| 1210 | + preparedStatement.close(); |
| 1211 | + } |
| 1212 | + |
1181 | 1213 | /**
|
1182 | 1214 | * Sets the {@code value} of a {@code preparedStatement} parameter at the
|
1183 | 1215 | * specified {@code index}. If a non-null {@code type} is provided, then it is
|
@@ -1454,7 +1486,7 @@ private JdbcBatch(
|
1454 | 1486 | */
|
1455 | 1487 | @Override
|
1456 | 1488 | protected Publisher<Void> bind() {
|
1457 |
| - @SuppressWarnings({"unchecked","rawtypes"}) |
| 1489 | + @SuppressWarnings({"unchecked"}) |
1458 | 1490 | Publisher<Void>[] bindPublishers = new Publisher[batchSize];
|
1459 | 1491 | for (int i = 0; i < batchSize; i++) {
|
1460 | 1492 | bindPublishers[i] = Flux.concat(
|
|
0 commit comments