1
1
#include " kqp_stream_lookup_actor.h"
2
2
3
- #include < ydb/library/actors/core/actor_bootstrapped.h>
4
-
5
3
#include < ydb/core/actorlib_impl/long_timer.h>
6
4
#include < ydb/core/base/tablet_pipecache.h>
7
5
#include < ydb/core/engine/minikql/minikql_engine_host.h>
8
6
#include < ydb/core/kqp/common/kqp_resolve.h>
7
+ #include < ydb/core/kqp/common/kqp_event_ids.h>
9
8
#include < ydb/core/kqp/gateway/kqp_gateway.h>
9
+ #include < ydb/core/kqp/runtime/kqp_scan_data.h>
10
+ #include < ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
10
11
#include < ydb/core/protos/kqp_stats.pb.h>
11
12
#include < ydb/core/tx/scheme_cache/scheme_cache.h>
12
- #include < ydb/core/kqp/common/kqp_event_ids.h>
13
+
14
+ #include < ydb/library/actors/core/actor_bootstrapped.h>
13
15
#include < ydb/library/yql/public/issue/yql_issue_message.h>
14
- #include < ydb/core/kqp/runtime/kqp_scan_data.h>
15
- #include < ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
16
16
#include < ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
17
+ #include < ydb/library/wilson_ids/wilson.h>
17
18
18
19
namespace NKikimr {
19
20
namespace NKqp {
@@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10;
25
26
26
27
class TKqpStreamLookupActor : public NActors ::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
27
28
public:
28
- TKqpStreamLookupActor (ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
29
- const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
30
- const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
31
- const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
29
+ TKqpStreamLookupActor (NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings,
32
30
TIntrusivePtr<TKqpCounters> counters)
33
- : LogPrefix(TStringBuilder() << " StreamLookupActor, inputIndex: " << inputIndex << " , CA Id " << computeActorId )
34
- , InputIndex(inputIndex )
35
- , Input(input )
36
- , ComputeActorId(computeActorId )
37
- , TypeEnv(typeEnv )
38
- , Alloc(alloc )
31
+ : LogPrefix(TStringBuilder() << " StreamLookupActor, inputIndex: " << args.InputIndex << " , CA Id " << args.ComputeActorId )
32
+ , InputIndex(args.InputIndex )
33
+ , Input(args.TransformInput )
34
+ , ComputeActorId(args.ComputeActorId )
35
+ , TypeEnv(args.TypeEnv )
36
+ , Alloc(args.Alloc )
39
37
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
40
38
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
41
39
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
42
- , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc ))
40
+ , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc ))
43
41
, Counters(counters)
42
+ , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), " LookupActor" )
44
43
{
45
- IngressStats.Level = statsLevel ;
44
+ IngressStats.Level = args. StatsLevel ;
46
45
}
47
46
48
47
virtual ~TKqpStreamLookupActor () {
@@ -174,6 +173,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
174
173
175
174
Send (MakePipePeNodeCacheID (false ), new TEvPipeCache::TEvUnlink (0 ));
176
175
TActorBootstrapped<TKqpStreamLookupActor>::PassAway ();
176
+
177
+ LookupActorSpan.End ();
177
178
}
178
179
179
180
i64 GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool & finished, i64 freeSpace) final {
@@ -234,10 +235,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
234
235
void Handle (TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
235
236
CA_LOG_D (" TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath ());
236
237
if (ev->Get ()->Request ->ErrorCount > 0 ) {
237
- return RuntimeError (TStringBuilder () << " Failed to get partitioning for table: "
238
- << StreamLookupWorker->GetTablePath (), NYql::NDqProto::StatusIds::SCHEME_ERROR);
238
+ TString errorMsg = TStringBuilder () << " Failed to get partitioning for table: "
239
+ << StreamLookupWorker->GetTablePath ();
240
+ LookupActorStateSpan.EndError (errorMsg);
241
+
242
+ return RuntimeError (errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR);
239
243
}
240
244
245
+ LookupActorStateSpan.EndOk ();
246
+
241
247
auto & resultSet = ev->Get ()->Request ->ResultSet ;
242
248
YQL_ENSURE (resultSet.size () == 1 , " Expected one result for range [NULL, +inf)" );
243
249
Partitioning = resultSet[0 ].KeyDescription ->Partitioning ;
@@ -342,8 +348,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
342
348
<< " was resolved: " << !!Partitioning);
343
349
344
350
if (!Partitioning) {
345
- RuntimeError (TStringBuilder () << " Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath ()
346
- << " (request timeout exceeded)" , NYql::NDqProto::StatusIds::TIMEOUT);
351
+ TString errorMsg = TStringBuilder () << " Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath ()
352
+ << " (request timeout exceeded)" ;
353
+ LookupActorStateSpan.EndError (errorMsg);
354
+
355
+ RuntimeError (errorMsg, NYql::NDqProto::StatusIds::TIMEOUT);
347
356
}
348
357
}
349
358
@@ -392,7 +401,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
392
401
record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
393
402
394
403
Send (MakePipePeNodeCacheID (false ), new TEvPipeCache::TEvForward (request.Release (), shardId, true ),
395
- IEventHandle::FlagTrackDelivery);
404
+ IEventHandle::FlagTrackDelivery, 0 , LookupActorSpan. GetTraceId () );
396
405
397
406
read .State = EReadState::Running;
398
407
@@ -438,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
438
447
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
439
448
440
449
Counters->IteratorsShardResolve ->Inc ();
450
+ LookupActorStateSpan = NWilson::TSpan (TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId (),
451
+ " WaitForShardsResolve" , NWilson::EFlags::AUTO_END);
452
+
441
453
Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvInvalidateTable (StreamLookupWorker->GetTableId (), {}));
442
454
Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvResolveKeySet (request));
443
455
@@ -467,6 +479,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
467
479
468
480
NYql::TIssues issues;
469
481
issues.AddIssue (std::move (issue));
482
+
483
+ if (LookupActorSpan) {
484
+ LookupActorSpan.EndError (issues.ToOneLineString ());
485
+ }
486
+
470
487
Send (ComputeActorId, new TEvAsyncInputError (InputIndex, std::move (issues), statusCode));
471
488
}
472
489
@@ -495,17 +512,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
495
512
ui64 ReadBytesCount = 0 ;
496
513
497
514
TIntrusivePtr<TKqpCounters> Counters;
515
+ NWilson::TSpan LookupActorSpan;
516
+ NWilson::TSpan LookupActorStateSpan;
498
517
};
499
518
500
519
} // namespace
501
520
502
- std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor (ui64 inputIndex,
503
- NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
504
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
505
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
521
+ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor (NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
506
522
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
507
- auto actor = new TKqpStreamLookupActor (inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
508
- alloc, inputDesc, std::move (settings), counters);
523
+ auto actor = new TKqpStreamLookupActor (std::move (args), std::move (settings), counters);
509
524
return {actor, actor};
510
525
}
511
526
0 commit comments