@@ -57,51 +57,102 @@ private BulkByScrollParallelizationHelper() {}
57
57
*
58
58
* If slices are set as {@code "auto"}, this class will resolve that to a specific number based on characteristics of the source
59
59
* indices. A request with {@code "auto"} slices may end up being sliced or unsliced.
60
+ *
61
+ * This method is equivalent to calling {@link #initTaskState} followed by {@link #executeSlicedAction}
60
62
*/
61
63
static <Request extends AbstractBulkByScrollRequest <Request >> void startSlicedAction (
62
- Request request ,
63
- BulkByScrollTask task ,
64
- ActionType <BulkByScrollResponse > action ,
65
- ActionListener <BulkByScrollResponse > listener ,
66
- Client client ,
67
- DiscoveryNode node ,
68
- Runnable workerAction ) {
64
+ Request request ,
65
+ BulkByScrollTask task ,
66
+ ActionType <BulkByScrollResponse > action ,
67
+ ActionListener <BulkByScrollResponse > listener ,
68
+ Client client ,
69
+ DiscoveryNode node ,
70
+ Runnable workerAction ) {
71
+ initTaskState (task , request , client , new ActionListener <Void >() {
72
+ @ Override
73
+ public void onResponse (Void aVoid ) {
74
+ executeSlicedAction (task , request , action , listener , client , node , workerAction );
75
+ }
76
+
77
+ @ Override
78
+ public void onFailure (Exception e ) {
79
+ listener .onFailure (e );
80
+ }
81
+ });
82
+ }
83
+
84
+ /**
85
+ * Takes an action and a {@link BulkByScrollTask} and runs it with regard to whether this task is a
86
+ * leader or worker.
87
+ *
88
+ * If this task is a worker, the worker action in the given {@link Runnable} will be started on the local
89
+ * node. If the task is a leader (i.e. the number of slices is more than 1), then a subrequest will be
90
+ * created for each slice and sent.
91
+ *
92
+ * This method can only be called after the task state is initialized {@link #initTaskState}.
93
+ */
94
+ static <Request extends AbstractBulkByScrollRequest <Request >> void executeSlicedAction (
95
+ BulkByScrollTask task ,
96
+ Request request ,
97
+ ActionType <BulkByScrollResponse > action ,
98
+ ActionListener <BulkByScrollResponse > listener ,
99
+ Client client ,
100
+ DiscoveryNode node ,
101
+ Runnable workerAction ) {
102
+ if (task .isLeader ()) {
103
+ sendSubRequests (client , action , node .getId (), task , request , listener );
104
+ } else if (task .isWorker ()) {
105
+ workerAction .run ();
106
+ } else {
107
+ throw new AssertionError ("Task should have been initialized at this point." );
108
+ }
109
+ }
69
110
70
- if (request .getSlices () == AbstractBulkByScrollRequest .AUTO_SLICES ) {
111
+ /**
112
+ * Takes a {@link BulkByScrollTask} and ensures that its initial task state (leader or worker) is set.
113
+ *
114
+ * If slices are set as {@code "auto"}, this method will resolve that to a specific number based on
115
+ * characteristics of the source indices. A request with {@code "auto"} slices may end up being sliced or
116
+ * unsliced. This method does not execute the action. In order to execute the action see
117
+ * {@link #executeSlicedAction}
118
+ */
119
+ static <Request extends AbstractBulkByScrollRequest <Request >> void initTaskState (
120
+ BulkByScrollTask task ,
121
+ Request request ,
122
+ Client client ,
123
+ ActionListener <Void > listener ) {
124
+ int configuredSlices = request .getSlices ();
125
+ if (configuredSlices == AbstractBulkByScrollRequest .AUTO_SLICES ) {
71
126
ClusterSearchShardsRequest shardsRequest = new ClusterSearchShardsRequest ();
72
127
shardsRequest .indices (request .getSearchRequest ().indices ());
73
- client .admin ().cluster ().searchShards (shardsRequest , ActionListener .wrap (
74
- response -> {
75
- int actualNumSlices = countSlicesBasedOnShards (response );
76
- sliceConditionally (request , task , action , listener , client , node , workerAction , actualNumSlices );
77
- },
78
- listener ::onFailure
79
- ));
128
+ client .admin ().cluster ().searchShards (shardsRequest , new ActionListener <ClusterSearchShardsResponse >() {
129
+ @ Override
130
+ public void onResponse (ClusterSearchShardsResponse response ) {
131
+ setWorkerCount (request , task , countSlicesBasedOnShards (response ));
132
+ listener .onResponse (null );
133
+ }
134
+
135
+ @ Override
136
+ public void onFailure (Exception e ) {
137
+ listener .onFailure (e );
138
+ }
139
+ });
80
140
} else {
81
- sliceConditionally (request , task , action , listener , client , node , workerAction , request .getSlices ());
141
+ setWorkerCount (request , task , configuredSlices );
142
+ listener .onResponse (null );
82
143
}
83
144
}
84
145
85
- private static <Request extends AbstractBulkByScrollRequest <Request >> void sliceConditionally (
86
- Request request ,
87
- BulkByScrollTask task ,
88
- ActionType <BulkByScrollResponse > action ,
89
- ActionListener <BulkByScrollResponse > listener ,
90
- Client client ,
91
- DiscoveryNode node ,
92
- Runnable workerAction ,
93
- int slices ) {
94
-
146
+ private static <Request extends AbstractBulkByScrollRequest <Request >> void setWorkerCount (
147
+ Request request ,
148
+ BulkByScrollTask task ,
149
+ int slices ) {
95
150
if (slices > 1 ) {
96
151
task .setWorkerCount (slices );
97
- sendSubRequests (client , action , node .getId (), task , request , listener );
98
152
} else {
99
153
SliceBuilder sliceBuilder = request .getSearchRequest ().source ().slice ();
100
- Integer sliceId = sliceBuilder == null
101
- ? null
102
- : sliceBuilder .getId ();
154
+ Integer sliceId = sliceBuilder == null ? null : sliceBuilder .getId ();
103
155
task .setWorker (request .getRequestsPerSecond (), sliceId );
104
- workerAction .run ();
105
156
}
106
157
}
107
158
0 commit comments