Skip to content

Commit ff32cf3

Browse files
nshestakovniksavelievAlek5andr-KotovsiarheivesialouFloatingCrowbar
authored andcommitted
Topics improvements for 24-3 (ydb-platform#8605)
Co-authored-by: niksaveliev <[email protected]> Co-authored-by: Alek5andr-Kotov <[email protected]> Co-authored-by: Sergey Veselov <[email protected]> Co-authored-by: FloatingCrowbar <[email protected]>
1 parent 7bc58fd commit ff32cf3

File tree

135 files changed

+7845
-1721
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+7845
-1721
lines changed

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,12 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
153153
if (key == NYdb::YDB_DATABASE_HEADER) {
154154
return GetDatabaseName();
155155
}
156-
return TMaybe<TString>{};
156+
auto valueIt = PeerMeta.find(key);
157+
return valueIt == PeerMeta.end() ? Nothing() : TMaybe<TString>(valueIt->second);
158+
}
159+
160+
void PutPeerMeta(const TString& key, const TString& value) {
161+
PeerMeta.insert_or_assign(key, value);
157162
}
158163

159164
TVector<TStringBuf> FindClientCert() const override {
@@ -278,6 +283,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
278283
const bool InternalCall;
279284
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken;
280285
const TString EmptySerializedTokenMessage_;
286+
TMap<TString, TString> PeerMeta;
281287
google::protobuf::Arena Arena;
282288
};
283289

@@ -318,6 +324,41 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest
318324
return DoLocalRpc<TRpc>(std::move(proto), database, token, Nothing(), actorSystem, internalCall);
319325
}
320326

327+
template<typename TRpc>
328+
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(
329+
typename TRpc::TRequest&& proto,
330+
const TString& database,
331+
const TMaybe<TString>& token,
332+
const TMaybe<TString>& requestType,
333+
TActorSystem* actorSystem,
334+
const TMap<TString, TString>& peerMeta,
335+
bool internalCall = false
336+
)
337+
{
338+
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();
339+
340+
SetRequestSyncOperationMode(proto);
341+
342+
using TCbWrapper = TPromiseWrapper<typename TRpc::TResponse>;
343+
auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(
344+
std::move(proto),
345+
TCbWrapper(promise),
346+
database,
347+
token,
348+
requestType,
349+
internalCall
350+
);
351+
352+
for (const auto& [key, value] : peerMeta) {
353+
req->PutPeerMeta(key, value);
354+
}
355+
356+
auto actor = TRpc::CreateRpcActor(req);
357+
actorSystem->Register(actor, TMailboxType::HTSwap, actorSystem->AppData<TAppData>()->UserPoolId);
358+
359+
return promise.GetFuture();
360+
}
361+
321362
template<typename TRpc>
322363
TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb,
323364
const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& requestType,

ydb/core/grpc_services/service_ymq.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
#include <memory>
3+
4+
namespace NActors {
5+
struct TActorId;
6+
}
7+
8+
namespace NKikimr {
9+
namespace NGRpcService {
10+
11+
class IRequestOpCtx;
12+
class IFacilityProvider;
13+
14+
void DoYmqGetQueueUrlRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
15+
void DoYmqCreateQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
16+
void DoYmqSendMessageRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
17+
void DoYmqReceiveMessageRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
18+
void DoYmqGetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
19+
void DoYmqListQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
20+
void DoYmqDeleteMessageRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
21+
void DoYmqPurgeQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
22+
void DoYmqDeleteQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
23+
void DoYmqChangeMessageVisibilityRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
24+
void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
25+
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
26+
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
27+
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
28+
void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
29+
}
30+
}

ydb/core/http_proxy/events.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,40 @@ namespace NKikimr::NHttpProxy {
148148
};
149149
};
150150

151+
enum TEv {
152+
EvYmqCloudAuthResponse
153+
};
154+
155+
struct TEvYmqCloudAuthResponse: public TEventLocal<
156+
TEvYmqCloudAuthResponse,
157+
EvYmqCloudAuthResponse> {
158+
struct TError {
159+
TString ErrorCode;
160+
ui32 HttpStatusCode;
161+
TString Message;
162+
};
163+
164+
bool IsSuccess;
165+
166+
TString CloudId;
167+
TString FolderId;
168+
TString Sid;
169+
170+
TMaybe<TError> Error;
171+
172+
TEvYmqCloudAuthResponse(const TString& cloudId, const TString& folderId, const TString& sid)
173+
: IsSuccess(true)
174+
, CloudId(cloudId)
175+
, FolderId(folderId)
176+
, Sid(sid)
177+
, Error(Nothing())
178+
{}
179+
180+
TEvYmqCloudAuthResponse(TError& error)
181+
: IsSuccess(false)
182+
, Error(error)
183+
{}
184+
};
151185

152186
inline TActorId MakeAccessServiceID() {
153187
static const char x[12] = "accss_srvce";
@@ -184,6 +218,11 @@ namespace NKikimr::NHttpProxy {
184218
return TActorId(0, TStringBuf(x, 12));
185219
}
186220

221+
inline TActorId MakeFolderServiceID() {
222+
static const char x[12] = "folder_svc";
223+
return TActorId(0, TStringBuf(x, 12));
224+
}
225+
187226
#define LOG_SP_ERROR_S(actorCtxOrSystem, component, stream) LOG_ERROR_S(actorCtxOrSystem, component, LogPrefix() << " " << stream)
188227
#define LOG_SP_WARN_S(actorCtxOrSystem, component, stream) LOG_WARN_S(actorCtxOrSystem, component, LogPrefix() << " " << stream)
189228
#define LOG_SP_INFO_S(actorCtxOrSystem, component, stream) LOG_INFO_S(actorCtxOrSystem, component, LogPrefix() << " " << stream)

0 commit comments

Comments
 (0)