|
| 1 | + |
| 2 | +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h> |
| 3 | +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h> |
| 4 | +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h> |
| 5 | +#include <library/cpp/testing/unittest/registar.h> |
| 6 | + |
| 7 | +#undef IS_CTX_LOG_PRIORITY_ENABLED |
| 8 | +#define IS_CTX_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component, sampleBy) false |
| 9 | +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h> |
| 10 | + |
| 11 | +namespace NYql::NDq { |
| 12 | + |
| 13 | +Y_UNIT_TEST_SUITE(TComputeActorAsyncInputHelperTest) { |
| 14 | + |
| 15 | + struct TDummyDqComputeActorAsyncInput: IDqComputeActorAsyncInput { |
| 16 | + TDummyDqComputeActorAsyncInput() { |
| 17 | + Batch.emplace_back(NUdf::TUnboxedValue{}); |
| 18 | + Batch.emplace_back(NUdf::TUnboxedValue{}); |
| 19 | + } |
| 20 | + ui64 GetInputIndex() const override { |
| 21 | + return 4; |
| 22 | + } |
| 23 | + |
| 24 | + const TDqAsyncStats& GetIngressStats() const override{ |
| 25 | + static TDqAsyncStats stats; |
| 26 | + return stats; |
| 27 | + } |
| 28 | + |
| 29 | + i64 GetAsyncInputData( |
| 30 | + NKikimr::NMiniKQL::TUnboxedValueBatch& batch, |
| 31 | + TMaybe<TInstant>& watermark, |
| 32 | + bool& finished, |
| 33 | + i64 freeSpace) override |
| 34 | + { |
| 35 | + Y_ABORT_IF(Batch.empty()); |
| 36 | + batch = Batch; |
| 37 | + Y_UNUSED(watermark); |
| 38 | + Y_UNUSED(finished); |
| 39 | + Y_UNUSED(freeSpace); |
| 40 | + return 2; |
| 41 | + } |
| 42 | + |
| 43 | + // Checkpointing. |
| 44 | + void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) override { |
| 45 | + Y_UNUSED(checkpoint); |
| 46 | + Y_UNUSED(state); |
| 47 | + } |
| 48 | + void CommitState(const NDqProto::TCheckpoint& checkpoint) override { |
| 49 | + Y_UNUSED(checkpoint); |
| 50 | + } |
| 51 | + void LoadState(const NDqProto::TSourceState& state) override { |
| 52 | + Y_UNUSED(state); |
| 53 | + } |
| 54 | + |
| 55 | + void PassAway() override {} |
| 56 | + NKikimr::NMiniKQL::TUnboxedValueBatch Batch; |
| 57 | + }; |
| 58 | + |
| 59 | + struct TDummyAsyncInputHelper: TComputeActorAsyncInputHelper{ |
| 60 | + using TComputeActorAsyncInputHelper::TComputeActorAsyncInputHelper; |
| 61 | + i64 GetFreeSpace() const override{ |
| 62 | + return 10; |
| 63 | + } |
| 64 | + void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) override{ |
| 65 | + batch.clear(); |
| 66 | + Y_UNUSED(space); |
| 67 | + Y_UNUSED(finished); |
| 68 | + return; |
| 69 | + } |
| 70 | + }; |
| 71 | + |
| 72 | + Y_UNIT_TEST(PollAsyncInput) { |
| 73 | + NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, true); |
| 74 | + TDummyDqComputeActorAsyncInput input; |
| 75 | + TDummyAsyncInputHelper helper("MyPrefix", 13, NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED); |
| 76 | + helper.AsyncInput = &input; |
| 77 | + TDqComputeActorMetrics metrics{NMonitoring::TDynamicCounterPtr{}}; |
| 78 | + TDqComputeActorWatermarks watermarks(NActors::TActorIdentity{NActors::TActorId{}}, TTxId{}, 7); |
| 79 | + auto result = helper.PollAsyncInput(metrics, watermarks, 20); |
| 80 | + UNIT_ASSERT(result && EResumeSource::CAPollAsync == *result); |
| 81 | + } |
| 82 | +} |
| 83 | + |
| 84 | +} //namespace NYql::NDq |
0 commit comments