diff --git a/.gitignore b/.gitignore index 327c395c50..ababfa711b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ install_manifest.txt compile_commands.json CTestTestfile.cmake _deps +.clangd # Ignore generated binaries -- any file without extension # Ignore all diff --git a/include/ydb-cpp-sdk/client/import/import.h b/include/ydb-cpp-sdk/client/import/import.h index 9840b0cee6..dd2b6a4ceb 100644 --- a/include/ydb-cpp-sdk/client/import/import.h +++ b/include/ydb-cpp-sdk/client/import/import.h @@ -41,6 +41,7 @@ struct TImportFromS3Settings : public TOperationRequestSettings PermissionNames; @@ -139,6 +141,9 @@ struct TModifyPermissionsSettings : public TOperationRequestSettings{action, permissions}); } + + TModifyPermissionsSettings() = default; + explicit TModifyPermissionsSettings(const ::Ydb::Scheme::ModifyPermissionsRequest& request); }; class TSchemeClient { diff --git a/include/ydb-cpp-sdk/library/yql_common/issue/yql_issue.h b/include/ydb-cpp-sdk/library/yql_common/issue/yql_issue.h index 02b814c65d..30ccd1869d 100644 --- a/include/ydb-cpp-sdk/library/yql_common/issue/yql_issue.h +++ b/include/ydb-cpp-sdk/library/yql_common/issue/yql_issue.h @@ -58,13 +58,18 @@ struct TPosition { class TTextWalker { public: - TTextWalker(TPosition& position) + TTextWalker(TPosition& position, bool utf8Aware) : Position(position) + , Utf8Aware(utf8Aware) , HaveCr(false) , LfCount(0) { } + static inline bool IsUtf8Intermediate(char c) { + return (c & 0xC0) == 0x80; + } + template TTextWalker& Advance(const T& buf) { for (char c : buf) { @@ -77,6 +82,7 @@ class TTextWalker { private: TPosition& Position; + const bool Utf8Aware; bool HaveCr; uint32_t LfCount; }; diff --git a/src/api/grpc/draft/ydb_ymq_v1.proto b/src/api/grpc/draft/ydb_ymq_v1.proto index 6e00aa2b72..3080f38db7 100644 --- a/src/api/grpc/draft/ydb_ymq_v1.proto +++ b/src/api/grpc/draft/ydb_ymq_v1.proto @@ -10,14 +10,19 @@ option java_package = "com.yandex.ydb.ymq.v1"; service YmqService { - rpc GetQueueUrl(GetQueueUrlRequest) returns (GetQueueUrlResponse); - rpc CreateQueue(CreateQueueRequest) returns (CreateQueueResponse); - rpc SendMessage(SendMessageRequest) returns (SendMessageResponse); - rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse); - rpc GetQueueAttributes(GetQueueAttributesRequest) returns (GetQueueAttributesResponse); - rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - rpc DeleteMessage(DeleteMessageRequest) returns (DeleteMessageResponse); - rpc PurgeQueue(PurgeQueueRequest) returns (PurgeQueueResponse); - rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); - rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse); + rpc YmqGetQueueUrl(GetQueueUrlRequest) returns (GetQueueUrlResponse); + rpc YmqCreateQueue(CreateQueueRequest) returns (CreateQueueResponse); + rpc YmqSendMessage(SendMessageRequest) returns (SendMessageResponse); + rpc YmqReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse); + rpc YmqGetQueueAttributes(GetQueueAttributesRequest) returns (GetQueueAttributesResponse); + rpc YmqListQueues(ListQueuesRequest) returns (ListQueuesResponse); + rpc YmqDeleteMessage(DeleteMessageRequest) returns (DeleteMessageResponse); + rpc YmqPurgeQueue(PurgeQueueRequest) returns (PurgeQueueResponse); + rpc YmqDeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); + rpc YmqChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse); + rpc YmqSetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse); + rpc YmqSendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse); + rpc YmqDeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse); + rpc YmqChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse); + rpc YmqListDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest) returns (ListDeadLetterSourceQueuesResponse); } diff --git a/src/api/grpc/ydb_cms_v1.proto b/src/api/grpc/ydb_cms_v1.proto index b8b11a277e..1b82703e9e 100644 --- a/src/api/grpc/ydb_cms_v1.proto +++ b/src/api/grpc/ydb_cms_v1.proto @@ -27,4 +27,7 @@ service CmsService { // Describe supported database options. rpc DescribeDatabaseOptions(Cms.DescribeDatabaseOptionsRequest) returns (Cms.DescribeDatabaseOptionsResponse); + + // Get resources scale recommendation for database. + rpc GetScaleRecommendation(Cms.GetScaleRecommendationRequest) returns (Cms.GetScaleRecommendationResponse); } diff --git a/src/api/protos/draft/fq.proto b/src/api/protos/draft/fq.proto index 23d17be0a2..7e9e7ac0cd 100644 --- a/src/api/protos/draft/fq.proto +++ b/src/api/protos/draft/fq.proto @@ -442,6 +442,7 @@ message DataStreams { string endpoint = 3 [(Ydb.length).le = 1024]; string database = 4 [(Ydb.length).le = 1024]; bool secure = 5; + bool shared_reading = 6; } message Monitoring { diff --git a/src/api/protos/draft/ymq.proto b/src/api/protos/draft/ymq.proto index ba05168fe6..70a10c6b5e 100644 --- a/src/api/protos/draft/ymq.proto +++ b/src/api/protos/draft/ymq.proto @@ -25,7 +25,7 @@ message ChangeMessageVisibilityResult { message ChangeMessageVisibilityBatchRequestEntry { string id = 1; string receipt_handle = 2; - int32 visibility_timeout = 3; + optional int32 visibility_timeout = 3; } message ChangeMessageVisibilityBatchRequest { @@ -128,7 +128,7 @@ message GetQueueAttributesResult { message GetQueueUrlRequest { Ydb.Operations.OperationParams operation_params = 1; string queue_name = 2; - string queue_owner_aws_account_id = 3; + optional string queue_owner_aws_account_id = 3; } message GetQueueUrlResponse { @@ -141,9 +141,9 @@ message GetQueueUrlResult { message ListQueuesRequest { Ydb.Operations.OperationParams operation_params = 1; - int64 max_results = 2; - string next_token = 3; - string queue_name_prefix = 4; + optional int64 max_results = 2; + optional string next_token = 3; + optional string queue_name_prefix = 4; } message ListQueuesResponse { @@ -178,13 +178,13 @@ message MessageAttribute { message ReceiveMessageRequest { Ydb.Operations.OperationParams operation_params = 1; repeated string attribute_names = 2; - int32 max_number_of_messages = 3; + optional int32 max_number_of_messages = 3; repeated string message_attribute_names = 4; repeated string message_system_attribute_names = 5; string queue_url = 6; - string receive_request_attempt_id = 7; - int32 visibility_timeout = 8; - int32 wait_time_seconds = 9; + optional string receive_request_attempt_id = 7; + optional int32 visibility_timeout = 8; + optional int32 wait_time_seconds = 9; } message ReceiveMessageResponse { @@ -207,11 +207,11 @@ message ReceiveMessageResult { message SendMessageRequest { Ydb.Operations.OperationParams operation_params = 1; - int32 delay_seconds = 2; + optional int32 delay_seconds = 2; map message_attributes = 3; string message_body = 4; - string message_deduplication_id = 5; - string message_group_id = 6; + optional string message_deduplication_id = 5; + optional string message_group_id = 6; map message_system_attributes = 7; string queue_url = 8; } @@ -228,15 +228,6 @@ message SendMessageResult { string sequence_number = 5; } -message SendMessageBatchRequest { - Ydb.Operations.OperationParams operation_params = 1; - repeated SendMessageRequest entries = 2; -} - -message SendMessageBatchResponse { - Ydb.Operations.Operation operation = 1; -} - message BatchResultErrorEntry { string code = 1; string id = 2; @@ -244,15 +235,66 @@ message BatchResultErrorEntry { string message = 4; } +message SendMessageBatchRequestEntry { + string id = 1; + optional int32 delay_seconds = 2; + map message_attributes = 3; + string message_body = 4; + optional string message_deduplication_id = 5; + optional string message_group_id = 6; + map message_system_attributes = 7; + string queue_url = 8; +} + message SendMessageBatchResultEntry { - string md5_of_message_attributes = 1; - string md5_of_message_body= 2; - string md5_of_message_system_attributes= 3; - string message_id = 4; - string sequence_number = 5; + string id = 1; + string md5_of_message_body = 2; + string message_id = 3; + string md5_of_message_attributes = 4; + string md5_of_message_system_attributes = 5; + string sequence_number = 6; +} + +message SendMessageBatchRequest { + Ydb.Operations.OperationParams operation_params = 1; + repeated SendMessageBatchRequestEntry entries = 2; + string queue_url = 3; +} + +message SendMessageBatchResponse { + Ydb.Operations.Operation operation = 1; } message SendMessageBatchResult { repeated BatchResultErrorEntry failed = 1; repeated SendMessageBatchResultEntry successful = 2; } + +message SetQueueAttributesRequest { + Ydb.Operations.OperationParams operation_params = 1; + map attributes = 2; + string queue_url = 3; +} + +message SetQueueAttributesResponse { + Ydb.Operations.Operation operation = 1; +} + +message SetQueueAttributesResult { +} + +message ListDeadLetterSourceQueuesRequest { + Ydb.Operations.OperationParams operation_params = 1; + optional int32 max_results = 2; + optional string next_token = 3; + string queue_url = 4; +} + +message ListDeadLetterSourceQueuesResponse { + Ydb.Operations.Operation operation = 1; +} + +message ListDeadLetterSourceQueuesResult { + string next_token = 1; + repeated string queue_urls = 2; +} diff --git a/src/api/protos/ydb_cms.proto b/src/api/protos/ydb_cms.proto index 3e718b1919..14b639104d 100644 --- a/src/api/protos/ydb_cms.proto +++ b/src/api/protos/ydb_cms.proto @@ -4,7 +4,9 @@ option cc_enable_arenas = true; package Ydb.Cms; option java_package = "com.yandex.ydb.cms"; +import "src/api/protos/ydb_issue_message.proto"; import "src/api/protos/ydb_operation.proto"; +import "src/api/protos/ydb_status_codes.proto"; // A set of uniform storage units. // Single storage unit can be thought of as a reserved part of a RAID. @@ -104,6 +106,23 @@ message DatabaseQuotas { repeated StorageQuotas storage_quotas = 6; } +// A policy that is used for resource scale recommendation. If multiple are used, +// recommender combines them to recommend the largest scale. +message ScaleRecommenderPolicy { + // Policy that tracks metric and reactively recommend to adjust resources scale + // to keep metric close to the specified target value. + message TargetTrackingPolicy { + oneof target { + // A percentage of compute resources' average CPU utilization. + uint32 average_cpu_utilization_percent = 1; + } + } + + oneof policy { + TargetTrackingPolicy target_tracking_policy = 1; + } +} + // Request to create a new database. For successfull creation // specified database shouldn't exist. At least one storage // unit should be requested for the database. @@ -129,6 +148,8 @@ message CreateDatabaseRequest { string idempotency_key = 9; // Optional quotas for the database DatabaseQuotas database_quotas = 10; + // Optional scale recommender policies + repeated ScaleRecommenderPolicy scale_recommender_policies = 11; } message CreateDatabaseResponse { @@ -179,6 +200,8 @@ message GetDatabaseStatusResult { SchemaOperationQuotas schema_operation_quotas = 9; // Current quotas for the database DatabaseQuotas database_quotas = 10; + // Current scale recommender policies + repeated ScaleRecommenderPolicy scale_recommender_policies = 11; } // Change resources allocated for database. @@ -207,6 +230,8 @@ message AlterDatabaseRequest { DatabaseQuotas database_quotas = 11; // Alter attributes. Leave the value blank to drop an attribute. map alter_attributes = 12; + // Alter scale recommender policies. + repeated ScaleRecommenderPolicy scale_recommender_policies = 13; } message AlterDatabaseResponse { @@ -271,3 +296,16 @@ message DescribeDatabaseOptionsResult { repeated AvailabilityZoneDescription availability_zones = 2; repeated ComputationalUnitDescription computational_units = 3; } + +// Get resources scale recommendation for database. +message GetScaleRecommendationRequest { + // Required. Full path to database's home dir. + string path = 1; +} + +message GetScaleRecommendationResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; + // Recommended resources scale to be allocated for database. + Resources recommended_resources = 3; +} diff --git a/src/api/protos/ydb_import.proto b/src/api/protos/ydb_import.proto index 5946cc5aa8..a01b9a4860 100644 --- a/src/api/protos/ydb_import.proto +++ b/src/api/protos/ydb_import.proto @@ -42,7 +42,8 @@ message ImportFromS3Settings { The object name begins with 'source_prefix'. This prefix is followed by: * '/data_PartNumber', where 'PartNumber' represents the index of the part, starting at zero; - * '/scheme.pb' - object with information about scheme, indexes, etc. + * '/scheme.pb' - object with information about scheme, indexes, etc; + * '/permissions.pb' - object with information about ACL and owner. */ string source_prefix = 1 [(required) = true]; @@ -67,6 +68,10 @@ message ImportFromS3Settings { // details: https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html // it is especially useful for custom s3 implementations bool disable_virtual_addressing = 10; + + // Prevent importing of ACL and owner. If true, objects are created with empty ACL + // and their owner will be the user who started the import. + bool no_acl = 11; } message ImportFromS3Result { diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 0ff13dc8a5..6e469a5884 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -28,4 +28,3 @@ add_subdirectory(table) add_subdirectory(topic) add_subdirectory(types) add_subdirectory(value) -add_subdirectory(ymq) diff --git a/src/client/federated_topic/ut/basic_usage_ut.cpp b/src/client/federated_topic/ut/basic_usage_ut.cpp index 1a98937b28..909fd7bd9c 100644 --- a/src/client/federated_topic/ut/basic_usage_ut.cpp +++ b/src/client/federated_topic/ut/basic_usage_ut.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include diff --git a/src/client/federated_topic/ut/fds_mock.h b/src/client/federated_topic/ut/fds_mock/fds_mock.h similarity index 98% rename from src/client/federated_topic/ut/fds_mock.h rename to src/client/federated_topic/ut/fds_mock/fds_mock.h index 365cd0d962..9f9df0ceb2 100644 --- a/src/client/federated_topic/ut/fds_mock.h +++ b/src/client/federated_topic/ut/fds_mock/fds_mock.h @@ -2,7 +2,7 @@ #include "util/string/builder.h" #include -#include +#include #include #include diff --git a/src/client/federated_topic/ut/ya.make b/src/client/federated_topic/ut/ya.make deleted file mode 100644 index 8688c7519d..0000000000 --- a/src/client/federated_topic/ut/ya.make +++ /dev/null @@ -1,38 +0,0 @@ -UNITTEST_FOR(ydb/public/sdk/cpp/client/ydb_federated_topic) - -IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) - TIMEOUT(1200) - SIZE(LARGE) - TAG(ya:fat) -ELSE() - TIMEOUT(600) - SIZE(MEDIUM) -ENDIF() - -FORK_SUBTESTS() - -PEERDIR( - library/cpp/testing/gmock_in_unittest - ydb/core/testlib/default - ydb/public/lib/json_value - ydb/public/lib/yson_value - ydb/public/sdk/cpp/client/ydb_driver - ydb/public/sdk/cpp/client/ydb_persqueue_public - ydb/public/sdk/cpp/client/ydb_persqueue_public/include - ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils - - ydb/public/sdk/cpp/client/ydb_topic/codecs - ydb/public/sdk/cpp/client/ydb_topic - ydb/public/sdk/cpp/client/ydb_topic/impl - - ydb/public/sdk/cpp/client/ydb_federated_topic - ydb/public/sdk/cpp/client/ydb_federated_topic/impl -) - -YQL_LAST_ABI_VERSION() - -SRCS( - basic_usage_ut.cpp -) - -END() diff --git a/src/client/import/import.cpp b/src/client/import/import.cpp index 1db747edd5..6eaa52b16d 100644 --- a/src/client/import/import.cpp +++ b/src/client/import/import.cpp @@ -154,6 +154,10 @@ TFuture TImportClient::ImportFromS3(const TImportFromS3Se request.mutable_settings()->set_number_of_retries(settings.NumberOfRetries_.value()); } + if (settings.NoACL_) { + request.mutable_settings()->set_no_acl(settings.NoACL_.value()); + } + request.mutable_settings()->set_disable_virtual_addressing(!settings.UseVirtualAddressing_); return Impl_->ImportFromS3(std::move(request), settings); diff --git a/src/client/persqueue_public/impl/persqueue_impl.h b/src/client/persqueue_public/impl/persqueue_impl.h index a653e3c328..0f90c4f961 100644 --- a/src/client/persqueue_public/impl/persqueue_impl.h +++ b/src/client/persqueue_public/impl/persqueue_impl.h @@ -57,7 +57,7 @@ class TPersQueueClient::TImpl : public TClientImplCommonset_max_active_partitions(settings.PartitionsCount_); + props.mutable_auto_partitioning_settings()->set_max_active_partitions(*settings.MaxPartitionsCount_); autoPartitioningSettingsDefined = true; } if (settings.AutoPartitioningStrategy_.has_value()) { @@ -77,9 +77,7 @@ class TPersQueueClient::TImpl : public TClientImplCommonset_min_active_partitions(settings.PartitionsCount_); } diff --git a/src/client/persqueue_public/ut/basic_usage_ut.cpp b/src/client/persqueue_public/ut/basic_usage_ut.cpp index a7fad7491b..06bfefb801 100644 --- a/src/client/persqueue_public/ut/basic_usage_ut.cpp +++ b/src/client/persqueue_public/ut/basic_usage_ut.cpp @@ -71,7 +71,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { std::visit(TOverloaded { [&](TReadSessionEvent::TDataReceivedEvent& event) { for (auto& message: event.GetMessages()) { - TString sourceId = message.GetMessageGroupId(); + std::string sourceId = message.GetMessageGroupId(); ui32 seqNo = message.GetSeqNo(); UNIT_ASSERT_VALUES_EQUAL(readMessageCount + 1, seqNo); ++readMessageCount; @@ -190,16 +190,16 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto readSession = client.CreateReadSession(readSettings); auto event = readSession->GetEvent(true); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& createPartitionStream = std::get(*event); createPartitionStream.Confirm(); UNIT_CHECK_GENERATED_EXCEPTION(readSession->GetEvent(true, 0), TContractViolation); - UNIT_CHECK_GENERATED_EXCEPTION(readSession->GetEvents(true, Nothing(), 0), TContractViolation); + UNIT_CHECK_GENERATED_EXCEPTION(readSession->GetEvents(true, std::nullopt, 0), TContractViolation); event = readSession->GetEvent(true, 1); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& dataReceived = std::get(*event); dataReceived.Commit(); @@ -380,7 +380,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { std::visit(TOverloaded { [&](TReadSessionEvent::TDataReceivedEvent& event) { for (auto& message: event.GetMessages()) { - TString sourceId = message.GetMessageGroupId(); + std::string sourceId = message.GetMessageGroupId(); ui32 seqNo = message.GetSeqNo(); UNIT_ASSERT_VALUES_EQUAL(readMessageCount + 1, seqNo); ++readMessageCount; @@ -447,7 +447,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { public: TBrokenCredentialsProvider() {} virtual ~TBrokenCredentialsProvider() {} - TStringType GetAuthInfo() const { + std::string GetAuthInfo() const { ythrow yexception() << "exception during creation"; return ""; } @@ -463,7 +463,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { return std::make_shared(); } - virtual TStringType GetClientIdentity() const { + virtual std::string GetClientIdentity() const { return "abacaba"; } }; diff --git a/src/client/persqueue_public/ut/read_session_ut.cpp b/src/client/persqueue_public/ut/read_session_ut.cpp index 4cab328fa6..8ab0db9712 100644 --- a/src/client/persqueue_public/ut/read_session_ut.cpp +++ b/src/client/persqueue_public/ut/read_session_ut.cpp @@ -330,7 +330,7 @@ struct TMockReadSessionProcessor : public TMockProcessorFactory* metadata, TReadCallback callback) override { + void ReadInitialMetadata(std::unordered_multimap* metadata, TReadCallback callback) override { Y_UNUSED(metadata); Y_UNUSED(callback); UNIT_ASSERT_C(false, "This method is not expected to be called"); @@ -578,7 +578,7 @@ class TSynchronousExecutor : public ::IExecutor { } }; -extern TLogFormatter NYdb::GetPrefixLogFormatter(const TString& prefix); // Defined in ydb.cpp. +extern TLogFormatter NYdb::GetPrefixLogFormatter(const std::string& prefix); // Defined in ydb.cpp. TReadSessionImplTestSetup::TReadSessionImplTestSetup() { Settings @@ -662,7 +662,7 @@ void TReadSessionImplTestSetup::SuccessfulInit(bool hasInitRequest) { TPartitionStream::TPtr TReadSessionImplTestSetup::CreatePartitionStream(const TString& topic, const TString& cluster, ui64 partition, ui64 assignId) { MockProcessor->AddServerResponse(TMockReadSessionProcessor::TServerReadInfo().CreatePartitionStream(topic, cluster, partition, assignId)); // Callback will be called. - TMaybe event = EventsQueue->GetEvent(true); + std::optional event = EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TCreatePartitionStreamEvent); auto& createEvent = std::get(*event); @@ -673,7 +673,7 @@ TPartitionStream::TPtr TReadSessionImplTestSetup::CreatePartitionStream(const TS } void TReadSessionImplTestSetup::AssertNoEvents() { - TMaybe event = GetEventsQueue()->GetEvent(false); + std::optional event = GetEventsQueue()->GetEvent(false); UNIT_ASSERT(!event); } @@ -691,7 +691,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { TDeferredCommit dc; // Event 1: create partition stream. { - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TCreatePartitionStreamEvent); std::get(*event).Confirm(); @@ -699,7 +699,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { } // Event 2: data. { - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(*event); @@ -715,7 +715,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { setup.WriteToTopic({"message3"}); // Event 3: data. { - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(*event); @@ -736,14 +736,14 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { // Event 4: commit ack. if (commit) { // (commit && close) branch check is broken with current TReadSession::Close quick fix - TMaybe event = session->GetEvent(!close); // Event is expected to be already in queue if closed. + std::optional event = session->GetEvent(!close); // Event is expected to be already in queue if closed. UNIT_ASSERT(event); Cerr << "commit ack or close event " << DebugString(*event) << Endl; UNIT_ASSERT(std::holds_alternative(*event) || std::holds_alternative(*event)); } if (close && !commit) { - TMaybe event = session->GetEvent(false); + std::optional event = session->GetEvent(false); UNIT_ASSERT(event); Cerr << "close event " << DebugString(*event) << Endl; UNIT_ASSERT(std::holds_alternative(*event)); @@ -775,7 +775,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { std::shared_ptr session = \ setup.GetPersQueueClient().CreateReadSession(settings); \ session->WaitEvent().Wait(); \ - TMaybe event = \ + std::optional event = \ session->GetEvent(true); \ UNIT_ASSERT(event); \ Cerr << DebugString(*event) << Endl; \ @@ -812,7 +812,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { // Set policy with max retries == 3. settings.RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetExponentialBackoffPolicy(TDuration::MilliSeconds(10), TDuration::MilliSeconds(10), TDuration::MilliSeconds(100), 3)); std::shared_ptr session = setup.GetPersQueueClient().CreateReadSession(settings); - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); Cerr << DebugString(*event) << Endl; UNIT_ASSERT_EVENT_TYPE(*event, TSessionClosedEvent); @@ -827,7 +827,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { // Success. { std::shared_ptr session = setup.GetPersQueueClient().CreateReadSession(settings); - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); Cerr << DebugString(*event) << Endl; UNIT_ASSERT_NOT_EVENT_TYPE(*event, TSessionClosedEvent); @@ -837,7 +837,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { { settings.AppendClusters("unknown_cluster"); std::shared_ptr session = setup.GetPersQueueClient().CreateReadSession(settings); - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); Cerr << DebugString(*event) << Endl; UNIT_ASSERT_EVENT_TYPE(*event, TSessionClosedEvent); @@ -847,7 +847,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { { settings.ReadOriginal({"unknown_cluster"}); std::shared_ptr session = setup.GetPersQueueClient().CreateReadSession(settings); - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); Cerr << DebugString(*event) << Endl; UNIT_ASSERT_EVENT_TYPE(*event, TSessionClosedEvent); @@ -861,7 +861,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { // Event 1: create partition stream. { - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TCreatePartitionStreamEvent); std::get(*event).Confirm(); @@ -869,8 +869,8 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { } // Event 2: receive data. - auto GetDataEvent = [&](const TString& content) -> TMaybe { - TMaybe event = session->GetEvent(true); + auto GetDataEvent = [&](const TString& content) -> std::optional { + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(*event); @@ -880,7 +880,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { return event; }; - TMaybe dataEvents[2]; + std::optional dataEvents[2]; dataEvents[0] = GetDataEvent("message1"); @@ -899,7 +899,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { // Commit and check that other events will come. for (int i = 0; i < 2; ++i) { std::get(*dataEvents[i]).Commit(); - TMaybe event = session->GetEvent(true); + std::optional event = session->GetEvent(true); UNIT_ASSERT(event); Y_ASSERT(std::holds_alternative(*event)); Cerr << DebugString(*event) << Endl; @@ -1105,7 +1105,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { setup.MockProcessor->AddServerResponse(TMockReadSessionProcessor::TServerReadInfo().CreatePartitionStream()); // Callback will be called. { - TVector events = setup.EventsQueue->GetEvents(true); + std::vector events = setup.EventsQueue->GetEvents(true); UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TCreatePartitionStreamEvent); auto& event = std::get(events[0]); @@ -1146,7 +1146,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { // Check destroy event. if (!forceful) { - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDestroyPartitionStreamEvent); auto& destroyEvent = std::get(*event); @@ -1165,7 +1165,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { // Check closed event. { - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionStreamClosedEvent); auto& closedEvent = std::get(*event); @@ -1200,7 +1200,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { } for (ui64 i = 1; i <= 2; ) { - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); auto& dataEvent = std::get(*event); @@ -1226,7 +1226,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { // Exception was passed during decompression. { - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); auto& dataEvent = std::get(*event); @@ -1254,7 +1254,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { .Batch("src_id") .CompressMessage(1, data, codec)); - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); auto& dataEvent = std::get(*event); @@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { ui64 prevSeqNo = 41; for (size_t i = 0; i < batches; ++i) { Cerr << "Getting new event" << Endl; - TMaybe event = setup.EventsQueue->GetEvent(true, batchLimit); + std::optional event = setup.EventsQueue->GetEvent(true, batchLimit); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); Cerr << DebugString(*event) << Endl; @@ -1500,7 +1500,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { } { - TVector events = setup.EventsQueue->GetEvents(true); + std::vector events = setup.EventsQueue->GetEvents(true); UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(events[0]); @@ -1509,7 +1509,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { } { - TVector events = setup.EventsQueue->GetEvents(true); + std::vector events = setup.EventsQueue->GetEvents(true); UNIT_ASSERT_VALUES_EQUAL(events.size(), 1); UNIT_ASSERT_EVENT_TYPE(events[0], TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(events[0]); @@ -1540,7 +1540,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { stream->RequestStatus(); - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionStreamStatusEvent); auto& statusEvent = std::get(*event); @@ -1591,7 +1591,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { })); for (int i = 0; i < 2; ) { - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(*event); @@ -1616,7 +1616,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { .Batch("src_id") .CompressMessage(1, "message1")); - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get(*event); @@ -1693,7 +1693,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { setup.MockProcessor->AddServerResponse(TMockReadSessionProcessor::TServerReadInfo() .ForcefulReleasePartitionStream()); - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TPartitionStreamClosedEvent); calledPromise.GetFuture().Wait(); @@ -1724,7 +1724,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { .Batch("src_id") .CompressMessage(1, "message1")); - TMaybe event = setup.EventsQueue->GetEvent(true); + std::optional event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); diff --git a/src/client/persqueue_public/ut/retry_policy_ut.cpp b/src/client/persqueue_public/ut/retry_policy_ut.cpp index 819c43f6b8..9739f6fb63 100644 --- a/src/client/persqueue_public/ut/retry_policy_ut.cpp +++ b/src/client/persqueue_public/ut/retry_policy_ut.cpp @@ -56,10 +56,10 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { helper.Policy->Initialize(); helper.Policy->ExpectFatalBreakDown(); helper.EventLoop->AllowStop(); - auto f1 = helper.Write(false); helper.Setup->KickTablets(); - helper.Write(false); + helper.Setup->WaitForTabletsDown(); + auto f1 = helper.Write(false); helper.EventLoop->WaitForStop(); UNIT_ASSERT(!f1.HasValue()); helper.Setup = nullptr; @@ -316,7 +316,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { Cerr << "===Data event\n"; auto& clusterName = event.GetPartitionStream()->GetCluster(); for (auto& message: event.GetMessages()) { - TString sourceId = message.GetMessageGroupId(); + auto sourceId = TString{message.GetMessageGroupId()}; ui32 seqNo = message.GetSeqNo(); if (sourceId == sourceId1) { UNIT_ASSERT_VALUES_EQUAL(seqNo, seqNoByClusterSrc1[clusterName] + 1); diff --git a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp index 0ef9b8750f..216c743428 100644 --- a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp +++ b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp @@ -51,7 +51,7 @@ namespace NKikimr::NPersQueueTests { std::optional partitionGroup, std::optional codec, std::optional reconnectOnFailure, - THashMap sessionMeta + std::unordered_map sessionMeta ) { auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId); if (partitionGroup) settings.PartitionGroupId(*partitionGroup); @@ -84,7 +84,7 @@ namespace NKikimr::NPersQueueTests { auto future = reader->WaitEvent(); future.Wait(timeout); - TMaybe event = reader->GetEvent(false, 1); + std::optional event = reader->GetEvent(false, 1); if (!event) return {}; if (auto dataEvent = std::get_if(&*event)) { diff --git a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h index 8d6f782e4d..4357cccefb 100644 --- a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h +++ b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h @@ -4,6 +4,8 @@ #include #include +#include + namespace NKikimr::NPersQueueTests { std::shared_ptr CreateWriter( @@ -34,7 +36,7 @@ namespace NKikimr::NPersQueueTests { std::optional partitionGroup = {}, std::optional codec = {}, std::optional reconnectOnFailure = {}, - THashMap sessionMeta = {} + std::unordered_map sessionMeta = {} ); std::shared_ptr CreateReader( diff --git a/src/client/persqueue_public/ut/ut_utils/sdk_test_setup.h b/src/client/persqueue_public/ut/ut_utils/sdk_test_setup.h index b65bbfd80d..cc39afff38 100644 --- a/src/client/persqueue_public/ut/ut_utils/sdk_test_setup.h +++ b/src/client/persqueue_public/ut/ut_utils/sdk_test_setup.h @@ -88,7 +88,7 @@ class SDKTestSetup { } } - static TString GetTestTopic() { + static std::string GetTestTopic() { return "test-topic"; } diff --git a/src/client/persqueue_public/ut/ut_utils/ut_utils.h b/src/client/persqueue_public/ut/ut_utils/ut_utils.h index c850d891d4..f61a2857be 100644 --- a/src/client/persqueue_public/ut/ut_utils/ut_utils.h +++ b/src/client/persqueue_public/ut/ut_utils/ut_utils.h @@ -45,7 +45,7 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << Server.GrpcPort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); Driver = MakeHolder(cfg); } return *Driver; @@ -168,7 +168,7 @@ struct TYDBClientEventLoop : public ::NPersQueue::IClientEventLoop { } Y_ABORT_UNLESS(continueToken); - TMaybe seqNo = Nothing(); + std::optional seqNo = std::nullopt; if (!AutoSeqNo) { seqNo = acknowledgeableMessage.SequenceNumber; Log << TLOG_INFO << "[" << sourceId << "] Write messages with sequence numbers " @@ -377,7 +377,7 @@ class TYdbPqTestExecutor : public IAsyncExecutor { Stop = true; Thread.Join(); } - void PostImpl(TVector&& fs) override { + void PostImpl(std::vector&& fs) override { for (auto& f : fs) { TasksQueue.Enqueue(std::move(f)); } diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index 41bc072877..78c2786a46 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -17,10 +17,15 @@ namespace NScheme { using namespace NThreading; using namespace Ydb::Scheme; +TPermissions::TPermissions(const ::Ydb::Scheme::Permissions& proto) + : Subject(proto.subject()) + , PermissionNames(proto.permission_names().begin(), proto.permission_names().end()) +{} + void TPermissions::SerializeTo(::Ydb::Scheme::Permissions& proto) const { proto.set_subject(TStringType{Subject}); for (const auto& name : PermissionNames) { - *proto.mutable_permission_names()->Add() = name; + proto.add_permission_names(TStringType{name}); } } @@ -132,7 +137,28 @@ void TSchemeEntry::Out(IOutputStream& out) const { void TSchemeEntry::SerializeTo(::Ydb::Scheme::ModifyPermissionsRequest& request) const { request.mutable_actions()->Add()->set_change_owner(TStringType{Owner}); for (const auto& permission : Permissions) { - permission.SerializeTo(*request.mutable_actions()->Add()->mutable_set()); + permission.SerializeTo(*request.mutable_actions()->Add()->mutable_grant()); + } +} + +TModifyPermissionsSettings::TModifyPermissionsSettings(const ::Ydb::Scheme::ModifyPermissionsRequest& request) { + for (const auto& action : request.actions()) { + switch (action.action_case()) { + case Ydb::Scheme::PermissionsAction::kGrant: + AddGrantPermissions(action.grant()); + break; + case Ydb::Scheme::PermissionsAction::kRevoke: + AddRevokePermissions(action.revoke()); + break; + case Ydb::Scheme::PermissionsAction::kSet: + AddSetPermissions(action.set()); + break; + case Ydb::Scheme::PermissionsAction::kChangeOwner: + AddChangeOwner(action.change_owner()); + break; + case Ydb::Scheme::PermissionsAction::ACTION_NOT_SET: + break; + } } } diff --git a/src/client/topic/impl/common.h b/src/client/topic/impl/common.h index aa95f90c00..026bee1818 100644 --- a/src/client/topic/impl/common.h +++ b/src/client/topic/impl/common.h @@ -381,7 +381,7 @@ class TBaseSessionEventsQueue : public ISignalable { void WaitEventsImpl() { // Assumes that we're under lock. Posteffect: HasEventsImpl() is true. while (!HasEventsImpl()) { - std::unique_lock lk(Mutex, std::defer_lock); + std::unique_lock lk(Mutex, std::adopt_lock); CondVar.wait(lk); } } diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 43c41f2faa..35acf14452 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -417,10 +417,6 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race ThrowFatalError("ProducerId != MessageGroupId scenario is currently not supported"); } CompressionExecutor = Settings.CompressionExecutor_; - IExecutor::TPtr executor; - executor = CreateSyncExecutor(); - executor->Start(); - Executor = std::move(executor); Settings.CompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); @@ -1225,7 +1221,6 @@ void TWriteSessionImpl::ResetForRetryImpl() { } if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().Id < minId) minId = OriginalMessagesToSend.front().Id; - MinUnsentId = minId; Y_ABORT_UNLESS(PackedMessagesToSend.size() == totalPackedMessages); Y_ABORT_UNLESS(OriginalMessagesToSend.size() == totalOriginalMessages); } diff --git a/src/client/topic/impl/write_session_impl.h b/src/client/topic/impl/write_session_impl.h index 8f3283858f..6a496da233 100644 --- a/src/client/topic/impl/write_session_impl.h +++ b/src/client/topic/impl/write_session_impl.h @@ -415,10 +415,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, TWriteSessionSettings Settings; std::shared_ptr Client; std::shared_ptr Connections; - std::string TargetCluster; - std::string InitialCluster; - std::string CurrentCluster; - std::string PreferredClusterByCDS; + std::shared_ptr ConnectionFactory; TDbDriverStatePtr DbDriverState; std::string PrevToken; @@ -438,7 +435,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, std::shared_ptr ServerMessage; // Server message to write server response to. std::string SessionId; - IExecutor::TPtr Executor; IExecutor::TPtr CompressionExecutor; size_t MemoryUsage = 0; //!< Estimated amount of memory used bool FirstTokenSent = false; @@ -461,10 +457,8 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, ui32 PartitionId = 0; TPartitionLocation PreferredPartitionLocation = {}; uint64_t NextId = 0; - uint64_t MinUnsentId = 1; std::optional InitSeqNo; std::optional AutoSeqNoMode; - bool ValidateSeqNoMode = false; NThreading::TPromise InitSeqNoPromise; bool InitSeqNoSetDone = false; diff --git a/src/client/topic/ut/basic_usage_ut.cpp b/src/client/topic/ut/basic_usage_ut.cpp index c1a7bf4a70..02c1383816 100644 --- a/src/client/topic/ut/basic_usage_ut.cpp +++ b/src/client/topic/ut/basic_usage_ut.cpp @@ -16,6 +16,8 @@ #include #include +#include + #include namespace NYdb::NTopic::NTests { @@ -99,7 +101,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "invalid:" << setup.GetServer().GrpcPort); cfg.SetDatabase("/Invalid"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); auto driver = NYdb::TDriver(cfg); { @@ -113,13 +115,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto writeSession = client.CreateWriteSession(writeSettings); auto event = writeSession->GetEvent(true); - UNIT_ASSERT(event.Defined() && std::holds_alternative(event.GetRef())); + UNIT_ASSERT(event && std::holds_alternative(event.value())); } { auto settings = TTopicClientSettings() .Database({"/Root"}) - .DiscoveryEndpoint({TStringBuilder() << "localhost:" << setup.GetServer().GrpcPort}); + .DiscoveryEndpoint("localhost:" + std::to_string(setup.GetServer().GrpcPort)); TTopicClient client(driver, settings); @@ -130,7 +132,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto writeSession = client.CreateWriteSession(writeSettings); auto event = writeSession->GetEvent(true); - UNIT_ASSERT(event.Defined() && !std::holds_alternative(event.GetRef())); + UNIT_ASSERT(event && !std::holds_alternative(event.value())); } } @@ -171,13 +173,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto readSession = client.CreateReadSession(readSettings); auto event = readSession->GetEvent(true); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& startPartitionSession = std::get(*event); startPartitionSession.Confirm(); event = readSession->GetEvent(true); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& dataReceived = std::get(*event); dataReceived.Commit(); @@ -234,16 +236,16 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto readSession = client.CreateReadSession(readSettings); auto event = readSession->GetEvent(true); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& startPartitionSession = std::get(*event); startPartitionSession.Confirm(); UNIT_CHECK_GENERATED_EXCEPTION(readSession->GetEvent(true, 0), TContractViolation); - UNIT_CHECK_GENERATED_EXCEPTION(readSession->GetEvents(true, Nothing(), 0), TContractViolation); + UNIT_CHECK_GENERATED_EXCEPTION(readSession->GetEvents(true, std::nullopt, 0), TContractViolation); event = readSession->GetEvent(true, 1); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& dataReceived = std::get(*event); dataReceived.Commit(); @@ -335,7 +337,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto description = result.GetConsumerDescription(); UNIT_ASSERT(description.GetPartitions().size() == 1); auto stats = description.GetPartitions().front().GetPartitionConsumerStats(); - UNIT_ASSERT(stats.Defined()); + UNIT_ASSERT(stats.has_value()); UNIT_ASSERT(stats->GetCommittedOffset() == 50); } @@ -695,7 +697,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto description = result.GetTopicDescription(); UNIT_ASSERT(description.GetPartitions().size() == 1); auto stats = description.GetPartitions().front().GetPartitionStats(); - UNIT_ASSERT(stats.Defined()); + UNIT_ASSERT(stats.has_value()); UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count); } @@ -779,7 +781,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { std::visit(TOverloaded { [&](TReadSessionEvent::TDataReceivedEvent& event) { for (auto& message: event.GetMessages()) { - TString sourceId = message.GetMessageGroupId(); + std::string sourceId = message.GetMessageGroupId(); ui32 seqNo = message.GetSeqNo(); UNIT_ASSERT_VALUES_EQUAL(readMessageCount + 1, seqNo); ++readMessageCount; @@ -829,11 +831,11 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) { auto client = setup.MakeClient(); ui64 producerIndex = 0u; - auto runTest = [&](TString producer, TString msgGroup, const TMaybe& useDedup, bool useSeqNo, EExpectedTestResult result) ->bool + auto runTest = [&](TString producer, TString msgGroup, const std::optional& useDedup, bool useSeqNo, EExpectedTestResult result) ->bool { TWriteSessionSettings writeSettings; writeSettings.Path(setup.GetTopicPath()).Codec(NTopic::ECodec::RAW); - TString useDedupStr = useDedup.Defined() ? ToString(*useDedup) : ""; + TString useDedupStr = useDedup.has_value() ? ToString(*useDedup) : ""; if (producer) { producer += ToString(producerIndex); } @@ -848,7 +850,7 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) { << useDedupStr << ", manual SeqNo: " << useSeqNo << Endl; try { - if (useDedup.Defined()) { + if (useDedup.has_value()) { writeSettings.DeduplicationEnabled(useDedup); } auto session = client.CreateWriteSession(writeSettings); @@ -857,12 +859,12 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) { ui64 written = 0; while (written < 10) { auto event = session->GetEvent(true); - if (std::holds_alternative(event.GetRef())) { + if (std::holds_alternative(event.value())) { auto closed = std::get(*event); Cerr << "Session failed with error: " << closed.DebugString() << Endl; UNIT_ASSERT(result == EExpectedTestResult::FAIL_ON_RPC); return false; - } else if (std::holds_alternative(event.GetRef())) { + } else if (std::holds_alternative(event.value())) { token = std::move(std::get(*event).ContinuationToken); if (useSeqNo) { session->Write(std::move(*token), "data", seqNo++); @@ -948,10 +950,10 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) { auto readSession = client.CreateReadSession(readSettings); auto event = readSession->GetEvent(true); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); auto& closeEvent = std::get(*event); - UNIT_ASSERT(closeEvent.DebugString().Contains("Too small max memory usage")); + UNIT_ASSERT(closeEvent.DebugString().contains("Too small max memory usage")); } } // Y_UNIT_TEST_SUITE(TSettingsValidation) diff --git a/src/client/topic/ut/describe_topic_ut.cpp b/src/client/topic/ut/describe_topic_ut.cpp index 75258d97c7..a6d3ec0e1a 100644 --- a/src/client/topic/ut/describe_topic_ut.cpp +++ b/src/client/topic/ut/describe_topic_ut.cpp @@ -144,7 +144,7 @@ namespace NYdb::NTopic::NTests { UNIT_ASSERT_GT(consumerStats->GetLastReadOffset(), 0); UNIT_ASSERT_GT(consumerStats->GetCommittedOffset(), 0); - UNIT_ASSERT_GE(consumerStats->GetReadSessionId(), 0); + UNIT_ASSERT_GE(TString{consumerStats->GetReadSessionId()}, 0); UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetReaderName(), ""); } else { UNIT_ASSERT_VALUES_EQUAL(stats->GetStartOffset(), 0); @@ -296,9 +296,9 @@ namespace NYdb::NTopic::NTests { // Event 1: start partition session { - TMaybe event = readSession->GetEvent(true); + std::optional event = readSession->GetEvent(true); UNIT_ASSERT(event); - auto startPartitionSession = std::get_if(event.Get()); + auto startPartitionSession = std::get_if(&event.value()); UNIT_ASSERT_C(startPartitionSession, DebugString(*event)); startPartitionSession->Confirm(); @@ -306,9 +306,9 @@ namespace NYdb::NTopic::NTests { // Event 2: data received { - TMaybe event = readSession->GetEvent(true); + std::optional event = readSession->GetEvent(true); UNIT_ASSERT(event); - auto dataReceived = std::get_if(event.Get()); + auto dataReceived = std::get_if(&event.value()); UNIT_ASSERT_C(dataReceived, DebugString(*event)); dataReceived->Commit(); @@ -316,9 +316,9 @@ namespace NYdb::NTopic::NTests { // Event 3: commit acknowledgement { - TMaybe event = readSession->GetEvent(true); + std::optional event = readSession->GetEvent(true); UNIT_ASSERT(event); - auto commitOffsetAck = std::get_if(event.Get()); + auto commitOffsetAck = std::get_if(&event.value()); UNIT_ASSERT_C(commitOffsetAck, DebugString(*event)); @@ -363,7 +363,7 @@ namespace NYdb::NTopic::NTests { if (allowUpdateRow) { acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, authToken); } - setup.GetServer().AnnoyingClient->ModifyACL("/Root", TEST_TOPIC, acl.SerializeAsString()); + setup.GetServer().AnnoyingClient->ModifyACL("/Root", TString{TEST_TOPIC}, acl.SerializeAsString()); return client.DescribePartition(existingTopic ? TEST_TOPIC : "bad-topic", testPartitionId, settings).GetValueSync(); } @@ -409,7 +409,7 @@ namespace NYdb::NTopic::NTests { if (resultStatus == EStatus::SUCCESS) { auto& p = result.GetPartitionDescription().GetPartition(); UNIT_ASSERT(p.GetActive()); - UNIT_ASSERT(p.GetPartitionLocation().Defined()); + UNIT_ASSERT(p.GetPartitionLocation().has_value()); } } } diff --git a/src/client/topic/ut/local_partition_ut.cpp b/src/client/topic/ut/local_partition_ut.cpp index a5b6718016..94165c9b38 100644 --- a/src/client/topic/ut/local_partition_ut.cpp +++ b/src/client/topic/ut/local_partition_ut.cpp @@ -83,16 +83,16 @@ namespace NYdb::NTopic::NTests { auto readSession = client.CreateReadSession(CreateReadSessionSettings()); - TMaybe event = readSession->GetEvent(true); + std::optional event = readSession->GetEvent(true); UNIT_ASSERT(event); - auto startPartitionSession = std::get_if(event.Get()); + auto startPartitionSession = std::get_if(&event.value()); UNIT_ASSERT_C(startPartitionSession, DebugString(*event)); startPartitionSession->Confirm(); event = readSession->GetEvent(true); UNIT_ASSERT(event); - auto dataReceived = std::get_if(event.Get()); + auto dataReceived = std::get_if(&event.value()); UNIT_ASSERT_C(dataReceived, DebugString(*event)); dataReceived->Commit(); @@ -103,7 +103,7 @@ namespace NYdb::NTopic::NTests { event = readSession->GetEvent(true); UNIT_ASSERT(event); - auto commitOffsetAck = std::get_if(event.Get()); + auto commitOffsetAck = std::get_if(&event.value()); UNIT_ASSERT_C(commitOffsetAck, DebugString(*event)); UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), expectedCommitedOffset); } @@ -391,7 +391,7 @@ namespace NYdb::NTopic::NTests { discovery.SetGoodEndpoints(*setup); auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr()); auto* tracingBackend = new TTracingBackend(); - driverConfig.SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})); + driverConfig.SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())); TDriver driver(driverConfig); TTopicClient client(driver); auto sessionSettings = TWriteSessionSettings() @@ -424,7 +424,7 @@ namespace NYdb::NTopic::NTests { discovery.SetGoodEndpoints(*setup); auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr()); auto* tracingBackend = new TTracingBackend(); - driverConfig.SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})); + driverConfig.SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())); TDriver driver(driverConfig); TTopicClient client(driver); auto retryPolicy = std::make_shared(); @@ -471,13 +471,13 @@ namespace NYdb::NTopic::NTests { // But at first we only add node 1 to the discovery service. auto setup = CreateSetup(TEST_CASE_NAME, 2, /* createTopic = */ false); setup->GetServer().AnnoyingClient->MarkNodeInHive(setup->GetServer().GetRuntime(), 0, false); - setup->CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1); + setup->CreateTopic(TString{TEST_TOPIC}, TEST_CONSUMER, 1); setup->GetServer().AnnoyingClient->MarkNodeInHive(setup->GetServer().GetRuntime(), 0, true); TMockDiscoveryService discovery; discovery.SetEndpoints(setup->GetRuntime().GetNodeId(0), 1, setup->GetServer().GrpcPort); auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr()); auto* tracingBackend = new TTracingBackend(); - driverConfig.SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})); + driverConfig.SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())); TDriver driver(driverConfig); TTopicClient client(driver); auto retryPolicy = std::make_shared(); @@ -522,7 +522,7 @@ namespace NYdb::NTopic::NTests { discovery.SetEndpoints(setup->GetRuntime().GetNodeId(0), 1, 0); auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr()); auto* tracingBackend = new TTracingBackend(); - driverConfig.SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})); + driverConfig.SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())); TDriver driver(driverConfig); TTopicClient client(driver); auto retryPolicy = std::make_shared(); @@ -560,13 +560,13 @@ namespace NYdb::NTopic::NTests { auto setup = CreateSetup(TEST_CASE_NAME, 2, /* createTopic = */ false); // Make the node 1 unavailable. setup->GetServer().AnnoyingClient->MarkNodeInHive(setup->GetServer().GetRuntime(), 0, false); - setup->CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2); + setup->CreateTopic(TString{TEST_TOPIC}, TEST_CONSUMER, 2); TMockDiscoveryService discovery; discovery.SetGoodEndpoints(*setup); auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr()); auto* tracingBackend = new TTracingBackend(); - driverConfig.SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})); + driverConfig.SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())); TDriver driver(driverConfig); TTopicClient client(driver); auto retryPolicy = std::make_shared(); @@ -625,14 +625,14 @@ namespace NYdb::NTopic::NTests { // Allow UpdateRow only, no DescribeSchema permission. NACLib::TDiffACL acl; acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, authToken); - setup->GetServer().AnnoyingClient->ModifyACL("/Root", TEST_TOPIC, acl.SerializeAsString()); + setup->GetServer().AnnoyingClient->ModifyACL("/Root", TString{TEST_TOPIC}, acl.SerializeAsString()); } TMockDiscoveryService discovery; discovery.SetGoodEndpoints(*setup); auto* tracingBackend = new TTracingBackend(); auto driverConfig = CreateConfig(*setup, discovery.GetDiscoveryAddr()) - .SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})) + .SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())) .SetAuthToken(authToken); TDriver driver(driverConfig); TTopicClient client(driver); @@ -662,13 +662,13 @@ namespace NYdb::NTopic::NTests { Y_UNIT_TEST(WithoutPartitionWithSplit) { auto setup = CreateSetupForSplitMerge(TEST_CASE_NAME); - setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100); + setup.CreateTopic(TString{TEST_TOPIC}, TEST_CONSUMER, 1, 100); TMockDiscoveryService discovery; discovery.SetGoodEndpoints(setup); auto driverConfig = CreateConfig(setup, discovery.GetDiscoveryAddr()); auto* tracingBackend = new TTracingBackend(); - driverConfig.SetLog(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend})); + driverConfig.SetLog(std::unique_ptr(CreateCompositeLogBackend({new TStreamLogBackend(&Cerr), tracingBackend}).Release())); TDriver driver(driverConfig); TTopicClient client(driver); auto writeSettings = TWriteSessionSettings() diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 3373f965dc..d323d3c1fe 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -61,19 +61,21 @@ class TFixture : public NUnitTest::TBaseFixture { const TString& topic, const TString& groupId, NTable::TTransaction& tx); - void CreateTopic(const TString& path = TEST_TOPIC, + void CreateTopic(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, std::optional maxPartitionCount = std::nullopt); + void DescribeTopic(const TString& path); + void WriteToTopicWithInvalidTxId(bool invalidTxId); TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath, const TString& messageGroupId, - TMaybe partitionId); + std::optional partitionId); TTopicWriteSessionContext& GetTopicWriteSession(const TString& topicPath, const TString& messageGroupId, - TMaybe partitionId); + std::optional partitionId); TTopicReadSessionPtr CreateTopicReadSession(const TString& topicPath, const TString& consumerName, @@ -86,7 +88,7 @@ class TFixture : public NUnitTest::TBaseFixture { const TString& messageGroupId, const TString& message, NTable::TTransaction* tx = nullptr, - TMaybe partitionId = Nothing()); + std::optional partitionId = std::nullopt); TVector ReadFromTopic(const TString& topicPath, const TString& consumerName, const TDuration& duration, @@ -152,6 +154,8 @@ class TFixture : public NUnitTest::TBaseFixture { void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params); + void WriteMessagesInTx(size_t big, size_t small); + const TDriver& GetDriver() const; void CheckTabletKeys(const TString& topicName); @@ -326,6 +330,11 @@ void TFixture::CreateTopic(const TString& path, Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount); } +void TFixture::DescribeTopic(const TString& path) +{ + Setup->DescribeTopic(path); +} + const TDriver& TFixture::GetDriver() const { return *Driver; @@ -344,8 +353,8 @@ void TFixture::WriteToTopicWithInvalidTxId(bool invalidTxId) auto writeSession = client.CreateWriteSession(options); auto event = writeSession->GetEvent(true); - UNIT_ASSERT(event.Defined() && std::holds_alternative(event.GetRef())); - auto token = std::move(std::get(event.GetRef()).ContinuationToken); + UNIT_ASSERT(event && std::holds_alternative(event.value())); + auto token = std::move(std::get(event.value()).ContinuationToken); NTopic::TWriteMessage params("message"); params.Tx(tx); @@ -361,8 +370,8 @@ void TFixture::WriteToTopicWithInvalidTxId(bool invalidTxId) while (true) { event = writeSession->GetEvent(true); - UNIT_ASSERT(event.Defined()); - auto& v = event.GetRef(); + UNIT_ASSERT(event.has_value()); + auto& v = event.value(); if (auto e = std::get_if(&v); e) { UNIT_ASSERT(false); } else if (auto e = std::get_if(&v); e) { @@ -451,8 +460,8 @@ Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) { TString topicPath[2] = { - TEST_TOPIC, - TEST_TOPIC + "_2" + TString{TEST_TOPIC}, + TString{TEST_TOPIC} + "_2" }; CreateTopic(topicPath[1]); @@ -470,8 +479,8 @@ Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) params.Tx(tx); auto event = ws->GetEvent(true); - UNIT_ASSERT(event.Defined() && std::holds_alternative(event.GetRef())); - auto token = std::move(std::get(event.GetRef()).ContinuationToken); + UNIT_ASSERT(event && std::holds_alternative(event.value())); + auto token = std::move(std::get(event.value()).ContinuationToken); ws->Write(std::move(token), std::move(params)); }; @@ -499,7 +508,7 @@ Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) } } - auto& v = event.GetRef(); + auto& v = event.value(); if (auto e = std::get_if(&v); e) { ++acks; } else if (auto e = std::get_if(&v); e) { @@ -514,7 +523,7 @@ Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) auto TFixture::CreateTopicWriteSession(const TString& topicPath, const TString& messageGroupId, - TMaybe partitionId) -> TTopicWriteSessionPtr + std::optional partitionId) -> TTopicWriteSessionPtr { NTopic::TTopicClient client(GetDriver()); NTopic::TWriteSessionSettings options; @@ -528,7 +537,7 @@ auto TFixture::CreateTopicWriteSession(const TString& topicPath, auto TFixture::GetTopicWriteSession(const TString& topicPath, const TString& messageGroupId, - TMaybe partitionId) -> TTopicWriteSessionContext& + std::optional partitionId) -> TTopicWriteSessionContext& { std::pair key(topicPath, messageGroupId); auto i = TopicWriteSessions.find(key); @@ -659,7 +668,7 @@ void TFixture::WriteToTopic(const TString& topicPath, const TString& messageGroupId, const TString& message, NTable::TTransaction* tx, - TMaybe partitionId) + std::optional partitionId) { TTopicWriteSessionContext& context = GetTopicWriteSession(topicPath, messageGroupId, partitionId); context.WaitForContinuationToken(); @@ -694,7 +703,7 @@ TVector TFixture::ReadFromTopic(const TString& topicPath, if (auto* e = std::get_if(&event)) { Cerr << e->HasCompressedMessages() << " " << e->GetMessagesCount() << Endl; for (auto& m : e->GetMessages()) { - messages.push_back(m.GetData()); + messages.push_back(TString{m.GetData()}); } if (!tx) { @@ -1120,6 +1129,8 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture) UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); } + + DescribeTopic("topic_A"); } Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture) @@ -1333,7 +1344,7 @@ void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, for (size_t i = 0; i < info.TxWritesSize(); ++i) { auto& writeInfo = info.GetTxWrites(i); UNIT_ASSERT(writeInfo.HasWriteId()); - if (NPQ::GetWriteId(writeInfo) == writeId) { + if ((NPQ::GetWriteId(writeInfo) == writeId) && writeInfo.HasOriginalPartitionId()) { found = true; break; } @@ -1611,21 +1622,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) for (size_t i = 0; i < params.OldHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x')); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++oldHeadMsgCount; } for (size_t i = 0; i < params.BigBlobsCount; ++i) { - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++bigBlobMsgCount; } for (size_t i = 0; i < params.NewHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++newHeadMsgCount; } - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - if (params.RestartMode == ERestartBeforeCommit) { RestartPQTablet("topic_A", 0); } @@ -1654,7 +1666,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) start += oldHeadMsgCount; for (size_t i = 0; i < bigBlobMsgCount; ++i) { - UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000); + UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000); } start += bigBlobMsgCount; @@ -1921,6 +1933,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture) UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); } +void TFixture::WriteMessagesInTx(size_t big, size_t small) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t i = 0; i < big; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + } + + for (size_t i = 0; i < small; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + } + + CommitTx(tx, EStatus::SUCCESS); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture) +{ + WriteMessagesInTx(1, 0); + WriteMessagesInTx(1, 0); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture) +{ + WriteMessagesInTx(1, 0); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture) +{ + WriteMessagesInTx(1, 0); + WriteMessagesInTx(1, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture) +{ + WriteMessagesInTx(0, 1); + WriteMessagesInTx(1, 0); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture) +{ + WriteMessagesInTx(0, 1); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture) +{ + WriteMessagesInTx(0, 1); + WriteMessagesInTx(1, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture) +{ + WriteMessagesInTx(1, 1); + WriteMessagesInTx(1, 0); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture) +{ + WriteMessagesInTx(1, 1); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture) +{ + WriteMessagesInTx(1, 1); + WriteMessagesInTx(1, 1); +} + + +Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture) +{ + WriteMessagesInTx(2, 202); + WriteMessagesInTx(2, 200); + WriteMessagesInTx(0, 1); + WriteMessagesInTx(4, 0); + WriteMessagesInTx(0, 1); +} + } } diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 806fe50b28..34de5301c8 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -52,6 +52,18 @@ void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consume Server.WaitInit(path); } +void TTopicSdkTestSetup::DescribeTopic(const TString& path) +{ + TTopicClient client(MakeDriver()); + + TDescribeTopicSettings settings; + settings.IncludeStats(true); + settings.IncludeLocation(true); + + auto status = client.DescribeTopic(path, settings).GetValueSync(); + UNIT_ASSERT(status.IsSuccess()); +} + TString TTopicSdkTestSetup::GetEndpoint() const { return "localhost:" + ToString(Server.GrpcPort); } @@ -86,7 +98,7 @@ TDriverConfig TTopicSdkTestSetup::MakeDriverConfig() const config.SetEndpoint(GetEndpoint()); config.SetDatabase(GetDatabase()); config.SetAuthToken("root@builtin"); - config.SetLog(MakeHolder(&Cerr)); + config.SetLog(std::make_unique(&Cerr)); return config; } diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 0dc2eb744c..349b5ab655 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -8,7 +8,7 @@ namespace NYdb::NTopic::NTests { #define TEST_CASE_NAME (this->Name_) -inline static const TString TEST_TOPIC = "test-topic"; +inline static const std::string TEST_TOPIC = "test-topic"; inline static const TString TEST_CONSUMER = "test-consumer"; inline static const TString TEST_MESSAGE_GROUP_ID = "test-message_group_id"; @@ -16,13 +16,15 @@ class TTopicSdkTestSetup { public: TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true); - void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, + void CreateTopic(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, std::optional maxPartitionCount = std::nullopt); - void CreateTopicWithAutoscale(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, + void CreateTopicWithAutoscale(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, size_t maxPartitionCount = 100); + void DescribeTopic(const TString& path = TEST_TOPIC); + TString GetEndpoint() const; - TString GetTopicPath(const TString& name = TEST_TOPIC) const; + TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const; TString GetTopicParent() const; TString GetDatabase() const; diff --git a/src/client/ymq/CMakeLists.txt b/src/client/ymq/CMakeLists.txt deleted file mode 100644 index 29c0bf0441..0000000000 --- a/src/client/ymq/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -_ydb_sdk_add_library(client-ymq) - -target_link_libraries(client-ymq - PUBLIC - yutil - grpc-client - string_utils-url - api-grpc-draft - library-operation_id - impl-ydb_internal-make_request - client-ydb_driver -) - -target_sources(client-ymq - PRIVATE - ymq.cpp -) - -_ydb_sdk_make_client_component(Ymq client-ymq) diff --git a/src/client/ymq/ymq.cpp b/src/client/ymq/ymq.cpp deleted file mode 100644 index a2cefdaa15..0000000000 --- a/src/client/ymq/ymq.cpp +++ /dev/null @@ -1,137 +0,0 @@ -#include "ymq.h" - -#define INCLUDE_YDB_INTERNAL_H -#include -#undef INCLUDE_YDB_INTERNAL_H - -#include -#include - -#include - -#include - -namespace NYdb::Ymq::V1 { - - class TYmqClient::TImpl : public TClientImplCommon { - public: - TImpl(std::shared_ptr &&connections, const TCommonClientSettings &settings) - : TClientImplCommon(std::move(connections), settings) {} - - template - auto MakeResultExtractor(NThreading::TPromise promise) { - return [promise = std::move(promise)] - (google::protobuf::Any *any, TPlainStatus status) mutable { - std::unique_ptr result; - if (any) { - result.reset(new TProtoResult); - any->UnpackTo(result.get()); - } - - promise.SetValue( - TResultWrapper( - TStatus(std::move(status)), - std::move(result))); - }; - } - - template - NThreading::TFuture> CallImpl(const TSettings& settings, TAsyncCall grpcCall, TFillRequestFn fillRequest) { - using TResultWrapper = TProtoResultWrapper; - auto request = MakeOperationRequest(settings); - fillRequest(request); - - auto promise = NThreading::NewPromise(); - auto future = promise.GetFuture(); - - auto extractor = MakeResultExtractor(std::move(promise)); - - Connections_->RunDeferred( - std::move(request), - std::move(extractor), - grpcCall, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings)); - - return future; - - } - - template - NThreading::TFuture> CallImpl(const TSettings& settings, TAsyncCall grpcCall) { - return CallImpl(settings, grpcCall, [](TProtoRequest&) {}); - } - - TAsyncGetQueueUrlResult GetQueueUrl(const std::string &queueName, TGetQueueUrlSettings settings) { - return CallImpl(settings, - &Ydb::Ymq::V1::YmqService::Stub::AsyncGetQueueUrl, - [&](Ydb::Ymq::V1::GetQueueUrlRequest& req) { - req.set_queue_name(TStringType{queueName}); - } - ); - } - - TAsyncCreateQueueResult CreateQueue(const std::string &queueName, TCreateQueueSettings settings) { - return CallImpl(settings, - &Ydb::Ymq::V1::YmqService::Stub::AsyncCreateQueue, - [&](Ydb::Ymq::V1::CreateQueueRequest& req) { - req.set_queue_name(TStringType{queueName}); - } - ); - } - - TAsyncSendMessageResult SendMessage(const std::string &queueUrl, const std::string &body, TSendMessageSettings settings) { - return CallImpl(settings, - &Ydb::Ymq::V1::YmqService::Stub::AsyncSendMessage, - [&](Ydb::Ymq::V1::SendMessageRequest& req) { - req.set_queue_url(TStringType{queueUrl}); - req.set_message_body(TStringType{body}); - } - ); - } - - template - NThreading::TFuture> DoProtoRequest(const TProtoRequest& proto, TMethod method, const TProtoRequestSettings& settings) { - return CallImpl(settings, method, - [&](TProtoRequest& req) { - req.CopyFrom(proto); - }); - } - }; - - TYmqClient::TYmqClient(const TDriver& driver, const TCommonClientSettings& settings) - : Impl_(new TImpl(CreateInternalInterface(driver), settings)) - { - } - - TAsyncGetQueueUrlResult TYmqClient::GetQueueUrl(const std::string& path, TGetQueueUrlSettings& settings) { - return Impl_->GetQueueUrl(path, settings); - } - - template - NThreading::TFuture> TYmqClient::DoProtoRequest(const TProtoRequest& request, TMethod method, TProtoRequestSettings settings) { - return Impl_->DoProtoRequest(request, method, settings); - } - - template NThreading::TFuture> TYmqClient::DoProtoRequest - < - Ydb::Ymq::V1::GetQueueUrlRequest, - Ydb::Ymq::V1::GetQueueUrlResponse, - Ydb::Ymq::V1::GetQueueUrlResult, - decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncGetQueueUrl) - >( - const Ydb::Ymq::V1::GetQueueUrlRequest& request, - decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncGetQueueUrl) method, - TProtoRequestSettings settings - ); -} diff --git a/src/client/ymq/ymq.h b/src/client/ymq/ymq.h deleted file mode 100644 index 1590911d81..0000000000 --- a/src/client/ymq/ymq.h +++ /dev/null @@ -1,74 +0,0 @@ -#pragma once - -#include - -#include - -namespace NYdb::Ymq::V1 { - - template - class TProtoResultWrapper : public NYdb::TStatus { - friend class TYmqClient; - - private: - TProtoResultWrapper( - NYdb::TStatus&& status, - std::unique_ptr result) - : TStatus(std::move(status)) - , Result(std::move(result)) - { } - - public: - const TProtoResult& GetResult() const { - Y_ABORT_UNLESS(Result, "Uninitialized result"); - return *Result; - } - - private: - std::unique_ptr Result; - }; - - enum EStreamMode { - ESM_PROVISIONED = 1, - ESM_ON_DEMAND = 2, - }; - - using TGetQueueUrlResult = TProtoResultWrapper; - using TAsyncGetQueueUrlResult = NThreading::TFuture; - - using TCreateQueueResult = TProtoResultWrapper; - using TAsyncCreateQueueResult = NThreading::TFuture; - - using TSendMessageResult = TProtoResultWrapper; - using TAsyncSendMessageResult = NThreading::TFuture; - - struct TDataRecord { - std::string Data; - std::string PartitionKey; - std::string ExplicitHashDecimal; - }; - - struct TGetQueueUrlSettings : public NYdb::TOperationRequestSettings {}; - struct TCreateQueueSettings : public NYdb::TOperationRequestSettings {}; - struct TSendMessageSettings : public NYdb::TOperationRequestSettings {}; - struct TProtoRequestSettings : public NYdb::TOperationRequestSettings {}; - - class TYmqClient { - class TImpl; - - public: - TYmqClient(const NYdb::TDriver& driver, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings()); - - TAsyncGetQueueUrlResult GetQueueUrl(const std::string& queueName, TGetQueueUrlSettings& getQueueUrlSettings); - TAsyncCreateQueueResult CreateQueue(const std::string& queueName, TCreateQueueSettings& createQueueSettings); - - template - NThreading::TFuture> DoProtoRequest(const TProtoRequest& request, TMethod method, TProtoRequestSettings settings = TProtoRequestSettings()); - - NThreading::TFuture DiscoveryCompleted(); - - private: - std::shared_ptr Impl_; - }; - -} diff --git a/src/library/uuid/uuid.cpp b/src/library/uuid/uuid.cpp index aa12c7664b..6177535dc0 100644 --- a/src/library/uuid/uuid.cpp +++ b/src/library/uuid/uuid.cpp @@ -43,6 +43,20 @@ void UuidToString(ui16 dw[8], IOutputStream& out) { WriteHex(dw[7], out, true); } +std::string UuidBytesToString(const std::string& in) { + TStringStream ss; + + UuidBytesToString(TString(in), ss); + + return std::string(ss.Str()); +} + +void UuidBytesToString(const std::string& in, IOutputStream& out) { + ui16 dw[8]; + std::memcpy(dw, in.data(), sizeof(dw)); + NUuid::UuidToString(dw, out); +} + void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out) { union { char bytes[16]; diff --git a/src/library/uuid/uuid.h b/src/library/uuid/uuid.h index 0cbdc08588..8a8d1faa8a 100644 --- a/src/library/uuid/uuid.h +++ b/src/library/uuid/uuid.h @@ -13,6 +13,8 @@ namespace NUuid { static constexpr ui32 UUID_LEN = 16; +std::string UuidBytesToString(const std::string& in); +void UuidBytesToString(const std::string& in, IOutputStream& out); void UuidToString(ui16 dw[8], IOutputStream& out); void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out); diff --git a/src/library/yql_common/decimal/yql_decimal.h b/src/library/yql_common/decimal/yql_decimal.h index eee22f9038..771d1691da 100644 --- a/src/library/yql_common/decimal/yql_decimal.h +++ b/src/library/yql_common/decimal/yql_decimal.h @@ -17,8 +17,8 @@ namespace NDecimal { #endif #ifdef DONT_USE_NATIVE_INT128 -using TInt128 = TWide; -using TUint128 = TWide; +using TInt128 = TWide; +using TUint128 = TWide; #else using TInt128 = signed __int128; using TUint128 = unsigned __int128; @@ -313,10 +313,10 @@ class TDecimalRemainder { TInt128 Do(TInt128 left, TRight right) const { if constexpr (std::is_signed::value) { - if (right >= +Bound || right <= -Bound) + if (TInt128(right) >= +Bound || TInt128(right) <= -Bound) return left; } else { - if (right >= Bound) + if (TInt128(right) >= Bound) return left; } diff --git a/src/library/yql_common/issue/yql_issue.cpp b/src/library/yql_common/issue/yql_issue.cpp index de3274fc27..2bb6b15caf 100644 --- a/src/library/yql_common/issue/yql_issue.cpp +++ b/src/library/yql_common/issue/yql_issue.cpp @@ -24,7 +24,7 @@ void SanitizeNonAscii(std::string& s) { const unsigned char* i = reinterpret_cast(s.data()); const unsigned char* end = i + s.size(); while (i < end) { - wchar32 rune; + char32_t rune; size_t runeLen; const RECODE_RESULT result = SafeReadUTF8Char(rune, runeLen, i, end); if (result == RECODE_OK) { @@ -52,13 +52,18 @@ TTextWalker& TTextWalker::Advance(char c) { return *this; } + uint32_t charDistance = 1; + if (Utf8Aware && IsUtf8Intermediate(c)) { + charDistance = 0; + } + // either not '\r' or second '\r' if (LfCount) { Position.Row += LfCount; - Position.Column = 1; + Position.Column = charDistance; LfCount = 0; } else { - Position.Column += 1 + (HaveCr && c != '\r'); + Position.Column += charDistance + (HaveCr && c != '\r'); } HaveCr = (c == '\r'); return *this; diff --git a/tests/unit/library/yql_common/issue/yql_issue_ut.cpp b/tests/unit/library/yql_common/issue/yql_issue_ut.cpp index f7451c006d..1f0d2d02b1 100644 --- a/tests/unit/library/yql_common/issue/yql_issue_ut.cpp +++ b/tests/unit/library/yql_common/issue/yql_issue_ut.cpp @@ -92,7 +92,7 @@ Y_UNIT_TEST_SUITE(TextWalkerTest) { TPosition pos; pos.Row = 1; - TTextWalker walker(pos); + TTextWalker walker(pos, false); walker.Advance("a\r\taa"sv); UNIT_ASSERT_VALUES_EQUAL(pos, TPosition(5, 1)); @@ -104,7 +104,7 @@ Y_UNIT_TEST_SUITE(TextWalkerTest) { TPosition pos; pos.Row = 1; - TTextWalker walker(pos); + TTextWalker walker(pos, false); walker.Advance("a\raa\r"sv); UNIT_ASSERT_VALUES_EQUAL(pos, TPosition(4, 1)); walker.Advance('\n'); @@ -118,6 +118,28 @@ Y_UNIT_TEST_SUITE(TextWalkerTest) { walker.Advance('a'); UNIT_ASSERT_VALUES_EQUAL(pos, TPosition(1, 3)); } + + Y_UNIT_TEST(UnicodeTest) { + { + TPosition pos; + pos.Row = 1; + + TTextWalker walker(pos, false); + walker.Advance(TStringBuf("привет")); + + UNIT_ASSERT_VALUES_EQUAL(pos, TPosition(12, 1)); + } + + { + TPosition pos; + pos.Row = 1; + + TTextWalker walker(pos, true); + walker.Advance(TStringBuf("привет")); + + UNIT_ASSERT_VALUES_EQUAL(pos, TPosition(6, 1)); + } + } } Y_UNIT_TEST_SUITE(ToOneLineStringTest) {