@@ -104,14 +104,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
104
104
}
105
105
106
106
TOperation::TPtr operation = new TOperation (txId);
107
- Operations[txId] = operation; // record is erased at ApplyOnExecute if all parts are done at propose
108
107
109
108
for (const auto & transaction : record.GetTransaction ()) {
110
109
auto quotaResult = operation->ConsumeQuota (transaction, context);
111
110
if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
112
111
response.Reset (new TProposeResponse (quotaResult.Status , ui64 (txId), ui64 (selfId)));
113
112
response->SetError (quotaResult.Status , quotaResult.Reason );
114
- Operations.erase (txId);
115
113
return std::move (response);
116
114
}
117
115
}
@@ -131,7 +129,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
131
129
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
132
130
response.Reset (new TProposeResponse (splitResult.Status , ui64 (txId), ui64 (selfId)));
133
131
response->SetError (splitResult.Status , splitResult.Reason );
134
- Operations.erase (txId);
135
132
return std::move (response);
136
133
}
137
134
@@ -140,11 +137,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
140
137
141
138
const TString owner = record.HasOwner () ? record.GetOwner () : BUILTIN_ACL_ROOT;
142
139
140
+ bool prevProposeUndoSafe = true ;
141
+
142
+ Operations[txId] = operation; // record is erased at ApplyOnExecute if all parts are done at propose
143
+
143
144
for (const auto & transaction : transactions) {
144
145
auto parts = operation->ConstructParts (transaction, context);
145
146
146
147
if (parts.size () > 1 ) {
147
- // les't allow altering impl index tables as part of consistent operation
148
+ // allow altering impl index tables as part of consistent operation
148
149
context.IsAllowedPrivateTables = true ;
149
150
}
150
151
@@ -198,25 +199,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
198
199
<< " , with reason: " << response->Record .GetReason ()
199
200
<< " , tx message: " << SecureDebugString (record));
200
201
201
- context.OnComplete = {}; // recreate
202
- context.DbChanges = {};
202
+ AbortOperationPropose (txId, context);
203
203
204
- for (auto & toAbort : operation->Parts ) {
205
- toAbort->AbortPropose (context);
206
- }
204
+ return std::move (response);
205
+ }
207
206
208
- context. MemChanges . UnDo (context. SS );
209
- context.OnComplete . ApplyOnExecute (context. SS , context. GetTxc (), context. Ctx );
210
- Operations. erase (txId) ;
207
+ // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
208
+ if (prevProposeUndoSafe && ! context.IsUndoChangesSafe ()) {
209
+ prevProposeUndoSafe = false ;
211
210
212
- return std::move (response);
211
+ LOG_WARN_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD,
212
+ " Operation part proposed ok, but propose itself is undo unsafe"
213
+ << " , suboperation type: " << NKikimrSchemeOp::EOperationType_Name (part->GetTransaction ().GetOperationType ())
214
+ << " , opId: " << part->GetOperationId ()
215
+ << " , at schemeshard: " << selfId
216
+ );
213
217
}
214
218
}
215
219
}
216
220
217
221
return std::move (response);
218
222
}
219
223
224
+ void TSchemeShard::AbortOperationPropose (const TTxId txId, TOperationContext& context) {
225
+ Y_ABORT_UNLESS (Operations.contains (txId));
226
+ TOperation::TPtr operation = Operations.at (txId);
227
+
228
+ // Drop operation side effects, undo memory changes
229
+ // (Local db changes were already applied)
230
+ context.OnComplete = {};
231
+ context.DbChanges = {};
232
+
233
+ for (auto & i : operation->Parts ) {
234
+ i->AbortPropose (context);
235
+ }
236
+
237
+ context.MemChanges .UnDo (context.SS );
238
+
239
+ // And remove aborted operation from existence
240
+ Operations.erase (txId);
241
+ }
242
+
243
+ void AbortOperation (TOperationContext& context, const TTxId txId, const TString& reason) {
244
+ LOG_ERROR_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD, " TTxOperationPropose Execute"
245
+ << " , txId: " << txId
246
+ << " , operation is rejected and all changes reverted"
247
+ << " , " << reason
248
+ << " , at schemeshard: " << context.SS ->SelfTabletId ()
249
+ );
250
+
251
+ context.GetTxc ().DB .RollbackChanges ();
252
+ context.SS ->AbortOperationPropose (txId, context);
253
+ }
254
+
255
+ bool IsCommitRedoSizeOverLimit (TString* reason, TOperationContext& context) {
256
+ // MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
257
+ // We subtract from MaxCommitRedoMB additional 1MB for anything extra
258
+ // that executor/tablet may (or may not) add under the hood
259
+ const ui64 limitBytes = (context.SS ->MaxCommitRedoMB - 1 ) << 20 ; // MB to bytes
260
+ const ui64 commitRedoBytes = context.GetTxc ().DB .GetCommitRedoBytes ();
261
+ if (commitRedoBytes >= limitBytes) {
262
+ *reason = TStringBuilder ()
263
+ << " local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
264
+ << " commit redo size " << commitRedoBytes
265
+ << " , limit " << limitBytes
266
+ << " , excess " << (commitRedoBytes - limitBytes)
267
+ ;
268
+ return true ;
269
+ }
270
+ return false ;
271
+ }
272
+
220
273
struct TSchemeShard ::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
221
274
using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;
222
275
@@ -236,6 +289,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
236
289
237
290
bool Execute (NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
238
291
TTabletId selfId = Self->SelfTabletId ();
292
+ auto txId = TTxId (Request->Get ()->Record .GetTxId ());
239
293
240
294
LOG_DEBUG_S (ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
241
295
" TTxOperationPropose Execute"
@@ -246,7 +300,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
246
300
247
301
auto [userToken, tokenParseError] = ParseUserToken (Request->Get ()->Record .GetUserToken ());
248
302
if (tokenParseError) {
249
- auto txId = Request->Get ()->Record .GetTxId ();
250
303
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64 (txId), ui64 (selfId), " Failed to parse user token" );
251
304
return true ;
252
305
}
@@ -258,10 +311,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
258
311
TStorageChanges dbChanges;
259
312
TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move (userToken)};
260
313
314
+ // NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
315
+ // Unsuccessful IgniteOperation will leave no operation and context will also be clean.
261
316
Response = Self->IgniteOperation (*Request->Get (), context);
262
317
263
- OnComplete.ApplyOnExecute (Self, txc, ctx);
318
+ // NOTE: Successfully created operation also must be checked for the size of this local tx.
319
+ //
320
+ // Limitation on a commit redo size of local transactions is imposed at the tablet executor level
321
+ // (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
322
+ // And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
323
+ //
324
+ // So even if operation was ignited successfully, it's local tx size still must be checked
325
+ // as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
326
+ // persisted operation again, hitting commit redo size limit and restarting again.
327
+ //
328
+ // On unsuccessful check, local tx should be rolled back, operation should be rejected and
329
+ // all accumulated changes dropped or reverted.
330
+ //
331
+
332
+ // Actually build commit redo (dbChanges could be empty)
264
333
dbChanges.Apply (Self, txc, ctx);
334
+
335
+ if (Self->Operations .contains (txId)) {
336
+ Y_ABORT_UNLESS (Response->IsDone () || Response->IsAccepted () || Response->IsConditionalAccepted ());
337
+
338
+ // Check local tx commit redo size
339
+ TString reason;
340
+ if (IsCommitRedoSizeOverLimit (&reason, context)) {
341
+ Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64 (txId), ui64 (selfId), reason);
342
+
343
+ AbortOperation (context, txId, reason);
344
+
345
+ if (!context.IsUndoChangesSafe ()) {
346
+ LOG_ERROR_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD, " TTxOperationPropose Execute"
347
+ << " , opId: " << txId
348
+ << " , operation should be rejected and all changes be reverted"
349
+ << " , but context.IsUndoChangesSafe is false, which means some direct writes have been done"
350
+ << " , message: " << SecureDebugString (Request->Get ()->Record )
351
+ << " , at schemeshard: " << context.SS ->SelfTabletId ()
352
+ );
353
+ }
354
+ }
355
+ }
356
+
357
+ // Apply accumulated changes (changes could be empty)
358
+ OnComplete.ApplyOnExecute (Self, txc, ctx);
359
+
265
360
return true ;
266
361
}
267
362
0 commit comments