Skip to content

Commit f86d9c5

Browse files
committed
When configuring a partition writer, sense if the index is an alias.
Validate the alias for writing if it is an alias, checking for a single index or if an index has a write index flag set. fixes #1197
1 parent b8b7c57 commit f86d9c5

File tree

4 files changed

+285
-9
lines changed

4 files changed

+285
-9
lines changed

mr/src/itest/java/org/elasticsearch/hadoop/integration/rest/AbstractRestSaveTest.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,32 @@
2222
import java.util.Map;
2323
import java.util.Scanner;
2424

25+
import org.apache.commons.logging.Log;
26+
import org.apache.commons.logging.LogFactory;
27+
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
2528
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
29+
import org.elasticsearch.hadoop.cfg.Settings;
30+
import org.elasticsearch.hadoop.mr.RestUtils;
31+
import org.elasticsearch.hadoop.mr.WritableBytesConverter;
32+
import org.elasticsearch.hadoop.mr.WritableValueWriter;
33+
import org.elasticsearch.hadoop.rest.InitializationUtils;
2634
import org.elasticsearch.hadoop.rest.Resource;
2735
import org.elasticsearch.hadoop.rest.RestClient;
2836
import org.elasticsearch.hadoop.rest.RestRepository;
37+
import org.elasticsearch.hadoop.rest.RestService;
2938
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
39+
import org.elasticsearch.hadoop.serialization.field.MapWritableFieldExtractor;
3040
import org.elasticsearch.hadoop.util.BytesArray;
3141
import org.elasticsearch.hadoop.util.TestSettings;
3242
import org.elasticsearch.hadoop.util.TrackingBytesArray;
43+
import org.junit.Assert;
44+
import org.junit.BeforeClass;
3345
import org.junit.Test;
3446

3547
public class AbstractRestSaveTest {
3648

49+
private static final Log LOG = LogFactory.getLog(AbstractRestSaveTest.class);
50+
3751
@Test
3852
public void testBulkWrite() throws Exception {
3953
TestSettings testSettings = new TestSettings("rest/savebulk");
@@ -69,4 +83,122 @@ public void testEmptyBulkWrite() throws Exception {
6983
restRepo.close();
7084
client.close();
7185
}
86+
87+
@BeforeClass
88+
public static void createAliasTestIndices() throws Exception {
89+
RestUtils.put("alias_index1", ("{" +
90+
"\"settings\": {" +
91+
"\"number_of_shards\": 3," +
92+
"\"number_of_replicas\": 0" +
93+
"}" +
94+
"}'").getBytes());
95+
96+
RestUtils.put("alias_index2", ("{" +
97+
"\"settings\": {" +
98+
"\"number_of_shards\": 3," +
99+
"\"number_of_replicas\": 0" +
100+
"}" +
101+
"}'").getBytes());
102+
}
103+
104+
@Test
105+
public void testCreatePartitionWriterWithPatternedIndex() throws Exception {
106+
Settings settings = new TestSettings();
107+
settings.setProperty(ConfigurationOptions.ES_RESOURCE, "alias_index{id}/doc");
108+
InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, LOG);
109+
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, LOG);
110+
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, LOG);
111+
RestService.PartitionWriter writer = RestService.createWriter(settings, 1, 3, LOG);
112+
writer.close();
113+
}
114+
115+
@Test
116+
public void testCreatePartitionWriterWithSingleIndex() throws Exception {
117+
Settings settings = new TestSettings();
118+
settings.setProperty(ConfigurationOptions.ES_RESOURCE, "alias_index1/doc");
119+
InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, LOG);
120+
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, LOG);
121+
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, LOG);
122+
RestService.PartitionWriter writer = RestService.createWriter(settings, 1, 3, LOG);
123+
writer.close();
124+
}
125+
126+
@Test
127+
public void testCreatePartitionWriterWithSingleAlias() throws Exception {
128+
RestUtils.postData("_aliases", ("{" +
129+
"\"actions\": [" +
130+
"{" +
131+
"\"add\": {" +
132+
"\"index\": \"alias_index1\"," +
133+
"\"alias\": \"single_alias\"" +
134+
"}" +
135+
"}" +
136+
"]" +
137+
"}").getBytes());
138+
139+
Settings settings = new TestSettings();
140+
settings.setProperty(ConfigurationOptions.ES_RESOURCE, "single_alias/doc");
141+
InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, LOG);
142+
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, LOG);
143+
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, LOG);
144+
RestService.PartitionWriter writer = RestService.createWriter(settings, 1, 3, LOG);
145+
writer.close();
146+
}
147+
148+
@Test(expected = EsHadoopIllegalArgumentException.class)
149+
public void testCreatePartitionWriterWithMultipleAliases() throws Exception {
150+
RestUtils.postData("_aliases", ("{" +
151+
"\"actions\": [" +
152+
"{" +
153+
"\"add\": {" +
154+
"\"index\": \"alias_index1\"," +
155+
"\"alias\": \"multi_alias\"" +
156+
"}" +
157+
"}," +
158+
"{" +
159+
"\"add\": {" +
160+
"\"index\": \"alias_index2\"," +
161+
"\"alias\": \"multi_alias\"" +
162+
"}" +
163+
"}" +
164+
"]" +
165+
"}").getBytes());
166+
167+
Settings settings = new TestSettings();
168+
settings.setProperty(ConfigurationOptions.ES_RESOURCE, "multi_alias/doc");
169+
InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, LOG);
170+
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, LOG);
171+
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, LOG);
172+
RestService.createWriter(settings, 1, 3, LOG);
173+
Assert.fail("Should not be able to read data from multi_alias run");
174+
}
175+
176+
@Test
177+
public void testCreatePartitionWriterWithWritableMultipleAliases() throws Exception {
178+
RestUtils.postData("_aliases", ("{" +
179+
"\"actions\": [" +
180+
"{" +
181+
"\"add\": {" +
182+
"\"index\": \"alias_index1\"," +
183+
"\"alias\": \"multi_alias_writable\"," +
184+
"\"is_write_index\": true" +
185+
"}" +
186+
"}," +
187+
"{" +
188+
"\"add\": {" +
189+
"\"index\": \"alias_index2\"," +
190+
"\"alias\": \"multi_alias_writable\"" +
191+
"}" +
192+
"}" +
193+
"]" +
194+
"}").getBytes());
195+
196+
Settings settings = new TestSettings();
197+
settings.setProperty(ConfigurationOptions.ES_RESOURCE, "multi_alias_writable/doc");
198+
InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, LOG);
199+
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, LOG);
200+
InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, LOG);
201+
RestService.PartitionWriter writer = RestService.createWriter(settings, 1, 3, LOG);
202+
writer.close();
203+
}
72204
}

mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java

Lines changed: 112 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -598,11 +598,49 @@ public static PartitionWriter createWriter(Settings settings, long currentSplit,
598598
IndexExtractor iformat = ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings);
599599
iformat.compile(resource.toString());
600600

601-
RestRepository repository = (iformat.hasPattern() ? initMultiIndices(settings, currentSplit, resource, log) : initSingleIndex(settings, currentSplit, resource, log));
602-
601+
// Create the partition writer and its client
602+
RestRepository repository;
603+
if (iformat.hasPattern()) {
604+
// Can't be sure if a multi-index pattern will resolve to indices or aliases
605+
// during the job. It's better to trust the user and discover any issues the
606+
// hard way at runtime.
607+
repository = initMultiIndices(settings, currentSplit, resource, log);
608+
} else {
609+
// Determine if the configured index is an alias.
610+
RestClient bootstrap = new RestClient(settings);
611+
GetAliasesRequestBuilder.Response response = null;
612+
try {
613+
response = new GetAliasesRequestBuilder(bootstrap).aliases(resource.index()).execute();
614+
} catch (EsHadoopInvalidRequest remoteException) {
615+
// For now, the get alias call throws if it does not find an alias that matches. Just log and continue.
616+
if (log.isDebugEnabled()) {
617+
log.debug(String.format("Provided index name [%s] is not an alias. Reason: [%s]",
618+
resource.index(), remoteException.getMessage()));
619+
}
620+
} finally {
621+
bootstrap.close();
622+
}
623+
// Validate the alias for writing, or pin to a single index shard.
624+
if (response != null && response.hasAliases()) {
625+
repository = initAliasWrite(response, settings, currentSplit, resource, log);
626+
} else {
627+
repository = initSingleIndex(settings, currentSplit, resource, log);
628+
}
629+
}
603630
return new PartitionWriter(settings, currentSplit, totalSplits, repository);
604631
}
605632

633+
/**
634+
* Validate and configure a rest repository for writing to an index.
635+
* The index is potentially created if it does not exist, and the
636+
* client is pinned to a node that hosts one of the index's primary
637+
* shards based on its currentInstance number.
638+
* @param settings Job settings
639+
* @param currentInstance Partition number
640+
* @param resource Configured write resource
641+
* @param log Logger to use
642+
* @return The RestRepository to be used by the partition writer
643+
*/
606644
private static RestRepository initSingleIndex(Settings settings, long currentInstance, Resource resource, Log log) {
607645
if (log.isDebugEnabled()) {
608646
log.debug(String.format("Resource [%s] resolves as a single index", resource));
@@ -638,9 +676,7 @@ private static RestRepository initSingleIndex(Settings settings, long currentIns
638676
}
639677

640678
// no routing necessary; select the relevant target shard/node
641-
Map<ShardInfo, NodeInfo> targetShards = Collections.emptyMap();
642-
643-
targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
679+
Map<ShardInfo, NodeInfo> targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
644680
repository.close();
645681

646682
Assert.isTrue(!targetShards.isEmpty(),
@@ -675,6 +711,15 @@ private static RestRepository initSingleIndex(Settings settings, long currentIns
675711
return repository;
676712
}
677713

714+
/**
715+
* Creates a RestRepository for use with a multi-index resource pattern. The client is left pinned
716+
* to the original node that it was pinned to since the shard locations cannot be determined at all.
717+
* @param settings Job settings
718+
* @param currentInstance Partition number
719+
* @param resource Configured write resource
720+
* @param log Logger to use
721+
* @return The RestRepository to be used by the partition writer
722+
*/
678723
private static RestRepository initMultiIndices(Settings settings, long currentInstance, Resource resource, Log log) {
679724
if (log.isDebugEnabled()) {
680725
log.debug(String.format("Resource [%s] resolves as an index pattern", resource));
@@ -688,4 +733,66 @@ private static RestRepository initMultiIndices(Settings settings, long currentIn
688733

689734
return new RestRepository(settings);
690735
}
736+
737+
/**
738+
* Validate and configure a rest repository for writing to an alias backed by a valid write-index.
739+
* This validation only checks that an alias is valid at time of job start, and makes no guarantees
740+
* about the alias changing during the execution.
741+
* @param response Response from the get alias call
742+
* @param settings Job settings
743+
* @param currentInstance Partition number
744+
* @param resource Configured write resource
745+
* @param log Logger to use
746+
* @return The RestRepository to be used by the partition writer
747+
*/
748+
private static RestRepository initAliasWrite(GetAliasesRequestBuilder.Response response, Settings settings, long currentInstance,
749+
Resource resource, Log log) {
750+
if (log.isDebugEnabled()) {
751+
log.debug(String.format("Resource [%s] resolves as an index alias", resource));
752+
}
753+
754+
// indexName -> aliasName -> alias definition
755+
Map<String, Map<String, IndicesAliases.Alias>> indexAliasTable = response.getIndices().getAll();
756+
757+
if (indexAliasTable.size() < 1) {
758+
// Sanity check
759+
throw new EsHadoopIllegalArgumentException("Cannot initialize alias write resource [" + resource.index() +
760+
"] if it does not have any alias entries.");
761+
} else if (indexAliasTable.size() > 1) {
762+
// Multiple indices, validate that one index-alias relation has its write index flag set
763+
String currentWriteIndex = null;
764+
for (Map.Entry<String, Map<String, IndicesAliases.Alias>> indexRow : indexAliasTable.entrySet()) {
765+
String indexName = indexRow.getKey();
766+
Map<String, IndicesAliases.Alias> aliases = indexRow.getValue();
767+
IndicesAliases.Alias aliasInfo = aliases.get(resource.index());
768+
if (aliasInfo.isWriteIndex()) {
769+
currentWriteIndex = indexName;
770+
break;
771+
}
772+
}
773+
if (currentWriteIndex == null) {
774+
throw new EsHadoopIllegalArgumentException("Attempting to write to alias [" + resource.index() + "], " +
775+
"but detected multiple indices [" + indexAliasTable.size() + "] with no write index selected. " +
776+
"Bailing out...");
777+
} else {
778+
if (log.isDebugEnabled()) {
779+
log.debug(String.format("Writing to currently configured write-index [%s]", currentWriteIndex));
780+
}
781+
}
782+
} else {
783+
// Single index in the alias, but we should still not pin the nodes
784+
if (log.isDebugEnabled()) {
785+
log.debug(String.format("Writing to the alias's single configured index [%s]", indexAliasTable.keySet().iterator().next()));
786+
}
787+
}
788+
789+
// alias-index write - since we don't know beforehand what concrete index will be used at any
790+
// given time during the job, use an already selected node
791+
String node = SettingsUtils.getPinnedNode(settings);
792+
if (log.isDebugEnabled()) {
793+
log.debug(String.format("Partition writer instance [%s] assigned to [%s]", currentInstance, node));
794+
}
795+
796+
return new RestRepository(settings);
797+
}
691798
}

mr/src/main/java/org/elasticsearch/hadoop/rest/request/GetAliasesRequestBuilder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,25 @@
2929

3030
public class GetAliasesRequestBuilder extends RequestBuilder<GetAliasesRequestBuilder.Response> {
3131
private final List<String> indices = new ArrayList<String>();
32+
private final List<String> aliases = new ArrayList<String>();
3233

3334
public GetAliasesRequestBuilder(RestClient client) {
3435
super(client);
3536
}
3637

38+
/**
39+
* The aliases to filter down in the response.
40+
*/
41+
public GetAliasesRequestBuilder aliases(String... values) {
42+
Collections.addAll(aliases, values);
43+
return this;
44+
}
45+
46+
/**
47+
* The indices to find aliases for.
48+
* Any aliases placed here are resolved on the server side to the indices that they contain.
49+
* If no indices are given, we default to "_all"
50+
*/
3751
public GetAliasesRequestBuilder indices(String... values) {
3852
Collections.addAll(indices, values);
3953
return this;
@@ -48,6 +62,9 @@ public Response execute() {
4862
path.append("_all");
4963
}
5064
path.append("/_alias");
65+
if (aliases.size() > 0) {
66+
path.append("/").append(StringUtils.concatenate(aliases));
67+
}
5168
return new Response((Map<String, Object>) client.get(path.toString(), null));
5269
}
5370

@@ -61,5 +78,9 @@ public Response(Map<String, Object> map) {
6178
public IndicesAliases getIndices() {
6279
return indicesAliases;
6380
}
81+
82+
public boolean hasAliases() {
83+
return indicesAliases.getAll().size() > 0;
84+
}
6485
}
6586
}

0 commit comments

Comments
 (0)