Skip to content

Moved csv parsing in import file cmd to YDB CLI and supported pg-types (only row tables) #514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions ydb/public/lib/json_value/ydb_json_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,22 +677,20 @@ namespace {
ValueBuilder.Decimal(jsonValue.GetString());
break;

case TTypeParser::ETypeKind::Pg: {
TPgType pgType(""); // TODO: correct type?
if (jsonValue.GetType() == NJson::JSON_STRING) {
ValueBuilder.Pg(TPgValue(TPgValue::VK_TEXT, jsonValue.GetString(), pgType));
} else if (jsonValue.GetType() == NJson::JSON_NULL) {
ValueBuilder.Pg(TPgValue(TPgValue::VK_NULL, {}, pgType));
} else {
EnsureType(jsonValue, NJson::JSON_ARRAY);
if (jsonValue.GetArray().size() != 1) {
ThrowFatalError(TStringBuilder() << "Pg type should be encoded as array with size 1, but not " << jsonValue.GetArray().size());
}
auto& innerJsonValue = jsonValue.GetArray().at(0);
EnsureType(innerJsonValue, NJson::JSON_STRING);
auto binary = JsonStringToBinaryString(innerJsonValue.GetString());
ValueBuilder.Pg(TPgValue(TPgValue::VK_BINARY, binary, pgType));
case TTypeParser::ETypeKind::Pg:
if (jsonValue.GetType() == NJson::JSON_STRING) {
ValueBuilder.Pg(TPgValue(TPgValue::VK_TEXT, jsonValue.GetString(), TypeParser.GetPg()));
} else if (jsonValue.GetType() == NJson::JSON_NULL) {
ValueBuilder.Pg(TPgValue(TPgValue::VK_NULL, {}, TypeParser.GetPg()));
} else {
EnsureType(jsonValue, NJson::JSON_ARRAY);
if (jsonValue.GetArray().size() != 1) {
ThrowFatalError(TStringBuilder() << "Pg type should be encoded as array with size 1, but not " << jsonValue.GetArray().size());
}
auto& innerJsonValue = jsonValue.GetArray().at(0);
EnsureType(innerJsonValue, NJson::JSON_STRING);
auto binary = JsonStringToBinaryString(innerJsonValue.GetString());
ValueBuilder.Pg(TPgValue(TPgValue::VK_BINARY, binary, TypeParser.GetPg()));
}
break;

Expand Down
4 changes: 3 additions & 1 deletion ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ int TCommandImportFromCsv::Run(TConfig& config) {
settings.Header(Header);
settings.NewlineDelimited(NewlineDelimited);
settings.HeaderRow(HeaderRow);
settings.NullValue(NullValue);
if (config.ParseResult->Has("null-value")) {
settings.NullValue(NullValue);
}

if (Delimiter.size() != 1) {
throw TMisuseException()
Expand Down
136 changes: 100 additions & 36 deletions ydb/public/lib/ydb_cli/common/csv_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace {

class TCsvToYdbConverter {
public:
explicit TCsvToYdbConverter(TTypeParser& parser)
explicit TCsvToYdbConverter(TTypeParser& parser, const std::optional<TString>& nullValue)
: Parser(parser)
, NullValue(nullValue)
{
}

Expand Down Expand Up @@ -40,12 +41,12 @@ class TCsvToYdbConverter {
size_t cnt;
try {
auto value = StringToArithmetic<T>(token, cnt);
if (cnt != token.Size() || value < std::numeric_limits<T>::min() || value > std::numeric_limits<T>::max()) {
if (cnt != token.Size() || value < std::numeric_limits<T>::lowest() || value > std::numeric_limits<T>::max()) {
throw yexception();
}
return static_cast<T>(value);
} catch (std::exception& e) {
throw TMisuseException() << "Expected " << Parser.GetPrimitive() << " value, recieved: \"" << token << "\".";
throw TMisuseException() << "Expected " << Parser.GetPrimitive() << " value, recieved: \"" << token << "\".";
}
}

Expand Down Expand Up @@ -105,15 +106,30 @@ class TCsvToYdbConverter {
case EPrimitiveType::DyNumber:
Builder.DyNumber(token);
break;
case EPrimitiveType::Date:
Builder.Date(TInstant::Days(GetArithmetic<ui16>(token)));
case EPrimitiveType::Date: {
TInstant date;
if (!TInstant::TryParseIso8601(token, date)) {
date = TInstant::Days(GetArithmetic<ui16>(token));
}
Builder.Date(date);
break;
case EPrimitiveType::Datetime:
Builder.Datetime(TInstant::Seconds(GetArithmetic<ui32>(token)));
}
case EPrimitiveType::Datetime: {
TInstant datetime;
if (!TInstant::TryParseIso8601(token, datetime)) {
datetime = TInstant::Seconds(GetArithmetic<ui32>(token));
}
Builder.Datetime(datetime);
break;
case EPrimitiveType::Timestamp:
Builder.Timestamp(TInstant::MicroSeconds(GetArithmetic<ui64>(token)));
}
case EPrimitiveType::Timestamp: {
TInstant timestamp;
if (!TInstant::TryParseIso8601(token, timestamp)) {
timestamp = TInstant::MicroSeconds(GetArithmetic<ui64>(token));
}
Builder.Timestamp(timestamp);
break;
}
case EPrimitiveType::Interval:
Builder.Interval(GetArithmetic<i64>(token));
break;
Expand All @@ -133,17 +149,17 @@ class TCsvToYdbConverter {

void BuildValue(TStringBuf token) {
switch (Parser.GetKind()) {
case TTypeParser::ETypeKind::Primitive:
case TTypeParser::ETypeKind::Primitive: {
BuildPrimitive(TString(token));
break;

case TTypeParser::ETypeKind::Decimal:
}
case TTypeParser::ETypeKind::Decimal: {
Builder.Decimal(TString(token));
break;

case TTypeParser::ETypeKind::Optional:
}
case TTypeParser::ETypeKind::Optional: {
Parser.OpenOptional();
if (token == NullValue) {
if (NullValue && token == NullValue) {
Builder.EmptyOptional(GetType());
} else {
Builder.BeginOptional();
Expand All @@ -152,23 +168,31 @@ class TCsvToYdbConverter {
}
Parser.CloseOptional();
break;

case TTypeParser::ETypeKind::Null:
}
case TTypeParser::ETypeKind::Null: {
EnsureNull(token);
break;

case TTypeParser::ETypeKind::Void:
}
case TTypeParser::ETypeKind::Void: {
EnsureNull(token);
break;

case TTypeParser::ETypeKind::Tagged:
}
case TTypeParser::ETypeKind::Tagged: {
Parser.OpenTagged();
Builder.BeginTagged(Parser.GetTag());
BuildValue(token);
Builder.EndTagged();
Parser.CloseTagged();
break;

}
case TTypeParser::ETypeKind::Pg: {
if (NullValue && token == NullValue) {
Builder.Pg(TPgValue(TPgValue::VK_NULL, {}, Parser.GetPg()));
} else {
Builder.Pg(TPgValue(TPgValue::VK_TEXT, TString(token), Parser.GetPg()));
}
break;
}
default:
throw TMisuseException() << "Unsupported type kind: " << Parser.GetKind();
}
Expand Down Expand Up @@ -200,6 +224,10 @@ class TCsvToYdbConverter {
Parser.CloseTagged();
break;

case TTypeParser::ETypeKind::Pg:
typeBuilder.Pg(Parser.GetPg());
break;

default:
throw TMisuseException() << "Unsupported type kind: " << Parser.GetKind();
}
Expand All @@ -222,6 +250,9 @@ class TCsvToYdbConverter {
}

void EnsureNull(TStringBuf token) const {
if (!NullValue) {
throw TMisuseException() << "Expected null value instead of \"" << token << "\", but null value is not set.";
}
if (token != NullValue) {
throw TMisuseException() << "Expected null value: \"" << NullValue << "\", recieved: \"" << token << "\".";
}
Expand All @@ -234,28 +265,42 @@ class TCsvToYdbConverter {

private:
TTypeParser& Parser;
const TString NullValue = "";
const std::optional<TString> NullValue = "";
TValueBuilder Builder;
};

}

TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::map<TString, TType>& paramTypes, const std::map<TString, TString>& paramSources)
TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
const std::map<TString, TType>* paramTypes,
const std::map<TString, TString>* paramSources)
: HeaderRow(std::move(headerRow))
, Delimeter(delimeter)
, NullValue(nullValue)
, ParamTypes(paramTypes)
, ParamSources(paramSources)
{
NCsvFormat::CsvSplitter splitter(HeaderRow, Delimeter);
Header = static_cast<TVector<TString>>(splitter);
}

TValue TCsvParser::FieldToValue(TTypeParser& parser, TStringBuf token) {
TCsvToYdbConverter converter(parser);
TCsvParser::TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
const std::map<TString, TType>* paramTypes,
const std::map<TString, TString>* paramSources)
: Header(std::move(header))
, Delimeter(delimeter)
, NullValue(nullValue)
, ParamTypes(paramTypes)
, ParamSources(paramSources)
{
}

TValue TCsvParser::FieldToValue(TTypeParser& parser, TStringBuf token) const {
TCsvToYdbConverter converter(parser, NullValue);
return converter.Convert(token);
}

void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) {
void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) const {
NCsvFormat::CsvSplitter splitter(data, Delimeter);
auto headerIt = Header.begin();
do {
Expand All @@ -264,14 +309,16 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) {
throw TMisuseException() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << data << "\"";
}
TString fullname = "$" + *headerIt;
auto paramIt = ParamTypes.find(fullname);
if (paramIt == ParamTypes.end()) {
auto paramIt = ParamTypes->find(fullname);
if (paramIt == ParamTypes->end()) {
++headerIt;
continue;
}
auto paramSource = ParamSources.find(fullname);
if (paramSource != ParamSources.end()) {
throw TMisuseException() << "Parameter " << fullname << " value found in more than one source: stdin, " << paramSource->second << ".";
if (ParamSources) {
auto paramSource = ParamSources->find(fullname);
if (paramSource != ParamSources->end()) {
throw TMisuseException() << "Parameter " << fullname << " value found in more than one source: stdin, " << paramSource->second << ".";
}
}
TTypeParser parser(paramIt->second);
builder.AddParam(fullname, FieldToValue(parser, token));
Expand All @@ -283,27 +330,30 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) {
}
}

void TCsvParser::GetValue(TString&& data, const TType& type, TValueBuilder& builder) {
void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& type) const {
NCsvFormat::CsvSplitter splitter(data, Delimeter);
auto headerIt = Header.begin();
auto headerIt = Header.cbegin();
std::map<TString, TStringBuf> fields;
do {
TStringBuf token = splitter.Consume();
if (headerIt == Header.end()) {
if (headerIt == Header.cend()) {
throw TMisuseException() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << data << "\"";
}
fields[*headerIt] = token;
++headerIt;
} while (splitter.Step());

if (headerIt != Header.end()) {
if (headerIt != Header.cend()) {
throw TMisuseException() << "Header contains more fields than data. Header: \"" << HeaderRow << "\", data: \"" << data << "\"";
}
builder.BeginStruct();
TTypeParser parser(type);
parser.OpenStruct();
while (parser.TryNextMember()) {
TString name = parser.GetMemberName();
if (name == "__ydb_skip_column_name") {
continue;
}
auto fieldIt = fields.find(name);
if (fieldIt == fields.end()) {
throw TMisuseException() << "No member \"" << name << "\" in csv string for YDB struct type";
Expand All @@ -314,5 +364,19 @@ void TCsvParser::GetValue(TString&& data, const TType& type, TValueBuilder& buil
builder.EndStruct();
}

TType TCsvParser::GetColumnsType() const {
TTypeBuilder builder;
builder.BeginStruct();
for (const auto& colName : Header) {
if (ParamTypes->find(colName) != ParamTypes->end()) {
builder.AddMember(colName, ParamTypes->at(colName));
} else {
builder.AddMember("__ydb_skip_column_name", TTypeBuilder().Build());
}
}
builder.EndStruct();
return builder.Build();
}

}
}
29 changes: 22 additions & 7 deletions ydb/public/lib/ydb_cli/common/csv_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,34 @@ namespace NConsoleClient {

class TCsvParser {
public:
TCsvParser(TString&& headerRow, const char delimeter, const std::map<TString, TType>& paramTypes, const std::map<TString, TString>& paramSources);
TCsvParser() = default;

void GetParams(TString&& data, TParamsBuilder& builder);
void GetValue(TString&& data, const TType& type, TValueBuilder& builder);
TCsvParser(const TCsvParser&) = delete;
TCsvParser(TCsvParser&&) = default;
TCsvParser& operator=(const TCsvParser&) = delete;
TCsvParser& operator=(TCsvParser&&) = default;
~TCsvParser() = default;

TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
const std::map<TString, TType>* paramTypes = nullptr,
const std::map<TString, TString>* paramSources = nullptr);
TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
const std::map<TString, TType>* paramTypes = nullptr,
const std::map<TString, TString>* paramSources = nullptr);

void GetParams(TString&& data, TParamsBuilder& builder) const;
void GetValue(TString&& data, TValueBuilder& builder, const TType& type) const;
TType GetColumnsType() const;

private:
TValue FieldToValue(TTypeParser& parser, TStringBuf token);
TValue FieldToValue(TTypeParser& parser, TStringBuf token) const;

TVector<TString> Header;
TString HeaderRow;
const char Delimeter;
const std::map<TString, TType>& ParamTypes;
const std::map<TString, TString>& ParamSources;
char Delimeter;
std::optional<TString> NullValue;
const std::map<TString, TType>* ParamTypes;
const std::map<TString, TString>* ParamSources;
};

}
Expand Down
Loading