@@ -166,20 +166,66 @@ public boolean execute() {
166
166
* 将source中的startSlot到endSlot迁移到target
167
167
*
168
168
*/
169
- public boolean migrateSlot (long appId , long appAuditId , InstanceInfo sourceInstanceInfo , InstanceInfo targetInstanceInfo , int startSlot , int endSlot , boolean isPipelineMigrate ) {
169
+ // public boolean migrateSlotOld(long appId, long appAuditId, InstanceInfo sourceInstanceInfo, InstanceInfo targetInstanceInfo, int startSlot, int endSlot, PipelineEnum pipelineEnum) {
170
+ // long startTime = System.currentTimeMillis();
171
+ // InstanceReshardProcess instanceReshardProcess = saveInstanceReshardProcess(appId, appAuditId, sourceInstanceInfo, targetInstanceInfo, startSlot, endSlot, pipelineEnum);
172
+ // //源和目标Jedis
173
+ // Jedis sourceJedis = redisCenter.getJedis(appId, sourceInstanceInfo.getIp(), sourceInstanceInfo.getPort(), defaultTimeout, defaultTimeout);
174
+ // Jedis targetJedis = redisCenter.getJedis(appId, targetInstanceInfo.getIp(), targetInstanceInfo.getPort(), defaultTimeout, defaultTimeout);
175
+ // //逐个slot迁移
176
+ // boolean hasError = false;
177
+ // for (int slot = startSlot; slot <= endSlot; slot++) {
178
+ // long slotStartTime = System.currentTimeMillis();
179
+ // try {
180
+ // instanceReshardProcessDao.updateMigratingSlot(instanceReshardProcess.getId(), slot);
181
+ // //num是迁移key的总数
182
+ // int num = migrateSlotData(appId, sourceJedis, targetJedis, slot, pipelineEnum);
183
+ // instanceReshardProcessDao.increaseFinishSlotNum(instanceReshardProcess.getId());
184
+ // logger.warn("clusterReshard:{}->{}, slot={}, keys={}, costTime={} ms", sourceInstanceInfo.getHostPort(),
185
+ // targetInstanceInfo.getHostPort(), slot, num, (System.currentTimeMillis() - slotStartTime));
186
+ // } catch (Exception e) {
187
+ // logger.error(e.getMessage(), e);
188
+ // hasError = true;
189
+ // break;
190
+ // }
191
+ // }
192
+ // long endTime = System.currentTimeMillis();
193
+ // logger.warn("clusterReshard:{}->{}, slot:{}->{}, costTime={} ms", sourceInstanceInfo.getHostPort(),
194
+ // targetInstanceInfo.getHostPort(), startSlot, endSlot, (endTime - startTime));
195
+ // if (hasError) {
196
+ // instanceReshardProcessDao.updateStatus(instanceReshardProcess.getId(), ReshardStatusEnum.ERROR.getValue());
197
+ // return false;
198
+ // } else {
199
+ // instanceReshardProcessDao.updateStatus(instanceReshardProcess.getId(), ReshardStatusEnum.FINISH.getValue());
200
+ // instanceReshardProcessDao.updateEndTime(instanceReshardProcess.getId(), new Date());
201
+ // return true;
202
+ // }
203
+ // }
204
+
205
+ /**
206
+ * 将source中的startSlot到endSlot迁移到target
207
+ *
208
+ */
209
+ public boolean migrateSlot (InstanceReshardProcess instanceReshardProcess ) {
210
+ long appId = instanceReshardProcess .getAppId ();
211
+ int migratingSlot = instanceReshardProcess .getMigratingSlot ();
212
+ int endSlot = instanceReshardProcess .getEndSlot ();
213
+ int isPipeline = instanceReshardProcess .getIsPipeline ();
214
+ InstanceInfo sourceInstanceInfo = instanceReshardProcess .getSourceInstanceInfo ();
215
+ InstanceInfo targetInstanceInfo = instanceReshardProcess .getTargetInstanceInfo ();
216
+
170
217
long startTime = System .currentTimeMillis ();
171
- InstanceReshardProcess instanceReshardProcess = saveInstanceReshardProcess (appId , appAuditId , sourceInstanceInfo , targetInstanceInfo , startSlot , endSlot );
172
218
//源和目标Jedis
173
219
Jedis sourceJedis = redisCenter .getJedis (appId , sourceInstanceInfo .getIp (), sourceInstanceInfo .getPort (), defaultTimeout , defaultTimeout );
174
220
Jedis targetJedis = redisCenter .getJedis (appId , targetInstanceInfo .getIp (), targetInstanceInfo .getPort (), defaultTimeout , defaultTimeout );
175
221
//逐个slot迁移
176
222
boolean hasError = false ;
177
- for (int slot = startSlot ; slot <= endSlot ; slot ++) {
223
+ for (int slot = migratingSlot ; slot <= endSlot ; slot ++) {
178
224
long slotStartTime = System .currentTimeMillis ();
179
225
try {
180
226
instanceReshardProcessDao .updateMigratingSlot (instanceReshardProcess .getId (), slot );
181
227
//num是迁移key的总数
182
- int num = migrateSlotData (appId , sourceJedis , targetJedis , slot , isPipelineMigrate );
228
+ int num = migrateSlotData (appId , sourceJedis , targetJedis , slot , isPipeline );
183
229
instanceReshardProcessDao .increaseFinishSlotNum (instanceReshardProcess .getId ());
184
230
logger .warn ("clusterReshard:{}->{}, slot={}, keys={}, costTime={} ms" , sourceInstanceInfo .getHostPort (),
185
231
targetInstanceInfo .getHostPort (), slot , num , (System .currentTimeMillis () - slotStartTime ));
@@ -191,7 +237,7 @@ public boolean migrateSlot(long appId, long appAuditId, InstanceInfo sourceInsta
191
237
}
192
238
long endTime = System .currentTimeMillis ();
193
239
logger .warn ("clusterReshard:{}->{}, slot:{}->{}, costTime={} ms" , sourceInstanceInfo .getHostPort (),
194
- targetInstanceInfo .getHostPort (), startSlot , endSlot , (endTime - startTime ));
240
+ targetInstanceInfo .getHostPort (), migratingSlot , endSlot , (endTime - startTime ));
195
241
if (hasError ) {
196
242
instanceReshardProcessDao .updateStatus (instanceReshardProcess .getId (), ReshardStatusEnum .ERROR .getValue ());
197
243
return false ;
@@ -202,45 +248,11 @@ public boolean migrateSlot(long appId, long appAuditId, InstanceInfo sourceInsta
202
248
}
203
249
}
204
250
205
-
206
- /**
207
- * 保存进度
208
- * @param appId
209
- * @param appAuditId
210
- * @param sourceInstanceInfo
211
- * @param targetInstanceInfo
212
- * @param startSlot
213
- * @param endSlot
214
- * @return
215
- */
216
- private InstanceReshardProcess saveInstanceReshardProcess (long appId , long appAuditId ,
217
- InstanceInfo sourceInstanceInfo , InstanceInfo targetInstanceInfo , int startSlot , int endSlot ) {
218
- Date now = new Date ();
219
- InstanceReshardProcess instanceReshardProcess = new InstanceReshardProcess ();
220
- instanceReshardProcess .setAppId (appId );
221
- instanceReshardProcess .setAuditId (appAuditId );
222
- instanceReshardProcess .setFinishSlotNum (0 );
223
- instanceReshardProcess .setSourceInstanceId (sourceInstanceInfo .getId ());
224
- instanceReshardProcess .setTargetInstanceId (targetInstanceInfo .getId ());
225
- instanceReshardProcess .setMigratingSlot (startSlot );
226
- instanceReshardProcess .setStartSlot (startSlot );
227
- instanceReshardProcess .setEndSlot (endSlot );
228
- instanceReshardProcess .setStatus (ReshardStatusEnum .RUNNING .getValue ());
229
- instanceReshardProcess .setStartTime (now );
230
- //用status控制显示结束时间
231
- instanceReshardProcess .setEndTime (now );
232
- instanceReshardProcess .setCreateTime (now );
233
- instanceReshardProcess .setUpdateTime (now );
234
-
235
- instanceReshardProcessDao .save (instanceReshardProcess );
236
- return instanceReshardProcess ;
237
- }
238
-
239
251
/**
240
252
* 迁移slot数据,并稳定slot配置
241
253
* @throws Exception
242
254
*/
243
- private int moveSlotData (final long appId , final Jedis source , final Jedis target , final int slot , boolean isPipelineMigrate ) throws Exception {
255
+ private int moveSlotData (final long appId , final Jedis source , final Jedis target , final int slot , int isPipeline ) throws Exception {
244
256
int num = 0 ;
245
257
while (true ) {
246
258
final Set <String > keys = new HashSet <String >();
@@ -322,7 +334,7 @@ public boolean execute() {
322
334
* MIGRATE host port key destination-db timeout [COPY] [REPLACE]
323
335
* CLUSTER SETSLOT <slot> NODE <node_id> 将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽>,然后再进行指派。
324
336
*/
325
- private int migrateSlotData (long appId , final Jedis source , final Jedis target , final int slot , boolean isPipelineMigrate ) {
337
+ private int migrateSlotData (long appId , final Jedis source , final Jedis target , final int slot , int isPipeline ) {
326
338
int num = 0 ;
327
339
final String sourceNodeId = getNodeId (appId , source );
328
340
final String targetNodeId = getNodeId (appId , target );
@@ -357,7 +369,7 @@ public boolean execute() {
357
369
}
358
370
359
371
try {
360
- num = moveSlotData (appId , source , target , slot , isPipelineMigrate );
372
+ num = moveSlotData (appId , source , target , slot , isPipeline );
361
373
} catch (Exception e ) {
362
374
isError = true ;
363
375
logger .error (e .getMessage (), e );
0 commit comments