Skip to content

Commit 78c36f9

Browse files
Merge f51c50f into a18f18d
2 parents a18f18d + f51c50f commit 78c36f9

File tree

4 files changed

+128
-9
lines changed

4 files changed

+128
-9
lines changed

ydb/core/fq/libs/http_api_client/http_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,21 @@ def create_query(
149149

150150
self._validate_http_error(response, expected_code=expected_code)
151151
return response.json()["id"]
152+
153+
def restart_query(
154+
self,
155+
query_id: str,
156+
idempotency_key: str | None = None,
157+
expected_code: int = 204,
158+
):
159+
response = self.session.get(
160+
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"),
161+
headers=self._build_headers(idempotency_key=idempotency_key),
162+
params=self._build_params(),
163+
)
164+
165+
self._validate_http_error(response, expected_code)
166+
return response.json()["status"]
152167

153168
def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
154169
response = self.session.get(

ydb/core/public_http/fq_handlers.h

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ void SetIdempotencyKey(T& dst, const TString& key) {
249249

250250
template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType>
251251
class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> {
252+
protected:
252253
THttpRequestContext RequestContext;
253254

254255
typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory;
@@ -344,20 +345,22 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
344345
RequestContext.ResponseBadRequest(Ydb::StatusIds::BAD_REQUEST, error);
345346
}
346347

348+
#define CheckTypedResponse(typedResponse, ResultType) \
349+
if (!typedResponse->operation().result().template Is<ResultType>()) { \
350+
TStringStream json; \
351+
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena()); \
352+
FqConvert(typedResponse->operation(), *httpResult); \
353+
FqPackToJson(json, *httpResult, jsonSettings); \
354+
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str()); \
355+
return; \
356+
}
357+
347358
static void SendReply(const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {
348359
Y_ABORT_UNLESS(resp);
349360
Y_ABORT_UNLESS(resp->GetArena());
350361
Y_UNUSED(status);
351362
auto* typedResponse = static_cast<TGrpcProtoResponseType*>(resp);
352-
if (!typedResponse->operation().result().template Is<TGrpcProtoResultType>()) {
353-
TStringStream json;
354-
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
355-
FqConvert(typedResponse->operation(), *httpResult);
356-
FqPackToJson(json, *httpResult, jsonSettings);
357-
358-
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
359-
return;
360-
}
363+
CheckTypedResponse(typedResponse, TGrpcProtoResultType);
361364

362365
auto* grpcResult = google::protobuf::Arena::CreateMessage<TGrpcProtoResultType>(resp->GetArena());
363366
typedResponse->operation().result().UnpackTo(grpcResult);
@@ -396,4 +399,63 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus);
396399
DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery);
397400
DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData);
398401

402+
#define TGrpcCallWrapperBase TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>
403+
404+
class TRestartQueryRequest : public TGrpcCallWrapperBase {
405+
public:
406+
TRestartQueryRequest(const THttpRequestContext& ctx)
407+
: TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall)
408+
{}
409+
410+
void Bootstrap(const TActorContext& ctx) {
411+
auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>();
412+
if (!Parse(*describeRequest)) {
413+
this->Die(ctx);
414+
return;
415+
}
416+
417+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(
418+
RequestContext,
419+
std::move(describeRequest),
420+
[&, this, query_id = describeRequest->Getquery_id()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {
421+
Y_ABORT_UNLESS(resp);
422+
Y_ABORT_UNLESS(resp->GetArena());
423+
424+
auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp);
425+
CheckTypedResponse(typedResponse, FederatedQuery::DescribeQueryResult);
426+
427+
FederatedQuery::DescribeQueryResult* describeResult = google::protobuf::Arena::CreateMessage<FederatedQuery::DescribeQueryResult>(resp->GetArena());
428+
typedResponse->operation().result().UnpackTo(describeResult);
429+
430+
431+
// modify
432+
auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(google::protobuf::Arena::CreateMessage<FederatedQuery::ModifyQueryRequest>(resp->GetArena()));
433+
434+
modifyRequest->set_query_id(query_id);
435+
auto content = describeResult->Getquery().content();
436+
modifyRequest->set_allocated_content(&content);
437+
modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN);
438+
modifyRequest->set_allocated_disposition(nullptr);
439+
modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::FROM_LAST_CHECKPOINT);
440+
modifyRequest->set_previous_revision(describeResult->Getquery().meta().Getlast_job_query_revision());
441+
modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey());
442+
443+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper(
444+
this->RequestContext,
445+
std::move(modifyRequest),
446+
TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply
447+
);
448+
449+
// новый тип события, по идее нужна новая фабрика
450+
EventFactory = &NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall;
451+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContextModify).release());
452+
this->Die(ctx);
453+
});
454+
455+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContext).release());
456+
}
457+
};
458+
459+
#undef TGrpcCallWrapperBase
460+
399461
} // namespace NKikimr::NPublicHttp

ydb/core/public_http/http_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ namespace {
5757
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>());
5858
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>());
5959
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>());
60+
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TRestartQueryRequest>());
6061
}
6162

6263
void Bootstrap(const TActorContext& ctx) {

ydb/tests/fq/http_api/test_http_api.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def test_simple_analytics_query(self):
9898
response = client.stop_query(query_id)
9999
assert response.status_code == 204
100100

101+
response = client.restart_query(query_id)
102+
assert response.status_code == 204
103+
104+
response = client.stop_query(query_id)
105+
assert response.status_code == 204
106+
101107
def test_empty_query(self):
102108
with self.create_client() as client:
103109
with pytest.raises(
@@ -109,6 +115,14 @@ def test_empty_query(self):
109115
): # noqa
110116
client.create_query()
111117

118+
def test_invalid_id(self):
119+
with self.create_client() as client:
120+
resp = client.stop_quert(query_id="nevalidno")
121+
assert resp.status_code == 404
122+
123+
resp = client.restart_query(query_id="snova nevalidno")
124+
assert resp.status_code == 404
125+
112126
def test_warning(self):
113127
with self.create_client() as client:
114128
query_id = client.create_query(query_text="select 10000000000000000000+1")
@@ -228,6 +242,30 @@ def test_stop_idempotency(self):
228242
self.streaming_over_kikimr.compute_plane.start()
229243
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
230244

245+
def test_restart_idempotency(self):
246+
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
247+
self.streaming_over_kikimr.compute_plane.stop()
248+
query_id = c.create_query("select1", "select 1").result.query_id
249+
c.wait_query_status(query_id, fq.QueryMeta.STARTING)
250+
251+
with self.create_client() as client:
252+
response1 = client.stop_query(query_id, idempotency_key="Z")
253+
assert response1.status_code == 204
254+
255+
response2 = client.restart_query(query_id, idempotency_key="Z")
256+
assert response2.status_code == 204
257+
258+
response2 = client.restart_query(query_id, idempotency_key="Z")
259+
assert response2.status_code == 204
260+
261+
response2 = client.restart_query(query_id)
262+
assert response2.status_code == 400
263+
264+
265+
self.streaming_over_kikimr.compute_plane.start()
266+
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
267+
pass
268+
231269
def test_simple_streaming_query(self):
232270
self.init_topics("simple_streaming_query", create_output=False)
233271
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
@@ -259,6 +297,9 @@ def test_simple_streaming_query(self):
259297
response = client.stop_query(query_id)
260298
assert response.status_code == 204
261299

300+
response = client.restart_query(query_id)
301+
assert response.status_code == 204
302+
262303
wait_for_query_status(client, query_id, ["FAILED"])
263304

264305
query_json2 = client.get_query(query_id)

0 commit comments

Comments
 (0)