|
16 | 16 | package org.springframework.data.couchbase.core;
|
17 | 17 |
|
18 | 18 | import com.couchbase.client.core.api.query.CoreQueryContext;
|
| 19 | +import com.couchbase.client.core.api.query.CoreQueryOptions; |
19 | 20 | import reactor.core.publisher.Flux;
|
20 | 21 | import reactor.core.publisher.Mono;
|
21 | 22 |
|
|
28 | 29 | import org.springframework.data.couchbase.core.support.TemplateUtils;
|
29 | 30 | import org.springframework.util.Assert;
|
30 | 31 |
|
31 |
| -import com.couchbase.client.core.io.CollectionIdentifier; |
32 | 32 | import com.couchbase.client.java.ReactiveScope;
|
33 | 33 | import com.couchbase.client.java.codec.JsonSerializer;
|
34 | 34 | import com.couchbase.client.java.query.QueryOptions;
|
35 | 35 | import com.couchbase.client.java.query.QueryScanConsistency;
|
36 | 36 | import com.couchbase.client.java.query.ReactiveQueryResult;
|
37 |
| -import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor; |
38 | 37 | import com.couchbase.client.java.transactions.TransactionQueryOptions;
|
39 | 38 | import com.couchbase.client.java.transactions.TransactionQueryResult;
|
40 | 39 |
|
@@ -77,9 +76,9 @@ static class ReactiveFindByQuerySupport<T> implements ReactiveFindByQuery<T> {
|
77 | 76 | private final ReactiveTemplateSupport support;
|
78 | 77 |
|
79 | 78 | ReactiveFindByQuerySupport(final ReactiveCouchbaseTemplate template, final Class<?> domainType,
|
80 |
| - final Class<T> returnType, final Query query, final QueryScanConsistency scanConsistency, final String scope, |
81 |
| - final String collection, final QueryOptions options, final String[] distinctFields, final String[] fields, |
82 |
| - final ReactiveTemplateSupport support) { |
| 79 | + final Class<T> returnType, final Query query, final QueryScanConsistency scanConsistency, |
| 80 | + final String scope, final String collection, final QueryOptions options, final String[] distinctFields, |
| 81 | + final String[] fields, final ReactiveTemplateSupport support) { |
83 | 82 | Assert.notNull(domainType, "domainType must not be null!");
|
84 | 83 | Assert.notNull(returnType, "returnType must not be null!");
|
85 | 84 | this.template = template;
|
@@ -192,10 +191,16 @@ public Flux<T> all() {
|
192 | 191 | return pArgs.getScope() == null ? clientFactory.getCluster().reactive().query(statement, opts)
|
193 | 192 | : rs.query(statement, opts);
|
194 | 193 | } else {
|
195 |
| - TransactionQueryOptions opts = buildTransactionOptions(pArgs.getOptions()); |
| 194 | + TransactionQueryOptions options = buildTransactionOptions(pArgs.getOptions()); |
196 | 195 | JsonSerializer jSer = clientFactory.getCluster().environment().jsonSerializer();
|
197 |
| - return AttemptContextReactiveAccessor.createReactiveTransactionAttemptContext(s.get().getCore(), jSer) |
198 |
| - .query(OptionsBuilder.queryContext(pArgs.getScope(), pArgs.getCollection(), rs.bucketName()) == null ? null : rs, statement, opts); |
| 196 | + CoreQueryOptions opts = options != null ? options.builder().build() : null; |
| 197 | + return s.get().getCore() |
| 198 | + .queryBlocking(statement, |
| 199 | + pArgs.getScope() == null ? null |
| 200 | + : CoreQueryContext.of(rs.bucketName(), pArgs.getScope()), |
| 201 | + opts, false) |
| 202 | + .map(response -> new TransactionQueryResult(response, jSer)); |
| 203 | + |
199 | 204 | }
|
200 | 205 | });
|
201 | 206 |
|
@@ -255,10 +260,15 @@ public Mono<Long> count() {
|
255 | 260 | return pArgs.getScope() == null ? clientFactory.getCluster().reactive().query(statement, opts)
|
256 | 261 | : rs.query(statement, opts);
|
257 | 262 | } else {
|
258 |
| - TransactionQueryOptions opts = buildTransactionOptions(pArgs.getOptions()); |
| 263 | + TransactionQueryOptions options = buildTransactionOptions(pArgs.getOptions()); |
259 | 264 | JsonSerializer jSer = clientFactory.getCluster().environment().jsonSerializer();
|
260 |
| - return AttemptContextReactiveAccessor.createReactiveTransactionAttemptContext(s.get().getCore(), jSer) |
261 |
| - .query(OptionsBuilder.queryContext(pArgs.getScope(), pArgs.getCollection(), rs.bucketName()) == null ? null : rs, statement, opts); |
| 265 | + CoreQueryOptions opts = options != null ? options.builder().build() : null; |
| 266 | + return s.get().getCore() |
| 267 | + .queryBlocking(statement, |
| 268 | + pArgs.getScope() == null ? null |
| 269 | + : CoreQueryContext.of(rs.bucketName(), pArgs.getScope()), |
| 270 | + opts, false) |
| 271 | + .map(response -> new TransactionQueryResult(response, jSer)); |
262 | 272 | }
|
263 | 273 | });
|
264 | 274 |
|
|
0 commit comments