Skip to content

Commit 971b56e

Browse files
Merge afec12b into 9367424
2 parents 9367424 + afec12b commit 971b56e

File tree

5 files changed

+158
-8
lines changed

5 files changed

+158
-8
lines changed

ydb/core/fq/libs/http_api_client/http_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,21 @@ def create_query(
149149

150150
self._validate_http_error(response, expected_code=expected_code)
151151
return response.json()["id"]
152+
153+
def start_query(
154+
self,
155+
query_id: str,
156+
idempotency_key: str | None = None,
157+
expected_code: int = 204,
158+
):
159+
response = self.session.get(
160+
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"),
161+
headers=self._build_headers(idempotency_key=idempotency_key),
162+
params=self._build_params(),
163+
)
164+
165+
self._validate_http_error(response, expected_code)
166+
return response.json()["status"]
152167

153168
def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
154169
response = self.session.get(

ydb/core/public_http/fq_handlers.h

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ void SetIdempotencyKey(T& dst, const TString& key) {
249249

250250
template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType>
251251
class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> {
252+
protected:
252253
THttpRequestContext RequestContext;
253254

254255
typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory;
@@ -349,14 +350,13 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
349350
Y_ABORT_UNLESS(resp->GetArena());
350351
Y_UNUSED(status);
351352
auto* typedResponse = static_cast<TGrpcProtoResponseType*>(resp);
352-
if (!typedResponse->operation().result().template Is<TGrpcProtoResultType>()) {
353-
TStringStream json;
354-
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
355-
FqConvert(typedResponse->operation(), *httpResult);
356-
FqPackToJson(json, *httpResult, jsonSettings);
357-
358-
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
359-
return;
353+
if (!typedResponse->operation().result().template Is<TGrpcProtoResultType>()) {
354+
TStringStream json;
355+
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
356+
FqConvert(typedResponse->operation(), *httpResult);
357+
FqPackToJson(json, *httpResult, jsonSettings);
358+
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
359+
return;
360360
}
361361

362362
auto* grpcResult = google::protobuf::Arena::CreateMessage<TGrpcProtoResultType>(resp->GetArena());
@@ -396,4 +396,71 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus);
396396
DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery);
397397
DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData);
398398

399+
#define TGrpcCallWrapperBase TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>
400+
401+
class TStartQueryRequest : public TGrpcCallWrapperBase {
402+
public:
403+
TStartQueryRequest(const THttpRequestContext& ctx)
404+
: TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall)
405+
{}
406+
407+
void Bootstrap(const TActorContext& ctx) {
408+
auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>();
409+
if (!Parse(*describeRequest)) {
410+
this->Die(ctx);
411+
return;
412+
}
413+
414+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(
415+
RequestContext,
416+
std::move(describeRequest),
417+
[&, this, query_id = describeRequest->Getquery_id()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {
418+
Y_ABORT_UNLESS(resp);
419+
Y_ABORT_UNLESS(resp->GetArena());
420+
421+
auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp);
422+
if (!typedResponse->operation().result().template Is<FederatedQuery::DescribeQueryResult>()) {
423+
TStringStream json;
424+
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
425+
FqConvert(typedResponse->operation(), *httpResult);
426+
FqPackToJson(json, *httpResult, jsonSettings);
427+
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
428+
this->Die(ctx);
429+
return;
430+
}
431+
432+
FederatedQuery::DescribeQueryResult* describeResult = google::protobuf::Arena::CreateMessage<FederatedQuery::DescribeQueryResult>(resp->GetArena());
433+
typedResponse->operation().result().UnpackTo(describeResult);
434+
435+
436+
// modify
437+
auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(google::protobuf::Arena::CreateMessage<FederatedQuery::ModifyQueryRequest>(resp->GetArena()));
438+
439+
modifyRequest->set_query_id(query_id);
440+
auto content = describeResult->Getquery().content();
441+
modifyRequest->set_allocated_content(&content);
442+
modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN);
443+
modifyRequest->set_allocated_disposition(nullptr);
444+
modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED);
445+
modifyRequest->set_previous_revision(describeResult->Getquery().meta().Getlast_job_query_revision());
446+
modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey());
447+
448+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper(
449+
this->RequestContext,
450+
std::move(modifyRequest),
451+
TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply
452+
);
453+
454+
// new event -> new EventFactory
455+
EventFactory = &NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall;
456+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContextModify).release());
457+
this->Die(ctx);
458+
});
459+
460+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContext).release());
461+
}
462+
};
463+
464+
#undef TGrpcCallWrapperBase
465+
399466
} // namespace NKikimr::NPublicHttp

ydb/core/public_http/http_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ namespace {
5757
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>());
5858
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>());
5959
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>());
60+
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TStartQueryRequest>());
6061
}
6162

6263
void Bootstrap(const TActorContext& ctx) {

ydb/core/public_http/openapi/openapi.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,31 @@ paths:
185185
required: true
186186
schema:
187187
type: string
188+
'/queries/{query_id}/start':
189+
post:
190+
responses:
191+
'204':
192+
description: No Content
193+
'400':
194+
description: Bad Request
195+
content:
196+
application/json:
197+
schema:
198+
$ref: '#/components/schemas/GenericError'
199+
parameters:
200+
- $ref: '#/components/parameters/Idempotency-Key'
201+
- $ref: '#/components/parameters/Authorization'
202+
- $ref: '#/components/parameters/x-request-id'
203+
- $ref: '#/components/parameters/db'
204+
- $ref: '#/components/parameters/project'
205+
summary: start stopped query
206+
operationId: start-query
207+
parameters:
208+
- name: query_id
209+
in: path
210+
required: true
211+
schema:
212+
type: string
188213
'/queries/{query_id}/results/{result_set_index}':
189214
parameters:
190215
- name: query_id

ydb/tests/fq/http_api/test_http_api.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def test_simple_analytics_query(self):
9898
response = client.stop_query(query_id)
9999
assert response.status_code == 204
100100

101+
response = client.start_query(query_id)
102+
assert response.status_code == 204
103+
104+
response = client.stop_query(query_id)
105+
assert response.status_code == 204
106+
101107
def test_empty_query(self):
102108
with self.create_client() as client:
103109
with pytest.raises(
@@ -109,6 +115,14 @@ def test_empty_query(self):
109115
): # noqa
110116
client.create_query()
111117

118+
def test_invalid_id(self):
119+
with self.create_client() as client:
120+
resp = client.stop_quert(query_id="nevalidno")
121+
assert resp.status_code == 404
122+
123+
resp = client.start_query(query_id="snova nevalidno")
124+
assert resp.status_code == 404
125+
112126
def test_warning(self):
113127
with self.create_client() as client:
114128
query_id = client.create_query(query_text="select 10000000000000000000+1")
@@ -228,6 +242,31 @@ def test_stop_idempotency(self):
228242
self.streaming_over_kikimr.compute_plane.start()
229243
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
230244

245+
def test_restart_idempotency(self):
246+
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
247+
self.streaming_over_kikimr.compute_plane.stop()
248+
query_id = c.create_query("select1", "select 1").result.query_id
249+
c.wait_query_status(query_id, fq.QueryMeta.STARTING)
250+
251+
with self.create_client() as client:
252+
response1 = client.stop_query(query_id, idempotency_key="Z")
253+
assert response1.status_code == 204
254+
255+
response2 = client.start_query(query_id, idempotency_key="Z")
256+
assert response2.status_code == 204
257+
258+
response2 = client.start_query(query_id, idempotency_key="Z")
259+
assert response2.status_code == 204
260+
261+
response2 = client.start_query(query_id)
262+
assert response2.status_code == 400
263+
264+
response1 = client.stop_query(query_id, idempotency_key="Z")
265+
assert response1.status_code == 204
266+
267+
self.streaming_over_kikimr.compute_plane.start()
268+
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
269+
231270
def test_simple_streaming_query(self):
232271
self.init_topics("simple_streaming_query", create_output=False)
233272
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
@@ -259,6 +298,9 @@ def test_simple_streaming_query(self):
259298
response = client.stop_query(query_id)
260299
assert response.status_code == 204
261300

301+
response = client.start_query(query_id)
302+
assert response.status_code == 204
303+
262304
wait_for_query_status(client, query_id, ["FAILED"])
263305

264306
query_json2 = client.get_query(query_id)

0 commit comments

Comments
 (0)