Skip to content

Commit d4f94c1

Browse files
authored
Merge pull request #2518 from lukas-krecan/es9
Support for ES9
2 parents b8fe471 + d401853 commit d4f94c1

File tree

7 files changed

+399
-1
lines changed

7 files changed

+399
-1
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
<module>providers/mongo/shedlock-provider-mongo</module>
4949
<module>providers/mongo/shedlock-provider-mongo-reactivestreams</module>
5050
<module>providers/elasticsearch/shedlock-provider-elasticsearch8</module>
51+
<module>providers/elasticsearch/shedlock-provider-elasticsearch9</module>
5152
<module>providers/opensearch/shedlock-provider-opensearch</module>
5253
<module>providers/opensearch/shedlock-provider-opensearch-java</module>
5354
<module>providers/couchbase/shedlock-provider-couchbase-javaclient3</module>

providers/elasticsearch/shedlock-provider-elasticsearch8/src/test/java/net/javacrumbs/shedlock/provider/elasticsearch8/ElasticsearchLockProviderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class ElasticsearchLockProviderTest extends AbstractLockProviderIntegrati
5858

5959
@Container
6060
private static final ElasticsearchContainer container =
61-
new ElasticsearchContainer(DEFAULT_IMAGE_NAME.withTag("7.17.5")).withPassword("elastic1234");
61+
new ElasticsearchContainer(DEFAULT_IMAGE_NAME.withTag("7.17.28")).withPassword("elastic1234");
6262

6363
private ElasticsearchClient client;
6464
private ElasticsearchLockProvider lockProvider;
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<parent>
4+
<artifactId>shedlock-parent</artifactId>
5+
<groupId>net.javacrumbs.shedlock</groupId>
6+
<version>6.3.2-SNAPSHOT</version>
7+
<relativePath>../../../pom.xml</relativePath>
8+
</parent>
9+
<modelVersion>4.0.0</modelVersion>
10+
11+
<artifactId>shedlock-provider-elasticsearch9</artifactId>
12+
<version>6.3.2-SNAPSHOT</version>
13+
14+
<properties>
15+
<elasticsearch.version>9.0.0</elasticsearch.version>
16+
</properties>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>net.javacrumbs.shedlock</groupId>
21+
<artifactId>shedlock-core</artifactId>
22+
<version>${project.version}</version>
23+
</dependency>
24+
25+
<dependency>
26+
<groupId>co.elastic.clients</groupId>
27+
<artifactId>elasticsearch-java</artifactId>
28+
<version>${elasticsearch.version}</version>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>net.javacrumbs.shedlock</groupId>
33+
<artifactId>shedlock-test-support</artifactId>
34+
<version>${project.version}</version>
35+
<scope>test</scope>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>org.testcontainers</groupId>
40+
<artifactId>junit-jupiter</artifactId>
41+
<version>${test-containers.ver}</version>
42+
<scope>test</scope>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.testcontainers</groupId>
47+
<artifactId>elasticsearch</artifactId>
48+
<version>${test-containers.ver}</version>
49+
<scope>test</scope>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>ch.qos.logback</groupId>
54+
<artifactId>logback-classic</artifactId>
55+
<version>${logback.ver}</version>
56+
<scope>test</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>commons-logging</groupId>
61+
<artifactId>commons-logging</artifactId>
62+
<version>1.3.5</version>
63+
<scope>test</scope>
64+
</dependency>
65+
</dependencies>
66+
67+
<build>
68+
<plugins>
69+
<plugin>
70+
<groupId>org.apache.maven.plugins</groupId>
71+
<artifactId>maven-jar-plugin</artifactId>
72+
<configuration>
73+
<archive>
74+
<manifestEntries>
75+
<Automatic-Module-Name>
76+
net.javacrumbs.shedlock.provider.elasticsearch9
77+
</Automatic-Module-Name>
78+
</manifestEntries>
79+
</archive>
80+
</configuration>
81+
</plugin>
82+
</plugins>
83+
</build>
84+
85+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Copyright 2009 the original author or authors.
3+
*
4+
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
5+
* except in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* <p>http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
10+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
* express or implied. See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package net.javacrumbs.shedlock.provider.elasticsearch9;
15+
16+
import static net.javacrumbs.shedlock.core.ClockProvider.now;
17+
import static net.javacrumbs.shedlock.support.Utils.getHostname;
18+
19+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
20+
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
21+
import co.elastic.clients.elasticsearch._types.Refresh;
22+
import co.elastic.clients.elasticsearch._types.Result;
23+
import co.elastic.clients.elasticsearch.core.UpdateRequest;
24+
import co.elastic.clients.elasticsearch.core.UpdateResponse;
25+
import co.elastic.clients.json.JsonData;
26+
import java.io.IOException;
27+
import java.time.Instant;
28+
import java.util.Collections;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import net.javacrumbs.shedlock.core.AbstractSimpleLock;
32+
import net.javacrumbs.shedlock.core.LockConfiguration;
33+
import net.javacrumbs.shedlock.core.LockProvider;
34+
import net.javacrumbs.shedlock.core.SimpleLock;
35+
import net.javacrumbs.shedlock.support.LockException;
36+
import net.javacrumbs.shedlock.support.annotation.NonNull;
37+
38+
/**
39+
* It uses a collection that contains documents like this:
40+
*
41+
* <pre>
42+
* {
43+
* "name" : "lock name",
44+
* "lockUntil" : {
45+
* "type": "date",
46+
* "format": "epoch_millis"
47+
* },
48+
* "lockedAt" : {
49+
* "type": "date",
50+
* "format": "epoch_millis"
51+
* }:
52+
* "lockedBy" : "hostname"
53+
* }
54+
* </pre>
55+
*
56+
* <p>
57+
* lockedAt and lockedBy are just for troubleshooting and are not read by the
58+
* code
59+
*
60+
* <ol>
61+
* <li>Attempts to insert a new lock record. As an optimization, we keep
62+
* in-memory track of created lock records. If the record has been inserted,
63+
* returns lock.
64+
* <li>We will try to update lock record using filter _id == name AND lock_until
65+
* &lt;= now
66+
* <li>If the update succeeded (1 updated document), we have the lock. If the
67+
* update failed (0 updated documents) somebody else holds the lock
68+
* <li>When unlocking, lock_until is set to now.
69+
* </ol>
70+
*/
71+
public class ElasticsearchLockProvider implements LockProvider {
72+
static final String SCHEDLOCK_DEFAULT_INDEX = "shedlock";
73+
static final String LOCK_UNTIL = "lockUntil";
74+
static final String LOCKED_AT = "lockedAt";
75+
static final String LOCKED_BY = "lockedBy";
76+
static final String NAME = "name";
77+
78+
private static final String UPDATE_SCRIPT = "if (ctx._source." + LOCK_UNTIL + " <= " + "params." + LOCKED_AT
79+
+ ") { " + "ctx._source." + LOCKED_BY + " = params." + LOCKED_BY + "; " + "ctx._source." + LOCKED_AT
80+
+ " = params." + LOCKED_AT + "; " + "ctx._source." + LOCK_UNTIL + " = params." + LOCK_UNTIL + "; "
81+
+ "} else { " + "ctx.op = 'none' " + "}";
82+
83+
private final ElasticsearchClient client;
84+
private final String hostname;
85+
private final String index;
86+
87+
private ElasticsearchLockProvider(@NonNull ElasticsearchClient client, @NonNull String index) {
88+
this.client = client;
89+
this.hostname = getHostname();
90+
this.index = index;
91+
}
92+
93+
public ElasticsearchLockProvider(@NonNull ElasticsearchClient client) {
94+
this(client, SCHEDLOCK_DEFAULT_INDEX);
95+
}
96+
97+
@Override
98+
@NonNull
99+
public Optional<SimpleLock> lock(@NonNull LockConfiguration lockConfiguration) {
100+
try {
101+
Instant now = now();
102+
Instant lockAtMostUntil = lockConfiguration.getLockAtMostUntil();
103+
Map<String, JsonData> lockObject = lockObject(lockConfiguration.getName(), lockAtMostUntil, now);
104+
105+
// The object exist only to have some type we can work with
106+
Lock pojo = new Lock(lockConfiguration.getName(), hostname, now, lockAtMostUntil);
107+
108+
UpdateRequest<Lock, Lock> updateRequest = UpdateRequest.of(ur -> ur.index(index)
109+
.id(lockConfiguration.getName())
110+
.refresh(Refresh.True)
111+
.script(sc -> sc.lang("painless")
112+
.source(builder -> builder.scriptString(UPDATE_SCRIPT))
113+
.params(lockObject))
114+
.upsert(pojo));
115+
116+
UpdateResponse<Lock> res = client.update(updateRequest, Lock.class);
117+
if (res.result() != Result.NoOp) {
118+
return Optional.of(new ElasticsearchSimpleLock(lockConfiguration));
119+
} else { // nothing happened
120+
return Optional.empty();
121+
}
122+
} catch (IOException | ElasticsearchException e) {
123+
if ((e instanceof ElasticsearchException && ((ElasticsearchException) e).status() == 409)) {
124+
return Optional.empty();
125+
} else {
126+
throw new LockException("Unexpected exception occurred", e);
127+
}
128+
}
129+
}
130+
131+
private Map<String, JsonData> lockObject(String name, Instant lockUntil, Instant lockedAt) {
132+
return Map.of(
133+
NAME,
134+
JsonData.of(name),
135+
LOCKED_BY,
136+
JsonData.of(hostname),
137+
LOCKED_AT,
138+
JsonData.of(lockedAt.toEpochMilli()),
139+
LOCK_UNTIL,
140+
JsonData.of(lockUntil.toEpochMilli()));
141+
}
142+
143+
private final class ElasticsearchSimpleLock extends AbstractSimpleLock {
144+
145+
private ElasticsearchSimpleLock(LockConfiguration lockConfiguration) {
146+
super(lockConfiguration);
147+
}
148+
149+
@Override
150+
public void doUnlock() {
151+
// Set lockUtil to now or lockAtLeastUntil whichever is later
152+
try {
153+
Map<String, JsonData> lockObject = Collections.singletonMap(
154+
"unlockTime",
155+
JsonData.of(lockConfiguration.getUnlockTime().toEpochMilli()));
156+
157+
UpdateRequest<Lock, Lock> updateRequest = UpdateRequest.of(ur -> ur.index(index)
158+
.id(lockConfiguration.getName())
159+
.refresh(Refresh.True)
160+
.script(sc -> sc.lang("painless")
161+
.source(builder -> builder.scriptString("ctx._source.lockUntil = params.unlockTime"))
162+
.params(lockObject)));
163+
client.update(updateRequest, Lock.class);
164+
} catch (IOException | ElasticsearchException e) {
165+
throw new LockException("Unexpected exception occurred", e);
166+
}
167+
}
168+
}
169+
170+
private record Lock(String name, String lockedBy, long lockedAt, long lockUntil) {
171+
Lock(String name, String lockedBy, Instant lockedAt, Instant lockUntil) {
172+
this(name, lockedBy, lockedAt.toEpochMilli(), lockUntil.toEpochMilli());
173+
}
174+
}
175+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright 2009 the original author or authors.
3+
*
4+
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
5+
* except in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* <p>http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
10+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
* express or implied. See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package net.javacrumbs.shedlock.provider.elasticsearch9;
15+
16+
import static net.javacrumbs.shedlock.provider.elasticsearch9.ElasticsearchLockProvider.LOCKED_AT;
17+
import static net.javacrumbs.shedlock.provider.elasticsearch9.ElasticsearchLockProvider.LOCKED_BY;
18+
import static net.javacrumbs.shedlock.provider.elasticsearch9.ElasticsearchLockProvider.LOCK_UNTIL;
19+
import static net.javacrumbs.shedlock.provider.elasticsearch9.ElasticsearchLockProvider.NAME;
20+
import static net.javacrumbs.shedlock.provider.elasticsearch9.ElasticsearchLockProvider.SCHEDLOCK_DEFAULT_INDEX;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.fail;
23+
24+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
25+
import co.elastic.clients.elasticsearch.core.GetRequest;
26+
import co.elastic.clients.elasticsearch.core.GetResponse;
27+
import java.io.IOException;
28+
import java.util.Date;
29+
import java.util.Map;
30+
import net.javacrumbs.shedlock.core.LockProvider;
31+
import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.testcontainers.elasticsearch.ElasticsearchContainer;
34+
import org.testcontainers.junit.jupiter.Container;
35+
import org.testcontainers.junit.jupiter.Testcontainers;
36+
import org.testcontainers.utility.DockerImageName;
37+
38+
@Testcontainers
39+
public class ElasticsearchLockProviderTest extends AbstractLockProviderIntegrationTest {
40+
41+
private static final DockerImageName DEFAULT_IMAGE_NAME =
42+
DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch");
43+
44+
@Container
45+
private static final ElasticsearchContainer container =
46+
new ElasticsearchContainer(DEFAULT_IMAGE_NAME.withTag("7.17.28"));
47+
48+
private ElasticsearchClient client;
49+
private ElasticsearchLockProvider lockProvider;
50+
51+
@BeforeEach
52+
public void setUp() {
53+
client = ElasticsearchClient.of(
54+
b -> b.host("http://" + container.getHttpHostAddress()).usernameAndPassword("elastic", "changeme"));
55+
lockProvider = new ElasticsearchLockProvider(client);
56+
}
57+
58+
@Override
59+
protected LockProvider getLockProvider() {
60+
return lockProvider;
61+
}
62+
63+
@Override
64+
protected void assertUnlocked(String lockName) {
65+
GetRequest request =
66+
GetRequest.of(gr -> gr.index(SCHEDLOCK_DEFAULT_INDEX).id(lockName));
67+
try {
68+
GetResponse<Map> response = client.get(request, Map.class);
69+
Map source = response.source();
70+
assertThat(new Date((Long) source.get(LOCK_UNTIL))).isBeforeOrEqualTo(now());
71+
assertThat(new Date((Long) source.get(LOCKED_AT))).isBeforeOrEqualTo(now());
72+
assertThat((String) source.get(LOCKED_BY)).isNotBlank();
73+
assertThat((String) source.get(NAME)).isEqualTo(lockName);
74+
} catch (IOException e) {
75+
fail("Call to embedded ES failed.");
76+
}
77+
}
78+
79+
@Override
80+
protected void assertLocked(String lockName) {
81+
GetRequest request =
82+
GetRequest.of(gr -> gr.index(SCHEDLOCK_DEFAULT_INDEX).id(lockName));
83+
try {
84+
GetResponse<Map> response = client.get(request, Map.class);
85+
Map source = response.source();
86+
assertThat(new Date((Long) source.get(LOCK_UNTIL))).isAfter(now());
87+
assertThat(new Date((Long) source.get(LOCKED_AT))).isBeforeOrEqualTo(now());
88+
assertThat((String) source.get(LOCKED_BY)).isNotBlank();
89+
assertThat((String) source.get(NAME)).isEqualTo(lockName);
90+
} catch (IOException e) {
91+
fail("Call to embedded ES failed.");
92+
}
93+
}
94+
95+
private Date now() {
96+
return new Date();
97+
}
98+
}

0 commit comments

Comments
 (0)