Skip to content

Commit 224aa6b

Browse files
Merge e367b83 into f2b72c6
2 parents f2b72c6 + e367b83 commit 224aa6b

File tree

5 files changed

+96
-26
lines changed

5 files changed

+96
-26
lines changed

ydb/core/persqueue/writer/writer.cpp

+9-13
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ namespace NKikimr::NPQ {
3333
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
3434
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
3535

36-
static const ui64 WRITE_BLOCK_SIZE = 4_KB;
36+
static const ui64 WRITE_BLOCK_SIZE = 4_KB;
3737

3838
TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
3939
auto out = TStringBuilder() << "Success {"
@@ -106,7 +106,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
106106
using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode;
107107

108108
static constexpr size_t MAX_QUOTA_INFLIGHT = 3;
109-
109+
110110
static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
111111
ui32 partitionId, const TActorId& pipeClient)
112112
{
@@ -272,12 +272,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
272272
void GetOwnership() {
273273
auto ev = MakeRequest(PartitionId, PipeClient);
274274

275-
auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
276-
if (Opts.UseDeduplication) {
277-
cmd.SetOwner(SourceId);
278-
} else {
279-
cmd.SetOwner(CreateGuidAsString());
280-
}
275+
auto& request = *ev->Record.MutablePartitionRequest();
276+
auto& cmd = *request.MutableCmdGetOwnership();
277+
cmd.SetOwner(SourceId);
281278
cmd.SetForce(true);
282279

283280
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
@@ -741,15 +738,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
741738
PendingQuota.clear();
742739

743740
ProcessQuota();
744-
745741
break;
746742

747743
case EWakeupTag::RlNoResource:
748-
// Re-requesting the quota. We do this until we get a quota.
744+
// Re-requesting the quota. We do this until we get a quota.
749745
// We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
750746
RequestDataQuota(PendingQuotaAmount, ctx);
751747
break;
752-
748+
753749
default:
754750
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
755751
}
@@ -770,7 +766,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
770766
, TabletId(tabletId)
771767
, PartitionId(partitionId)
772768
, ExpectedGeneration(opts.ExpectedGeneration)
773-
, SourceId(opts.SourceId)
769+
, SourceId(opts.UseDeduplication ? opts.SourceId : CreateGuidAsString())
774770
, Opts(opts)
775771
{
776772
if (Opts.MeteringMode) {
@@ -856,7 +852,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
856852
IActor* CreatePartitionWriter(const TActorId& client,
857853
// const NKikimrSchemeOp::TPersQueueGroupDescription& config,
858854
ui64 tabletId,
859-
ui32 partitionId,
855+
ui32 partitionId,
860856
const TPartitionWriterOpts& opts) {
861857
return new TPartitionWriter(client, tabletId, partitionId, opts);
862858
}

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

-1
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
317317
};
318318

319319
THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action
320-
321320
public:
322321
TWriteSessionImpl(const TWriteSessionSettings& settings,
323322
std::shared_ptr<TTopicClient::TImpl> client,

ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp

+23-3
Original file line numberDiff line numberDiff line change
@@ -697,9 +697,29 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
697697
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);
698698

699699
}
700+
} // Y_UNIT_TEST_SUITE(BasicUsage)
700701

702+
Y_UNIT_TEST_SUITE(TSettingsValidation) {
703+
Y_UNIT_TEST(TWriteSessionProducerSettings) {
704+
TTopicSdkTestSetup setup(TEST_CASE_NAME);
705+
TTopicClient client = setup.MakeClient();
701706

707+
{
708+
auto writeSettings = TWriteSessionSettings()
709+
.Path(TEST_TOPIC)
710+
.ProducerId("something")
711+
.DeduplicationEnabled(false);
712+
try {
713+
auto writeSession = client.CreateWriteSession(writeSettings);
714+
auto event = writeSession->GetEvent(true);
715+
UNIT_ASSERT(event.Defined());
716+
auto* closedEvent = std::get_if<TSessionClosedEvent>(&event.GetRef());
717+
UNIT_ASSERT(closedEvent);
718+
} catch (NYdb::TContractViolation&) {
719+
//pass
720+
}
721+
}
722+
}
723+
} // Y_UNIT_TEST_SUITE(TSettingsValidation)
702724

703-
}
704-
705-
}
725+
} // namespace

ydb/services/persqueue_v1/actors/write_session_actor.ipp

+15-9
Original file line numberDiff line numberDiff line change
@@ -389,11 +389,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
389389
// 1.2. non-empty partition_id (explicit partitioning)
390390
// 1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host)
391391
// 2. Empty producer id (no deduplication, partition is selected using round-robin).
392-
bool isScenarioSupported =
392+
bool isScenarioSupported =
393393
!InitRequest.producer_id().empty() && (
394-
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
394+
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
395395
InitRequest.has_partition_id() ||
396-
InitRequest.has_partition_with_generation()) ||
396+
InitRequest.has_partition_with_generation())
397+
||
397398
InitRequest.producer_id().empty();
398399

399400
if (!isScenarioSupported) {
@@ -424,7 +425,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
424425
return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
425426
}
426427
}();
427-
428428
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName);
429429
if (!UseDeduplication) {
430430
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
@@ -467,8 +467,9 @@ template<bool UseMigrationProtocol>
467467
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
468468
Y_UNUSED(ctx);
469469

470-
if (SourceId.empty()) {
471-
Y_ABORT_UNLESS(!UseDeduplication);
470+
if (SourceId.empty() && UseDeduplication) {
471+
CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx);
472+
return;
472473
}
473474

474475
InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
@@ -835,9 +836,14 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
835836
OwnerCookie = result.GetResult().OwnerCookie;
836837

837838
const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
838-
if (!UseDeduplication) {
839-
Y_ABORT_UNLESS(maxSeqNo == 0);
840-
}
839+
840+
// ToDo: uncomment after fixing KIKIMR-21124
841+
// if (!UseDeduplication) {
842+
// if (maxSeqNo != 0) {
843+
// return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled",
844+
// PersQueue::ErrorCode::ERROR, ctx);
845+
// }
846+
// }
841847

842848
OwnerCookie = result.GetResult().OwnerCookie;
843849
MakeAndSentInitResponse(maxSeqNo, ctx);

ydb/services/persqueue_v1/persqueue_ut.cpp

+49
Original file line numberDiff line numberDiff line change
@@ -6694,6 +6694,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
66946694
}
66956695
}
66966696

6697+
Y_UNIT_TEST(DisableWrongSettings) {
6698+
NPersQueue::TTestServer server;
6699+
server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
6700+
server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO);
6701+
TString topicFullName = "rt3.dc1--acc--topic1";
6702+
auto driver = SetupTestAndGetDriver(server, topicFullName, 3);
6703+
6704+
std::shared_ptr<grpc::Channel> Channel_;
6705+
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
6706+
{
6707+
Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
6708+
TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
6709+
}
6710+
6711+
{
6712+
grpc::ClientContext rcontext1;
6713+
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
6714+
UNIT_ASSERT(writeStream1);
6715+
Ydb::Topic::StreamWriteMessage::FromClient req;
6716+
Ydb::Topic::StreamWriteMessage::FromServer resp;
6717+
6718+
req.mutable_init_request()->set_path("acc/topic1");
6719+
req.mutable_init_request()->set_message_group_id("some-group");
6720+
if (!writeStream1->Write(req)) {
6721+
ythrow yexception() << "write fail";
6722+
}
6723+
UNIT_ASSERT(writeStream1->Read(&resp));
6724+
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
6725+
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
6726+
}
6727+
{
6728+
grpc::ClientContext rcontext1;
6729+
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
6730+
UNIT_ASSERT(writeStream1);
6731+
Ydb::Topic::StreamWriteMessage::FromClient req;
6732+
Ydb::Topic::StreamWriteMessage::FromServer resp;
6733+
6734+
req.mutable_init_request()->set_path("acc/topic1");
6735+
req.mutable_init_request()->set_message_group_id("some-group");
6736+
req.mutable_init_request()->set_producer_id("producer");
6737+
if (!writeStream1->Write(req)) {
6738+
ythrow yexception() << "write fail";
6739+
}
6740+
UNIT_ASSERT(writeStream1->Read(&resp));
6741+
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
6742+
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
6743+
}
6744+
}
6745+
66976746
Y_UNIT_TEST(DisableDeduplication) {
66986747
NPersQueue::TTestServer server;
66996748
TString topicFullName = "rt3.dc1--topic1";

0 commit comments

Comments
 (0)