Skip to content

Commit dd3a69c

Browse files
Adopt to Reactor 2022 changes.
Closes: #1285
1 parent ce68431 commit dd3a69c

File tree

3 files changed

+85
-39
lines changed

3 files changed

+85
-39
lines changed

spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/repository/query/AbstractR2dbcQuery.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
import reactor.core.publisher.Mono;
1919

2020
import org.reactivestreams.Publisher;
21-
2221
import org.springframework.data.mapping.model.EntityInstantiators;
2322
import org.springframework.data.r2dbc.convert.R2dbcConverter;
2423
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
2524
import org.springframework.data.r2dbc.repository.query.R2dbcQueryExecution.ResultProcessingConverter;
2625
import org.springframework.data.r2dbc.repository.query.R2dbcQueryExecution.ResultProcessingExecution;
2726
import org.springframework.data.relational.repository.query.RelationalParameterAccessor;
28-
import org.springframework.data.relational.repository.query.RelationalParametersParameterAccessor;
2927
import org.springframework.data.repository.query.ParameterAccessor;
3028
import org.springframework.data.repository.query.RepositoryQuery;
3129
import org.springframework.data.repository.query.ResultProcessor;
@@ -41,6 +39,7 @@
4139
*
4240
* @author Mark Paluch
4341
* @author Stephen Cohen
42+
* @author Christoph Strobl
4443
*/
4544
public abstract class AbstractR2dbcQuery implements RepositoryQuery {
4645

@@ -83,13 +82,12 @@ public R2dbcQueryMethod getQueryMethod() {
8382
*/
8483
public Object execute(Object[] parameters) {
8584

86-
RelationalParameterAccessor parameterAccessor = new RelationalParametersParameterAccessor(method, parameters);
87-
88-
return createQuery(parameterAccessor).flatMapMany(it -> executeQuery(parameterAccessor, it));
85+
Mono<R2dbcParameterAccessor> resolveParameters = new R2dbcParameterAccessor(method, parameters).resolveParameters();
86+
return resolveParameters.flatMapMany(it -> createQuery(it).flatMapMany(foo -> executeQuery(it, foo)));
8987
}
9088

9189
@SuppressWarnings("unchecked")
92-
private Publisher<?> executeQuery(RelationalParameterAccessor parameterAccessor, PreparedOperation<?> operation) {
90+
private Publisher<?> executeQuery(R2dbcParameterAccessor parameterAccessor, PreparedOperation<?> operation) {
9391

9492
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(parameterAccessor);
9593

spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/repository/query/R2dbcParameterAccessor.java

+64-33
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package org.springframework.data.r2dbc.repository.query;
1717

18+
import org.reactivestreams.Publisher;
1819
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
20-
import reactor.core.publisher.MonoProcessor;
2121

2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
import java.util.concurrent.ConcurrentHashMap;
2427

2528
import org.springframework.data.relational.repository.query.RelationalParametersParameterAccessor;
2629
import org.springframework.data.repository.util.ReactiveWrapperConverters;
@@ -31,11 +34,12 @@
3134
* to reactive parameter wrapper types upon creation. This class performs synchronization when accessing parameters.
3235
*
3336
* @author Mark Paluch
37+
* @author Christoph Strobl
3438
*/
3539
class R2dbcParameterAccessor extends RelationalParametersParameterAccessor {
3640

3741
private final Object[] values;
38-
private final List<MonoProcessor<?>> subscriptions;
42+
private final R2dbcQueryMethod method;
3943

4044
/**
4145
* Creates a new {@link R2dbcParameterAccessor}.
@@ -45,37 +49,7 @@ public R2dbcParameterAccessor(R2dbcQueryMethod method, Object... values) {
4549
super(method, values);
4650

4751
this.values = values;
48-
this.subscriptions = new ArrayList<>(values.length);
49-
50-
for (int i = 0; i < values.length; i++) {
51-
52-
Object value = values[i];
53-
54-
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
55-
subscriptions.add(null);
56-
continue;
57-
}
58-
59-
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
60-
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).toProcessor());
61-
} else {
62-
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().toProcessor());
63-
}
64-
}
65-
}
66-
67-
/* (non-Javadoc)
68-
* @see org.springframework.data.repository.query.ParametersParameterAccessor#getValue(int)
69-
*/
70-
@SuppressWarnings("unchecked")
71-
@Override
72-
protected <T> T getValue(int index) {
73-
74-
if (subscriptions.get(index) != null) {
75-
return (T) subscriptions.get(index).block();
76-
}
77-
78-
return super.getValue(index);
52+
this.method = method;
7953
}
8054

8155
/* (non-Javadoc)
@@ -97,4 +71,61 @@ public Object[] getValues() {
9771
public Object getBindableValue(int index) {
9872
return getValue(getParameters().getBindableParameter(index).getIndex());
9973
}
74+
75+
/**
76+
* Resolve parameters that were provided through reactive wrapper types. Flux is collected into a list, values from
77+
* Mono's are used directly.
78+
*
79+
* @return
80+
*/
81+
@SuppressWarnings("unchecked")
82+
public Mono<R2dbcParameterAccessor> resolveParameters() {
83+
84+
boolean hasReactiveWrapper = false;
85+
86+
for (Object value : values) {
87+
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
88+
continue;
89+
}
90+
91+
hasReactiveWrapper = true;
92+
break;
93+
}
94+
95+
if (!hasReactiveWrapper) {
96+
return Mono.just(this);
97+
}
98+
99+
Object[] resolved = new Object[values.length];
100+
Map<Integer, Optional<?>> holder = new ConcurrentHashMap<>();
101+
List<Publisher<?>> publishers = new ArrayList<>();
102+
103+
for (int i = 0; i < values.length; i++) {
104+
105+
Object value = resolved[i] = values[i];
106+
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
107+
continue;
108+
}
109+
110+
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
111+
112+
int index = i;
113+
publishers.add(ReactiveWrapperConverters.toWrapper(value, Mono.class) //
114+
.map(Optional::of) //
115+
.defaultIfEmpty(Optional.empty()) //
116+
.doOnNext(it -> holder.put(index, (Optional<?>) it)));
117+
} else {
118+
119+
int index = i;
120+
publishers.add(ReactiveWrapperConverters.toWrapper(value, Flux.class) //
121+
.collectList() //
122+
.doOnNext(it -> holder.put(index, Optional.of(it))));
123+
}
124+
}
125+
126+
return Flux.merge(publishers).then().thenReturn(resolved).map(values -> {
127+
holder.forEach((index, v) -> values[index] = v.orElse(null));
128+
return new R2dbcParameterAccessor(method, values);
129+
});
130+
}
100131
}

spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/repository/query/PartTreeR2dbcQueryUnitTests.java

+17
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,20 @@ void createQueryWithPessimisticReadLock() throws Exception {
734734
.where("users.first_name = $1 AND (users.age = $2) FOR SHARE OF users");
735735
}
736736

737+
@Test // GH-1285
738+
void bindsParametersFromPublisher() throws Exception {
739+
740+
R2dbcQueryMethod queryMethod = getQueryMethod("findByFirstName", Mono.class);
741+
PartTreeR2dbcQuery r2dbcQuery = new PartTreeR2dbcQuery(queryMethod, operations, r2dbcConverter, dataAccessStrategy);
742+
R2dbcParameterAccessor accessor = new R2dbcParameterAccessor(queryMethod, new Object[] { Mono.just("John") });
743+
744+
PreparedOperation<?> preparedOperation = createQuery(r2dbcQuery, accessor.resolveParameters().block());
745+
BindTarget bindTarget = mock(BindTarget.class);
746+
preparedOperation.bindTo(bindTarget);
747+
748+
verify(bindTarget, times(1)).bind(0, "John");
749+
}
750+
737751
private PreparedOperation<?> createQuery(R2dbcQueryMethod queryMethod, PartTreeR2dbcQuery r2dbcQuery,
738752
Object... parameters) {
739753
return createQuery(r2dbcQuery, getAccessor(queryMethod, parameters));
@@ -927,6 +941,9 @@ interface UserRepository extends Repository<User, Long> {
927941
Mono<Integer> deleteByFirstName(String firstName);
928942

929943
Mono<Long> countByFirstName(String firstName);
944+
945+
Mono<User> findByFirstName(Mono<String> firstName);
946+
930947
}
931948

932949
@Table("users")

0 commit comments

Comments
 (0)