Skip to content

MONGOCRYPT-723 support $lookup #954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions integrating.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ All contexts.

#### State: `MONGOCRYPT_CTX_NEED_MONGO_COLLINFO` ####

<a name="multi-collection-commands"></a>
> [!IMPORTANT]
> **Multi-collection commands**: prior to 1.13.0, drivers were expected to pass _at most one result_ from `listCollections`. In 1.13.0, drivers are expected to pass _all results_ from `listCollections` to support multi-collection commands (e.g. aggregate with `$lookup`).
>
> Drivers must call `mongocrypt_setopt_enable_multiple_collinfo` to indicate the new behavior is implemented and opt-in to support for multi-collection commands.
>
> The opt-in is avoids the following example scenario: Driver upgrades to 1.13.0 and does not update the behavior. A multi-collection command requests schemas for both `db.c1` and `db.c2`. Driver only passes the result for `db.c1` (even though `db.c2` has a result). libmongocrypt mistakenly assumes `db.c2` has no schema.

**libmongocrypt needs**...

A result from a listCollections cursor.
Expand All @@ -148,7 +156,7 @@ A result from a listCollections cursor.

1. Run listCollections on the encrypted MongoClient with the filter
provided by `mongocrypt_ctx_mongo_op`
2. Return the first result (if any) with `mongocrypt_ctx_mongo_feed` or proceed to the next step if nothing was returned.
2. Pass all results (if any) with calls to `mongocrypt_ctx_mongo_feed` or proceed to the next step if nothing was passed. Results may be passed in any order.
3. Call `mongocrypt_ctx_mongo_done`

**Applies to...**
Expand All @@ -157,6 +165,8 @@ auto encrypt

#### State: `MONGOCRYPT_CTX_NEED_MONGO_COLLINFO_WITH_DB` ####

See [note](#multi-collection-commands) about multi-collection commands.

**libmongocrypt needs**...

Results from a listCollections cursor from a specified database.
Expand All @@ -165,7 +175,7 @@ Results from a listCollections cursor from a specified database.

1. Run listCollections on the encrypted MongoClient with the filter
provided by `mongocrypt_ctx_mongo_op` on the database provided by `mongocrypt_ctx_mongo_db`.
2. Return the first result (if any) with `mongocrypt_ctx_mongo_feed` or proceed to the next step if nothing was returned.
2. Pass all results (if any) with calls to `mongocrypt_ctx_mongo_feed` or proceed to the next step if nothing was passed. Results may be passed in any order.
3. Call `mongocrypt_ctx_mongo_done`

**Applies to...**
Expand Down
3 changes: 3 additions & 0 deletions src/mc-schema-broker-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ void mc_schema_broker_use_rangev2(mc_schema_broker_t *sb);
// Returns error if two requests have different databases (not-yet supported).
bool mc_schema_broker_request(mc_schema_broker_t *sb, const char *db, const char *coll, mongocrypt_status_t *status);

// mc_schema_broker_has_multiple_requests returns true if there are requests for multiple unique collections
bool mc_schema_broker_has_multiple_requests(const mc_schema_broker_t *sb);

// mc_schema_broker_append_listCollections_filter appends a filter to use with the listCollections command.
// Example: { "name": { "$in": [ "coll1", "coll2" ] } }
// The filter matches all not-yet-satisfied collections.
Expand Down
5 changes: 5 additions & 0 deletions src/mc-schema-broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ bool mc_schema_broker_request(mc_schema_broker_t *sb, const char *db, const char
return true;
}

bool mc_schema_broker_has_multiple_requests(const mc_schema_broker_t *sb) {
BSON_ASSERT_PARAM(sb);
return sb->ll_len > 1;
}

void mc_schema_broker_destroy(mc_schema_broker_t *sb) {
if (!sb) {
return;
Expand Down
239 changes: 220 additions & 19 deletions src/mongocrypt-ctx-encrypt.c
Original file line number Diff line number Diff line change
Expand Up @@ -2220,9 +2220,161 @@ static bool needs_ismaster_check(mongocrypt_ctx_t *ctx) {
BSON_ASSERT_PARAM(ctx);

bool using_mongocryptd = !ectx->bypass_query_analysis && !ctx->crypt->csfle.okay;
// The "create" and "createIndexes" command require an isMaster check when
// using mongocryptd. See MONGOCRYPT-429.
return using_mongocryptd && (0 == strcmp(ectx->cmd_name, "create") || 0 == strcmp(ectx->cmd_name, "createIndexes"));

if (!using_mongocryptd) {
return false;
}

if (mc_schema_broker_has_multiple_requests(ectx->sb)) {
// Only mongocryptd 8.1 (wire version 26) supports multiple schemas with csfleEncryptionSchemas.
return true;
}
// MONGOCRYPT-429: The "create" and "createIndexes" command are only supported on mongocrypt 6.0 (wire version 17).
if (0 == strcmp(ectx->cmd_name, "create") || 0 == strcmp(ectx->cmd_name, "createIndexes")) {
return true;
}

return false;
}

// `find_collections_in_pipeline` finds other collection names in an aggregate pipeline that may need schemas.
static bool find_collections_in_pipeline(mc_schema_broker_t *sb,
bson_iter_t pipeline_iter,
const char *db,
mstr_view path,
mongocrypt_status_t *status) {
bson_iter_t array_iter;
if (!BSON_ITER_HOLDS_ARRAY(&pipeline_iter) || !bson_iter_recurse(&pipeline_iter, &array_iter)) {
CLIENT_ERR("failed to recurse pipeline at path: %s", path.data);
return false;
}

while (bson_iter_next(&array_iter)) {
bson_iter_t stage_iter;
const char *stage_key = bson_iter_key(&array_iter);

if (!BSON_ITER_HOLDS_DOCUMENT(&array_iter) || !bson_iter_recurse(&array_iter, &stage_iter)
|| !bson_iter_next(&stage_iter)) {
CLIENT_ERR("failed to recurse stage at path: %s.%s", path.data, stage_key);
return false;
}

const char *stage = bson_iter_key(&stage_iter);
// Check for $lookup.
if (0 == strcmp(stage, "$lookup")) {
bson_iter_t lookup_iter;
if (!BSON_ITER_HOLDS_DOCUMENT(&stage_iter) || !bson_iter_recurse(&stage_iter, &lookup_iter)) {
CLIENT_ERR("failed to recurse $lookup at path: %s.%s", path.data, stage_key);
return false;
}

while (bson_iter_next(&lookup_iter)) {
const char *field = bson_iter_key(&lookup_iter);
if (0 == strcmp(field, "from")) {
if (!BSON_ITER_HOLDS_UTF8(&lookup_iter)) {
CLIENT_ERR("expected string, but '%s' for 'from' field at path: %s.%s",
mc_bson_type_to_string(bson_iter_type(&lookup_iter)),
path.data,
stage_key);
return false;
}
const char *from = bson_iter_utf8(&lookup_iter, NULL);
if (!mc_schema_broker_request(sb, db, from, status)) {
return false;
}
}

if (0 == strcmp(field, "pipeline")) {
mstr subpath = mstr_append(path, mstrv_lit("."));
mstr_inplace_append(&subpath, mstrv_view_cstr(stage_key));
mstr_inplace_append(&subpath, mstrv_lit(".$lookup.pipeline"));
if (!find_collections_in_pipeline(sb, lookup_iter, db, subpath.view, status)) {
mstr_free(subpath);
return false;
}
mstr_free(subpath);
}
}
}

// Check for $facet.
if (0 == strcmp(stage, "$facet")) {
bson_iter_t facet_iter;
if (!BSON_ITER_HOLDS_DOCUMENT(&stage_iter) || !bson_iter_recurse(&stage_iter, &facet_iter)) {
CLIENT_ERR("failed to recurse $facet at path: %s.%s", path.data, stage_key);
return false;
}

while (bson_iter_next(&facet_iter)) {
const char *field = bson_iter_key(&facet_iter);
mstr subpath = mstr_append(path, mstrv_lit("."));
mstr_inplace_append(&subpath, mstrv_view_cstr(stage_key));
mstr_inplace_append(&subpath, mstrv_lit(".$facet."));
mstr_inplace_append(&subpath, mstrv_view_cstr(field));
if (!find_collections_in_pipeline(sb, facet_iter, db, subpath.view, status)) {
mstr_free(subpath);
return false;
}
mstr_free(subpath);
}
}

// Check for $unionWith.
if (0 == strcmp(stage, "$unionWith")) {
bson_iter_t unionWith_iter;
if (!BSON_ITER_HOLDS_DOCUMENT(&stage_iter) || !bson_iter_recurse(&stage_iter, &unionWith_iter)) {
CLIENT_ERR("failed to recurse $unionWith at path: %s.%s", path.data, stage_key);
return false;
}

while (bson_iter_next(&unionWith_iter)) {
const char *field = bson_iter_key(&unionWith_iter);
if (0 == strcmp(field, "coll")) {
if (!BSON_ITER_HOLDS_UTF8(&unionWith_iter)) {
CLIENT_ERR("expected string, but got '%s' for 'coll' field at path: %s.%s",
mc_bson_type_to_string(bson_iter_type(&unionWith_iter)),
path.data,
stage_key);
return false;
}
const char *coll = bson_iter_utf8(&unionWith_iter, NULL);
if (!mc_schema_broker_request(sb, db, coll, status)) {
return false;
}
}

if (0 == strcmp(field, "pipeline")) {
mstr subpath = mstr_append(path, mstrv_lit("."));
mstr_inplace_append(&subpath, mstrv_view_cstr(stage_key));
mstr_inplace_append(&subpath, mstrv_lit(".$unionWith.pipeline"));
if (!find_collections_in_pipeline(sb, unionWith_iter, db, subpath.view, status)) {
mstr_free(subpath);
return false;
}
mstr_free(subpath);
}
}
}
}

return true;
}

static bool
find_collections_in_agg(mongocrypt_binary_t *cmd, mc_schema_broker_t *sb, const char *db, mongocrypt_status_t *status) {
bson_t cmd_bson;
if (!_mongocrypt_binary_to_bson(cmd, &cmd_bson)) {
CLIENT_ERR("failed to convert command to BSON");
return false;
}

bson_iter_t iter;
if (!bson_iter_init_find(&iter, &cmd_bson, "pipeline")) {
// Command may be malformed. Let server error.
return true;
}

return find_collections_in_pipeline(sb, iter, db, mstrv_lit("aggregate.pipeline"), status);
}

bool mongocrypt_ctx_encrypt_init(mongocrypt_ctx_t *ctx, const char *db, int32_t db_len, mongocrypt_binary_t *cmd) {
Expand Down Expand Up @@ -2313,6 +2465,22 @@ bool mongocrypt_ctx_encrypt_init(mongocrypt_ctx_t *ctx, const char *db, int32_t
}
}

if (0 == strcmp(ectx->cmd_name, "aggregate")) {
if (!find_collections_in_agg(cmd, ectx->sb, ectx->cmd_db, ctx->status)) {
_mongocrypt_ctx_fail(ctx);
return false;
}

if (mc_schema_broker_has_multiple_requests(ectx->sb)) {
if (!ctx->crypt->multiple_collinfo_enabled) {
return _mongocrypt_ctx_fail_w_msg(ctx,
"aggregate includes a $lookup stage, but libmongocrypt is not "
"configured to support encrypting a "
"command with multiple collections");
}
}
}

if (ctx->opts.kek.provider.aws.region || ctx->opts.kek.provider.aws.cmk) {
return _mongocrypt_ctx_fail_w_msg(ctx, "aws masterkey options must not be set");
}
Expand Down Expand Up @@ -2341,11 +2509,8 @@ bool mongocrypt_ctx_encrypt_init(mongocrypt_ctx_t *ctx, const char *db, int32_t
bson_free(cmd_val);
}

/* The "create" and "createIndexes" command require sending an isMaster
* request to mongocryptd. */
// Check if an isMaster request to mongocryptd is needed to detect feature support:
if (needs_ismaster_check(ctx)) {
/* We are using mongocryptd. We need to ensure that mongocryptd
* maxWireVersion >= 17. */
ectx->ismaster.needed = true;
ctx->state = MONGOCRYPT_CTX_NEED_MONGO_MARKINGS;
return true;
Expand All @@ -2355,6 +2520,10 @@ bool mongocrypt_ctx_encrypt_init(mongocrypt_ctx_t *ctx, const char *db, int32_t
}

#define WIRE_VERSION_SERVER_6 17
#define WIRE_VERSION_SERVER_8_1 26
// The crypt_shared version format is defined in mongo_crypt-v1.h.
// Example: server 6.2.1 is encoded as 0x0006000200010000
#define CRYPT_SHARED_8_1 0x0008000100000000ull

/* mongocrypt_ctx_encrypt_ismaster_done is called when:
* 1. The max wire version of mongocryptd is known.
Expand All @@ -2367,20 +2536,52 @@ static bool mongocrypt_ctx_encrypt_ismaster_done(mongocrypt_ctx_t *ctx) {

ectx->ismaster.needed = false;

/* The "create" and "createIndexes" command require bypassing on mongocryptd
* older than version 6.0. */
if (needs_ismaster_check(ctx)) {
if (ectx->ismaster.maxwireversion < WIRE_VERSION_SERVER_6) {
// Bypass auto encryption.
// Satisfy schema request with an empty schema.
if (!mc_schema_broker_satisfy_remaining_with_empty_schemas(ectx->sb,
NULL /* do not cache */,
ctx->status)) {
return _mongocrypt_ctx_fail(ctx);
// MONGOCRYPT-429: "create" and "createIndexes" require bypassing on mongocryptd older than version 6.0.
if (0 == strcmp(ectx->cmd_name, "create") || 0 == strcmp(ectx->cmd_name, "createIndexes")) {
if (ectx->ismaster.maxwireversion < WIRE_VERSION_SERVER_6) {
// Bypass auto encryption.
// Satisfy schema request with an empty schema.
if (!mc_schema_broker_satisfy_remaining_with_empty_schemas(ectx->sb,
NULL /* do not cache */,
ctx->status)) {
return _mongocrypt_ctx_fail(ctx);
}
ctx->nothing_to_do = true;
ctx->state = MONGOCRYPT_CTX_READY;
return true;
}
}

if (mc_schema_broker_has_multiple_requests(ectx->sb)) {
// Ensure mongocryptd supports multiple schemas.
if (ectx->ismaster.maxwireversion < WIRE_VERSION_SERVER_8_1) {
mongocrypt_status_t *status = ctx->status;
CLIENT_ERR("Encrypting '%s' requires multiple schemas. Detected mongocryptd with wire version %" PRId32
", but need %" PRId32 ". Upgrade mongocryptd to 8.1 or newer.",
ectx->cmd_name,
ectx->ismaster.maxwireversion,
WIRE_VERSION_SERVER_8_1);
_mongocrypt_ctx_fail(ctx);
return false;
}
}
}

if (ctx->crypt->csfle.okay) {
if (mc_schema_broker_has_multiple_requests(ectx->sb)) {
// Ensure crypt_shared supports multiple schemas.
uint64_t version = ctx->crypt->csfle.get_version();
const char *version_str = ctx->crypt->csfle.get_version_str();
if (version < CRYPT_SHARED_8_1) {
mongocrypt_status_t *status = ctx->status;
CLIENT_ERR("Encrypting '%s' requires multiple schemas. Detected crypt_shared with version %s, but "
"need 8.1. Upgrade crypt_shared to 8.1 or newer.",
ectx->cmd_name,
version_str);
_mongocrypt_ctx_fail(ctx);
return false;
}
ctx->nothing_to_do = true;
ctx->state = MONGOCRYPT_CTX_READY;
return true;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/mongocrypt-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ struct _mongocrypt_t {
/// Pointer to the global csfle_lib object. Should not be freed directly.
mongo_crypt_v1_lib *csfle_lib;
bool retry_enabled;
bool multiple_collinfo_enabled;
};

typedef enum {
Expand Down
6 changes: 6 additions & 0 deletions src/mongocrypt.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ bool mongocrypt_setopt_retry_kms(mongocrypt_t *crypt, bool enable) {
return true;
}

bool mongocrypt_setopt_enable_multiple_collinfo(mongocrypt_t *crypt) {
ASSERT_MONGOCRYPT_PARAM_UNINIT(crypt);
crypt->multiple_collinfo_enabled = true;
return true;
}

bool mongocrypt_setopt_kms_provider_aws(mongocrypt_t *crypt,
const char *aws_access_key_id,
int32_t aws_access_key_id_len,
Expand Down
11 changes: 11 additions & 0 deletions src/mongocrypt.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,17 @@ bool mongocrypt_setopt_log_handler(mongocrypt_t *crypt, mongocrypt_log_fn_t log_
MONGOCRYPT_EXPORT
bool mongocrypt_setopt_retry_kms(mongocrypt_t *crypt, bool enable);

/**
* Enable support for multiple collection schemas. Required to support $lookup.
*
* @param[in] crypt The @ref mongocrypt_t object.
* @pre @ref mongocrypt_init has not been called on @p crypt.
* @returns A boolean indicating success. If false, an error status is set.
* Retrieve it with @ref mongocrypt_ctx_status
*/
MONGOCRYPT_EXPORT
bool mongocrypt_setopt_enable_multiple_collinfo(mongocrypt_t *crypt);

/**
* Configure an AWS KMS provider on the @ref mongocrypt_t object.
*
Expand Down
Loading