Skip to content

Commit 90e631a

Browse files
committed
CDRIVER-2883 Support sharded transactions recovery token
1 parent b75358d commit 90e631a

11 files changed

+80
-84
lines changed

.gdbinit

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ define __printbson
1616
set $bson = ((uint32_t*)$arg0)
1717
printf " (len=%d)\n", $bson[0]
1818
printf "{\n"
19-
__printelements ($bson+1) 0 1
19+
__printElements ($bson+1) 0 1
2020
printf "\n}\n"
2121
end
2222

23-
define __printelements
23+
define __printElements
2424
set $data = ((uint8_t*)$arg0)
2525
set $isDocument = $arg1
2626
set $depth = $arg2

src/libmongoc/doc/mongoc_read_prefs_t.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ A read preference abstraction
88
Synopsis
99
--------
1010

11-
:symbol:`mongoc_read_prefs_t` provides an abstraction on top of the MongoDB connection read prefences. It allows for hinting to the driver which nodes in a replica set should be accessed first.
11+
:symbol:`mongoc_read_prefs_t` provides an abstraction on top of the MongoDB connection read preferences. It allows for hinting to the driver which nodes in a replica set should be accessed first.
1212

1313
You can specify a read preference mode on connection objects, database objects, collection objects, or per-operation. Generally, it makes the most sense to stick with the global default, ``MONGOC_READ_PRIMARY``. All of the other modes come with caveats that won't be covered in great detail here.
1414

src/libmongoc/src/mongoc/mongoc-client-session-private.h

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ struct _mongoc_client_session_t {
7777
uint32_t operation_increment;
7878
uint32_t client_generation;
7979
uint32_t server_id;
80+
bson_t *recovery_token;
8081

8182
/* For testing only */
8283
int64_t with_txn_timeout_ms;

src/libmongoc/src/mongoc/mongoc-client-session.c

+4
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ txn_commit (mongoc_client_session_t *session,
209209
_mongoc_bson_init_if_set (reply);
210210

211211
BSON_APPEND_INT32 (&cmd, "commitTransaction", 1);
212+
if (session->recovery_token) {
213+
BSON_APPEND_DOCUMENT (&cmd, "recoveryToken", session->recovery_token);
214+
}
212215

213216
retry:
214217
if (!mongoc_client_session_append (session, &opts, err_ptr)) {
@@ -1423,6 +1426,7 @@ mongoc_client_session_destroy (mongoc_client_session_t *session)
14231426
txn_opts_cleanup (&session->txn.opts);
14241427

14251428
bson_destroy (&session->cluster_time);
1429+
bson_destroy (session->recovery_token);
14261430
bson_free (session);
14271431

14281432
EXIT;

src/libmongoc/src/mongoc/mongoc-cluster.c

+24-5
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,14 @@ handle_not_master_error (mongoc_cluster_t *cluster,
506506
}
507507
}
508508

509+
bool
510+
_in_sharded_txn (const mongoc_client_session_t *session)
511+
{
512+
return session && _mongoc_client_session_in_txn (session) &&
513+
_mongoc_topology_get_type (session->client->topology) ==
514+
MONGOC_TOPOLOGY_SHARDED;
515+
}
516+
509517
/*
510518
*--------------------------------------------------------------------------
511519
*
@@ -542,6 +550,7 @@ mongoc_cluster_run_command_monitored (mongoc_cluster_t *cluster,
542550
bson_t reply_local;
543551
bson_error_t error_local;
544552
int32_t compressor_id;
553+
bson_iter_t iter;
545554

546555
server_stream = cmd->server_stream;
547556
server_id = server_stream->sd->id;
@@ -612,6 +621,19 @@ mongoc_cluster_run_command_monitored (mongoc_cluster_t *cluster,
612621

613622
handle_not_master_error (cluster, server_id, reply);
614623

624+
if (retval && _in_sharded_txn (cmd->session) &&
625+
bson_iter_init_find (&iter, reply, "recoveryToken")) {
626+
bson_free (cmd->session->recovery_token);
627+
if (BSON_ITER_HOLDS_DOCUMENT (&iter)) {
628+
cmd->session->recovery_token =
629+
bson_new_from_data (bson_iter_value (&iter)->value.v_doc.data,
630+
bson_iter_value (&iter)->value.v_doc.data_len);
631+
} else {
632+
MONGOC_ERROR ("Malformed recovery token from server");
633+
cmd->session->recovery_token = NULL;
634+
}
635+
}
636+
615637
if (reply == &reply_local) {
616638
bson_destroy (&reply_local);
617639
}
@@ -2153,11 +2175,8 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
21532175
bson_error_t *error)
21542176
{
21552177
uint32_t server_id;
2156-
bool in_txn;
21572178

2158-
in_txn = _mongoc_client_session_in_txn (cs);
2159-
if (in_txn && _mongoc_topology_get_type (cs->client->topology) ==
2160-
MONGOC_TOPOLOGY_SHARDED) {
2179+
if (_in_sharded_txn (cs)) {
21612180
server_id = cs->server_id;
21622181
if (!server_id) {
21632182
server_id = mongoc_topology_select_server_id (
@@ -2170,7 +2189,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
21702189
/* Transactions Spec: Additionally, any non-transaction operation using a
21712190
* pinned ClientSession MUST unpin the session and the operation MUST
21722191
* perform normal server selection. */
2173-
if (cs && !in_txn) {
2192+
if (cs && !_mongoc_client_session_in_txn (cs)) {
21742193
_mongoc_client_session_unpin (cs);
21752194
}
21762195
}

src/libmongoc/tests/json-test-operations.c

+14-1
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,8 @@ find_and_modify (mongoc_collection_t *collection,
945945
*/
946946
if (r) {
947947
bson_lookup_value (reply, "value", &value);
948+
} else {
949+
value_init_from_doc (&value, reply);
948950
}
949951

950952
check_result (test, operation, r, &value, &error);
@@ -1658,9 +1660,20 @@ json_test_operation (json_test_ctx_t *ctx,
16581660
test_error ("unrecognized session operation name %s", op_name);
16591661
}
16601662
} else if (!strcmp (obj_name, "testRunner")) {
1663+
/* We don't use reply, but we need to initialize it for the test runner */
1664+
bson_init (reply);
16611665
if (!strcmp (op_name, "assertSessionPinned")) {
1662-
bson_init (reply);
16631666
res = (0 != mongoc_client_session_get_server_id (session));
1667+
} else if (!strcmp (op_name, "assertSessionUnpinned")) {
1668+
res = (0 == mongoc_client_session_get_server_id (session));
1669+
} else if (!strcmp (op_name, "targetedFailPoint")) {
1670+
mongoc_client_t *client;
1671+
1672+
client = mongoc_client_new_from_uri (ctx->client->uri);
1673+
activate_fail_point (
1674+
client, session->server_id, operation, "arguments.failPoint");
1675+
1676+
mongoc_client_destroy (client);
16641677
} else {
16651678
test_error ("unrecognized session operation name %s", op_name);
16661679
}

src/libmongoc/tests/json-test.c

+27-9
Original file line numberDiff line numberDiff line change
@@ -991,14 +991,7 @@ execute_test (const json_test_config_t *config,
991991
ASSERT_OR_PRINT (server_id, error);
992992

993993
if (bson_has_field (test, "failPoint")) {
994-
bson_t command;
995-
bool r;
996-
997-
bson_lookup_doc (test, "failPoint", &command);
998-
ASSERT_CMPSTR (_mongoc_get_command_name (&command), "configureFailPoint");
999-
r = mongoc_client_command_simple_with_server_id (
1000-
client, "admin", &command, NULL, server_id, NULL, &error);
1001-
ASSERT_OR_PRINT (r, error);
994+
activate_fail_point (client, server_id, test, "failPoint");
1002995
}
1003996

1004997
json_test_ctx_init (&ctx, test, client, db, collection, config);
@@ -1043,6 +1036,27 @@ execute_test (const json_test_config_t *config,
10431036
}
10441037

10451038

1039+
void
1040+
activate_fail_point (mongoc_client_t *client,
1041+
const uint32_t server_id,
1042+
const bson_t *test,
1043+
const char *key)
1044+
{
1045+
bson_t command;
1046+
bson_error_t error;
1047+
bool r;
1048+
1049+
BSON_ASSERT (client);
1050+
BSON_ASSERT (server_id);
1051+
1052+
bson_lookup_doc (test, key, &command);
1053+
1054+
ASSERT_CMPSTR (_mongoc_get_command_name (&command), "configureFailPoint");
1055+
r = mongoc_client_command_simple_with_server_id (
1056+
client, "admin", &command, NULL, server_id, NULL, &error);
1057+
ASSERT_OR_PRINT (r, error);
1058+
}
1059+
10461060
/*
10471061
*-----------------------------------------------------------------------
10481062
*
@@ -1186,6 +1200,7 @@ run_json_general_test (const json_test_config_t *config)
11861200
uint32_t server_id;
11871201
bson_error_t error;
11881202
bool r;
1203+
bson_iter_t uri_iter;
11891204

11901205
ASSERT (BSON_ITER_HOLDS_DOCUMENT (&tests_iter));
11911206
bson_iter_bson (&tests_iter, &test);
@@ -1209,7 +1224,10 @@ run_json_general_test (const json_test_config_t *config)
12091224

12101225
bson_free (selected_test);
12111226

1212-
uri = test_framework_get_uri ();
1227+
uri = bson_iter_init_find (&uri_iter, &test, "useMultipleMongoses")
1228+
? mongoc_uri_new ("mongodb://localhost:27017,localhost:27018/")
1229+
: test_framework_get_uri ();
1230+
12131231
if (bson_iter_init_find (&client_opts_iter, &test, "clientOptions")) {
12141232
bson_t client_opts;
12151233

src/libmongoc/tests/json-test.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ typedef struct _json_test_config_t {
4040
bool command_monitoring_allow_subset;
4141
} json_test_config_t;
4242

43-
#define JSON_TEST_CONFIG_INIT {NULL, NULL, NULL, NULL, false}
43+
#define JSON_TEST_CONFIG_INIT \
44+
{ \
45+
NULL, NULL, NULL, NULL, false \
46+
}
4447

4548
bson_t *
4649
get_bson_from_json_file (char *filename);
@@ -75,8 +78,9 @@ server_type_from_test (const char *type);
7578

7679
void
7780
activate_fail_point (mongoc_client_t *client,
78-
uint32_t server_id,
79-
const bson_t *opts);
81+
const uint32_t server_id,
82+
const bson_t *test,
83+
const char *key);
8084

8185
void
8286
deactivate_fail_points (mongoc_client_t *client, uint32_t server_id);

0 commit comments

Comments
 (0)