Skip to content

Commit 6302e92

Browse files
Merge 7c956b5 into df0fa56
2 parents df0fa56 + 7c956b5 commit 6302e92

File tree

5 files changed

+95
-25
lines changed

5 files changed

+95
-25
lines changed

ydb/core/persqueue/writer/writer.cpp

+8-12
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ namespace NKikimr::NPQ {
3333
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
3434
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
3535

36-
static const ui64 WRITE_BLOCK_SIZE = 4_KB;
36+
static const ui64 WRITE_BLOCK_SIZE = 4_KB;
3737

3838
TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
3939
auto out = TStringBuilder() << "Success {"
@@ -106,7 +106,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
106106
using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode;
107107

108108
static constexpr size_t MAX_QUOTA_INFLIGHT = 3;
109-
109+
110110
static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
111111
ui32 partitionId, const TActorId& pipeClient)
112112
{
@@ -274,11 +274,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
274274

275275
auto& request = *ev->Record.MutablePartitionRequest();
276276
auto& cmd = *request.MutableCmdGetOwnership();
277-
if (Opts.UseDeduplication) {
278-
cmd.SetOwner(SourceId);
279-
} else {
280-
cmd.SetOwner(CreateGuidAsString());
281-
}
277+
cmd.SetOwner(SourceId);
282278
cmd.SetForce(true);
283279

284280
SetWriteId(request);
@@ -724,16 +720,16 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
724720
ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
725721
PendingQuota.clear();
726722

727-
ProcessQuotaAndWrite();
723+
ProcessQuotaAndWrite();
728724

729725
break;
730726

731727
case EWakeupTag::RlNoResource:
732-
// Re-requesting the quota. We do this until we get a quota.
728+
// Re-requesting the quota. We do this until we get a quota.
733729
// We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
734730
RequestDataQuota(PendingQuotaAmount, ctx);
735731
break;
736-
732+
737733
default:
738734
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
739735
}
@@ -754,7 +750,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
754750
, TabletId(tabletId)
755751
, PartitionId(partitionId)
756752
, ExpectedGeneration(opts.ExpectedGeneration)
757-
, SourceId(opts.SourceId)
753+
, SourceId(opts.UseDeduplication ? opts.SourceId : CreateGuidAsString())
758754
, Opts(opts)
759755
{
760756
if (Opts.MeteringMode) {
@@ -840,7 +836,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
840836
IActor* CreatePartitionWriter(const TActorId& client,
841837
// const NKikimrSchemeOp::TPersQueueGroupDescription& config,
842838
ui64 tabletId,
843-
ui32 partitionId,
839+
ui32 partitionId,
844840
const TPartitionWriterOpts& opts) {
845841
return new TPartitionWriter(client, tabletId, partitionId, opts);
846842
}

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

-1
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
317317
};
318318

319319
THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action
320-
321320
public:
322321
TWriteSessionImpl(const TWriteSessionSettings& settings,
323322
std::shared_ptr<TTopicClient::TImpl> client,

ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp

+23-3
Original file line numberDiff line numberDiff line change
@@ -697,9 +697,29 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
697697
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);
698698

699699
}
700+
} // Y_UNIT_TEST_SUITE(BasicUsage)
700701

702+
Y_UNIT_TEST_SUITE(TSettingsValidation) {
703+
Y_UNIT_TEST(TWriteSessionProducerSettings) {
704+
TTopicSdkTestSetup setup(TEST_CASE_NAME);
705+
TTopicClient client = setup.MakeClient();
701706

707+
{
708+
auto writeSettings = TWriteSessionSettings()
709+
.Path(TEST_TOPIC)
710+
.ProducerId("something")
711+
.DeduplicationEnabled(false);
712+
try {
713+
auto writeSession = client.CreateWriteSession(writeSettings);
714+
auto event = writeSession->GetEvent(true);
715+
UNIT_ASSERT(event.Defined());
716+
auto* closedEvent = std::get_if<TSessionClosedEvent>(&event.GetRef());
717+
UNIT_ASSERT(closedEvent);
718+
} catch (NYdb::TContractViolation&) {
719+
//pass
720+
}
721+
}
722+
}
723+
} // Y_UNIT_TEST_SUITE(TSettingsValidation)
702724

703-
}
704-
705-
}
725+
} // namespace

ydb/services/persqueue_v1/actors/write_session_actor.ipp

+15-9
Original file line numberDiff line numberDiff line change
@@ -389,11 +389,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
389389
// 1.2. non-empty partition_id (explicit partitioning)
390390
// 1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host)
391391
// 2. Empty producer id (no deduplication, partition is selected using round-robin).
392-
bool isScenarioSupported =
392+
bool isScenarioSupported =
393393
!InitRequest.producer_id().empty() && (
394-
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
394+
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
395395
InitRequest.has_partition_id() ||
396-
InitRequest.has_partition_with_generation()) ||
396+
InitRequest.has_partition_with_generation())
397+
||
397398
InitRequest.producer_id().empty();
398399

399400
if (!isScenarioSupported) {
@@ -424,7 +425,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
424425
return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
425426
}
426427
}();
427-
428428
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName);
429429
if (!UseDeduplication) {
430430
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
@@ -467,8 +467,9 @@ template<bool UseMigrationProtocol>
467467
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
468468
Y_UNUSED(ctx);
469469

470-
if (SourceId.empty()) {
471-
Y_ABORT_UNLESS(!UseDeduplication);
470+
if (SourceId.empty() && UseDeduplication) {
471+
CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx);
472+
return;
472473
}
473474

474475
InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
@@ -835,9 +836,14 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
835836
OwnerCookie = result.GetResult().OwnerCookie;
836837

837838
const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
838-
if (!UseDeduplication) {
839-
Y_ABORT_UNLESS(maxSeqNo == 0);
840-
}
839+
840+
// ToDo: uncomment after fixing KIKIMR-21124
841+
// if (!UseDeduplication) {
842+
// if (maxSeqNo != 0) {
843+
// return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled",
844+
// PersQueue::ErrorCode::ERROR, ctx);
845+
// }
846+
// }
841847

842848
OwnerCookie = result.GetResult().OwnerCookie;
843849
MakeAndSentInitResponse(maxSeqNo, ctx);

ydb/services/persqueue_v1/persqueue_ut.cpp

+49
Original file line numberDiff line numberDiff line change
@@ -6699,6 +6699,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
66996699
}
67006700
}
67016701

6702+
Y_UNIT_TEST(DisableWrongSettings) {
6703+
NPersQueue::TTestServer server;
6704+
server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
6705+
server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO);
6706+
TString topicFullName = "rt3.dc1--acc--topic1";
6707+
auto driver = SetupTestAndGetDriver(server, topicFullName, 3);
6708+
6709+
std::shared_ptr<grpc::Channel> Channel_;
6710+
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
6711+
{
6712+
Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
6713+
TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
6714+
}
6715+
6716+
{
6717+
grpc::ClientContext rcontext1;
6718+
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
6719+
UNIT_ASSERT(writeStream1);
6720+
Ydb::Topic::StreamWriteMessage::FromClient req;
6721+
Ydb::Topic::StreamWriteMessage::FromServer resp;
6722+
6723+
req.mutable_init_request()->set_path("acc/topic1");
6724+
req.mutable_init_request()->set_message_group_id("some-group");
6725+
if (!writeStream1->Write(req)) {
6726+
ythrow yexception() << "write fail";
6727+
}
6728+
UNIT_ASSERT(writeStream1->Read(&resp));
6729+
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
6730+
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
6731+
}
6732+
{
6733+
grpc::ClientContext rcontext1;
6734+
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
6735+
UNIT_ASSERT(writeStream1);
6736+
Ydb::Topic::StreamWriteMessage::FromClient req;
6737+
Ydb::Topic::StreamWriteMessage::FromServer resp;
6738+
6739+
req.mutable_init_request()->set_path("acc/topic1");
6740+
req.mutable_init_request()->set_message_group_id("some-group");
6741+
req.mutable_init_request()->set_producer_id("producer");
6742+
if (!writeStream1->Write(req)) {
6743+
ythrow yexception() << "write fail";
6744+
}
6745+
UNIT_ASSERT(writeStream1->Read(&resp));
6746+
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
6747+
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
6748+
}
6749+
}
6750+
67026751
Y_UNIT_TEST(DisableDeduplication) {
67036752
NPersQueue::TTestServer server;
67046753
TString topicFullName = "rt3.dc1--topic1";

0 commit comments

Comments
 (0)