Skip to content

Commit 129f0f1

Browse files
authored
EvWrite: Immediate & Prepare (#10913)
1 parent cd279f5 commit 129f0f1

31 files changed

+3110
-590
lines changed

ydb/core/kqp/common/buffer/buffer.h

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <ydb/core/scheme/scheme_tabledefs.h>
4+
#include <ydb/core/kqp/common/kqp_tx_manager.h>
5+
6+
namespace NKikimr {
7+
namespace NKqp {
8+
9+
struct TKqpBufferWriterSettings {
10+
TActorId SessionActorId;
11+
IKqpTransactionManagerPtr TxManager;
12+
};
13+
14+
NActors::IActor* CreateKqpBufferWriterActor(TKqpBufferWriterSettings&& settings);
15+
16+
}
17+
}

ydb/core/kqp/common/buffer/events.cpp

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#include "events.h"
2+
3+
namespace NKikimr {
4+
namespace NKqp {
5+
6+
TEvKqpBuffer::TEvError::TEvError(
7+
const TString& message,
8+
NYql::NDqProto::StatusIds::StatusCode statusCode,
9+
const NYql::TIssues& subIssues)
10+
: Message(message)
11+
, StatusCode(statusCode)
12+
, SubIssues(subIssues) {
13+
}
14+
15+
}
16+
}

ydb/core/kqp/common/buffer/events.h

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#pragma once
2+
3+
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
4+
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
5+
#include <ydb/library/yql/public/issue/yql_issue.h>
6+
7+
8+
namespace NKikimr {
9+
namespace NKqp {
10+
11+
struct TEvKqpBuffer {
12+
13+
struct TEvPrepare : public TEventLocal<TEvPrepare, TKqpBufferWriterEvents::EvPrepare> {
14+
TActorId ExecuterActorId;
15+
};
16+
17+
struct TEvCommit : public TEventLocal<TEvCommit, TKqpBufferWriterEvents::EvCommit> {
18+
TActorId ExecuterActorId;
19+
ui64 TxId;
20+
};
21+
22+
struct TEvRollback : public TEventLocal<TEvRollback, TKqpBufferWriterEvents::EvRollback> {
23+
TActorId ExecuterActorId;
24+
};
25+
26+
struct TEvFlush : public TEventLocal<TEvFlush, TKqpBufferWriterEvents::EvFlush> {
27+
TActorId ExecuterActorId;
28+
};
29+
30+
struct TEvResult : public TEventLocal<TEvResult, TKqpBufferWriterEvents::EvResult> {
31+
};
32+
33+
struct TEvError : public TEventLocal<TEvError, TKqpBufferWriterEvents::EvError> {
34+
TString Message;
35+
NYql::NDqProto::StatusIds::StatusCode StatusCode;
36+
NYql::TIssues SubIssues;
37+
38+
TEvError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues);
39+
};
40+
41+
struct TEvTerminate : public TEventLocal<TEvTerminate, TKqpBufferWriterEvents::EvTerminate> {
42+
};
43+
44+
};
45+
46+
}
47+
}

ydb/core/kqp/common/buffer/ya.make

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
events.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/kqp/common/simple
9+
ydb/library/yql/public/issue
10+
)
11+
12+
YQL_LAST_ABI_VERSION()
13+
14+
END()

ydb/core/kqp/common/kqp_tx.h

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/core/base/feature_flags.h>
44
#include <ydb/core/kqp/common/kqp_yql.h>
5+
#include <ydb/core/kqp/common/kqp_tx_manager.h>
56
#include <ydb/core/kqp/gateway/kqp_gateway.h>
67
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
78

@@ -121,12 +122,6 @@ struct TDeferredEffects {
121122
friend class TKqpTransactionContext;
122123
};
123124

124-
struct TTableInfo {
125-
bool IsOlap = false;
126-
THashSet<TStringBuf> Pathes;
127-
};
128-
129-
130125
class TShardIdToTableInfo {
131126
public:
132127
const TTableInfo& Get(ui64 shardId) const {
@@ -204,6 +199,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
204199
void Finish() final {
205200
YQL_ENSURE(DeferredEffects.Empty());
206201
YQL_ENSURE(!Locks.HasLocks());
202+
YQL_ENSURE(!TxManager);
203+
YQL_ENSURE(!BufferActorId);
207204

208205
FinishTime = TInstant::Now();
209206

@@ -350,6 +347,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
350347
bool NeedUncommittedChangesFlush = false;
351348
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
352349

350+
TActorId BufferActorId;
351+
IKqpTransactionManagerPtr TxManager = nullptr;
352+
353353
TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
354354
};
355355

0 commit comments

Comments
 (0)