Skip to content

Commit b76c6f8

Browse files
authored
YQ-3892 fix data race in json filters (#11827)
1 parent a03cc3d commit b76c6f8

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

ydb/core/fq/libs/row_dispatcher/common.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ class TPureCalcProgramFactory : public IPureCalcProgramFactory {
2121
return it->second;
2222
}
2323

24+
TGuard<TMutex> LockFactory() const override {
25+
return Guard(FactoryMutex);
26+
}
27+
2428
private:
2529
void CreateFactory(const TSettings& settings) {
2630
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
@@ -31,6 +35,7 @@ class TPureCalcProgramFactory : public IPureCalcProgramFactory {
3135

3236
private:
3337
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
38+
TMutex FactoryMutex;
3439
};
3540

3641
} // anonymous namespace

ydb/core/fq/libs/row_dispatcher/common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <util/generic/ptr.h>
4+
#include <util/system/mutex.h>
45

56
#include <ydb/library/yql/public/purecalc/common/fwd.h>
67

@@ -18,6 +19,9 @@ class IPureCalcProgramFactory : public TThrRefBase {
1819

1920
public:
2021
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
22+
23+
// Before creating purecalc program factory should be locked
24+
virtual TGuard<TMutex> LockFactory() const = 0;
2125
};
2226

2327
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,9 +279,13 @@ class TJsonFilter::TImpl {
279279
TCallback callback,
280280
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
281281
const IPureCalcProgramFactory::TSettings& factorySettings)
282-
: Sql(GenerateSql(whereFilter, factorySettings)) {
282+
: PureCalcProgramFactory(pureCalcProgramFactory)
283+
, Sql(GenerateSql(whereFilter, factorySettings)) {
283284
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
284285

286+
// Shared factory may change during compilation, so it should be locked
287+
auto guard = pureCalcProgramFactory->LockFactory();
288+
285289
// Program should be stateless because input values
286290
// allocated on another allocator and should be released
287291
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
@@ -304,6 +308,12 @@ class TJsonFilter::TImpl {
304308
return Sql;
305309
}
306310

311+
~TImpl() {
312+
auto guard = PureCalcProgramFactory->LockFactory();
313+
InputConsumer.Reset();
314+
Program.Reset();
315+
}
316+
307317
private:
308318
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
309319
TStringStream str;
@@ -317,6 +327,7 @@ class TJsonFilter::TImpl {
317327
}
318328

319329
private:
330+
const IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
320331
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
321332
THolder<NYql::NPureCalc::IConsumer<TInputType>> InputConsumer;
322333
const TString Sql;

0 commit comments

Comments
 (0)