Skip to content

Commit 148c342

Browse files
brandonvinchadlwilson
authored andcommitted
Implement agent reuse, toggled by a cluster profile config option
1 parent d4f3db3 commit 148c342

36 files changed

+867
-399
lines changed

src/main/java/cd/go/contrib/elasticagent/AgentInstances.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import cd.go.contrib.elasticagent.executors.ServerPingRequestExecutor;
2020
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;
2121

22+
import java.util.Optional;
23+
import java.util.function.Function;
2224

2325
/**
2426
* Plugin implementors should implement these methods to interface to your cloud.
@@ -36,7 +38,7 @@ public interface AgentInstances<T> {
3638
* @param pluginRequest the plugin request object
3739
* @param consoleLogAppender
3840
*/
39-
T create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception;
41+
Optional<T> requestCreateAgent(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception;
4042

4143
/**
4244
* This message is sent when the plugin needs to terminate the agent instance.
@@ -84,5 +86,16 @@ public interface AgentInstances<T> {
8486
* @param agentId the elastic agent id
8587
*/
8688
T find(String agentId);
89+
90+
/**
91+
* Atomically update the agent instance for the given <code>agentId</code>.
92+
* <code>computeFn</code> is called with the current agent instance if it exists,
93+
* or null if it doesn't exist. <code>computeFn</code> should return a new agent instance
94+
* that represents its new state.
95+
* @param agentId
96+
* @param computeFn
97+
* @return
98+
*/
99+
T updateAgent(String agentId, Function<T, T> computeFn);
87100
}
88101

src/main/java/cd/go/contrib/elasticagent/KubernetesAgentInstances.java

+137-44
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,25 @@
1818

1919
import cd.go.contrib.elasticagent.model.JobIdentifier;
2020
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;
21+
import cd.go.contrib.elasticagent.KubernetesInstance.AgentState;
22+
import cd.go.contrib.elasticagent.utils.Util;
2123
import io.fabric8.kubernetes.api.model.Pod;
22-
import io.fabric8.kubernetes.api.model.PodList;
2324
import io.fabric8.kubernetes.client.KubernetesClient;
2425

2526
import java.net.SocketTimeoutException;
2627
import java.time.Duration;
2728
import java.time.Instant;
28-
import java.util.ArrayList;
29-
import java.util.Map;
29+
import java.util.*;
3030
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.Semaphore;
31+
import java.util.function.Function;
32+
import java.util.stream.Collectors;
3233

3334
import static cd.go.contrib.elasticagent.KubernetesPlugin.LOG;
3435
import static java.text.MessageFormat.format;
3536

3637
public class KubernetesAgentInstances implements AgentInstances<KubernetesInstance> {
37-
private final ConcurrentHashMap<String, KubernetesInstance> instances = new ConcurrentHashMap<>();
38+
private final ConcurrentHashMap<String, KubernetesInstance> instances;
3839
public Clock clock = Clock.DEFAULT;
39-
final Semaphore semaphore = new Semaphore(0, true);
4040

4141
private KubernetesClientFactory factory;
4242
private KubernetesInstanceFactory kubernetesInstanceFactory;
@@ -50,55 +50,127 @@ public KubernetesAgentInstances(KubernetesClientFactory factory) {
5050
}
5151

5252
public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInstanceFactory kubernetesInstanceFactory) {
53+
this(factory, kubernetesInstanceFactory, Collections.emptyMap());
54+
}
55+
56+
public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInstanceFactory kubernetesInstanceFactory, Map<String, KubernetesInstance> initialInstances) {
5357
this.factory = factory;
5458
this.kubernetesInstanceFactory = kubernetesInstanceFactory;
59+
this.instances = new ConcurrentHashMap<>(initialInstances);
5560
}
5661

5762
@Override
58-
public KubernetesInstance create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
59-
final Integer maxAllowedContainers = settings.getMaxPendingPods();
63+
public Optional<KubernetesInstance> requestCreateAgent(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
64+
final Integer maxAllowedPods = settings.getMaxPendingPods();
6065
synchronized (instances) {
61-
refreshAll(settings);
62-
doWithLockOnSemaphore(new SetupSemaphore(maxAllowedContainers, instances, semaphore));
63-
consoleLogAppender.accept("Waiting to create agent pod.");
64-
if (semaphore.tryAcquire()) {
65-
return createKubernetesInstance(request, settings, pluginRequest, consoleLogAppender);
66+
if (instances.size() < maxAllowedPods) {
67+
return requestCreateAgentHelper(request, settings, pluginRequest, consoleLogAppender);
6668
} else {
67-
String message = format("[Create Agent Request] The number of pending kubernetes pods is currently at the maximum permissible limit ({0}). Total kubernetes pods ({1}). Not creating any more containers.", maxAllowedContainers, instances.size());
69+
String message = String.format("[Create Agent Request] The number of pending kubernetes pods is currently at the maximum permissible limit (%s). Total kubernetes pods (%s). Not creating any more pods.",
70+
maxAllowedPods,
71+
instances.size());
6872
LOG.warn(message);
6973
consoleLogAppender.accept(message);
70-
return null;
74+
return Optional.empty();
7175
}
7276
}
7377
}
7478

75-
private void doWithLockOnSemaphore(Runnable runnable) {
76-
synchronized (semaphore) {
77-
runnable.run();
79+
private List<KubernetesInstance> findPodsEligibleForReuse(CreateAgentRequest request) {
80+
Long jobId = request.jobIdentifier().getJobId();
81+
String jobElasticConfigHash = KubernetesInstanceFactory.agentConfigHash(
82+
request.clusterProfileProperties(), request.elasticProfileProperties());
83+
84+
List<KubernetesInstance> eligiblePods = new ArrayList<>();
85+
86+
for (KubernetesInstance instance : instances.values()) {
87+
if (instance.getJobId().equals(jobId)) {
88+
eligiblePods.add(instance);
89+
continue;
90+
}
91+
92+
String podElasticConfigHash = instance.getPodAnnotations().get(KubernetesInstance.ELASTIC_CONFIG_HASH);
93+
boolean sameElasticConfig = Objects.equals(podElasticConfigHash, jobElasticConfigHash);
94+
boolean instanceIsIdle = instance.getAgentState().equals(KubernetesInstance.AgentState.Idle);
95+
boolean podIsRunning = instance.getPodState().equals(PodState.Running);
96+
boolean isReusable = sameElasticConfig && instanceIsIdle && podIsRunning;
97+
98+
LOG.info(
99+
"[reuse] Is pod {} reusable for job {}? {}. Job has {}={}; pod has {}={}, agentState={}, podState={}",
100+
instance.getPodName(),
101+
jobId,
102+
isReusable,
103+
KubernetesInstance.ELASTIC_CONFIG_HASH,
104+
jobElasticConfigHash,
105+
KubernetesInstance.ELASTIC_CONFIG_HASH,
106+
podElasticConfigHash,
107+
instance.getAgentState(),
108+
instance.getPodState()
109+
);
110+
111+
if (isReusable) {
112+
eligiblePods.add(instance);
113+
}
78114
}
115+
116+
return eligiblePods;
79117
}
80118

81-
private KubernetesInstance createKubernetesInstance(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
119+
120+
private Optional<KubernetesInstance> requestCreateAgentHelper(
121+
CreateAgentRequest request,
122+
PluginSettings settings,
123+
PluginRequest pluginRequest,
124+
ConsoleLogAppender consoleLogAppender) {
82125
JobIdentifier jobIdentifier = request.jobIdentifier();
83-
if (isAgentCreatedForJob(jobIdentifier.getJobId())) {
84-
String message = format("[Create Agent Request] Request for creating an agent for Job Identifier [{0}] has already been scheduled. Skipping current request.", jobIdentifier);
85-
LOG.warn(message);
86-
consoleLogAppender.accept(message);
87-
return null;
126+
Long jobId = jobIdentifier.getJobId();
127+
128+
// Agent reuse disabled - create a new pod only if one hasn't already been created for this job ID.
129+
if (!settings.getEnableAgentReuse()) {
130+
// Already created a pod for this job ID.
131+
if (isAgentCreatedForJob(jobId)) {
132+
String message = format("[Create Agent Request] Request for creating an agent for Job Identifier [{0}] has already been scheduled. Skipping current request.", jobIdentifier);
133+
LOG.warn(message);
134+
consoleLogAppender.accept(message);
135+
return Optional.empty();
136+
}
137+
// No pod created yet for this job ID. Create one.
138+
KubernetesClient client = factory.client(settings);
139+
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
140+
consoleLogAppender.accept(String.format("Created pod: %s", instance.getPodName()));
141+
instance = instance.toBuilder().agentState(AgentState.Building).build();
142+
register(instance);
143+
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.getPodName()));
144+
return Optional.of(instance);
88145
}
89146

90-
KubernetesClient client = factory.client(settings);
91-
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
92-
consoleLogAppender.accept(String.format("Creating pod: %s", instance.name()));
93-
register(instance);
94-
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.name()));
147+
// Agent reuse enabled - look for any extant pods that match this job,
148+
// and create a new one only if there are none.
149+
List<KubernetesInstance> reusablePods = findPodsEligibleForReuse(request);
150+
LOG.info("[reuse] Found {} pods eligible for reuse for CreateAgentRequest for job {}: {}",
151+
reusablePods.size(),
152+
jobId,
153+
reusablePods.stream().map(pod -> pod.getPodName()).collect(Collectors.toList()));
95154

96-
return instance;
155+
if (reusablePods.isEmpty()) {
156+
KubernetesClient client = factory.client(settings);
157+
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
158+
consoleLogAppender.accept(String.format("Created pod: %s", instance.getPodName()));
159+
instance = instance.toBuilder().agentState(AgentState.Building).build();
160+
register(instance);
161+
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.getPodName()));
162+
return Optional.of(instance);
163+
} else {
164+
String message = String.format("[reuse] Not creating a new pod - found %s eligible for reuse.", reusablePods.size());
165+
consoleLogAppender.accept(message);
166+
LOG.info(message);
167+
return Optional.empty();
168+
}
97169
}
98170

99171
private boolean isAgentCreatedForJob(Long jobId) {
100172
for (KubernetesInstance instance : instances.values()) {
101-
if (instance.jobId().equals(jobId)) {
173+
if (instance.getJobId().equals(jobId)) {
102174
return true;
103175
}
104176
}
@@ -111,7 +183,7 @@ public void terminate(String agentId, PluginSettings settings) {
111183
KubernetesInstance instance = instances.get(agentId);
112184
if (instance != null) {
113185
KubernetesClient client = factory.client(settings);
114-
instance.terminate(client);
186+
client.pods().withName(instance.getPodName()).delete();
115187
} else {
116188
LOG.warn(format("Requested to terminate an instance that does not exist {0}.", agentId));
117189
}
@@ -140,56 +212,77 @@ public Agents instancesCreatedAfterTimeout(PluginSettings settings, Agents agent
140212
continue;
141213
}
142214

143-
if (clock.now().isAfter(instance.createdAt().plus(settings.getAutoRegisterPeriod()))) {
215+
if (clock.now().isAfter(instance.getCreatedAt().plus(settings.getAutoRegisterPeriod()))) {
144216
oldAgents.add(agent);
145217
}
146218
}
147219
return new Agents(oldAgents);
148220
}
149221

222+
public List<Pod> listAgentPods(KubernetesClient client) {
223+
if (client == null) {
224+
throw new IllegalArgumentException("client is null");
225+
}
226+
return client.pods()
227+
.withLabel(Constants.KUBERNETES_POD_KIND_LABEL_KEY, Constants.KUBERNETES_POD_KIND_LABEL_VALUE)
228+
.list()
229+
.getItems();
230+
}
231+
150232
@Override
151233
public void refreshAll(PluginSettings properties) {
152234
LOG.debug("[Refresh Instances] Syncing k8s elastic agent pod information for cluster {}.", properties);
153-
PodList list = null;
235+
List<Pod> pods = null;
154236
try {
155237
KubernetesClient client = factory.client(properties);
156-
list = client.pods().list();
238+
pods = listAgentPods(client);
157239
} catch (Exception e) {
158240
LOG.error("Error occurred while trying to list kubernetes pods:", e);
159241

160242
if (e.getCause() instanceof SocketTimeoutException) {
161243
LOG.error("Error caused due to SocketTimeoutException. This generally happens due to stale kubernetes client. Clearing out existing kubernetes client and creating a new one!");
162244
factory.clearOutExistingClient();
163245
KubernetesClient client = factory.client(properties);
164-
list = client.pods().list();
246+
pods = listAgentPods(client);
165247
}
166248
}
167249

168-
if (list == null) {
250+
if (pods == null) {
169251
LOG.info("Did not find any running kubernetes pods.");
170252
return;
171253
}
172254

255+
Map<String, KubernetesInstance> oldInstances = Map.copyOf(instances);
173256
instances.clear();
174-
for (Pod pod : list.getItems()) {
175-
Map<String, String> podLabels = pod.getMetadata().getLabels();
176-
if (podLabels != null) {
177-
if (Constants.KUBERNETES_POD_KIND_LABEL_VALUE.equals(podLabels.get(Constants.KUBERNETES_POD_KIND_LABEL_KEY))) {
178-
register(kubernetesInstanceFactory.fromKubernetesPod(pod));
179-
}
257+
258+
for (Pod pod : pods) {
259+
String podName = pod.getMetadata().getName();
260+
// preserve pod's agent state
261+
KubernetesInstance newInstance = kubernetesInstanceFactory.fromKubernetesPod(pod);
262+
KubernetesInstance oldInstance = oldInstances.get(podName);
263+
if (oldInstance != null) {
264+
AgentState oldAgentState = oldInstances.get(podName).getAgentState();
265+
newInstance = newInstance.toBuilder().agentState(oldAgentState).build();
266+
LOG.debug("[reuse] Preserved AgentState {} upon refresh of pod {}", oldAgentState, podName);
180267
}
268+
register(newInstance);
181269
}
182270

183271
LOG.info(String.format("[refresh-pod-state] Pod information successfully synced. All(Running/Pending) pod count is %d.", instances.size()));
184272
}
185273

274+
@Override
275+
public KubernetesInstance updateAgent(String agentId, Function<KubernetesInstance, KubernetesInstance> updateFn) {
276+
return instances.compute(agentId, (_agentId, instance) -> updateFn.apply(instance));
277+
}
278+
186279
@Override
187280
public KubernetesInstance find(String agentId) {
188281
return instances.get(agentId);
189282
}
190283

191284
public void register(KubernetesInstance instance) {
192-
instances.put(instance.name(), instance);
285+
instances.put(instance.getPodName(), instance);
193286
}
194287

195288
private KubernetesAgentInstances unregisteredAfterTimeout(PluginSettings settings, Agents knownAgents) throws Exception {

0 commit comments

Comments
 (0)