1
1
package ai .wanaku .core .service .discovery ;
2
2
3
+ import ai .wanaku .api .types .management .State ;
4
+ import io .valkey .StreamEntryID ;
5
+ import io .valkey .params .XAddParams ;
6
+ import io .valkey .resps .StreamEntry ;
3
7
import jakarta .enterprise .context .ApplicationScoped ;
4
8
import jakarta .enterprise .event .Observes ;
5
9
import jakarta .inject .Inject ;
13
17
import io .quarkus .runtime .ShutdownEvent ;
14
18
import io .valkey .Jedis ;
15
19
import io .valkey .JedisPool ;
20
+
21
+ import java .time .Instant ;
22
+ import java .util .ArrayList ;
16
23
import java .util .HashMap ;
24
+ import java .util .List ;
17
25
import java .util .Map ;
18
26
import java .util .Set ;
19
27
import org .jboss .logging .Logger ;
@@ -46,6 +54,10 @@ public class ValkeyRegistry implements ServiceRegistry {
46
54
@ Override
47
55
public void register (ServiceTarget serviceTarget , Map <String , String > configurations ) {
48
56
try (io .valkey .Jedis jedis = jedisPool .getResource ()) {
57
+ // Register the service on the specific set
58
+ String serviceKey = ReservedKeys .getServiceKey (serviceTarget .getServiceType ());
59
+ jedis .sadd (serviceKey , serviceTarget .getService ());
60
+
49
61
jedis .hset (serviceTarget .getService (), ReservedKeys .WANAKU_TARGET_ADDRESS , serviceTarget .toAddress ());
50
62
jedis .hset (serviceTarget .getService (), ReservedKeys .WANAKU_TARGET_TYPE , serviceTarget .getServiceType ().asValue ());
51
63
@@ -64,17 +76,70 @@ public void register(ServiceTarget serviceTarget, Map<String, String> configurat
64
76
* Deregisters a service with the given name.
65
77
*
66
78
* @param service The name of the service to deregister.
79
+ * @param serviceType the type of service to deregister
67
80
*/
68
81
@ Override
69
- public void deregister (String service ) {
82
+ public void deregister (String service , ServiceType serviceType ) {
70
83
try (io .valkey .Jedis jedis = jedisPool .getResource ()) {
71
- jedis .del (service );
84
+ String serviceKey = ReservedKeys .getServiceKey (serviceType );
85
+ jedis .srem (serviceKey , service );
86
+
72
87
LOG .infof ("Service %s registered" , service );
73
88
} catch (Exception e ) {
74
89
LOG .errorf (e , "Failed to register service %s: %s" , service , e .getMessage ());
75
90
}
76
91
}
77
92
93
+
94
+ @ Override
95
+ public void saveState (String service , boolean healthy , String message ) {
96
+ try (io .valkey .Jedis jedis = jedisPool .getResource ()) {
97
+ Map <String , String > state = Map .of ("service" , service , "healthy" ,
98
+ Boolean .toString (healthy ), "message" , (healthy ? "healthy" : message ));
99
+
100
+ jedis .xadd (stateKey (service ), state , XAddParams .xAddParams ());
101
+ } catch (Exception e ) {
102
+ LOG .errorf (e , "Failed to save state for %s: %s" , service , e .getMessage ());
103
+ }
104
+ }
105
+
106
+ @ Override
107
+ public List <State > getState (String service , int count ) {
108
+ try (io .valkey .Jedis jedis = jedisPool .getResource ()) {
109
+
110
+ String stateKey = stateKey (service );
111
+ Instant now = Instant .now ();
112
+ long endEpoch = now .toEpochMilli ();
113
+ long startEpoch = now .minusSeconds (60 ).toEpochMilli ();
114
+
115
+ List <StreamEntry > streamEntries = jedis .xrange (stateKey , new StreamEntryID (startEpoch ), new StreamEntryID (endEpoch ));
116
+
117
+ List <State > states = new ArrayList <>(streamEntries .size ());
118
+
119
+ for (StreamEntry streamEntry : streamEntries ) {
120
+ LOG .debugf ("Entry %s" , streamEntry );
121
+
122
+ Map <String , String > fields = streamEntry .getFields ();
123
+ String serviceName = fields .get ("service" );
124
+ String message = fields .get ("message" );
125
+ String healthy = fields .get ("healthy" );
126
+
127
+ State state = new State (serviceName , Boolean .parseBoolean (healthy ), message );
128
+ states .add (state );
129
+ }
130
+
131
+ return states ;
132
+ } catch (Exception e ) {
133
+ LOG .errorf (e , "Failed to get state for %s: %s" , service , e .getMessage ());
134
+ }
135
+
136
+ return List .of ();
137
+ }
138
+
139
+ private static String stateKey (String service ) {
140
+ return "state:" + service ;
141
+ }
142
+
78
143
/**
79
144
* Retrieves a service with the given name.
80
145
*
@@ -98,14 +163,13 @@ public Service getService(String service) {
98
163
public Map <String , Service > getEntries (ServiceType serviceType ) {
99
164
Map <String , Service > entries = new HashMap <>();
100
165
try (io .valkey .Jedis jedis = jedisPool .getResource ()) {
101
- Set <String > keys = jedis .keys ("*" );
102
- for (String key : keys ) {
103
- String sType = jedis .hget (key , ReservedKeys .WANAKU_TARGET_TYPE );
104
- if (serviceType .asValue ().equals (sType )) {
105
- Service service = newService (jedis , key );
106
-
107
- entries .put (key , service );
108
- }
166
+ String serviceKey = ReservedKeys .getServiceKey (serviceType );
167
+ Set <String > services = jedis .smembers (serviceKey );
168
+
169
+ for (String key : services ) {
170
+ Service service = newService (jedis , key );
171
+
172
+ entries .put (key , service );
109
173
}
110
174
111
175
} catch (Exception e ) {
0 commit comments