Skip to content

Commit 2972578

Browse files
Merge 0c1d0f2 into 3d45056
2 parents 3d45056 + 0c1d0f2 commit 2972578

File tree

6 files changed

+190
-11
lines changed

6 files changed

+190
-11
lines changed

ydb/core/fq/libs/http_api_client/http_client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from .query_results import YQResults
1313

14-
MAX_RETRY_FOR_SESSION = 4
14+
MAX_RETRY_FOR_SESSION = 100
1515
BACK_OFF_FACTOR = 0.3
1616
TIME_BETWEEN_RETRIES = 1000
1717
ERROR_CODES = (500, 502, 504)
@@ -149,6 +149,21 @@ def create_query(
149149

150150
self._validate_http_error(response, expected_code=expected_code)
151151
return response.json()["id"]
152+
153+
def start_query(
154+
self,
155+
query_id: str,
156+
idempotency_key: str | None = None,
157+
expected_code: int = 204,
158+
):
159+
response = self.session.get(
160+
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"),
161+
headers=self._build_headers(idempotency_key=idempotency_key),
162+
params=self._build_params(),
163+
)
164+
165+
self._validate_http_error(response, expected_code)
166+
return response.json()["status"]
152167

153168
def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
154169
response = self.session.get(

ydb/core/public_http/fq_handlers.h

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <ydb/library/services/services.pb.h>
1111
#include <ydb/core/public_http/protos/fq.pb.h>
1212

13+
#include <type_traits>
14+
1315
namespace NKikimr::NPublicHttp {
1416

1517
using namespace NActors;
@@ -249,6 +251,7 @@ void SetIdempotencyKey(T& dst, const TString& key) {
249251

250252
template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType>
251253
class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> {
254+
protected:
252255
THttpRequestContext RequestContext;
253256

254257
typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory;
@@ -278,6 +281,11 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
278281
}
279282

280283
void Bootstrap(const TActorContext& ctx) {
284+
Bootstrap_wrapper(ctx);
285+
}
286+
287+
virtual void Bootstrap_wrapper(const TActorContext& ctx) {
288+
std::cerr << "Call base\n";
281289
auto grpcRequest = std::make_unique<TGrpcProtoRequestType>();
282290
if (Parse(*grpcRequest)) {
283291
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply);
@@ -349,14 +357,13 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
349357
Y_ABORT_UNLESS(resp->GetArena());
350358
Y_UNUSED(status);
351359
auto* typedResponse = static_cast<TGrpcProtoResponseType*>(resp);
352-
if (!typedResponse->operation().result().template Is<TGrpcProtoResultType>()) {
353-
TStringStream json;
354-
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
355-
FqConvert(typedResponse->operation(), *httpResult);
356-
FqPackToJson(json, *httpResult, jsonSettings);
357-
358-
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
359-
return;
360+
if (!typedResponse->operation().result().template Is<TGrpcProtoResultType>()) {
361+
TStringStream json;
362+
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
363+
FqConvert(typedResponse->operation(), *httpResult);
364+
FqPackToJson(json, *httpResult, jsonSettings);
365+
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
366+
return;
360367
}
361368

362369
auto* grpcResult = google::protobuf::Arena::CreateMessage<TGrpcProtoResultType>(resp->GetArena());
@@ -396,4 +403,88 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus);
396403
DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery);
397404
DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData);
398405

406+
#define TGrpcCallWrapperBase TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>
407+
408+
class TStartQueryRequest : public TGrpcCallWrapperBase {
409+
public:
410+
TStartQueryRequest(const THttpRequestContext& ctx)
411+
: TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall)
412+
{}
413+
414+
void Bootstrap_wrapper(const TActorContext& ctx) override {
415+
std::cerr << "Call derived\n";
416+
417+
auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>();
418+
if (!Parse(*describeRequest)) {
419+
this->Die(ctx);
420+
return;
421+
}
422+
423+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(
424+
RequestContext,
425+
std::move(describeRequest),
426+
[&, this, query_id = describeRequest->Getquery_id()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {
427+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response-1"); return;
428+
Y_ABORT_UNLESS(resp);
429+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response-2"); return;
430+
Y_ABORT_UNLESS(resp->GetArena());
431+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response-3"); return;
432+
433+
auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp);
434+
if (!typedResponse->operation().result().template Is<FederatedQuery::DescribeQueryResult>()) {
435+
TStringStream json;
436+
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
437+
FqConvert(typedResponse->operation(), *httpResult);
438+
FqPackToJson(json, *httpResult, jsonSettings);
439+
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
440+
this->Die(ctx);
441+
return;
442+
}
443+
444+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response1"); return;
445+
446+
FederatedQuery::DescribeQueryResult* describeResult = google::protobuf::Arena::CreateMessage<FederatedQuery::DescribeQueryResult>(resp->GetArena());
447+
if(!typedResponse->operation().result().UnpackTo(describeResult)) {
448+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Error in response unpack");
449+
this->Die(ctx);
450+
return;
451+
}
452+
453+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response2"); return;
454+
455+
456+
// modify
457+
auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(google::protobuf::Arena::CreateMessage<FederatedQuery::ModifyQueryRequest>(resp->GetArena()));
458+
459+
modifyRequest->set_query_id(query_id);
460+
auto content = describeResult->Getquery().content();
461+
modifyRequest->set_allocated_content(&content);
462+
modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN);
463+
modifyRequest->set_allocated_disposition(nullptr);
464+
modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED);
465+
modifyRequest->set_previous_revision(describeResult->Getquery().meta().Getlast_job_query_revision());
466+
modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey());
467+
468+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response3"); return;
469+
470+
TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper(
471+
this->RequestContext,
472+
std::move(modifyRequest),
473+
TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply
474+
);
475+
476+
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Debug response4"); return;
477+
478+
// new event -> new EventFactory
479+
EventFactory = &NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall;
480+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContextModify).release());
481+
this->Die(ctx);
482+
});
483+
484+
ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContext).release());
485+
}
486+
};
487+
488+
#undef TGrpcCallWrapperBase
489+
399490
} // namespace NKikimr::NPublicHttp

ydb/core/public_http/http_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ namespace {
5757
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>());
5858
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>());
5959
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>());
60+
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TStartQueryRequest>());
6061
}
6162

6263
void Bootstrap(const TActorContext& ctx) {

ydb/core/public_http/openapi/openapi.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,31 @@ paths:
185185
required: true
186186
schema:
187187
type: string
188+
'/queries/{query_id}/start':
189+
post:
190+
responses:
191+
'204':
192+
description: No Content
193+
'400':
194+
description: Bad Request
195+
content:
196+
application/json:
197+
schema:
198+
$ref: '#/components/schemas/GenericError'
199+
parameters:
200+
- $ref: '#/components/parameters/Idempotency-Key'
201+
- $ref: '#/components/parameters/Authorization'
202+
- $ref: '#/components/parameters/x-request-id'
203+
- $ref: '#/components/parameters/db'
204+
- $ref: '#/components/parameters/project'
205+
summary: start stopped query
206+
operationId: start-query
207+
parameters:
208+
- name: query_id
209+
in: path
210+
required: true
211+
schema:
212+
type: string
188213
'/queries/{query_id}/results/{result_set_index}':
189214
parameters:
190215
- name: query_id

ydb/library/login/login.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ void TLoginProvider::TImpl::GenerateKeyPair(TString& publicKey, TString& private
547547
TString TLoginProvider::TImpl::GenerateHash(const TString& password) {
548548
char salt[SALT_SIZE];
549549
char hash[HASH_SIZE];
550-
RAND_bytes(reinterpret_cast<unsigned char*>(salt), SALT_SIZE);
550+
// RAND_bytes(reinterpret_cast<unsigned char*>(salt), SALT_SIZE);
551551
ArgonHasher->Hash(
552552
reinterpret_cast<const ui8*>(password.data()),
553553
password.size(),

ydb/tests/fq/http_api/test_http_api.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from yaml.loader import SafeLoader
1111

1212
import library.python.retry as retry
13+
import time
1314

1415
from test_base import TestBase
1516
from ydb.core.fq.libs.http_api_client.http_client import YQHttpClientConfig, YQHttpClient, YQHttpClientException
@@ -19,7 +20,11 @@
1920

2021
@retry.retry(retry.RetryConf().upto(10))
2122
def wait_for_query_status(client, query_id, statuses):
22-
status = client.get_query_status(query_id)
23+
while True:
24+
status = client.get_query_status(query_id)
25+
if status in statuses:
26+
return status
27+
time.sleep(1)
2328
if status not in statuses:
2429
raise Exception(f"Status {status} is not in {statuses}")
2530
return status
@@ -98,6 +103,12 @@ def test_simple_analytics_query(self):
98103
response = client.stop_query(query_id)
99104
assert response.status_code == 204
100105

106+
response = client.start_query(query_id)
107+
assert response.status_code == 204
108+
109+
response = client.stop_query(query_id)
110+
assert response.status_code == 204
111+
101112
def test_empty_query(self):
102113
with self.create_client() as client:
103114
with pytest.raises(
@@ -109,6 +120,14 @@ def test_empty_query(self):
109120
): # noqa
110121
client.create_query()
111122

123+
def test_invalid_id(self):
124+
with self.create_client() as client:
125+
resp = client.stop_quert(query_id="nevalidno")
126+
assert resp.status_code == 404
127+
128+
resp = client.start_query(query_id="snova nevalidno")
129+
assert resp.status_code == 404
130+
112131
def test_warning(self):
113132
with self.create_client() as client:
114133
query_id = client.create_query(query_text="select 10000000000000000000+1")
@@ -228,6 +247,31 @@ def test_stop_idempotency(self):
228247
self.streaming_over_kikimr.compute_plane.start()
229248
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
230249

250+
def test_restart_idempotency(self):
251+
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
252+
self.streaming_over_kikimr.compute_plane.stop()
253+
query_id = c.create_query("select1", "select 1").result.query_id
254+
c.wait_query_status(query_id, fq.QueryMeta.STARTING)
255+
256+
with self.create_client() as client:
257+
response1 = client.stop_query(query_id, idempotency_key="Z")
258+
assert response1.status_code == 204
259+
260+
response2 = client.start_query(query_id, idempotency_key="Z")
261+
assert response2.status_code == 204
262+
263+
response2 = client.start_query(query_id, idempotency_key="Z")
264+
assert response2.status_code == 204
265+
266+
response2 = client.start_query(query_id)
267+
assert response2.status_code == 400
268+
269+
response1 = client.stop_query(query_id, idempotency_key="Z")
270+
assert response1.status_code == 204
271+
272+
self.streaming_over_kikimr.compute_plane.start()
273+
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
274+
231275
def test_simple_streaming_query(self):
232276
self.init_topics("simple_streaming_query", create_output=False)
233277
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
@@ -259,6 +303,9 @@ def test_simple_streaming_query(self):
259303
response = client.stop_query(query_id)
260304
assert response.status_code == 204
261305

306+
response = client.start_query(query_id)
307+
assert response.status_code == 204
308+
262309
wait_for_query_status(client, query_id, ["FAILED"])
263310

264311
query_json2 = client.get_query(query_id)

0 commit comments

Comments
 (0)