Skip to content

Commit f752c63

Browse files
committed
Minor change in poll method.
The issue is about sending POST requests for committing ofsets in `poll` method even if no messages in `poll` response. The backend sends back an error about invalid parameters of committing offsets requests in this case. This fix checks the count of messages in the poll response and sends committing requests only if messages are retrieved. Relates-To: OLPEDGE-2663 Signed-off-by: Oleksii Zubko <[email protected]>
1 parent f3f336c commit f752c63

File tree

2 files changed

+141
-30
lines changed

2 files changed

+141
-30
lines changed

@here/olp-sdk-dataservice-read/lib/client/StreamLayerClient.ts

+36-30
Original file line numberDiff line numberDiff line change
@@ -200,37 +200,43 @@ export class StreamLayerClient {
200200
// Update xCorrelationId
201201
this.xCorrelationId = consumeResponse.xCorrelationId;
202202

203-
// Commit offsets
204-
const latestOffsets: { [key: string]: number } = consumeResponse.data
205-
.map(msg => msg.offset)
206-
.reduce(
207-
(
208-
acc: { [key: string]: number },
209-
curr: StreamApi.StreamOffset
210-
) => {
211-
acc[curr.partition] =
212-
acc[curr.partition] > curr.offset
213-
? acc[curr.partition]
214-
: curr.offset;
215-
return acc;
203+
if (consumeResponse.data.length > 0) {
204+
// Commit offsets
205+
const latestOffsets: {
206+
[key: string]: number;
207+
} = consumeResponse.data
208+
.map(msg => msg.offset)
209+
.reduce(
210+
(
211+
acc: { [key: string]: number },
212+
curr: StreamApi.StreamOffset
213+
) => {
214+
acc[curr.partition] =
215+
acc[curr.partition] > curr.offset
216+
? acc[curr.partition]
217+
: curr.offset;
218+
return acc;
219+
},
220+
{}
221+
);
222+
223+
await StreamApi.doCommitOffsets(requestBuilder, {
224+
commitOffsets: {
225+
offsets: Object.keys(latestOffsets).map(key => ({
226+
partition: +key,
227+
offset: latestOffsets[key]
228+
}))
216229
},
217-
{}
218-
);
219-
220-
await StreamApi.doCommitOffsets(requestBuilder, {
221-
commitOffsets: {
222-
offsets: Object.keys(latestOffsets).map(key => ({
223-
partition: +key,
224-
offset: latestOffsets[key]
225-
}))
226-
},
227-
subscriptionId: request.getSubscriptionId(),
228-
mode: request.getMode(),
229-
layerId: this.layerId,
230-
xCorrelationId: this.xCorrelationId
231-
}).catch(async error => {
232-
console.log(`Commit offsets unsuccessful, error=${error.message}`);
233-
});
230+
subscriptionId: request.getSubscriptionId(),
231+
mode: request.getMode(),
232+
layerId: this.layerId,
233+
xCorrelationId: this.xCorrelationId
234+
}).catch(async error => {
235+
console.log(
236+
`Commit offsets unsuccessful, error=${error.message}`
237+
);
238+
});
239+
}
234240

235241
return Promise.resolve(consumeResponse.data);
236242
}

@here/olp-sdk-dataservice-read/test/unit/StreamLayerClient.test.ts

+105
Original file line numberDiff line numberDiff line change
@@ -619,4 +619,109 @@ describe("StreamLayerClient", function() {
619619
);
620620
});
621621
});
622+
623+
it("Should not post offsets if no data", async function() {
624+
const headers = new Headers();
625+
headers.append("X-Correlation-Id", "9141392.f96875c-9422-4df4-bdfj");
626+
const mockResponse = ({
627+
headers,
628+
json: function() {
629+
return {
630+
messages: []
631+
};
632+
}
633+
} as unknown) as Response;
634+
pollStub.callsFake(
635+
(builder: any, params: any): Promise<Response> => {
636+
return Promise.resolve(mockResponse);
637+
}
638+
);
639+
640+
let settings = sandbox.createStubInstance(core.OlpClientSettings);
641+
const subscribtionId = await streamLayerClient.subscribe(
642+
new dataServiceRead.SubscribeRequest().withMode("serial")
643+
);
644+
const request = new dataServiceRead.PollRequest().withSubscriptionId(
645+
subscribtionId
646+
);
647+
const messages = await streamLayerClient.poll(request);
648+
649+
assert.isDefined(messages);
650+
expect(messages.length).to.be.equal(0);
651+
expect(commitOffsetsStub.notCalled);
652+
});
653+
654+
it("Should not throw if commiting offsets failed", async function() {
655+
const headers = new Headers();
656+
headers.append("X-Correlation-Id", "9141392.f96875c-9422-4df4-bdfj");
657+
const mockResponse = ({
658+
headers,
659+
json: function() {
660+
return {
661+
messages: [
662+
{
663+
metaData: {
664+
partition: "314010583",
665+
checksum: "ff7494d6f17da702862e550c907c0a91",
666+
compressedDataSize: 152417,
667+
dataSize: 100500,
668+
data: "",
669+
dataHandle:
670+
"iVBORw0-Lf9HdIZBfNEiKAA-AAAE-lFTkSuQmCC",
671+
timestamp: 1517916706
672+
},
673+
offset: {
674+
partition: 7,
675+
offset: 38562
676+
}
677+
},
678+
{
679+
metaData: {
680+
partition: "314010584",
681+
checksum: "ff7494d6f17da702862e550c907c0a91",
682+
dataSize: 100500,
683+
data:
684+
"7n348c7y49nry394y39yv39y384tvn3984tvn34ty034ynt3yvt983ny",
685+
dataHandle: "",
686+
timestamp: 1517916707
687+
},
688+
offset: {
689+
partition: 8,
690+
offset: 38563
691+
}
692+
}
693+
]
694+
};
695+
}
696+
} as unknown) as Response;
697+
pollStub.callsFake(
698+
(builder: any, params: any): Promise<Response> => {
699+
return Promise.resolve(mockResponse);
700+
}
701+
);
702+
703+
commitOffsetsStub.callsFake(
704+
(builder: any, params: any): Promise<Response> => {
705+
return Promise.reject(new Error("fake error"));
706+
}
707+
);
708+
709+
let settings = sandbox.createStubInstance(core.OlpClientSettings);
710+
const params = {
711+
catalogHrn: mockedHRN,
712+
layerId: mockedLayerId,
713+
settings: (settings as unknown) as core.OlpClientSettings
714+
};
715+
const subscribtionId = await streamLayerClient.subscribe(
716+
new dataServiceRead.SubscribeRequest().withMode("serial")
717+
);
718+
const request = new dataServiceRead.PollRequest().withSubscriptionId(
719+
subscribtionId
720+
);
721+
const messages = await streamLayerClient.poll(request);
722+
723+
assert.isDefined(messages);
724+
expect(messages.length).to.be.equal(2);
725+
expect(messages[0].metaData.dataSize).to.be.equal(100500);
726+
});
622727
});

0 commit comments

Comments
 (0)