Skip to content

Commit b45569f

Browse files
Remove duplicated request tracker callback arguments
1 parent 81ecc8c commit b45569f

21 files changed

+362
-525
lines changed

Diff for: core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java

+6
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ default DriverExecutionProfile getExecutionProfile() {
6969
*/
7070
int getSuccessfulExecutionIndex();
7171

72+
/** @return Exception raised by the driver to the application. */
73+
@Nullable
74+
default Throwable getDriverError() {
75+
return null;
76+
}
77+
7278
/**
7379
* The errors encountered on previous coordinators, if any.
7480
*

Diff for: core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

+17-29
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
629629
Throwable error = future.cause();
630630
if (error instanceof EncoderException
631631
&& error.getCause() instanceof FrameTooLongException) {
632-
trackNodeError(node, error.getCause(), null);
632+
trackNodeError(error.getCause(), null);
633633
lock.lock();
634634
try {
635635
abort(error.getCause(), false);
@@ -646,7 +646,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
646646
.getMetricUpdater()
647647
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
648648
recordError(node, error);
649-
trackNodeError(node, error.getCause(), null);
649+
trackNodeError(error.getCause(), null);
650650
sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution);
651651
}
652652
} else {
@@ -767,11 +767,11 @@ public void onResponse(@NonNull Frame response) {
767767
} else {
768768
IllegalStateException error =
769769
new IllegalStateException("Unexpected response " + responseMessage);
770-
trackNodeError(node, error, response);
770+
trackNodeError(error, response);
771771
abort(error, false);
772772
}
773773
} catch (Throwable t) {
774-
trackNodeError(node, t, response);
774+
trackNodeError(t, response);
775775
abort(t, false);
776776
}
777777
} finally {
@@ -915,7 +915,7 @@ private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame fr
915915
if (error instanceof BootstrappingException) {
916916
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
917917
recordError(node, error);
918-
trackNodeError(node, error, frame);
918+
trackNodeError(error, frame);
919919
sendRequest(statement, null, executionIndex, retryCount, false);
920920
} else if (error instanceof QueryValidationException
921921
|| error instanceof FunctionFailureException
@@ -927,7 +927,7 @@ private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame fr
927927
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
928928
metricUpdater.incrementCounter(
929929
DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
930-
trackNodeError(node, error, frame);
930+
trackNodeError(error, frame);
931931
abort(error, true);
932932
} else {
933933
try {
@@ -1066,7 +1066,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10661066
+ "This usually happens when you run a 'USE...' query after "
10671067
+ "the statement was prepared.",
10681068
Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId)));
1069-
trackNodeError(node, illegalStateException, null);
1069+
trackNodeError(illegalStateException, null);
10701070
fatalError = illegalStateException;
10711071
} else {
10721072
LOG.trace(
@@ -1085,18 +1085,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10851085
|| prepareError instanceof FunctionFailureException
10861086
|| prepareError instanceof ProtocolError) {
10871087
LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix);
1088-
trackNodeError(node, prepareError, null);
1088+
trackNodeError(prepareError, null);
10891089
fatalError = prepareError;
10901090
}
10911091
}
10921092
} else if (exception instanceof RequestThrottlingException) {
1093-
trackNodeError(node, exception, null);
1093+
trackNodeError(exception, null);
10941094
fatalError = exception;
10951095
}
10961096
if (fatalError == null) {
10971097
LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix);
10981098
recordError(node, exception);
1099-
trackNodeError(node, exception, null);
1099+
trackNodeError(exception, null);
11001100
sendRequest(statement, null, executionIndex, retryCount, false);
11011101
}
11021102
}
@@ -1124,18 +1124,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab
11241124
switch (verdict.getRetryDecision()) {
11251125
case RETRY_SAME:
11261126
recordError(node, error);
1127-
trackNodeError(node, error, null);
1127+
trackNodeError(error, null);
11281128
sendRequest(
11291129
verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false);
11301130
break;
11311131
case RETRY_NEXT:
11321132
recordError(node, error);
1133-
trackNodeError(node, error, null);
1133+
trackNodeError(error, null);
11341134
sendRequest(
11351135
verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false);
11361136
break;
11371137
case RETHROW:
1138-
trackNodeError(node, error, null);
1138+
trackNodeError(error, null);
11391139
abort(error, true);
11401140
break;
11411141
case IGNORE:
@@ -1448,18 +1448,13 @@ private void reenableAutoReadIfNeeded() {
14481448

14491449
// ERROR HANDLING
14501450

1451-
private void trackNodeError(
1452-
@NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) {
1451+
private void trackNodeError(@NonNull Throwable error, @Nullable Frame frame) {
14531452
if (nodeErrorReported.compareAndSet(false, true)) {
14541453
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
14551454
context
14561455
.getRequestTracker()
14571456
.onNodeError(
1458-
this.statement,
1459-
error,
14601457
latencyNanos,
1461-
executionProfile,
1462-
node,
14631458
createExecutionInfo(error).withServerResponse(frame).build(),
14641459
logPrefix);
14651460
}
@@ -1584,23 +1579,16 @@ private void completeResultSetFuture(
15841579
}
15851580

15861581
if (nodeSuccessReported.compareAndSet(false, true)) {
1587-
context
1588-
.getRequestTracker()
1589-
.onNodeSuccess(
1590-
statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix);
1582+
context.getRequestTracker().onNodeSuccess(nodeLatencyNanos, executionInfo, logPrefix);
15911583
}
1592-
context
1593-
.getRequestTracker()
1594-
.onSuccess(
1595-
statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix);
1584+
context.getRequestTracker().onSuccess(totalLatencyNanos, executionInfo, logPrefix);
15961585
}
15971586
} else {
15981587
Throwable error = (Throwable) pageOrError;
15991588
if (future.completeExceptionally(error)) {
16001589
context
16011590
.getRequestTracker()
1602-
.onError(
1603-
statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix);
1591+
.onError(totalLatencyNanos, createExecutionInfo(error).build(), logPrefix);
16041592
if (error instanceof DriverTimeoutException) {
16051593
throttler.signalTimeout(ContinuousRequestHandlerBase.this);
16061594
session

Diff for: core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java

+11
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public int getSuccessfulExecutionIndex() {
8484
return graphExecutionInfo.getSuccessfulExecutionIndex();
8585
}
8686

87+
@Nullable
88+
@Override
89+
public Throwable getDriverError() {
90+
return graphExecutionInfo.getDriverError();
91+
}
92+
8793
@NonNull
8894
@Override
8995
public List<Entry<Node, Throwable>> getErrors() {
@@ -172,6 +178,11 @@ public int getSuccessfulExecutionIndex() {
172178
return executionInfo.getSuccessfulExecutionIndex();
173179
}
174180

181+
@Override
182+
public Throwable getDriverError() {
183+
return executionInfo.getDriverError();
184+
}
185+
175186
@Override
176187
public List<Entry<Node, Throwable>> getErrors() {
177188
return executionInfo.getErrors();

Diff for: core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

+32-31
Original file line numberDiff line numberDiff line change
@@ -372,20 +372,8 @@ private void setFinalResult(
372372
completionTimeNanos = System.nanoTime();
373373
totalLatencyNanos = completionTimeNanos - startTimeNanos;
374374
long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos;
375-
requestTracker.onNodeSuccess(
376-
callback.statement,
377-
nodeLatencyNanos,
378-
executionProfile,
379-
callback.node,
380-
executionInfo,
381-
logPrefix);
382-
requestTracker.onSuccess(
383-
callback.statement,
384-
totalLatencyNanos,
385-
executionProfile,
386-
callback.node,
387-
executionInfo,
388-
logPrefix);
375+
requestTracker.onNodeSuccess(nodeLatencyNanos, executionInfo, logPrefix);
376+
requestTracker.onSuccess(totalLatencyNanos, executionInfo, logPrefix);
389377
}
390378
if (sessionMetricUpdater.isEnabled(
391379
DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) {
@@ -471,8 +459,7 @@ private void setFinalError(
471459
cancelScheduledTasks();
472460
if (!(requestTracker instanceof NoopRequestTracker)) {
473461
long latencyNanos = System.nanoTime() - startTimeNanos;
474-
requestTracker.onError(
475-
statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix);
462+
requestTracker.onError(latencyNanos, executionInfo, logPrefix);
476463
}
477464
if (error instanceof DriverTimeoutException) {
478465
throttler.signalTimeout(this);
@@ -534,7 +521,7 @@ public void operationComplete(Future<java.lang.Void> future) {
534521
Throwable error = future.cause();
535522
if (error instanceof EncoderException
536523
&& error.getCause() instanceof FrameTooLongException) {
537-
trackNodeError(node, error.getCause(), NANOTIME_NOT_MEASURED_YET);
524+
trackNodeError(this, error.getCause(), NANOTIME_NOT_MEASURED_YET, null);
538525
setFinalError(statement, error.getCause(), node, execution);
539526
} else {
540527
LOG.trace(
@@ -543,7 +530,7 @@ public void operationComplete(Future<java.lang.Void> future) {
543530
channel,
544531
error);
545532
recordError(node, error);
546-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
533+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
547534
((DefaultNode) node)
548535
.getMetricUpdater()
549536
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
@@ -651,38 +638,39 @@ public void onResponse(Frame responseFrame) {
651638
setFinalResult((Result) responseMessage, responseFrame, this);
652639
} else if (responseMessage instanceof Error) {
653640
LOG.trace("[{}] Got error response, processing", logPrefix);
654-
processErrorResponse((Error) responseMessage);
641+
processErrorResponse((Error) responseMessage, responseFrame);
655642
} else {
656643
trackNodeError(
657-
node,
644+
this,
658645
new IllegalStateException("Unexpected response " + responseMessage),
659-
nodeResponseTimeNanos);
646+
nodeResponseTimeNanos,
647+
responseFrame);
660648
setFinalError(
661649
statement,
662650
new IllegalStateException("Unexpected response " + responseMessage),
663651
node,
664652
execution);
665653
}
666654
} catch (Throwable t) {
667-
trackNodeError(node, t, nodeResponseTimeNanos);
655+
trackNodeError(this, t, nodeResponseTimeNanos, responseFrame);
668656
setFinalError(statement, t, node, execution);
669657
}
670658
}
671659

672-
private void processErrorResponse(Error errorMessage) {
660+
private void processErrorResponse(Error errorMessage, Frame responseFrame) {
673661
CoordinatorException error = Conversions.toThrowable(node, errorMessage, context);
674662
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
675663
if (error instanceof BootstrappingException) {
676664
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
677665
recordError(node, error);
678-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
666+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, responseFrame);
679667
sendRequest(statement, null, queryPlan, execution, retryCount, false);
680668
} else if (error instanceof QueryValidationException
681669
|| error instanceof FunctionFailureException
682670
|| error instanceof ProtocolError) {
683671
LOG.trace("[{}] Unrecoverable error, rethrowing", logPrefix);
684672
metricUpdater.incrementCounter(DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
685-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
673+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, responseFrame);
686674
setFinalError(statement, error, node, execution);
687675
} else {
688676
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
@@ -757,7 +745,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) {
757745
switch (verdict.getRetryDecision()) {
758746
case RETRY_SAME:
759747
recordError(node, error);
760-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
748+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
761749
sendRequest(
762750
verdict.getRetryRequest(statement),
763751
node,
@@ -768,7 +756,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) {
768756
break;
769757
case RETRY_NEXT:
770758
recordError(node, error);
771-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
759+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
772760
sendRequest(
773761
verdict.getRetryRequest(statement),
774762
null,
@@ -778,7 +766,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) {
778766
false);
779767
break;
780768
case RETHROW:
781-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
769+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
782770
setFinalError(statement, error, node, execution);
783771
break;
784772
case IGNORE:
@@ -857,16 +845,29 @@ void cancel() {
857845
* measured. If {@link #NANOTIME_NOT_MEASURED_YET}, it hasn't and we need to measure it now
858846
* (this is to avoid unnecessary calls to System.nanoTime)
859847
*/
860-
private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNanos) {
848+
private void trackNodeError(
849+
NodeResponseCallback callback, Throwable error, long nodeResponseTimeNanos, Frame frame) {
861850
if (requestTracker instanceof NoopRequestTracker) {
862851
return;
863852
}
864853
if (nodeResponseTimeNanos == NANOTIME_NOT_MEASURED_YET) {
865854
nodeResponseTimeNanos = System.nanoTime();
866855
}
856+
ExecutionInfo executionInfo =
857+
DefaultExecutionInfo.builder(
858+
callback.statement,
859+
callback.node,
860+
startedSpeculativeExecutionsCount.get(),
861+
callback.execution,
862+
error,
863+
errors,
864+
session,
865+
context,
866+
callback.executionProfile)
867+
.withServerResponse(null, frame)
868+
.build();
867869
long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos;
868-
requestTracker.onNodeError(
869-
statement, error, latencyNanos, executionProfile, node, null, logPrefix);
870+
requestTracker.onNodeError(latencyNanos, executionInfo, logPrefix);
870871
}
871872

872873
@Override

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java

+1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ default DriverExecutionProfile getExecutionProfile() {
120120
@NonNull
121121
List<Map.Entry<Node, Throwable>> getErrors();
122122

123+
/** @return Exception raised by the driver to the application. */
123124
@Nullable
124125
default Throwable getDriverError() {
125126
return null;

0 commit comments

Comments
 (0)