Skip to content

Commit 60b81b0

Browse files
georgesjamousacinader
authored andcommitted
using per-key basis queue (parse-community#5420)
* adding KeyPromiseQueue * nit * removing secondary object and using a tuple * using array * nits * some tests * Minor refinements * removing old adapter * dummy change, travis test not found * travis test missing, dummy change * revrting mistake * reverting mistake * indentation fix * additional tests for coverage * extending coverage * nits * fixing mistake * better code
1 parent 156309a commit 60b81b0

File tree

4 files changed

+249
-83
lines changed

4 files changed

+249
-83
lines changed

spec/RedisCacheAdapter.spec.js

+105
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,48 @@ describe_only(() => {
4545
.then(done);
4646
});
4747

48+
it('should not store value for ttl=0', done => {
49+
const cache = new RedisCacheAdapter(null, 5);
50+
51+
cache
52+
.put(KEY, VALUE, 0)
53+
.then(() => cache.get(KEY))
54+
.then(value => expect(value).toEqual(null))
55+
.then(done);
56+
});
57+
58+
it('should not expire when ttl=Infinity', done => {
59+
const cache = new RedisCacheAdapter(null, 1);
60+
61+
cache
62+
.put(KEY, VALUE, Infinity)
63+
.then(() => cache.get(KEY))
64+
.then(value => expect(value).toEqual(VALUE))
65+
.then(wait.bind(null, 1))
66+
.then(() => cache.get(KEY))
67+
.then(value => expect(value).toEqual(VALUE))
68+
.then(done);
69+
});
70+
71+
it('should fallback to default ttl', done => {
72+
const cache = new RedisCacheAdapter(null, 1);
73+
let promise = Promise.resolve();
74+
75+
[-100, null, undefined, 'not number', true].forEach(ttl => {
76+
promise = promise.then(() =>
77+
cache
78+
.put(KEY, VALUE, ttl)
79+
.then(() => cache.get(KEY))
80+
.then(value => expect(value).toEqual(VALUE))
81+
.then(wait.bind(null, 2))
82+
.then(() => cache.get(KEY))
83+
.then(value => expect(value).toEqual(null))
84+
);
85+
});
86+
87+
promise.then(done);
88+
});
89+
4890
it('should find un-expired records', done => {
4991
const cache = new RedisCacheAdapter(null, 5);
5092

@@ -58,3 +100,66 @@ describe_only(() => {
58100
.then(done);
59101
});
60102
});
103+
104+
describe_only(() => {
105+
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
106+
})('RedisCacheAdapter/KeyPromiseQueue', function() {
107+
const KEY1 = 'key1';
108+
const KEY2 = 'key2';
109+
const VALUE = 'hello';
110+
111+
// number of chained ops on a single key
112+
function getQueueCountForKey(cache, key) {
113+
return cache.queue.queue[key][0];
114+
}
115+
116+
// total number of queued keys
117+
function getQueueCount(cache) {
118+
return Object.keys(cache.queue.queue).length;
119+
}
120+
121+
it('it should clear completed operations from queue', done => {
122+
const cache = new RedisCacheAdapter({ ttl: NaN });
123+
124+
// execute a bunch of operations in sequence
125+
let promise = Promise.resolve();
126+
for (let index = 1; index < 100; index++) {
127+
promise = promise.then(() => {
128+
const key = `${index}`;
129+
return cache
130+
.put(key, VALUE)
131+
.then(() => expect(getQueueCount(cache)).toEqual(0))
132+
.then(() => cache.get(key))
133+
.then(() => expect(getQueueCount(cache)).toEqual(0))
134+
.then(() => cache.clear())
135+
.then(() => expect(getQueueCount(cache)).toEqual(0));
136+
});
137+
}
138+
139+
// at the end the queue should be empty
140+
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
141+
});
142+
143+
it('it should count per key chained operations correctly', done => {
144+
const cache = new RedisCacheAdapter({ ttl: NaN });
145+
146+
let key1Promise = Promise.resolve();
147+
let key2Promise = Promise.resolve();
148+
for (let index = 1; index < 100; index++) {
149+
key1Promise = cache.put(KEY1, VALUE);
150+
key2Promise = cache.put(KEY2, VALUE);
151+
// per key chain should be equal to index, which is the
152+
// total number of operations on that key
153+
expect(getQueueCountForKey(cache, KEY1)).toEqual(index);
154+
expect(getQueueCountForKey(cache, KEY2)).toEqual(index);
155+
// the total keys counts should be equal to the different keys
156+
// we have currently being processed.
157+
expect(getQueueCount(cache)).toEqual(2);
158+
}
159+
160+
// at the end the queue should be empty
161+
Promise.all([key1Promise, key2Promise])
162+
.then(() => expect(getQueueCount(cache)).toEqual(0))
163+
.then(done);
164+
});
165+
});

src/Adapters/Cache/RedisCacheAdapter.js

-83
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// KeyPromiseQueue is a simple promise queue
2+
// used to queue operations per key basis.
3+
// Once the tail promise in the key-queue fulfills,
4+
// the chain on that key will be cleared.
5+
export class KeyPromiseQueue {
6+
constructor() {
7+
this.queue = {};
8+
}
9+
10+
enqueue(key, operation) {
11+
const tuple = this.beforeOp(key);
12+
const toAwait = tuple[1];
13+
const nextOperation = toAwait.then(operation);
14+
const wrappedOperation = nextOperation.then(result => {
15+
this.afterOp(key);
16+
return result;
17+
});
18+
tuple[1] = wrappedOperation;
19+
return wrappedOperation;
20+
}
21+
22+
beforeOp(key) {
23+
let tuple = this.queue[key];
24+
if (!tuple) {
25+
tuple = [0, Promise.resolve()];
26+
this.queue[key] = tuple;
27+
}
28+
tuple[0]++;
29+
return tuple;
30+
}
31+
32+
afterOp(key) {
33+
const tuple = this.queue[key];
34+
if (!tuple) {
35+
return;
36+
}
37+
tuple[0]--;
38+
if (tuple[0] <= 0) {
39+
delete this.queue[key];
40+
return;
41+
}
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import redis from 'redis';
2+
import logger from '../../../logger';
3+
import { KeyPromiseQueue } from './KeyPromiseQueue';
4+
5+
const DEFAULT_REDIS_TTL = 30 * 1000; // 30 seconds in milliseconds
6+
const FLUSH_DB_KEY = '__flush_db__';
7+
8+
function debug() {
9+
logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]);
10+
}
11+
12+
const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
13+
14+
export class RedisCacheAdapter {
15+
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
16+
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
17+
this.client = redis.createClient(redisCtx);
18+
this.queue = new KeyPromiseQueue();
19+
}
20+
21+
get(key) {
22+
debug('get', key);
23+
return this.queue.enqueue(
24+
key,
25+
() =>
26+
new Promise(resolve => {
27+
this.client.get(key, function(err, res) {
28+
debug('-> get', key, res);
29+
if (!res) {
30+
return resolve(null);
31+
}
32+
resolve(JSON.parse(res));
33+
});
34+
})
35+
);
36+
}
37+
38+
put(key, value, ttl = this.ttl) {
39+
value = JSON.stringify(value);
40+
debug('put', key, value, ttl);
41+
42+
if (ttl === 0) {
43+
// ttl of zero is a logical no-op, but redis cannot set expire time of zero
44+
return this.queue.enqueue(key, () => Promise.resolve());
45+
}
46+
47+
if (ttl === Infinity) {
48+
return this.queue.enqueue(
49+
key,
50+
() =>
51+
new Promise(resolve => {
52+
this.client.set(key, value, function() {
53+
resolve();
54+
});
55+
})
56+
);
57+
}
58+
59+
if (!isValidTTL(ttl)) {
60+
ttl = this.ttl;
61+
}
62+
63+
return this.queue.enqueue(
64+
key,
65+
() =>
66+
new Promise(resolve => {
67+
this.client.psetex(key, ttl, value, function() {
68+
resolve();
69+
});
70+
})
71+
);
72+
}
73+
74+
del(key) {
75+
debug('del', key);
76+
return this.queue.enqueue(
77+
key,
78+
() =>
79+
new Promise(resolve => {
80+
this.client.del(key, function() {
81+
resolve();
82+
});
83+
})
84+
);
85+
}
86+
87+
clear() {
88+
debug('clear');
89+
return this.queue.enqueue(
90+
FLUSH_DB_KEY,
91+
() =>
92+
new Promise(resolve => {
93+
this.client.flushdb(function() {
94+
resolve();
95+
});
96+
})
97+
);
98+
}
99+
}
100+
101+
export default RedisCacheAdapter;

0 commit comments

Comments
 (0)