Skip to content

Commit a89660c

Browse files
Merge 9abfdac into 2f89980
2 parents 2f89980 + 9abfdac commit a89660c

File tree

23 files changed

+1469
-32
lines changed

23 files changed

+1469
-32
lines changed

build/conf/compilers/gnu_compiler.conf

+7
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ when ($MSAN_TRACK_ORIGIN == "yes") {
8484

8585
when ($ARCH_XTENSA == "yes") {
8686
FSTACK=
87+
CFLAGS+=-Wno-c++14-extensions
88+
when ($ARCH_XTENSA_HIFI4 == "yes") {
89+
CFLAGS+=-Wno-c++1z-extensions
90+
}
91+
otherwise {
92+
CFLAGS+=-Wno-c++17-extensions
93+
}
8794
}
8895

8996
when ($OS_EMSCRIPTEN == "yes") {

ydb/ci/rightlib.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
23e9865bb938b83e7e32b670ba055c407f75494b
1+
796e6186c6652f49958e68c7eb0f06c52827e702

yql/essentials/core/dq_integration/yql_dq_integration.h

+17-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ class TJsonValue;
2121

2222
namespace NYql {
2323

24-
struct TDqSettings;
2524
class TTransformationPipeline;
2625

2726
namespace NCommon {
@@ -45,12 +44,26 @@ class IDqIntegration {
4544
public:
4645
virtual ~IDqIntegration() {}
4746

48-
virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
49-
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0;
47+
struct TPartitionSettings {
48+
TMaybe<ui64> DataSizePerJob;
49+
size_t MaxPartitions = 0;
50+
TMaybe<bool> EnableComputeActor;
51+
bool CanFallback = false;
52+
};
53+
54+
virtual ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) = 0;
5055
virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0;
5156
virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
5257
virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0;
53-
virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
58+
59+
struct TWrapReadSettings {
60+
TMaybe<TString> WatermarksMode;
61+
TMaybe<ui64> WatermarksGranularityMs;
62+
TMaybe<ui64> WatermarksLateArrivalDelayMs;
63+
TMaybe<bool> WatermarksEnableIdlePartitions;
64+
};
65+
66+
virtual TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) = 0;
5467
virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0;
5568
virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
5669

yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
namespace NYql {
44

5-
ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&,
6-
TVector<TString>&, TString*, TExprContext&, bool) {
5+
ui64 TDqIntegrationBase::Partition(const TExprNode&, TVector<TString>&, TString*, TExprContext&, const TPartitionSettings& ) {
76
return 0;
87
}
98

@@ -22,7 +21,7 @@ TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<cons
2221
return Nothing();
2322
}
2423

25-
TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) {
24+
TExprNode::TPtr TDqIntegrationBase::WrapRead(const TExprNode::TPtr& read, TExprContext&, const TWrapReadSettings& ) {
2625
return read;
2726
}
2827

yql/essentials/providers/common/dq/yql_dq_integration_impl.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ namespace NYql {
66

77
class TDqIntegrationBase: public IDqIntegration {
88
public:
9-
ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
10-
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override;
9+
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) override;
1110
bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override;
1211
bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override;
1312
TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override;
14-
TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
13+
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) override;
1514
TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override;
1615
TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
1716
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;

yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class TPgDqIntegration: public TDqIntegrationBase {
2525
return Nothing();
2626
}
2727

28-
ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
28+
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
2929
partitions.clear();
3030
partitions.emplace_back();
3131
return 0ULL;

yt/cpp/mapreduce/client/init.cpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -166,27 +166,32 @@ NLogging::ELogLevel ToCoreLogLevel(ILogger::ELevel level)
166166
Y_ABORT();
167167
}
168168

169-
void CommonInitialize(int, const char**)
169+
void CommonInitialize(TGuard<TMutex>& g)
170170
{
171171
auto logLevelStr = to_lower(TConfig::Get()->LogLevel);
172172
ILogger::ELevel logLevel;
173173

174174
if (!TryFromString(logLevelStr, logLevel)) {
175175
Cerr << "Invalid log level: " << TConfig::Get()->LogLevel << Endl;
176+
g.Release();
176177
exit(1);
177178
}
178179

179180
auto logPath = TConfig::Get()->LogPath;
180-
ILoggerPtr logger;
181181
if (logPath.empty()) {
182-
logger = CreateStdErrLogger(logLevel);
182+
if (TConfig::Get()->LogUseCore) {
183+
auto coreLoggingConfig = NLogging::TLogManagerConfig::CreateStderrLogger(ToCoreLogLevel(logLevel));
184+
NLogging::TLogManager::Get()->Configure(coreLoggingConfig);
185+
SetUseCoreLog();
186+
} else {
187+
auto logger = CreateStdErrLogger(logLevel);
188+
SetLogger(logger);
189+
}
183190
} else {
184-
logger = CreateFileLogger(logLevel, logPath, /*append*/ true);
185-
186191
auto coreLoggingConfig = NLogging::TLogManagerConfig::CreateLogFile(logPath, ToCoreLogLevel(logLevel));
187192
NLogging::TLogManager::Get()->Configure(coreLoggingConfig);
193+
SetUseCoreLog();
188194
}
189-
SetLogger(logger);
190195
}
191196

192197
void NonJobInitialize(const TInitializeOptions& options)
@@ -281,8 +286,7 @@ void JoblessInitialize(const TInitializeOptions& options)
281286
{
282287
auto g = Guard(InitializeLock);
283288

284-
static const char* fakeArgv[] = {"unknown..."};
285-
NDetail::CommonInitialize(1, fakeArgv);
289+
NDetail::CommonInitialize(g);
286290
NDetail::NonJobInitialize(options);
287291
NDetail::ElevateInitStatus(NDetail::EInitStatus::JoblessInitialization);
288292
}
@@ -291,7 +295,7 @@ void Initialize(int argc, const char* argv[], const TInitializeOptions& options)
291295
{
292296
auto g = Guard(InitializeLock);
293297

294-
NDetail::CommonInitialize(argc, argv);
298+
NDetail::CommonInitialize(g);
295299

296300
NDetail::ElevateInitStatus(NDetail::EInitStatus::FullInitialization);
297301

yt/cpp/mapreduce/interface/config.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ void TConfig::Reset()
194194
ApiVersion = GetEnv("YT_VERSION", "v3");
195195
LogLevel = GetEnv("YT_LOG_LEVEL", "error");
196196
LogPath = GetEnv("YT_LOG_PATH");
197+
LogUseCore = GetBool("YT_LOG_USE_CORE", false);
197198

198199
ContentEncoding = GetEncoding("YT_CONTENT_ENCODING");
199200
AcceptEncoding = GetEncoding("YT_ACCEPT_ENCODING");

yt/cpp/mapreduce/interface/config.h

+12
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ struct TConfig
8181
TString LogLevel;
8282
TString LogPath;
8383

84+
///
85+
/// For historical reasons mapreduce client uses its own logging system.
86+
///
87+
/// If this options is set to true library switches to yt/yt/core logging by default.
88+
/// But if user calls @ref NYT::SetLogger library switches back to logger provided by user
89+
/// (except for messages from yt/yt/core).
90+
///
91+
/// This is temporary option. In future it would be true by default, and then removed.
92+
///
93+
/// https://st.yandex-team.ru/YT-23645
94+
bool LogUseCore = false;
95+
8496
// Compression for data that is sent to YT cluster.
8597
EEncoding ContentEncoding;
8698

yt/cpp/mapreduce/interface/logging/logger.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,12 @@ ILoggerPtr GetLogger()
182182
return Logger;
183183
}
184184

185+
void SetUseCoreLog()
186+
{
187+
auto guard = TWriteGuard(LoggerMutex);
188+
Logger = nullptr;
189+
}
190+
185191
////////////////////////////////////////////////////////////////////////////////
186192

187193
}
188-

yt/cpp/mapreduce/interface/logging/logger.h

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ using ILoggerPtr = ::TIntrusivePtr<ILogger>;
3030
void SetLogger(ILoggerPtr logger);
3131
ILoggerPtr GetLogger();
3232

33+
void SetUseCoreLog();
34+
3335
ILoggerPtr CreateStdErrLogger(ILogger::ELevel cutLevel);
3436
ILoggerPtr CreateFileLogger(ILogger::ELevel cutLevel, const TString& path, bool append = false);
3537

@@ -40,4 +42,6 @@ ILoggerPtr CreateFileLogger(ILogger::ELevel cutLevel, const TString& path, bool
4042
*/
4143
ILoggerPtr CreateBufferedFileLogger(ILogger::ELevel cutLevel, const TString& path, bool append = false);
4244

45+
////////////////////////////////////////////////////////////////////////////////
46+
4347
} // namespace NYT

yt/cpp/mapreduce/interface/logging/yt_log.cpp

+21-10
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,34 @@ class TLogManager
2626
::TSourceLocation sourceLocation,
2727
TStringBuf anchorMessage) override
2828
{
29+
if (auto* defaultLogManager = GetDefaultLogManager()) {
30+
defaultLogManager->RegisterStaticAnchor(anchor, sourceLocation, anchorMessage);
31+
}
2932
auto guard = Guard(Mutex_);
3033
anchor->SourceLocation = sourceLocation;
3134
anchor->AnchorMessage = anchorMessage;
3235
}
3336

34-
void UpdateAnchor(TLoggingAnchor* /*position*/) override
35-
{ }
37+
void UpdateAnchor(TLoggingAnchor* anchor) override
38+
{
39+
if (auto* defaultLogManager = GetDefaultLogManager()) {
40+
defaultLogManager->UpdateAnchor(anchor);
41+
}
42+
}
3643

3744
void Enqueue(TLogEvent&& event) override
3845
{
39-
auto message = TString(event.MessageRef.ToStringBuf());
40-
LogMessage(
41-
ToImplLevel(event.Level),
42-
::TSourceLocation(event.SourceFile, event.SourceLine),
43-
"%.*s",
44-
event.MessageRef.size(),
45-
event.MessageRef.begin());
46+
if (auto logger = GetLogger()) {
47+
LogMessage(
48+
logger,
49+
ToImplLevel(event.Level),
50+
::TSourceLocation(event.SourceFile, event.SourceLine),
51+
"%.*s",
52+
event.MessageRef.size(),
53+
event.MessageRef.begin());
54+
} else if (auto* defaultLogManager = GetDefaultLogManager()) {
55+
defaultLogManager->Enqueue(std::move(event));
56+
}
4657
}
4758

4859
const TLoggingCategory* GetCategory(TStringBuf categoryName) override
@@ -81,7 +92,7 @@ class TLogManager
8192
}
8293
}
8394

84-
static void LogMessage(ILogger::ELevel level, const ::TSourceLocation& sourceLocation, const char* format, ...)
95+
static void LogMessage(const ILoggerPtr& logger, ILogger::ELevel level, const ::TSourceLocation& sourceLocation, const char* format, ...)
8596
{
8697
va_list args;
8798
va_start(args, format);

yt/yt/client/formats/config.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -352,4 +352,12 @@ void TSkiffFormatConfig::Register(TRegistrar registrar)
352352

353353
////////////////////////////////////////////////////////////////////////////////
354354

355+
void TYamlFormatConfig::Register(TRegistrar registrar)
356+
{
357+
registrar.Parameter("write_uint_tag", &TThis::WriteUintTag)
358+
.Default(false);
359+
}
360+
361+
////////////////////////////////////////////////////////////////////////////////
362+
355363
} // namespace NYT::NFormats

yt/yt/client/formats/config.h

+20
Original file line numberDiff line numberDiff line change
@@ -415,4 +415,24 @@ DEFINE_REFCOUNTED_TYPE(TSkiffFormatConfig)
415415

416416
////////////////////////////////////////////////////////////////////////////////
417417

418+
class TYamlFormatConfig
419+
: public NYTree::TYsonStruct
420+
{
421+
public:
422+
//! Write explicit tag "!yt/uint64" for uint64 data type.
423+
//! Use this option if you want to preserve information about
424+
//! the original YT type (without it, numbers in range [0, 2^63-1]
425+
//! will always be written as integers).
426+
//! Option has no effect for parsing.
427+
bool WriteUintTag;
428+
429+
REGISTER_YSON_STRUCT(TYamlFormatConfig);
430+
431+
static void Register(TRegistrar registrar);
432+
};
433+
434+
DEFINE_REFCOUNTED_TYPE(TYamlFormatConfig)
435+
436+
////////////////////////////////////////////////////////////////////////////////
437+
418438
} // namespace NYT::NFormats

yt/yt/client/formats/public.h

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ DEFINE_ENUM(EFormatType,
5858
(WebJson)
5959
(Skiff)
6060
(Arrow)
61+
(Yaml)
6162
);
6263

6364
////////////////////////////////////////////////////////////////////////////////
@@ -76,6 +77,7 @@ DECLARE_REFCOUNTED_CLASS(TProtobufTableConfig)
7677
DECLARE_REFCOUNTED_CLASS(TProtobufFormatConfig)
7778
DECLARE_REFCOUNTED_CLASS(TWebJsonFormatConfig)
7879
DECLARE_REFCOUNTED_CLASS(TSkiffFormatConfig)
80+
DECLARE_REFCOUNTED_CLASS(TYamlFormatConfig)
7981

8082
DECLARE_REFCOUNTED_STRUCT(IYamrConsumer)
8183

yt/yt/library/formats/format.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include "schemaless_writer_adapter.h"
1313
#include "skiff_parser.h"
1414
#include "skiff_writer.h"
15+
#include "yaml_parser.h"
16+
#include "yaml_writer.h"
1517
#include "yamred_dsv_parser.h"
1618
#include "yamred_dsv_writer.h"
1719
#include "yamr_parser.h"
@@ -108,6 +110,18 @@ std::unique_ptr<IFlushableYsonConsumer> CreateConsumerForDsv(
108110
};
109111
}
110112

113+
std::unique_ptr<IFlushableYsonConsumer> CreateConsumerForYaml(
114+
EDataType dataType,
115+
const IAttributeDictionary& attributes,
116+
IZeroCopyOutput* output)
117+
{
118+
if (dataType != EDataType::Structured) {
119+
THROW_ERROR_EXCEPTION("YAML is supported only for structured data");
120+
}
121+
auto config = ConvertTo<TYamlFormatConfigPtr>(&attributes);
122+
return CreateYamlWriter(output, DataTypeToYsonType(dataType), config);
123+
}
124+
111125
class TTableParserAdapter
112126
: public IParser
113127
{
@@ -161,6 +175,8 @@ std::unique_ptr<IFlushableYsonConsumer> CreateConsumerForFormat(
161175
return CreateConsumerForJson(dataType, format.Attributes(), output);
162176
case EFormatType::Dsv:
163177
return CreateConsumerForDsv(dataType, format.Attributes(), output);
178+
case EFormatType::Yaml:
179+
return CreateConsumerForYaml(dataType, format.Attributes(), output);
164180
default:
165181
THROW_ERROR_EXCEPTION("Unsupported output format %Qlv",
166182
format.GetType());
@@ -408,6 +424,21 @@ TYsonProducer CreateProducerForJson(
408424
});
409425
}
410426

427+
TYsonProducer CreateProducerForYaml(
428+
EDataType dataType,
429+
const IAttributeDictionary& attributes,
430+
IInputStream* input)
431+
{
432+
if (dataType != EDataType::Structured) {
433+
THROW_ERROR_EXCEPTION("YAML is supported only for structured data");
434+
}
435+
auto ysonType = DataTypeToYsonType(dataType);
436+
auto config = ConvertTo<TYamlFormatConfigPtr>(&attributes);
437+
return BIND([=] (IYsonConsumer* consumer) {
438+
ParseYaml(input, consumer, config, ysonType);
439+
});
440+
}
441+
411442
TYsonProducer CreateProducerForYson(EDataType dataType, IInputStream* input)
412443
{
413444
auto ysonType = DataTypeToYsonType(dataType);
@@ -429,6 +460,8 @@ TYsonProducer CreateProducerForFormat(const TFormat& format, EDataType dataType,
429460
return CreateProducerForYamredDsv(dataType, format.Attributes(), input);
430461
case EFormatType::SchemafulDsv:
431462
return CreateProducerForSchemafulDsv(dataType, format.Attributes(), input);
463+
case EFormatType::Yaml:
464+
return CreateProducerForYaml(dataType, format.Attributes(), input);
432465
default:
433466
THROW_ERROR_EXCEPTION("Unsupported input format %Qlv",
434467
format.GetType());
@@ -489,6 +522,10 @@ std::unique_ptr<IParser> CreateParserForFormat(const TFormat& format, EDataType
489522
auto config = ConvertTo<TSchemafulDsvFormatConfigPtr>(&format.Attributes());
490523
return CreateParserForSchemafulDsv(consumer, config);
491524
}
525+
case EFormatType::Yaml:
526+
// We can only get here with EDataType::Tabular, so throw specific error about supporting
527+
// only structured data in YAML.
528+
THROW_ERROR_EXCEPTION("YAML is supported only for structured data");
492529
default:
493530
THROW_ERROR_EXCEPTION("Unsupported input format %Qlv",
494531
format.GetType());

0 commit comments

Comments
 (0)