|
126 | 126 | LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
|
127 | 127 | << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
|
128 | 128 |
|
129 |
| -#define THROW_ARROW_NOT_OK(status) \ |
| 129 | +#define THROW_ARROW_NOT_OK(code, status) \ |
130 | 130 | do \
|
131 | 131 | { \
|
132 | 132 | if (::arrow::Status _s = (status); !_s.ok()) \
|
133 |
| - throw yexception() << _s.ToString(); \ |
| 133 | + ythrow TCodeLineException(code) << _s.ToString(); \ |
134 | 134 | } while (false)
|
135 | 135 |
|
136 | 136 | namespace NYql::NDq {
|
@@ -575,7 +575,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
|
575 | 575 | void HandleEvent(TEvS3Provider::TEvReadResult2::THandle& event) {
|
576 | 576 |
|
577 | 577 | if (event.Get()->Failure) {
|
578 |
| - throw yexception() << event.Get()->Issues.ToOneLineString(); |
| 578 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << event.Get()->Issues.ToOneLineString(); |
579 | 579 | }
|
580 | 580 | auto readyRange = event.Get()->ReadRange;
|
581 | 581 | LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << event.Cookie);
|
@@ -773,9 +773,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
|
773 | 773 | if (StopIfConsumedEnough(numRows)) {
|
774 | 774 | isCancelled = true;
|
775 | 775 | }
|
776 |
| - if (!status.ok()) { |
777 |
| - throw yexception() << status.ToString(); |
778 |
| - } |
| 776 | + ThrowParquetNotOk(status); |
779 | 777 | SourceContext->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
|
780 | 778 | if (RawInflightSize) {
|
781 | 779 | RawInflightSize->Sub(downloadedBytes);
|
@@ -862,9 +860,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
|
862 | 860 | if (StopIfConsumedEnough(numRows)) {
|
863 | 861 | isCancelled = true;
|
864 | 862 | }
|
865 |
| - if (!status.ok()) { |
866 |
| - throw yexception() << status.ToString(); |
867 |
| - } |
| 863 | + ThrowParquetNotOk(status); |
868 | 864 | SourceContext->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
|
869 | 865 | if (isCancelled) {
|
870 | 866 | LOG_CORO_D("RunCoroBlockArrowParserOverFile - STOPPED ON SATURATION");
|
@@ -1177,6 +1173,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
|
1177 | 1173 | // Stop any activity instantly
|
1178 | 1174 | RetryStuff->Cancel();
|
1179 | 1175 | return;
|
| 1176 | + } catch (const TCodeLineException& err) { |
| 1177 | + LOG_CORO_E(err.what()); |
| 1178 | + Issues.AddIssue(err.GetRawMessage()); |
| 1179 | + FatalCode = static_cast<NYql::NDqProto::StatusIds::StatusCode>(err.Code); |
| 1180 | + RetryStuff->Cancel(); |
1180 | 1181 | } catch (const std::exception& err) {
|
1181 | 1182 | Issues.AddIssue(TIssue(err.what()));
|
1182 | 1183 | FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
|
@@ -2029,11 +2030,11 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type, NSerialization::TSerializat
|
2029 | 2030 | return std::make_shared<const NDB::DataTypeDecimal<NDB::Decimal128>>(precision, scale);
|
2030 | 2031 | }
|
2031 | 2032 | default:
|
2032 |
| - throw yexception() << "Unsupported data slot in MetaToClickHouse: " << slot; |
| 2033 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << "Unsupported data slot in MetaToClickHouse: " << slot; |
2033 | 2034 | }
|
2034 | 2035 | }
|
2035 | 2036 | default:
|
2036 |
| - throw yexception() << "Unsupported type kind in MetaToClickHouse: " << type->GetKindAsStr(); |
| 2037 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << "Unsupported type kind in MetaToClickHouse: " << type->GetKindAsStr(); |
2037 | 2038 | }
|
2038 | 2039 | return nullptr;
|
2039 | 2040 | }
|
@@ -2104,17 +2105,18 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
|
2104 | 2105 | if (hasDirectories) {
|
2105 | 2106 | auto pathPatternValue = settings.find("pathpattern");
|
2106 | 2107 | if (pathPatternValue == settings.cend()) {
|
2107 |
| - ythrow yexception() << "'pathpattern' must be configured for directory listing"; |
| 2108 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) |
| 2109 | + << "'pathpattern' must be configured for directory listing"; |
2108 | 2110 | }
|
2109 | 2111 | pathPattern = pathPatternValue->second;
|
2110 | 2112 |
|
2111 | 2113 | auto pathPatternVariantValue = settings.find("pathpatternvariant");
|
2112 | 2114 | if (pathPatternVariantValue == settings.cend()) {
|
2113 |
| - ythrow yexception() |
| 2115 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) |
2114 | 2116 | << "'pathpatternvariant' must be configured for directory listing";
|
2115 | 2117 | }
|
2116 | 2118 | if (!TryFromString(pathPatternVariantValue->second, pathPatternVariant)) {
|
2117 |
| - ythrow yexception() |
| 2119 | + ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) |
2118 | 2120 | << "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second;
|
2119 | 2121 | }
|
2120 | 2122 | }
|
@@ -2197,13 +2199,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
|
2197 | 2199 | std::shared_ptr<arrow::DataType> dataType;
|
2198 | 2200 |
|
2199 | 2201 | YQL_ENSURE(ConvertArrowType(memberType, dataType, true), "Unsupported arrow type");
|
2200 |
| - THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional()))); |
| 2202 | + THROW_ARROW_NOT_OK( |
| 2203 | + NYql::NDqProto::StatusIds::INTERNAL_ERROR, |
| 2204 | + builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional())) |
| 2205 | + ); |
2201 | 2206 | readSpec->ColumnReorder.push_back(i);
|
2202 | 2207 | readSpec->RowSpec.emplace(memberName, memberType);
|
2203 | 2208 | }
|
2204 | 2209 |
|
2205 | 2210 | auto res = builder.Finish();
|
2206 |
| - THROW_ARROW_NOT_OK(res.status()); |
| 2211 | + THROW_ARROW_NOT_OK(NYql::NDqProto::StatusIds::INTERNAL_ERROR, res.status()); |
2207 | 2212 | readSpec->ArrowSchema = std::move(res).ValueOrDie();
|
2208 | 2213 | } else {
|
2209 | 2214 | readSpec->CHColumns.resize(structType->GetMembersCount());
|
|
0 commit comments