diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index cbe7516b0ede0..e48f712a16030 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -2479,6 +2479,21 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { } } + /** + * Requests the namenode to tell all datanodes to use a new bandwidth value. + * @param bandwidth Bandwidth in bytes per second for this datanode. + * @param type DataNode bandwidth type. + * @throws IOException If an I/O error occurred + * + * @see ClientProtocol#setDataNodeBandwidth(long, String) + */ + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + checkOpen(); + try (TraceScope ignored = tracer.newScope("setDataNodeBandwidth-" + type)) { + namenode.setDataNodeBandwidth(bandwidth, type); + } + } + /** * @see ClientProtocol#finalizeUpgrade() */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index dac205158d0f4..171b52c530e2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -2088,12 +2088,23 @@ public Token getDelegationToken(String renewer) * bandwidth to be used by a datanode during balancing. * * @param bandwidth Balancer bandwidth in bytes per second for all datanodes. - * @throws IOException + * @throws IOException If an I/O error occurred */ public void setBalancerBandwidth(long bandwidth) throws IOException { dfs.setBalancerBandwidth(bandwidth); } + /** + * Requests the namenode to tell all datanodes to reset the bandwidth of the specified type. + * + * @param bandwidth Bandwidth in bytes per second for all datanodes. + * @param type DataNode bandwidth type. + * @throws IOException + */ + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + dfs.setDataNodeBandwidth(bandwidth, type); + } + /** * Get a canonical service name for this file system. If the URI is logical, * the hostname part of the URI will be returned. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java index ff2b2ba6e35e3..0f1462138a566 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java @@ -1057,6 +1057,16 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { defaultDFS.setBalancerBandwidth(bandwidth); } + @Override + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + if (this.vfs == null) { + super.setDataNodeBandwidth(bandwidth, type); + return; + } + checkDefaultDFS(defaultDFS, "setDataNodeBandwidth"); + defaultDFS.setDataNodeBandwidth(bandwidth, type); + } + @Override public String getCanonicalServiceName() { if (this.vfs == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index b56b7916ff798..c1d58efe11749 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1050,6 +1050,16 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) @Idempotent void setBalancerBandwidth(long bandwidth) throws IOException; + /** + * Tell all datanodes to reset the bandwidth of the specified type. + * + * @param bandwidth Bandwidth in bytes per second for this datanode. + * @param type DataNode bandwidth type. + * @throws IOException If an I/O error occurred + */ + @Idempotent + void setDataNodeBandwidth(long bandwidth, String type) throws IOException; + /** * Get the file info for a specific file or directory. * @param src The string representation of the path to the file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 78d2b312b4f56..aaf69afacaf1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -189,6 +189,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetDataNodeBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; @@ -976,6 +977,16 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { ipc(() -> rpcProxy.setBalancerBandwidth(null, req)); } + @Override + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + SetDataNodeBandwidthRequestProto req = + SetDataNodeBandwidthRequestProto.newBuilder() + .setBandwidth(bandwidth) + .setType(type) + .build(); + ipc(() -> rpcProxy.setDataNodeBandwidth(null, req)); + } + @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 18595dbcbef86..2d2a6296da23e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -772,9 +772,17 @@ message SetBalancerBandwidthRequestProto { required int64 bandwidth = 1; } +message SetDataNodeBandwidthRequestProto { + required int64 bandwidth = 1; + required string type = 2; +} + message SetBalancerBandwidthResponseProto { // void response } +message SetDataNodeBandwidthResponseProto { // void response +} + message GetDataEncryptionKeyRequestProto { // no parameters } @@ -999,6 +1007,8 @@ service ClientNamenodeProtocol { returns(hadoop.common.CancelDelegationTokenResponseProto); rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto) returns(SetBalancerBandwidthResponseProto); + rpc setDataNodeBandwidth(SetDataNodeBandwidthRequestProto) + returns(SetDataNodeBandwidthResponseProto); rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto) returns(GetDataEncryptionKeyResponseProto); rpc createSnapshot(CreateSnapshotRequestProto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 361bdf21fb443..1fa667c1cf4ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -1314,6 +1314,16 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + @Override + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("setDataNodeBandwidth", + new Class[] {long.class, String.class}, bandwidth, type); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + /** * Recursively get all the locations for the path. * For example, there are some mount points: diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 77bebab4ade71..ad68d15aab829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1639,6 +1639,11 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { clientProto.setBalancerBandwidth(bandwidth); } + @Override // ClientProtocol + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + clientProto.setDataNodeBandwidth(bandwidth, type); + } + @Override // ClientProtocol public ContentSummary getContentSummary(String path) throws IOException { return clientProto.getContentSummary(path); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index ddbfdc9727c3a..5919fb051614a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -2205,6 +2205,15 @@ public void testSetBalancerBandwidth() throws Exception { }, 100, 60 * 1000); } + @Test + public void testSetDataNodeBandwidth() throws Exception { + routerProtocol.setDataNodeBandwidth(1000L, "transfer"); + ArrayList datanodes = cluster.getCluster().getDataNodes(); + GenericTestUtils.waitFor(() -> { + return datanodes.get(0).getTransferBandwidth() == 1000L; + }, 100, 60 * 1000); + } + @Test public void testAddClientIpPortToCallerContext() throws IOException { GenericTestUtils.LogCapturer auditLog = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 47629d87a58ea..956964341d8c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -225,6 +225,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetDataNodeBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetDataNodeBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; @@ -415,6 +417,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements protected static final SetBalancerBandwidthResponseProto VOID_SETBALANCERBANDWIDTH_RESPONSE = SetBalancerBandwidthResponseProto.newBuilder().build(); + protected static final SetDataNodeBandwidthResponseProto VOID_SETDATANODEBANDWIDTH_RESPONSE = + SetDataNodeBandwidthResponseProto.newBuilder().build(); + protected static final SetAclResponseProto VOID_SETACL_RESPONSE = SetAclResponseProto.getDefaultInstance(); @@ -1254,6 +1259,18 @@ public SetBalancerBandwidthResponseProto setBalancerBandwidth( } } + @Override + public SetDataNodeBandwidthResponseProto setDataNodeBandwidth( + RpcController controller, SetDataNodeBandwidthRequestProto req) + throws ServiceException { + try { + server.setDataNodeBandwidth(req.getBandwidth(), req.getType()); + return VOID_SETDATANODEBANDWIDTH_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public GetDataEncryptionKeyResponseProto getDataEncryptionKey( RpcController controller, GetDataEncryptionKeyRequestProto request) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index b7cee876623bb..dee14c7f33154 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.KeyValueProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DataNodeBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; @@ -90,6 +91,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; +import org.apache.hadoop.hdfs.server.protocol.DataNodeBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -458,6 +460,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) { switch (proto.getCmdType()) { case BalancerBandwidthCommand: return PBHelper.convert(proto.getBalancerCmd()); + case DataNodeBandwidthCommand: + return PBHelper.convert(proto.getBandwidthCmd()); case BlockCommand: return PBHelper.convert(proto.getBlkCmd()); case BlockRecoveryCommand: @@ -483,6 +487,13 @@ public static BalancerBandwidthCommandProto convert( .setBandwidth(bbCmd.getBalancerBandwidthValue()).build(); } + public static DataNodeBandwidthCommandProto convert( + DataNodeBandwidthCommand bbCmd) { + return DataNodeBandwidthCommandProto.newBuilder() + .setBandwidth(bbCmd.getDataNodeBandwidthValue()) + .setType(bbCmd.getDataNodeBandwidthType()).build(); + } + public static KeyUpdateCommandProto convert(KeyUpdateCommand cmd) { return KeyUpdateCommandProto.newBuilder() .setKeys(convert(cmd.getExportedKeys())).build(); @@ -572,6 +583,11 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { .setBalancerCmd( PBHelper.convert((BalancerBandwidthCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_DATANODEBANDWIDTHUPDATE: + builder.setCmdType(DatanodeCommandProto.Type.DataNodeBandwidthCommand) + .setBandwidthCmd( + PBHelper.convert((DataNodeBandwidthCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: builder .setCmdType(DatanodeCommandProto.Type.KeyUpdateCommand) @@ -709,6 +725,12 @@ public static BalancerBandwidthCommand convert( return new BalancerBandwidthCommand(balancerCmd.getBandwidth()); } + public static DataNodeBandwidthCommand convert( + DataNodeBandwidthCommandProto bandwidthCmd) { + return new DataNodeBandwidthCommand(bandwidthCmd.getBandwidth(), + bandwidthCmd.getType()); + } + public static ReceivedDeletedBlockInfoProto convert( ReceivedDeletedBlockInfo receivedDeletedBlockInfo) { ReceivedDeletedBlockInfoProto.Builder builder = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 1ec63e0ca83da..1cc124cd28f17 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -193,6 +193,8 @@ public Type getType() { // specified datanode, this value will be set back to 0. private long bandwidth; + private Map throttlers = new HashMap<>(); + /** A queue of blocks to be replicated by this datanode */ private final BlockQueue replicateBlocks = new BlockQueue<>(); @@ -1027,6 +1029,21 @@ public synchronized void setBalancerBandwidth(long bandwidth) { this.bandwidth = bandwidth; } + /** + * @return bandwidth throttlers for this datanode + */ + public synchronized Map getDataNodeBandwidth() { + return this.throttlers; + } + + /** + * @param dnBandwidth Bandwidth in bytes per second for this datanode + * @param type DataNode bandwidth type. + */ + public synchronized void setDataNodeBandwidth(long dnBandwidth, String type) { + this.throttlers.put(type, dnBandwidth); + } + @Override public String dumpDatanode() { StringBuilder sb = new StringBuilder(super.dumpDatanode()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 01f1af9624d05..534041d8d20a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1930,8 +1930,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, if (!pendingList.isEmpty()) { // If the block is deleted, the block size will become // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't - // need - // to send for replication or reconstruction + // need to send for replication or reconstruction Iterator iterator = pendingList.iterator(); while (iterator.hasNext()) { BlockTargetPair cmd = iterator.next(); @@ -1962,9 +1961,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)); } - // cache commands addCacheCommands(blockPoolId, nodeinfo, cmds); - // key update command blockManager.addKeyUpdateCommand(cmds, nodeinfo); // check for balancer bandwidth update @@ -1974,6 +1971,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, nodeinfo.setBalancerBandwidth(0); } + if (!nodeinfo.getDataNodeBandwidth().isEmpty()) { + for (Map.Entry entry : nodeinfo.getDataNodeBandwidth().entrySet()){ + cmds.add(new DataNodeBandwidthCommand(entry.getValue(), entry.getKey())); + } + nodeinfo.getDataNodeBandwidth().clear(); + } + Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned"); if (slowPeerTracker.isSlowPeerTrackerEnabled()) { @@ -2085,7 +2089,25 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { } } } - + + /** + * Tell all datanodes to reset the bandwidth of the specified type. + * + * A system administrator can tune the datanode bandwidth dynamically by calling + * "dfsadmin -setDataNodeBandwidth newbandwidth -type " + * + * @param bandwidth Bandwidth in bytes per second for all datanodes. + * @param type DataNode bandwidth type. + * @throws IOException If an I/O error occurred + */ + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + synchronized(this) { + for (DatanodeDescriptor nodeInfo : datanodeMap.values()) { + nodeInfo.setDataNodeBandwidth(bandwidth, type); + } + } + } + public void markAllDatanodesStaleAndSetKeyUpdateIfNeed() { LOG.info("Marking all datanodes as stale and schedule update block token if need."); synchronized (this) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 11489e919c493..5b0bc26f75e1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Sets; @@ -806,6 +807,40 @@ assert getBlockPoolId().equals(bp) : dxcs.balanceThrottler.setBandwidth(bandwidth); } break; + case DatanodeProtocol.DNA_DATANODEBANDWIDTHUPDATE: + LOG.info("DatanodeCommand action: DNA_DATANODEBANDWIDTHUPDATE"); + long dnBandwidth = + ((DataNodeBandwidthCommand) cmd).getDataNodeBandwidthValue(); + long oldBandwidth; + String type = + ((DataNodeBandwidthCommand) cmd).getDataNodeBandwidthType(); + if (dnBandwidth >= 0) { + DataXceiverServer dxcs = dn.getXferServer(); + if (type.equals("transfer")) { + oldBandwidth = dxcs.getTransferThrottler() == null ? 0 : + dxcs.getTransferThrottler().getBandwidth(); + DataTransferThrottler transferThrottler = dnBandwidth == 0 ? null : + new DataTransferThrottler(dnBandwidth); + dxcs.setTransferThrottler(transferThrottler); + } else if (type.equals("write")) { + oldBandwidth = dxcs.getWriteThrottler() == null ? 0 : + dxcs.getWriteThrottler().getBandwidth(); + DataTransferThrottler writeThrottler = dnBandwidth == 0 ? null : + new DataTransferThrottler(dnBandwidth); + dxcs.setWriteThrottler(writeThrottler); + } else if (type.equals("read")) { + oldBandwidth = dxcs.getReadThrottler() == null ? 0 : + dxcs.getReadThrottler().getBandwidth(); + DataTransferThrottler readThrottler = dnBandwidth == 0 ? null : + new DataTransferThrottler(dnBandwidth); + dxcs.setReadThrottler(readThrottler); + } else { + break; + } + LOG.info("Updated " + type + " throttler bandwidth from " + + oldBandwidth + " bytes/s to: " + dnBandwidth + " bytes/s."); + } + break; case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); Collection ecTasks = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ef778791cfd9c..a49534b2f18cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -4004,7 +4004,31 @@ public long getBalancerBandwidth() { (DataXceiverServer) this.dataXceiverServer.getRunnable(); return dxcs.balanceThrottler.getBandwidth(); } - + + @VisibleForTesting + public long getTransferBandwidth() { + if (this.xserver.getTransferThrottler() != null) { + return this.xserver.getTransferThrottler().getBandwidth(); + } + return 0; + } + + @VisibleForTesting + public long getWriteBandwidth() { + if (this.xserver.getWriteThrottler() != null) { + return this.xserver.getWriteThrottler().getBandwidth(); + } + return 0; + } + + @VisibleForTesting + public long getReadBandwidth() { + if (this.xserver.getReadThrottler() != null) { + return this.xserver.getReadThrottler().getBandwidth(); + } + return 0; + } + public DNConf getDnConf() { return dnConf; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f359d86df7b2a..4bb3d0f50f5e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5188,6 +5188,14 @@ void setBalancerBandwidth(long bandwidth) throws IOException { logAuditEvent(true, operationName, null); } + void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + String operationName = "setDataNodeBandwidth-" + type; + checkOperation(OperationCategory.WRITE); + checkSuperuserPrivilege(operationName); + getBlockManager().getDatanodeManager().setDataNodeBandwidth(bandwidth, type); + logAuditEvent(true, operationName, null); + } + boolean setSafeMode(SafeModeAction action) throws IOException { String operationName = action.toString().toLowerCase(); boolean error = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 76100e032964d..6cf959e9800ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1499,7 +1499,25 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { checkNNStartup(); namesystem.setBalancerBandwidth(bandwidth); } - + + /** + * Tell all datanodes to reset the bandwidth of the specified type. + * @param bandwidth Bandwidth in bytes per second for all datanodes. + * @param type DataNode bandwidth type. + * @throws IOException If an I/O error occurred + */ + @Override // ClientProtocol + public void setDataNodeBandwidth(long bandwidth, String type) throws IOException { + if (bandwidth > HdfsServerConstants.MAX_BANDWIDTH_PER_DATANODE) { + throw new IllegalArgumentException( + "Bandwidth should not exceed maximum limit " + + HdfsServerConstants.MAX_BANDWIDTH_PER_DATANODE + + " bytes per second"); + } + checkNNStartup(); + namesystem.setDataNodeBandwidth(bandwidth, type); + } + @Override // ClientProtocol public ContentSummary getContentSummary(String path) throws IOException { checkNNStartup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DataNodeBandwidthCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DataNodeBandwidthCommand.java new file mode 100644 index 0000000000000..9cc96e8e89930 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DataNodeBandwidthCommand.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +/** + * DataNode bandwidth command instructs each datanode to change its value for + * the max amount of network bandwidth. + */ +public class DataNodeBandwidthCommand extends DatanodeCommand { + private final static long DBC_DEFAULTBANDWIDTH = 0L; + + private final long bandwidth; + private final String type; + + /** + * DataNode Bandwidth Command constructor. Sets bandwidth to 0. + */ + DataNodeBandwidthCommand() { + this(DBC_DEFAULTBANDWIDTH, null); + } + + /** + * DataNode Bandwidth Command constructor. + * + * @param bandwidth Bandwidth in bytes per second. + */ + public DataNodeBandwidthCommand(long bandwidth, String type) { + super(DatanodeProtocol.DNA_DATANODEBANDWIDTHUPDATE); + this.bandwidth = bandwidth; + this.type = type; + } + + /** + * Get current value of the max bandwidth in bytes per second. + * + * @return bandwidth DataNode bandwidth in bytes per second for this datanode. + */ + public long getDataNodeBandwidthValue() { + return this.bandwidth; + } + + /** + * Get current value of bandwidth type. + * + * @return datanode bandwidth type. + */ + public String getDataNodeBandwidthType() { + return this.type; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 3caeb8b08cee8..41d491a8388e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -81,6 +81,7 @@ public interface DatanodeProtocol { final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command int DNA_DROP_SPS_WORK_COMMAND = 13; // drop sps work command + int DNA_DATANODEBANDWIDTHUPDATE = 14; // update datanode bandwidth /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index f02f6fc8a7996..3447d65bd469d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -461,6 +461,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-getVolumeReport datanode_host:ipc_port]\n" + "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+ "\t[-setBalancerBandwidth ]\n" + + "\t[-setDataNodeBandwidth -type ]\n" + "\t[-getBalancerBandwidth ]\n" + "\t[-fetchImage ]\n" + "\t[-allowSnapshot ]\n" + @@ -1131,6 +1132,54 @@ public int setBalancerBandwidth(String[] argv, int idx) throws IOException { return exitCode; } + /** + * Command to ask the active namenode to set the datandoe bandwidth. + * Usage: hdfs dfsadmin -setDataNodeBandwidth bandwidth -type + * @param argv List of of command line parameters. + * @param idx The index of the command that is being processed. + * @exception IOException If an I/O error occurred + */ + public int setDataNodeBandwidth(String[] argv, int idx) throws IOException { + + long bandwidth; + int exitCode = -1; + String type; + List types = Arrays.asList("transfer", "write", "read"); + + try { + List args = new ArrayList<>(Arrays.asList(argv)); + type = StringUtils.popOptionWithArgument("-type", args); + bandwidth = StringUtils.TraditionalBinaryPrefix.string2long(argv[idx]); + } catch (Exception e) { + System.err.println("Exception: " + e.getMessage()); + System.err.println("Usage: hdfs dfsadmin" + + " [-setDataNodeBandwidth " + + " [-type ]]"); + return exitCode; + } + + if (bandwidth < 0) { + System.err.println("Bandwidth should be a non-negative integer"); + return exitCode; + } + + if (!types.contains(type)) { + System.err.println("Bandwidth type should be in "); + return exitCode; + } + DistributedFileSystem dfs = getDFS(); + try{ + dfs.setDataNodeBandwidth(bandwidth, type); + System.out.println("DataNode " + type + " bandwidth is set to " + bandwidth); + } catch (IOException ioe){ + System.err.println("DataNode " + type + " bandwidth is set failed."); + throw ioe; + } + exitCode = 0; + + return exitCode; + } + /** * Command to get balancer bandwidth for the given datanode. Usage: hdfs * dfsadmin -getBalancerBandwidth {@literal } @@ -1318,6 +1367,11 @@ private void printHelp(String cmd) { "\tduring HDFS block balancing.\n\n" + "\t--- NOTE: This value is not persistent on the DataNode.---\n"; + String setDataNodeBandwidth = "-setDataNodeBandwidth :\n" + + "\tChanges the bandwidth of the throttler for each datanode.\n" + + "\tThe types of throttlers include transfer, write, and read.\n\n" + + "\t--- NOTE: The new value is not persistent on the DataNode.---\n"; + String fetchImage = "-fetchImage :\n" + "\tDownloads the most recent fsimage from the Name Node and saves it in" + "\tthe specified local directory.\n"; @@ -1414,6 +1468,8 @@ private void printHelp(String cmd) { System.out.println(deleteBlockPool); } else if ("setBalancerBandwidth".equals(cmd)) { System.out.println(setBalancerBandwidth); + } else if ("setDataNodeBandwidth".equals(cmd)) { + System.out.println(setDataNodeBandwidth); } else if ("getBalancerBandwidth".equals(cmd)) { System.out.println(getBalancerBandwidth); } else if ("fetchImage".equals(cmd)) { @@ -1462,6 +1518,7 @@ private void printHelp(String cmd) { System.out.println(refreshNamenodes); System.out.println(deleteBlockPool); System.out.println(setBalancerBandwidth); + System.out.println(setDataNodeBandwidth); System.out.println(getBalancerBandwidth); System.out.println(fetchImage); System.out.println(allowSnapshot); @@ -2353,6 +2410,10 @@ private static void printUsage(String cmd) { } else if ("-setBalancerBandwidth".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-setBalancerBandwidth ]"); + } else if ("-setDataNodeBandwidth".equals(cmd)) { + System.err.println("Usage: hdfs dfsadmin" + + " [-setDataNodeBandwidth " + + " [-type ]]"); } else if ("-getBalancerBandwidth".equalsIgnoreCase(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-getBalancerBandwidth ]"); @@ -2511,6 +2572,11 @@ public int run(String[] argv) { printUsage(cmd); return exitCode; } + } else if ("-setDataNodeBandwidth".equals(cmd)) { + if (argv.length != 4) { + printUsage(cmd); + return exitCode; + } } else if ("-getBalancerBandwidth".equalsIgnoreCase(cmd)) { if (argv.length != 2) { printUsage(cmd); @@ -2603,6 +2669,8 @@ public int run(String[] argv) { exitCode = deleteBlockPool(argv, i); } else if ("-setBalancerBandwidth".equals(cmd)) { exitCode = setBalancerBandwidth(argv, i); + } else if ("-setDataNodeBandwidth".equals(cmd)) { + exitCode = setDataNodeBandwidth(argv, i); } else if ("-getBalancerBandwidth".equals(cmd)) { exitCode = getBalancerBandwidth(argv, i); } else if ("-fetchImage".equals(cmd)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index c537ce3ae494c..10eb5d2002515 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -60,6 +60,7 @@ message DatanodeCommandProto { NullDatanodeCommand = 7; BlockIdCommand = 8; BlockECReconstructionCommand = 9; + DataNodeBandwidthCommand = 10; } required Type cmdType = 1; // Type of the command @@ -74,6 +75,7 @@ message DatanodeCommandProto { optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; + optional DataNodeBandwidthCommandProto bandwidthCmd = 10; } /** @@ -86,6 +88,17 @@ message BalancerBandwidthCommandProto { required uint64 bandwidth = 1; } +/** + * Command sent from namenode to datanode to set the + * maximum bandwidth. + */ +message DataNodeBandwidthCommandProto { + + // Maximum bandwidth to be used by datanode + required uint64 bandwidth = 1; + required string type = 2; +} + /** * Command to instruct datanodes to perform certain action * on the given set of blocks. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index c065eb4c8d697..2d7d0f07fac8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -388,6 +388,7 @@ Usage: hdfs dfsadmin [-getVolumeReport datanodehost:port] hdfs dfsadmin [-deleteBlockPool datanode-host:port blockpoolId [force]] hdfs dfsadmin [-setBalancerBandwidth ] + hdfs dfsadmin [-setDataNodeBandwidth -type ] hdfs dfsadmin [-getBalancerBandwidth ] hdfs dfsadmin [-fetchImage ] hdfs dfsadmin [-allowSnapshot ] @@ -427,6 +428,7 @@ Usage: | `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. | | `-deleteBlockPool` datanode-host:port blockpoolId [force] | If force is passed, block pool directory for the given blockpool id on the given datanode is deleted along with its contents, otherwise the directory is deleted only if it is empty. The command will fail if datanode is still serving the block pool. Refer to refreshNamenodes to shutdown a block pool service on a datanode. | | `-setBalancerBandwidth` \ | Changes the network bandwidth used by each datanode during HDFS block balancing. \ is the maximum number of bytes per second that will be used by each datanode. This value overrides the dfs.datanode.balance.bandwidthPerSec parameter. NOTE: The new value is not persistent on the DataNode. | +| `-setDataNodeBandwidth` \ `-type` \ | Changes the bandwidth of the throttler for each datanode. The types of throttlers include transfer, write, and read. | | `-getBalancerBandwidth` \ | Get the network bandwidth(in bytes per second) for the given datanode. This is the maximum network bandwidth used by the datanode during HDFS block balancing.| | `-fetchImage` \ | Downloads the most recent fsimage from the NameNode and saves it in the specified local directory. | | `-allowSnapshot` \ | Allowing snapshots of a directory to be created. If the operation completes successfully, the directory becomes snapshottable. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 669224818f07c..c61b6571ed31a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -338,6 +338,12 @@ private void verifyOpsUsingClosedClient(DFSClient dfsClient) { } catch (IOException ioe) { GenericTestUtils.assertExceptionContains("Filesystem closed", ioe); } + try { + dfsClient.setDataNodeBandwidth(1000L, "transfer"); + fail("setDataNodeBandwidth using a closed filesystem!"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Filesystem closed", ioe); + } try { dfsClient.finalizeUpgrade(); fail("finalizeUpgrade using a closed filesystem!"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java index d74637501474d..0aef179e1909f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java @@ -1042,6 +1042,36 @@ public void testSetBalancerBandwidth() throws Exception { } } + @Test + public void testSetDataNodeBandwidth() throws Exception { + String auditLogString = + ".*allowed=true.*cmd=setDataNodeBandwidth.*"; + FSNamesystem fsNamesystem = spy(cluster.getNamesystem()); + when(fsNamesystem.isExternalInvocation()).thenReturn(true); + Server.Call call = spy(new Server.Call( + 1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3})); + when(call.getRemoteUser()).thenReturn( + UserGroupInformation.createRemoteUser(System.getProperty("user.name"))); + Server.getCurCall().set(call); + try { + fsNamesystem.setDataNodeBandwidth(10, "transfer"); + verifyAuditLogs(auditLogString); + } catch (Exception e) { + fail("setDataNodeBandwidth threw exception!"); + } + when(call.getRemoteUser()).thenReturn( + UserGroupInformation.createRemoteUser("theDoctor")); + try { + fsNamesystem.setDataNodeBandwidth(10, "transfer"); + fail( + "setDataNodeBandwidth should have thrown AccessControlException!"); + } catch (AccessControlException ace) { + auditLogString = + ".*allowed=false.*cmd=setDataNodeBandwidth.*"; + verifyAuditLogs(auditLogString); + } + } + @Test public void testRefreshNodes() throws Exception { String auditLogString = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index f7b0df352c0d5..9b705c285a232 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -1142,6 +1142,33 @@ public void testSetBalancerBandwidth() throws Exception { new String[]{"-setBalancerBandwidth", "-10m"})); } + @Test + public void testSetDataNodeBandwidth() throws Exception { + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + + // Test basic case: 10000 + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-setDataNodeBandwidth", "10000", "-type", "transfer"})); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-setDataNodeBandwidth", "10000", "-type", "write"})); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-setDataNodeBandwidth", "10000", "-type", "read"})); + + Thread.sleep(3 * 1000); + + // verify bandwidth + assertEquals(10000, datanode.getTransferBandwidth()); + assertEquals(10000, datanode.getWriteBandwidth()); + assertEquals(10000, datanode.getReadBandwidth()); + + // Test negative numbers + assertEquals(-1, ToolRunner.run(dfsAdmin, + new String[]{"-setBalancerBandwidth", "-10000", "-type", "transfer"})); + // Test error type + assertEquals(-1, ToolRunner.run(dfsAdmin, + new String[]{"-setBalancerBandwidth", "10000", "-type", "errorType"})); + } + @Test(timeout = 300000L) public void testCheckNumOfBlocksInReportCommand() throws Exception { DistributedFileSystem dfs = cluster.getFileSystem(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java index 3d4ddfe88eb14..afa77d37c7849 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java @@ -425,6 +425,65 @@ public void testSetNegativeBalancerBandwidth() throws Exception { assertEquals("Negative bandwidth value must fail the command", -1, exitCode); } + @Test (timeout = 30000) + public void testSetDataNodeBandwidth() throws Exception { + setUpHaCluster(false); + cluster.getDfsCluster().transitionToActive(0); + + int exitCode = admin.run(new String[] {"-setDataNodeBandwidth", "10", "-type", "transfer"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "DataNode transfer bandwidth is set to 10"; + assertOutputMatches(message + newLine); + } + + @Test (timeout = 30000) + public void testSetDataNodeBandwidthNN1UpNN2Down() throws Exception { + setUpHaCluster(false); + cluster.getDfsCluster().shutdownNameNode(1); + cluster.getDfsCluster().transitionToActive(0); + int exitCode = admin.run(new String[] {"-setDataNodeBandwidth", "10", "-type", "transfer"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "DataNode transfer bandwidth is set to 10"; + assertOutputMatches(message + newLine); + } + + @Test (timeout = 30000) + public void testSetDataNodeBandwidthNN1DownNN2Up() throws Exception { + setUpHaCluster(false); + cluster.getDfsCluster().shutdownNameNode(0); + cluster.getDfsCluster().transitionToActive(1); + int exitCode = admin.run(new String[] {"-setDataNodeBandwidth", "10", "-type", "transfer"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "DataNode transfer bandwidth is set to 10"; + assertOutputMatches(message + newLine); + } + + @Test + public void testSetDataNodeBandwidthNN1DownNN2Down() throws Exception { + setUpHaCluster(false); + cluster.getDfsCluster().shutdownNameNode(0); + cluster.getDfsCluster().shutdownNameNode(1); + int exitCode = admin.run(new String[] {"-setDataNodeBandwidth", "10", "-type", "transfer"}); + assertNotEquals(err.toString().trim(), 0, exitCode); + String message = "DataNode transfer bandwidth is set failed." + newLine + + ".*" + newLine; + assertOutputMatches(message); + } + + @Test (timeout = 30000) + public void testSetNegativeDataNodeBandwidth() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-setDataNodeBandwidth", "-10", "-type", "transfer"}); + assertEquals("Negative bandwidth value must fail the command", -1, exitCode); + } + + @Test (timeout = 30000) + public void testSetErrorTypeDataNodeBandwidth() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-setDataNodeBandwidth", "10", "-type", "errortype"}); + assertEquals("Bandwidth type should be in ", -1, exitCode); + } + @Test (timeout = 30000) public void testMetaSave() throws Exception { setUpHaCluster(false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestViewFileSystemOverloadSchemeWithDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestViewFileSystemOverloadSchemeWithDFSAdmin.java index 2f821cf277118..d50159a2047e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestViewFileSystemOverloadSchemeWithDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestViewFileSystemOverloadSchemeWithDFSAdmin.java @@ -290,4 +290,24 @@ public void testSetBalancerBandwidth() throws Exception { assertOutMsg("Balancer bandwidth is set to 1000", 0); assertEquals(0, ret); } + + /** + * Tests setDataNodeBandwidth with ViewFSOverloadScheme. + */ + @Test + public void testSetDataNodeBandwidth() throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + addMountLinks(defaultFSURI.getHost(), + new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER }, + new String[] {hdfsTargetPath.toUri().toString(), + localTargetDir.toURI().toString() }, + conf); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + redirectStream(); + int ret = ToolRunner.run(dfsAdmin, + new String[] {"-fs", defaultFSURI.toString(), "-setDataNodeBandwidth", + "1000", "-type", "transfer"}); + assertOutMsg("DataNode transfer bandwidth is set to 1000", 0); + assertEquals(0, ret); + } } \ No newline at end of file