Skip to content

Commit 27d8720

Browse files
what-the-fawkArseny Bolotnikov
and
Arseny Bolotnikov
authored
Http start query (#11759)
Co-authored-by: Arseny Bolotnikov <[email protected]>
1 parent dfda963 commit 27d8720

File tree

5 files changed

+150
-6
lines changed

5 files changed

+150
-6
lines changed

ydb/core/fq/libs/http_api_client/http_client.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from .query_results import YQResults
1313

14-
MAX_RETRY_FOR_SESSION = 4
14+
MAX_RETRY_FOR_SESSION = 100
1515
BACK_OFF_FACTOR = 0.3
1616
TIME_BETWEEN_RETRIES = 1000
1717
ERROR_CODES = (500, 502, 504)
@@ -150,6 +150,22 @@ def create_query(
150150
self._validate_http_error(response, expected_code=expected_code)
151151
return response.json()["id"]
152152

153+
def start_query(
154+
self,
155+
query_id: str,
156+
request_id=None,
157+
idempotency_key: str | None = None,
158+
expected_code: int = 204,
159+
):
160+
response = self.session.post(
161+
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"),
162+
headers=self._build_headers(idempotency_key=idempotency_key, request_id=request_id),
163+
params=self._build_params(),
164+
)
165+
166+
self._validate_http_error(response, expected_code)
167+
return response
168+
153169
def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
154170
response = self.session.get(
155171
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/status"),
@@ -272,9 +288,7 @@ def get_query_result_set(self, query_id: str, result_set_index: int, raw_format:
272288

273289
return YQResults(result).results
274290

275-
def get_query_all_result_sets(
276-
self, query_id: str, result_set_count: int, raw_format: bool = False
277-
) -> Any:
291+
def get_query_all_result_sets(self, query_id: str, result_set_count: int, raw_format: bool = False) -> Any:
278292
result = []
279293
for i in range(0, result_set_count):
280294
r = self.get_query_result_set(query_id, result_set_index=i, raw_format=raw_format)

ydb/core/public_http/fq_handlers.h

+75-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <ydb/library/services/services.pb.h>
1111
#include <ydb/core/public_http/protos/fq.pb.h>
1212

13+
#include <type_traits>
14+
1315
namespace NKikimr::NPublicHttp {
1416

1517
using namespace NActors;
@@ -249,6 +251,7 @@ void SetIdempotencyKey(T& dst, const TString& key) {
249251

250252
template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType>
251253
class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> {
254+
protected:
252255
THttpRequestContext RequestContext;
253256

254257
typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory;
@@ -278,6 +281,10 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
278281
}
279282

280283
void Bootstrap(const TActorContext& ctx) {
284+
BootstrapWrapper(ctx);
285+
}
286+
287+
virtual void BootstrapWrapper(const TActorContext& ctx) {
281288
auto grpcRequest = std::make_unique<TGrpcProtoRequestType>();
282289
if (Parse(*grpcRequest)) {
283290
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply);
@@ -354,7 +361,6 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
354361
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
355362
FqConvert(typedResponse->operation(), *httpResult);
356363
FqPackToJson(json, *httpResult, jsonSettings);
357-
358364
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
359365
return;
360366
}
@@ -396,4 +402,72 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus);
396402
DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery);
397403
DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData);
398404

405+
class TJsonStartQuery : public TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> {
406+
public:
407+
typedef TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> TGrpcCallWrapperBase;
408+
409+
TJsonStartQuery(const THttpRequestContext& ctx)
410+
: TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall)
411+
{}
412+
413+
void BootstrapWrapper(const TActorContext& ctx) override {
414+
415+
auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>();
416+
if (!Parse(*describeRequest)) {
417+
this->Die(ctx);
418+
return;
419+
}
420+
421+
TProtoStringType queryId = describeRequest->Getquery_id();
422+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = MakeIntrusive<TGrpcRequestContextWrapper>(
423+
RequestContext,
424+
std::move(describeRequest),
425+
[query_id = std::move(queryId), actorSystem = TActivationContext::ActorSystem()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {
426+
427+
Y_ABORT_UNLESS(resp);
428+
Y_ABORT_UNLESS(resp->GetArena());
429+
430+
auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp);
431+
if (!typedResponse->operation().result().template Is<FederatedQuery::DescribeQueryResult>()) {
432+
TStringStream json;
433+
auto httpResult = std::unique_ptr<FQHttp::Error>(new FQHttp::Error());
434+
FqConvert(typedResponse->operation(), *httpResult);
435+
FqPackToJson(json, *httpResult, jsonSettings);
436+
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
437+
return;
438+
}
439+
440+
std::unique_ptr<FederatedQuery::DescribeQueryResult> describeResult = std::unique_ptr<FederatedQuery::DescribeQueryResult>(new FederatedQuery::DescribeQueryResult());
441+
if (!typedResponse->operation().result().UnpackTo(&*describeResult)) {
442+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Error in response unpack");
443+
return;
444+
}
445+
446+
// modify
447+
auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(new FederatedQuery::ModifyQueryRequest());
448+
449+
modifyRequest->set_query_id(query_id);
450+
*modifyRequest->mutable_content() = describeResult->query().content();
451+
modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN);
452+
modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED);
453+
modifyRequest->set_previous_revision(describeResult->query().meta().Getlast_job_query_revision());
454+
modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey());
455+
456+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper(
457+
requestContext,
458+
std::move(modifyRequest),
459+
TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply
460+
);
461+
462+
// new event -> new EventFactory
463+
actorSystem->Send(NGRpcService::CreateGRpcRequestProxyId(), NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall(std::move(requestContextModify)).release());
464+
});
465+
466+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(std::move(requestContext)).release());
467+
this->Die(ctx);
468+
}
469+
};
470+
471+
#undef TGrpcCallWrapperBase
472+
399473
} // namespace NKikimr::NPublicHttp

ydb/core/public_http/http_service.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ namespace {
5757
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>());
5858
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>());
5959
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>());
60+
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TJsonStartQuery>());
6061
}
6162

6263
void Bootstrap(const TActorContext& ctx) {

ydb/core/public_http/openapi/openapi.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,31 @@ paths:
185185
required: true
186186
schema:
187187
type: string
188+
'/queries/{query_id}/start':
189+
post:
190+
responses:
191+
'204':
192+
description: No Content
193+
'400':
194+
description: Bad Request
195+
content:
196+
application/json:
197+
schema:
198+
$ref: '#/components/schemas/GenericError'
199+
parameters:
200+
- $ref: '#/components/parameters/Idempotency-Key'
201+
- $ref: '#/components/parameters/Authorization'
202+
- $ref: '#/components/parameters/x-request-id'
203+
- $ref: '#/components/parameters/db'
204+
- $ref: '#/components/parameters/project'
205+
summary: start stopped query
206+
operationId: start-query
207+
parameters:
208+
- name: query_id
209+
in: path
210+
required: true
211+
schema:
212+
type: string
188213
'/queries/{query_id}/results/{result_set_index}':
189214
parameters:
190215
- name: query_id

ydb/tests/fq/http_api/test_http_api.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def test_simple_analytics_query(self):
7676
assert len(query_id) == 20
7777

7878
status = client.get_query_status(query_id)
79-
assert status in ["FAILED", "RUNNING", "COMPLETED"]
79+
assert status in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"]
8080

8181
wait_for_query_status(client, query_id, ["COMPLETED"])
8282
query_json = client.get_query(query_id)
@@ -98,6 +98,14 @@ def test_simple_analytics_query(self):
9898
response = client.stop_query(query_id)
9999
assert response.status_code == 204
100100

101+
response = client.start_query(query_id)
102+
assert response.status_code == 204
103+
104+
assert client.get_query_status(query_id) in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"]
105+
106+
response = client.stop_query(query_id)
107+
assert response.status_code == 204
108+
101109
def test_empty_query(self):
102110
with self.create_client() as client:
103111
with pytest.raises(
@@ -228,6 +236,28 @@ def test_stop_idempotency(self):
228236
self.streaming_over_kikimr.compute_plane.start()
229237
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
230238

239+
def test_restart_idempotency(self):
240+
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
241+
self.streaming_over_kikimr.compute_plane.stop()
242+
query_id = c.create_query("select1", "select 1").result.query_id
243+
c.wait_query_status(query_id, fq.QueryMeta.STARTING)
244+
245+
with self.create_client() as client:
246+
response1 = client.stop_query(query_id, idempotency_key="Z")
247+
assert response1.status_code == 204
248+
249+
response2 = client.start_query(query_id, idempotency_key="Z")
250+
assert response2.status_code == 204
251+
252+
response2 = client.start_query(query_id, idempotency_key="Z")
253+
assert response2.status_code == 204
254+
255+
response1 = client.stop_query(query_id, idempotency_key="Z")
256+
assert response1.status_code == 204
257+
258+
self.streaming_over_kikimr.compute_plane.start()
259+
c.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
260+
231261
def test_simple_streaming_query(self):
232262
self.init_topics("simple_streaming_query", create_output=False)
233263
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)

0 commit comments

Comments
 (0)