9
9
import org .apache .logging .log4j .LogManager ;
10
10
import org .apache .logging .log4j .Logger ;
11
11
import org .apache .logging .log4j .message .ParameterizedMessage ;
12
- import org .elasticsearch .ResourceAlreadyExistsException ;
13
12
import org .elasticsearch .action .ActionListener ;
14
- import org .elasticsearch .action .admin .indices .alias .Alias ;
15
- import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
13
+ import org .elasticsearch .action .DocWriteRequest ;
16
14
import org .elasticsearch .action .index .IndexRequest ;
17
15
import org .elasticsearch .client .Client ;
18
- import org .elasticsearch .cluster .ClusterState ;
19
- import org .elasticsearch .cluster .metadata .IndexAbstraction ;
16
+ import org .elasticsearch .cluster .metadata .Metadata ;
20
17
import org .elasticsearch .cluster .service .ClusterService ;
21
18
import org .elasticsearch .common .settings .Settings ;
22
19
import org .elasticsearch .common .xcontent .ToXContent ;
27
24
28
25
import static org .elasticsearch .xpack .core .ilm .LifecycleSettings .SLM_HISTORY_INDEX_ENABLED_SETTING ;
29
26
import static org .elasticsearch .xpack .core .slm .history .SnapshotLifecycleTemplateRegistry .INDEX_TEMPLATE_VERSION ;
27
+ import static org .elasticsearch .xpack .core .slm .history .SnapshotLifecycleTemplateRegistry .SLM_TEMPLATE_NAME ;
30
28
31
29
/**
32
30
* Records Snapshot Lifecycle Management actions as represented by {@link SnapshotHistoryItem} into an index
35
33
public class SnapshotHistoryStore {
36
34
private static final Logger logger = LogManager .getLogger (SnapshotHistoryStore .class );
37
35
38
- public static final String SLM_HISTORY_INDEX_PREFIX = ".slm-history-" + INDEX_TEMPLATE_VERSION + "-" ;
39
- public static final String SLM_HISTORY_ALIAS = ".slm-history-" + INDEX_TEMPLATE_VERSION ;
36
+ public static final String SLM_HISTORY_DATA_STREAM = "slm-history-" + INDEX_TEMPLATE_VERSION ;
40
37
41
38
private final Client client ;
42
39
private final ClusterService clusterService ;
@@ -59,85 +56,29 @@ public void putAsync(SnapshotHistoryItem item) {
59
56
SLM_HISTORY_INDEX_ENABLED_SETTING .getKey (), item );
60
57
return ;
61
58
}
62
- logger .trace ("about to index snapshot history item in index [{}]: [{}]" , SLM_HISTORY_ALIAS , item );
63
- ensureHistoryIndex (client , clusterService .state (), ActionListener .wrap (createdIndex -> {
64
- try (XContentBuilder builder = XContentFactory .jsonBuilder ()) {
65
- item .toXContent (builder , ToXContent .EMPTY_PARAMS );
66
- IndexRequest request = new IndexRequest (SLM_HISTORY_ALIAS )
67
- .source (builder );
68
- client .index (request , ActionListener .wrap (indexResponse -> {
69
- logger .debug ("successfully indexed snapshot history item with id [{}] in index [{}]: [{}]" ,
70
- indexResponse .getId (), SLM_HISTORY_ALIAS , item );
71
- }, exception -> {
72
- logger .error (new ParameterizedMessage ("failed to index snapshot history item in index [{}]: [{}]" ,
73
- SLM_HISTORY_ALIAS , item ), exception );
74
- }));
75
- } catch (IOException exception ) {
76
- logger .error (new ParameterizedMessage ("failed to index snapshot history item in index [{}]: [{}]" ,
77
- SLM_HISTORY_ALIAS , item ), exception );
78
- }
79
- }, ex -> logger .error (new ParameterizedMessage ("failed to ensure SLM history index exists, not indexing history item [{}]" ,
80
- item ), ex )));
81
- }
82
-
83
- /**
84
- * Checks if the SLM history index exists, and if not, creates it.
85
- *
86
- * @param client The client to use to create the index if needed
87
- * @param state The current cluster state, to determine if the alias exists
88
- * @param andThen Called after the index has been created. `onResponse` called with `true` if the index was created,
89
- * `false` if it already existed.
90
- */
91
- static void ensureHistoryIndex (Client client , ClusterState state , ActionListener <Boolean > andThen ) {
92
- final String initialHistoryIndexName = SLM_HISTORY_INDEX_PREFIX + "000001" ;
93
- final IndexAbstraction slmHistory = state .metadata ().getIndicesLookup ().get (SLM_HISTORY_ALIAS );
94
- final IndexAbstraction initialHistoryIndex = state .metadata ().getIndicesLookup ().get (initialHistoryIndexName );
95
-
96
- if (slmHistory == null && initialHistoryIndex == null ) {
97
- // No alias or index exists with the expected names, so create the index with appropriate alias
98
- client .admin ().indices ().prepareCreate (initialHistoryIndexName )
99
- .setWaitForActiveShards (1 )
100
- .addAlias (new Alias (SLM_HISTORY_ALIAS )
101
- .writeIndex (true )
102
- .isHidden (true ))
103
- .execute (new ActionListener <CreateIndexResponse >() {
104
- @ Override
105
- public void onResponse (CreateIndexResponse response ) {
106
- andThen .onResponse (true );
107
- }
108
-
109
- @ Override
110
- public void onFailure (Exception e ) {
111
- if (e instanceof ResourceAlreadyExistsException ) {
112
- // The index didn't exist before we made the call, there was probably a race - just ignore this
113
- logger .debug ("index [{}] was created after checking for its existence, likely due to a concurrent call" ,
114
- initialHistoryIndexName );
115
- andThen .onResponse (false );
116
- } else {
117
- andThen .onFailure (e );
118
- }
119
- }
120
- });
121
- } else if (slmHistory == null ) {
122
- // alias does not exist but initial index does, something is broken
123
- andThen .onFailure (new IllegalStateException ("SLM history index [" + initialHistoryIndexName +
124
- "] already exists but does not have alias [" + SLM_HISTORY_ALIAS + "]" ));
125
- } else if (slmHistory .getType () == IndexAbstraction .Type .ALIAS ) {
126
- if (slmHistory .getWriteIndex () != null ) {
127
- // The alias exists and has a write index, so we're good
128
- andThen .onResponse (false );
129
- } else {
130
- // The alias does not have a write index, so we can't index into it
131
- andThen .onFailure (new IllegalStateException ("SLM history alias [" + SLM_HISTORY_ALIAS + "does not have a write index" ));
132
- }
133
- } else if (slmHistory .getType () != IndexAbstraction .Type .ALIAS ) {
134
- // This is not an alias, error out
135
- andThen .onFailure (new IllegalStateException ("SLM history alias [" + SLM_HISTORY_ALIAS +
136
- "] already exists as " + slmHistory .getType ().getDisplayName ()));
137
- } else {
138
- logger .error ("unexpected IndexOrAlias for [{}]: [{}]" , SLM_HISTORY_ALIAS , slmHistory );
139
- // (slmHistory.isAlias() == true) but (slmHistory instanceof Alias == false)?
140
- assert false : SLM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously" ;
59
+ logger .trace ("about to index snapshot history item in data stream [{}]: [{}]" , SLM_HISTORY_DATA_STREAM , item );
60
+ Metadata metadata = clusterService .state ().getMetadata ();
61
+ if (metadata .dataStreams ().containsKey (SLM_HISTORY_DATA_STREAM ) == false &&
62
+ metadata .templatesV2 ().containsKey (SLM_TEMPLATE_NAME ) == false ) {
63
+ logger .error (new ParameterizedMessage ("failed to index snapshot history item, data stream [{}] and template [{}] don't exist" ,
64
+ SLM_HISTORY_DATA_STREAM , SLM_TEMPLATE_NAME ));
65
+ return ;
66
+ }
67
+ try (XContentBuilder builder = XContentFactory .jsonBuilder ()) {
68
+ item .toXContent (builder , ToXContent .EMPTY_PARAMS );
69
+ IndexRequest request = new IndexRequest (SLM_HISTORY_DATA_STREAM )
70
+ .opType (DocWriteRequest .OpType .CREATE )
71
+ .source (builder );
72
+ client .index (request , ActionListener .wrap (indexResponse -> {
73
+ logger .debug ("successfully indexed snapshot history item with id [{}] in data stream [{}]: [{}]" ,
74
+ indexResponse .getId (), SLM_HISTORY_DATA_STREAM , item );
75
+ }, exception -> {
76
+ logger .error (new ParameterizedMessage ("failed to index snapshot history item in data stream [{}]: [{}]" ,
77
+ SLM_HISTORY_DATA_STREAM , item ), exception );
78
+ }));
79
+ } catch (IOException exception ) {
80
+ logger .error (new ParameterizedMessage ("failed to index snapshot history item in data stream [{}]: [{}]" ,
81
+ SLM_HISTORY_DATA_STREAM , item ), exception );
141
82
}
142
83
}
143
84
}
0 commit comments