Skip to content

Commit 475b837

Browse files
committed
feat(microservices) add capability to use RegExp in Kafka Events
closes nestjs#3083
1 parent 21fb46e commit 475b837

File tree

4 files changed

+55
-8
lines changed

4 files changed

+55
-8
lines changed

packages/microservices/interfaces/message-handler.interface.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,8 @@ export interface MessageHandler<TInput = any, TContext = any, TResult = any> {
44
(data: TInput, ctx?: TContext): Promise<Observable<TResult>>;
55
isEventHandler?: boolean;
66
}
7+
8+
export interface RegExpMessageHandler {
9+
pattern: RegExp;
10+
messageHandler: MessageHandler;
11+
}

packages/microservices/server/server-kafka.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,11 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
9898
}
9999

100100
public async bindEvents(consumer: Consumer) {
101-
const registeredPatterns = [...this.messageHandlers.keys()];
102-
const subscribeToPattern = async (pattern: string) =>
101+
const registeredPatterns = [
102+
...this.messageHandlers.keys(),
103+
...this.regExpMessageHandlers.map(handler => handler.pattern),
104+
];
105+
const subscribeToPattern = async (pattern: string | RegExp) =>
103106
consumer.subscribe({
104107
topic: pattern,
105108
});

packages/microservices/server/server.ts

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
NatsOptions,
2323
ReadPacket,
2424
RedisOptions,
25+
RegExpMessageHandler,
2526
RmqOptions,
2627
TcpOptions,
2728
WritePacket,
@@ -34,6 +35,7 @@ import { NO_EVENT_HANDLER } from '../constants';
3435

3536
export abstract class Server {
3637
protected readonly messageHandlers = new Map<string, MessageHandler>();
38+
protected readonly regExpMessageHandlers = new Array<RegExpMessageHandler>();
3739
protected readonly logger = new Logger(Server.name);
3840
protected serializer: ConsumerSerializer;
3941
protected deserializer: ConsumerDeserializer;
@@ -43,9 +45,13 @@ export abstract class Server {
4345
callback: MessageHandler,
4446
isEventHandler = false,
4547
) {
46-
const route = transformPatternToRoute(pattern);
47-
callback.isEventHandler = isEventHandler;
48-
this.messageHandlers.set(route, callback);
48+
if (pattern.constructor.name === 'RegExp') {
49+
this.regExpMessageHandlers.push({ pattern, messageHandler: callback });
50+
} else {
51+
const route = transformPatternToRoute(pattern);
52+
callback.isEventHandler = isEventHandler;
53+
this.messageHandlers.set(route, callback);
54+
}
4955
}
5056

5157
public getHandlers(): Map<string, MessageHandler> {
@@ -54,9 +60,20 @@ export abstract class Server {
5460

5561
public getHandlerByPattern(pattern: string): MessageHandler | null {
5662
const route = this.getRouteFromPattern(pattern);
57-
return this.messageHandlers.has(route)
58-
? this.messageHandlers.get(route)
59-
: null;
63+
let handler = null;
64+
// Try to find the message handler by name
65+
if (this.messageHandlers.has(route)) {
66+
return this.messageHandlers.get(route);
67+
}
68+
69+
// If it was not found, iterate through the Regular Expression handlers
70+
this.regExpMessageHandlers.forEach(regExpHandler => {
71+
if (regExpHandler.pattern.exec(route) !== null) {
72+
handler = regExpHandler.messageHandler;
73+
}
74+
});
75+
76+
return handler;
6077
}
6178

6279
public send(

packages/microservices/test/server/server.spec.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Observable, of, throwError as _throw } from 'rxjs';
33
import * as sinon from 'sinon';
44
import { Server } from '../../server/server';
55
import * as Utils from '../../utils';
6+
import { RegExpMessageHandler } from '../../interfaces';
67

78
class TestServer extends Server {
89
public listen(callback: () => void) {}
@@ -197,6 +198,27 @@ describe('Server', () => {
197198
});
198199
});
199200

201+
describe('when handler exists and was added with a RegExp', () => {
202+
beforeEach(() => {
203+
sandbox.stub(server as any, 'regExpMessageHandlers').value([
204+
({
205+
pattern: /.*el.*/,
206+
messageHandler: callback,
207+
} as unknown) as RegExpMessageHandler,
208+
]);
209+
});
210+
211+
it('should return expected handler', () => {
212+
messageHandlersHasSpy.returns(false);
213+
214+
const value = server.getHandlerByPattern(handlerRoute);
215+
216+
expect(messageHandlersHasSpy.called).to.be.true;
217+
expect(messageHandlersGetSpy.called).to.be.false;
218+
expect(value).to.be.equal(callback);
219+
});
220+
});
221+
200222
describe('when handler does not exists', () => {
201223
it('should return null', () => {
202224
messageHandlersHasSpy.returns(false);

0 commit comments

Comments
 (0)