Skip to content

Commit a766334

Browse files
George Wangensteensourcegraph-bot
George Wangensteen
authored and
sourcegraph-bot
committed
mongo: SERVER-71230 Add baton support to async rpc senders
Commit: 99cf1c7c97eb67ad7505140e20cab9d1c23d0b0c
1 parent 0880890 commit a766334

File tree

11 files changed

+320
-42
lines changed

11 files changed

+320
-42
lines changed

mongo/src/mongo/executor/async_rpc.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,25 @@ class AsyncRPCRunnerImpl : public AsyncRPCRunner {
5353
Targeter* targeter,
5454
OperationContext* opCtx,
5555
std::shared_ptr<TaskExecutor> exec,
56-
CancellationToken token) final {
56+
CancellationToken token,
57+
BatonHandle baton) final {
58+
auto proxyExec = std::make_shared<ProxyingExecutor>(baton, exec);
5759
auto targetsUsed = std::make_shared<std::vector<HostAndPort>>();
5860
return targeter->resolve(token)
59-
.thenRunOn(exec)
60-
.then([dbName, cmdBSON, opCtx, exec = std::move(exec), token, targetsUsed](
61-
std::vector<HostAndPort> targets) {
61+
.thenRunOn(proxyExec)
62+
.then([dbName,
63+
cmdBSON,
64+
opCtx,
65+
exec = std::move(exec),
66+
token,
67+
baton = std::move(baton),
68+
targetsUsed](std::vector<HostAndPort> targets) {
6269
invariant(targets.size(),
6370
"Successful targeting implies there are hosts to target.");
6471
*targetsUsed = targets;
6572
executor::RemoteCommandRequestOnAny executorRequest(
6673
targets, dbName.toString(), cmdBSON, rpc::makeEmptyMetadata(), opCtx);
67-
return exec->scheduleRemoteCommandOnAny(executorRequest, token);
74+
return exec->scheduleRemoteCommandOnAny(executorRequest, token, std::move(baton));
6875
})
6976
.onError([targetsUsed](Status s) -> StatusWith<TaskExecutor::ResponseOnAnyStatus> {
7077
// If there was a scheduling error or other local error before the

mongo/src/mongo/executor/async_rpc.h

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ struct AsyncRPCOptions {
8686
AsyncRPCOptions(CommandType cmd,
8787
std::shared_ptr<executor::TaskExecutor> exec,
8888
CancellationToken token,
89-
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>())
90-
: cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {}
89+
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
90+
BatonHandle baton = nullptr)
91+
: cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy}, baton{std::move(baton)} {}
9192
CommandType cmd;
9293
std::shared_ptr<executor::TaskExecutor> exec;
9394
CancellationToken token;
9495
std::shared_ptr<RetryPolicy> retryPolicy;
96+
BatonHandle baton;
9597
};
9698

9799
/**
@@ -124,7 +126,22 @@ class AsyncRPCRunner {
124126
Targeter* targeter,
125127
OperationContext* opCtx,
126128
std::shared_ptr<TaskExecutor> exec,
127-
CancellationToken token) = 0;
129+
CancellationToken token,
130+
BatonHandle baton) = 0;
131+
ExecutorFuture<AsyncRPCInternalResponse> _sendCommand(StringData dbName,
132+
BSONObj cmdBSON,
133+
Targeter* targeter,
134+
OperationContext* opCtx,
135+
std::shared_ptr<TaskExecutor> exec,
136+
CancellationToken token) {
137+
return _sendCommand(std::move(dbName),
138+
std::move(cmdBSON),
139+
std::move(targeter),
140+
std::move(opCtx),
141+
std::move(exec),
142+
std::move(token),
143+
nullptr);
144+
}
128145
static AsyncRPCRunner* get(ServiceContext* serviceContext);
129146
static void set(ServiceContext* serviceContext, std::unique_ptr<AsyncRPCRunner> theRunner);
130147
};
@@ -154,6 +171,31 @@ struct RetryDelayAsBackoff {
154171
RetryPolicy* _policy;
155172
};
156173

174+
class ProxyingExecutor : public OutOfLineExecutor,
175+
public std::enable_shared_from_this<ProxyingExecutor> {
176+
public:
177+
ProxyingExecutor(BatonHandle baton, std::shared_ptr<TaskExecutor> executor)
178+
: _baton{std::move(baton)}, _executor{std::move(executor)} {}
179+
180+
void schedule(Task func) override {
181+
if (_baton)
182+
return _baton->schedule(std::move(func));
183+
return _executor->schedule(std::move(func));
184+
}
185+
186+
ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token) {
187+
auto deadline = Date_t::now() + duration;
188+
if (auto netBaton = _baton ? _baton->networking() : nullptr; netBaton) {
189+
return netBaton->waitUntil(deadline, token).thenRunOn(shared_from_this());
190+
}
191+
return _executor->sleepFor(duration, token);
192+
}
193+
194+
private:
195+
BatonHandle _baton;
196+
std::shared_ptr<TaskExecutor> _executor;
197+
};
198+
157199
template <typename CommandType>
158200
ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner(
159201
BSONObj cmdBSON,
@@ -162,6 +204,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
162204
OperationContext* opCtx,
163205
std::unique_ptr<Targeter> targeter) {
164206
using ReplyType = AsyncRPCResponse<typename CommandType::Reply>;
207+
auto proxyExec = std::make_shared<ProxyingExecutor>(options->baton, options->exec);
165208
auto tryBody = [=, targeter = std::move(targeter)] {
166209
// Execute the command after extracting the db name and bson from the CommandType.
167210
// Wrapping this function allows us to separate the CommandType parsing logic from the
@@ -181,7 +224,8 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
181224
return shouldStopRetry;
182225
})
183226
.withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get()))
184-
.on(options->exec, CancellationToken::uncancelable());
227+
.on(proxyExec, CancellationToken::uncancelable());
228+
185229
return std::move(resFuture)
186230
.then([](detail::AsyncRPCInternalResponse r) -> ReplyType {
187231
auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"),

mongo/src/mongo/executor/async_rpc_test.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
4545
#include "mongo/rpc/topology_version_gen.h"
4646
#include "mongo/unittest/bson_test_util.h"
47+
#include "mongo/unittest/thread_assertion_monitor.h"
4748
#include "mongo/unittest/unittest.h"
4849
#include "mongo/util/duration.h"
4950
#include "mongo/util/future.h"
@@ -412,6 +413,44 @@ TEST_F(AsyncRPCTestFixture, ExecutorShutdown) {
412413
ASSERT(ErrorCodes::isA<ErrorCategory::CancellationError>(extraInfo->asLocal()));
413414
}
414415

416+
TEST_F(AsyncRPCTestFixture, BatonTest) {
417+
std::unique_ptr<Targeter> targeter = std::make_unique<LocalHostTargeter>();
418+
auto retryPolicy = std::make_shared<NeverRetryPolicy>();
419+
HelloCommand helloCmd;
420+
HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0));
421+
initializeCommand(helloCmd);
422+
auto opCtxHolder = makeOperationContext();
423+
auto baton = opCtxHolder->getBaton();
424+
auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
425+
helloCmd, getExecutorPtr(), _cancellationToken);
426+
options->baton = baton;
427+
auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
428+
429+
Notification<void> seenNetworkRequest;
430+
unittest::ThreadAssertionMonitor monitor;
431+
// This thread will respond to the request we sent via sendCommand above.
432+
auto networkResponder = monitor.spawn([&] {
433+
onCommand([&](const auto& request) {
434+
ASSERT(request.cmdObj["hello"]);
435+
seenNetworkRequest.set();
436+
monitor.notifyDone();
437+
return helloReply.toBSON();
438+
});
439+
});
440+
// Wait on the opCtx until networkResponder has observed the network request.
441+
// While we block on the opCtx, the current thread should run jobs scheduled
442+
// on the baton, including enqueuing the network request via `sendCommand` above.
443+
seenNetworkRequest.get(opCtxHolder.get());
444+
445+
networkResponder.join();
446+
// Wait on the opCtx again to allow the current thread, via the baton, to propogate
447+
// the network response up into the resultFuture.
448+
AsyncRPCResponse res = resultFuture.get(opCtxHolder.get());
449+
450+
ASSERT_BSONOBJ_EQ(res.response.toBSON(), helloReply.toBSON());
451+
ASSERT_EQ(HostAndPort("localhost", serverGlobalParams.port), res.targetUsed);
452+
}
453+
415454
/*
416455
* Basic Targeter that returns the host that invoked it.
417456
*/
@@ -507,6 +546,42 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) {
507546
ASSERT(extraInfo->isLocal());
508547
ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus);
509548
}
549+
TEST_F(AsyncRPCTestFixture, BatonShutdownExecutorAlive) {
550+
std::unique_ptr<Targeter> targeter = std::make_unique<LocalHostTargeter>();
551+
auto retryPolicy = std::make_shared<TestRetryPolicy>();
552+
const auto maxNumRetries = 5;
553+
const auto retryDelay = Milliseconds(10);
554+
retryPolicy->setMaxNumRetries(maxNumRetries);
555+
for (int i = 0; i < maxNumRetries; ++i)
556+
retryPolicy->pushRetryDelay(retryDelay);
557+
HelloCommand helloCmd;
558+
HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0));
559+
initializeCommand(helloCmd);
560+
auto opCtxHolder = makeOperationContext();
561+
auto subBaton = opCtxHolder->getBaton()->makeSubBaton();
562+
auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
563+
helloCmd, getExecutorPtr(), _cancellationToken);
564+
options->baton = *subBaton;
565+
auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
566+
567+
subBaton.shutdown();
568+
569+
auto error = resultFuture.getNoThrow().getStatus();
570+
auto expectedDetachError = Status(ErrorCodes::ShutdownInProgress, "Baton detached");
571+
auto expectedOuterReason = "Remote command execution failed due to executor shutdown";
572+
573+
ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
574+
ASSERT_EQ(error.reason(), expectedOuterReason);
575+
576+
auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
577+
ASSERT(extraInfo);
578+
579+
ASSERT(extraInfo->isLocal());
580+
auto localError = extraInfo->asLocal();
581+
ASSERT_EQ(localError, expectedDetachError);
582+
583+
ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed());
584+
}
510585

511586
TEST_F(AsyncRPCTestFixture, SendTxnCommandWithoutTxnRouterAppendsNoTxnFields) {
512587
ShardId shardId("shard");

mongo/src/mongo/executor/hedged_async_rpc.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,23 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
115115
std::shared_ptr<executor::TaskExecutor> exec,
116116
CancellationToken token,
117117
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
118-
ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly)) {
118+
ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly),
119+
BatonHandle baton = nullptr) {
119120
using SingleResponse = AsyncRPCResponse<typename CommandType::Reply>;
120121

121122
// Set up cancellation token to cancel remaining hedged operations.
122123
CancellationSource hedgeCancellationToken{token};
123124
auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>();
125+
auto proxyExec = std::make_shared<detail::ProxyingExecutor>(baton, exec);
124126
auto tryBody = [=, targeter = std::move(targeter)] {
125127
return targeter->resolve(token)
126-
.thenRunOn(exec)
128+
.thenRunOn(proxyExec)
127129
.onError([](Status status) -> StatusWith<std::vector<HostAndPort>> {
128130
// Targeting error; rewrite it to a RemoteCommandExecutionError and skip
129131
// command execution body. We'll retry if the policy indicates to.
130132
return Status{AsyncRPCErrorInfo(status), status.reason()};
131133
})
132-
.then([cmd, opCtx, exec, token, hedgeCancellationToken, readPref, targetsAttempted](
133-
std::vector<HostAndPort> targets) {
134+
.then([=](std::vector<HostAndPort> targets) {
134135
invariant(targets.size(),
135136
"Successful targeting implies there are hosts to target.");
136137
*targetsAttempted = targets;
@@ -146,8 +147,9 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
146147
std::unique_ptr<Targeter> t = std::make_unique<FixedTargeter>(targets[i]);
147148
auto options = std::make_shared<AsyncRPCOptions<CommandType>>(
148149
cmd, exec, hedgeCancellationToken.token());
149-
requests.emplace_back(
150-
sendCommand(options, opCtx, std::move(t)).thenRunOn(exec));
150+
options->baton = baton;
151+
requests.push_back(
152+
sendCommand(options, opCtx, std::move(t)).thenRunOn(proxyExec));
151153
}
152154

153155
/**
@@ -189,7 +191,7 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
189191
!retryPolicy->recordAndEvaluateRetry(swResponse.getStatus());
190192
})
191193
.withBackoffBetweenIterations(detail::RetryDelayAsBackoff(retryPolicy.get()))
192-
.on(exec, CancellationToken::uncancelable())
194+
.on(proxyExec, CancellationToken::uncancelable())
193195
// We go inline here to intercept executor-shutdown errors and re-write them
194196
// so that the API always returns RemoteCommandExecutionError. Additionally,
195197
// we need to make sure we cancel outstanding requests.

0 commit comments

Comments
 (0)