Skip to content

Commit d7caed7

Browse files
committed
Use expire in case of deregistration event miss
1 parent d960ec2 commit d7caed7

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

core/core-service-discovery/src/main/java/ai/wanaku/core/service/discovery/ValkeyRegistry.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import ai.wanaku.api.types.management.State;
44
import io.quarkus.arc.lookup.LookupIfProperty;
5-
import io.quarkus.arc.properties.IfBuildProperty;
65
import io.valkey.StreamEntryID;
76
import io.valkey.params.XAddParams;
87
import io.valkey.resps.StreamEntry;
@@ -26,6 +25,8 @@
2625
import java.util.List;
2726
import java.util.Map;
2827
import java.util.Set;
28+
29+
import org.eclipse.microprofile.config.inject.ConfigProperty;
2930
import org.jboss.logging.Logger;
3031

3132
/**
@@ -48,6 +49,15 @@ public class ValkeyRegistry implements ServiceRegistry {
4849
@Inject
4950
JedisPool jedisPool;
5051

52+
/**
53+
* In case of a deregister event is not triggered (for example, kill -9),
54+
* The entry in Valkey will be removed after expireTime.
55+
*
56+
* NB {wanaku.service.expire-time} > {wanaku.service.provider.registration.interval}
57+
*/
58+
@ConfigProperty(name = "wanaku.service.expire-time", defaultValue = "60")
59+
private int expireTime;
60+
5161
/**
5262
* Registers a new service with the given configurations.
5363
*
@@ -67,13 +77,17 @@ public void register(ServiceTarget serviceTarget, Map<String, String> configurat
6777
LOG.infof("Service %s with target %s registered", serviceTarget.getService(), serviceTarget.toAddress());
6878

6979
for (var entry : configurations.entrySet()) {
70-
if (jedis.hget(serviceTarget.getService(), entry.getKey()) == null) {
80+
if (!jedis.hexists(serviceTarget.getService(), entry.getKey())) {
7181
LOG.infof("Registering configuration %s for service %s", entry.getKey(), serviceTarget.getService());
7282
Configuration configuration = new Configuration();
7383
configuration.setDescription(entry.getValue());
7484
jedis.hset(serviceTarget.getService(), entry.getKey(), configuration.toJson());
7585
}
7686
}
87+
88+
jedis.expire(serviceTarget.getService(), expireTime);
89+
// Need to wait for https://github.com/valkey-io/valkey/issues/640
90+
// jedis.expireMember(serviceKey, serviceTarget.getService(), EXPIRE_TIME);
7791
} catch (Exception e) {
7892
LOG.errorf(e, "Failed to register service %s: %s", serviceTarget.getService(), e.getMessage());
7993
}
@@ -176,7 +190,9 @@ public Map<String, Service> getEntries(ServiceType serviceType) {
176190
for (String key : services) {
177191
Service service = newService(jedis, key);
178192

179-
entries.put(key, service);
193+
if (service != null) {
194+
entries.put(key, service);
195+
}
180196
}
181197

182198
} catch (Exception e) {
@@ -223,8 +239,15 @@ private static Service newService(Jedis jedis, String key) {
223239
* @return A Service object representing the created service.
224240
*/
225241
private static Service toService(Jedis jedis, String key, Set<String> configs) {
242+
String address = jedis.hget(key, ReservedKeys.WANAKU_TARGET_ADDRESS);
243+
if (address == null) {
244+
return null;
245+
}
246+
226247
Service service = new Service();
227248

249+
service.setTarget(address);
250+
228251
Map<String, Configuration> configurationMap = new HashMap<>();
229252

230253
for (String config : configs) {
@@ -238,9 +261,6 @@ private static Service toService(Jedis jedis, String key, Set<String> configs) {
238261
configurations.setConfigurations(configurationMap);
239262
service.setConfigurations(configurations);
240263

241-
String address = jedis.hget(key, ReservedKeys.WANAKU_TARGET_ADDRESS);
242-
service.setTarget(address);
243-
244264
return service;
245265
}
246266

0 commit comments

Comments
 (0)