@@ -2581,11 +2581,21 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
2581
2581
Process ();
2582
2582
}
2583
2583
2584
- void OnPrepared (IKqpTransactionManager::TPrepareResult&& preparedInfo, ui64 dataSize ) override {
2584
+ void OnPrepared (IKqpTransactionManager::TPrepareResult&& preparedInfo, ui64) override {
2585
2585
if (State != EState::PREPARING) {
2586
2586
return ;
2587
2587
}
2588
- Y_UNUSED (preparedInfo, dataSize);
2588
+ if (!preparedInfo.Coordinator || (TxManager->GetCoordinator () && preparedInfo.Coordinator != TxManager->GetCoordinator ())) {
2589
+ CA_LOG_E (" Handle TEvWriteResult: unable to select coordinator. Tx canceled, actorId: " << SelfId ()
2590
+ << " , previously selected coordinator: " << TxManager->GetCoordinator ()
2591
+ << " , coordinator selected at propose result: " << preparedInfo.Coordinator );
2592
+
2593
+ TxProxyMon->TxResultAborted ->Inc ();
2594
+ ReplyErrorAndDie (NYql::NDqProto::StatusIds::CANCELLED,
2595
+ NKikimrIssues::TIssuesIds::TX_DECLINED_IMPLICIT_COORDINATOR,
2596
+ " Unable to choose coordinator." );
2597
+ return ;
2598
+ }
2589
2599
if (TxManager->ConsumePrepareTransactionResult (std::move (preparedInfo))) {
2590
2600
OnOperationFinished (Counters->BufferActorPrepareLatencyHistogram );
2591
2601
TxManager->StartExecute ();
@@ -2596,11 +2606,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
2596
2606
Process ();
2597
2607
}
2598
2608
2599
- void OnCommitted (ui64 shardId, ui64 dataSize ) override {
2609
+ void OnCommitted (ui64 shardId, ui64) override {
2600
2610
if (State != EState::COMMITTING) {
2601
2611
return ;
2602
2612
}
2603
- Y_UNUSED (dataSize);
2604
2613
if (TxManager->ConsumeCommitResult (shardId)) {
2605
2614
CA_LOG_D (" Committed TxId=" << TxId.value_or (0 ));
2606
2615
OnOperationFinished (Counters->BufferActorCommitLatencyHistogram );
@@ -2615,8 +2624,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
2615
2624
}
2616
2625
}
2617
2626
2618
- void OnMessageAcknowledged (ui64 dataSize) override {
2619
- Y_UNUSED (dataSize);
2627
+ void OnMessageAcknowledged (ui64) override {
2620
2628
Process ();
2621
2629
}
2622
2630
@@ -2641,7 +2649,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
2641
2649
ReplyErrorAndDie (statusCode, std::move (issues));
2642
2650
}
2643
2651
2644
- void ReplyErrorAndDie (NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::EYqlIssueCode id, const TString& message, const NYql::TIssues& subIssues = {}) {
2652
+ void ReplyErrorAndDie (NYql::NDqProto::StatusIds::StatusCode statusCode, auto id, const TString& message, const NYql::TIssues& subIssues = {}) {
2645
2653
BufferWriteActorState.EndError (message);
2646
2654
BufferWriteActor.EndError (message);
2647
2655
0 commit comments