Skip to content

Commit feed223

Browse files
committed
feat(microservices): Allow RegExps in Patterns for Kafka
closes nestjs#3083
1 parent eb892ca commit feed223

File tree

6 files changed

+72
-15
lines changed

6 files changed

+72
-15
lines changed

Diff for: integration/microservices/e2e/sum-kafka.spec.ts

+15-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,21 @@ describe.skip('Kafka transport', function () {
9696
.send()
9797
.end(() => {
9898
setTimeout(() => {
99-
expect(KafkaController.IS_NOTIFIED).to.be.true;
99+
expect(KafkaMessagesController.IS_NOTIFIED).to.be.true;
100+
done();
101+
}, 1000);
102+
});
103+
});
104+
105+
it(`/POST (async event notification)`, done => {
106+
request(server)
107+
.post('/notifyRegex')
108+
.send()
109+
.end(() => {
110+
setTimeout(() => {
111+
expect(KafkaMessagesController.IS_REGEX_NOTIFIED).to.be.eq(
112+
'regex.notify.test-0',
113+
);
100114
done();
101115
}, 1000);
102116
});

Diff for: integration/microservices/src/kafka/kafka.controller.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import { UserDto } from './dtos/user.dto';
1515
@Controller()
1616
export class KafkaController implements OnModuleInit, OnModuleDestroy {
1717
protected readonly logger = new Logger(KafkaController.name);
18-
static IS_NOTIFIED = false;
1918
static MATH_SUM = 0;
2019

2120
@Client({
@@ -133,6 +132,11 @@ export class KafkaController implements OnModuleInit, OnModuleDestroy {
133132
return this.client.emit('notify', { notify: true });
134133
}
135134

135+
@Post('notifyRegex')
136+
async sendRegexNotification(): Promise<any> {
137+
return this.client.emit('regex.notify.test-0', { notify: true });
138+
}
139+
136140
// Complex data to send.
137141
@Post('/user')
138142
@HttpCode(200)

Diff for: integration/microservices/src/kafka/kafka.messages.controller.ts

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { Controller, Logger } from '@nestjs/common';
2-
import { EventPattern, MessagePattern } from '@nestjs/microservices';
2+
import {
3+
Ctx,
4+
EventPattern,
5+
KafkaContext,
6+
MessagePattern,
7+
} from '@nestjs/microservices';
38
import { BusinessDto } from './dtos/business.dto';
49
import { UserDto } from './dtos/user.dto';
510
import { BusinessEntity } from './entities/business.entity';
@@ -10,6 +15,7 @@ import { KafkaController } from './kafka.controller';
1015
export class KafkaMessagesController {
1116
protected readonly logger = new Logger(KafkaMessagesController.name);
1217
static IS_NOTIFIED = false;
18+
static IS_REGEX_NOTIFIED: any = false;
1319

1420
@MessagePattern('math.sum.sync.kafka.message')
1521
mathSumSyncKafkaMessage(data: any) {
@@ -53,7 +59,12 @@ export class KafkaMessagesController {
5359

5460
@EventPattern('notify')
5561
eventHandler(data: any) {
56-
KafkaController.IS_NOTIFIED = data.value.notify;
62+
KafkaMessagesController.IS_NOTIFIED = data.value.notify;
63+
}
64+
65+
@EventPattern(/regex\.notify\.test-[0-9]*/)
66+
regexHandler(data: any, @Ctx() context: KafkaContext) {
67+
KafkaMessagesController.IS_REGEX_NOTIFIED = context.getTopic();
5768
}
5869

5970
// Complex data to send.

Diff for: packages/microservices/server/server-kafka.ts

+15-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { KafkaLogger, KafkaParser } from '../helpers';
2828
import {
2929
CustomTransportStrategy,
3030
KafkaOptions,
31+
MessageHandler,
3132
OutgoingResponse,
3233
ReadPacket,
3334
} from '../interfaces';
@@ -49,6 +50,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
4950
protected clientId: string;
5051
protected groupId: string;
5152

53+
protected registeredPatterns: any[];
54+
5255
constructor(protected readonly options: KafkaOptions['options']) {
5356
super();
5457

@@ -120,13 +123,12 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
120123
}
121124

122125
public async bindEvents(consumer: Consumer) {
123-
const registeredPatterns = [...this.messageHandlers.keys()];
124126
const consumerSubscribeOptions = this.options.subscribe || {};
125127

126-
if (registeredPatterns.length > 0) {
128+
if (this.registeredPatterns.length > 0) {
127129
await this.consumer.subscribe({
128130
...consumerSubscribeOptions,
129-
topics: registeredPatterns,
131+
topics: this.registeredPatterns,
130132
});
131133
}
132134

@@ -317,4 +319,14 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
317319
protected initializeDeserializer(options: KafkaOptions['options']) {
318320
this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
319321
}
322+
323+
public addHandler(
324+
pattern: any,
325+
callback: MessageHandler,
326+
isEventHandler: boolean = false,
327+
extras: Record<string, any> = {},
328+
) {
329+
this.registeredPatterns.push(pattern);
330+
super.addHandler(pattern, callback, isEventHandler, extras);
331+
}
320332
}

Diff for: packages/microservices/test/server/server-kafka.spec.ts

+21-8
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,26 @@ describe('ServerKafka', () => {
180180
await server.listen(callback);
181181

182182
const pattern = 'test';
183-
const handler = sinon.spy();
184-
(server as any).messageHandlers = objectToMap({
185-
[pattern]: handler,
186-
});
183+
(server as any).registeredPatterns = [pattern];
184+
185+
await server.bindEvents((server as any).consumer);
186+
187+
expect(subscribe.called).to.be.true;
188+
expect(
189+
subscribe.calledWith({
190+
topics: [pattern],
191+
}),
192+
).to.be.true;
193+
194+
expect(run.called).to.be.true;
195+
expect(connect.called).to.be.true;
196+
});
197+
it('should call subscribe and run on consumer when there are RegExp messageHandlers', async () => {
198+
(server as any).logger = new NoopLogger();
199+
await server.listen(callback);
200+
201+
const pattern = /test/;
202+
(server as any).registeredPatterns = [pattern];
187203

188204
await server.bindEvents((server as any).consumer);
189205

@@ -204,10 +220,7 @@ describe('ServerKafka', () => {
204220
await server.listen(callback);
205221

206222
const pattern = 'test';
207-
const handler = sinon.spy();
208-
(server as any).messageHandlers = objectToMap({
209-
[pattern]: handler,
210-
});
223+
(server as any).registeredPatterns = [pattern];
211224

212225
await server.bindEvents((server as any).consumer);
213226

Diff for: packages/microservices/utils/transform-pattern.utils.ts

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ export function transformPatternToRoute(pattern: MsPattern): string {
2424
if (!isObject(pattern)) {
2525
return pattern;
2626
}
27+
if (pattern instanceof RegExp) {
28+
return pattern.source;
29+
}
2730

2831
const sortedKeys = Object.keys(pattern).sort((a, b) =>
2932
('' + a).localeCompare(b),

0 commit comments

Comments
 (0)