20
20
package org .elasticsearch .action .admin .indices .mapping .create ;
21
21
22
22
import com .google .inject .Inject ;
23
+ import org .elasticsearch .ExceptionsHelper ;
23
24
import org .elasticsearch .action .ActionListener ;
24
25
import org .elasticsearch .action .Actions ;
25
26
import org .elasticsearch .action .TransportActions ;
27
+ import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
28
+ import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
29
+ import org .elasticsearch .action .admin .indices .create .TransportCreateIndexAction ;
26
30
import org .elasticsearch .action .support .BaseAction ;
27
31
import org .elasticsearch .cluster .ClusterService ;
28
32
import org .elasticsearch .cluster .metadata .MetaDataService ;
33
+ import org .elasticsearch .indices .IndexAlreadyExistsException ;
29
34
import org .elasticsearch .threadpool .ThreadPool ;
30
35
import org .elasticsearch .transport .*;
31
36
import org .elasticsearch .util .io .VoidStreamable ;
32
37
import org .elasticsearch .util .settings .Settings ;
33
38
34
39
import java .io .IOException ;
40
+ import java .util .concurrent .CountDownLatch ;
41
+ import java .util .concurrent .TimeUnit ;
35
42
36
43
/**
37
44
* @author kimchy (Shay Banon)
@@ -44,21 +51,54 @@ public class TransportCreateMappingAction extends BaseAction<CreateMappingReques
44
51
45
52
private final MetaDataService metaDataService ;
46
53
54
+ private final TransportCreateIndexAction createIndexAction ;
55
+
47
56
private final ThreadPool threadPool ;
48
57
58
+ private final boolean autoCreateIndex ;
59
+
49
60
@ Inject public TransportCreateMappingAction (Settings settings , TransportService transportService , ClusterService clusterService ,
50
- ThreadPool threadPool , MetaDataService metaDataService ) {
61
+ ThreadPool threadPool , MetaDataService metaDataService , TransportCreateIndexAction createIndexAction ) {
51
62
super (settings );
52
63
this .transportService = transportService ;
53
64
this .clusterService = clusterService ;
54
65
this .threadPool = threadPool ;
55
66
this .metaDataService = metaDataService ;
67
+ this .createIndexAction = createIndexAction ;
68
+ this .autoCreateIndex = settings .getAsBoolean ("action.autoCreateIndex" , true );
56
69
57
70
transportService .registerHandler (TransportActions .Admin .Indices .Mapping .CREATE , new TransportHandler ());
58
71
}
59
72
60
73
@ Override protected void doExecute (final CreateMappingRequest request , final ActionListener <CreateMappingResponse > listener ) {
61
74
final String [] indices = Actions .processIndices (clusterService .state (), request .indices ());
75
+ if (autoCreateIndex ) {
76
+ final CountDownLatch latch = new CountDownLatch (indices .length );
77
+ for (String index : indices ) {
78
+ if (!clusterService .state ().metaData ().hasIndex (index )) {
79
+ createIndexAction .execute (new CreateIndexRequest (index ), new ActionListener <CreateIndexResponse >() {
80
+ @ Override public void onResponse (CreateIndexResponse result ) {
81
+ latch .countDown ();
82
+ }
83
+
84
+ @ Override public void onFailure (Throwable e ) {
85
+ if (ExceptionsHelper .unwrapCause (e ) instanceof IndexAlreadyExistsException ) {
86
+ latch .countDown ();
87
+ } else {
88
+ listener .onFailure (e );
89
+ }
90
+ }
91
+ });
92
+ } else {
93
+ latch .countDown ();
94
+ }
95
+ }
96
+ try {
97
+ latch .await (10 , TimeUnit .SECONDS );
98
+ } catch (InterruptedException e ) {
99
+ // ignore
100
+ }
101
+ }
62
102
if (clusterService .state ().nodes ().localNodeMaster ()) {
63
103
threadPool .execute (new Runnable () {
64
104
@ Override public void run () {
0 commit comments