Skip to content

HDFS-17785. DFSAdmin supports setting the bandwidth for DataNode. #7705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2479,6 +2479,21 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
}
}

/**
* Requests the namenode to tell all datanodes to use a new bandwidth value.
* @param bandwidth Bandwidth in bytes per second for this datanode.
* @param type DataNode bandwidth type.
* @throws IOException If an I/O error occurred
*
* @see ClientProtocol#setDataNodeBandwidth(long, String)
*/
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
checkOpen();
try (TraceScope ignored = tracer.newScope("setDataNodeBandwidth-" + type)) {
namenode.setDataNodeBandwidth(bandwidth, type);
}
}

/**
* @see ClientProtocol#finalizeUpgrade()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2088,12 +2088,23 @@ public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
* bandwidth to be used by a datanode during balancing.
*
* @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
* @throws IOException
* @throws IOException If an I/O error occurred
*/
public void setBalancerBandwidth(long bandwidth) throws IOException {
dfs.setBalancerBandwidth(bandwidth);
}

/**
* Requests the namenode to tell all datanodes to reset the bandwidth of the specified type.
*
* @param bandwidth Bandwidth in bytes per second for all datanodes.
* @param type DataNode bandwidth type.
* @throws IOException
*/
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
dfs.setDataNodeBandwidth(bandwidth, type);
}

/**
* Get a canonical service name for this file system. If the URI is logical,
* the hostname part of the URI will be returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,16 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
defaultDFS.setBalancerBandwidth(bandwidth);
}

@Override
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
if (this.vfs == null) {
super.setDataNodeBandwidth(bandwidth, type);
return;
}
checkDefaultDFS(defaultDFS, "setDataNodeBandwidth");
defaultDFS.setDataNodeBandwidth(bandwidth, type);
}

@Override
public String getCanonicalServiceName() {
if (this.vfs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,16 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
@Idempotent
void setBalancerBandwidth(long bandwidth) throws IOException;

/**
* Tell all datanodes to reset the bandwidth of the specified type.
*
* @param bandwidth Bandwidth in bytes per second for this datanode.
* @param type DataNode bandwidth type.
* @throws IOException If an I/O error occurred
*/
@Idempotent
void setDataNodeBandwidth(long bandwidth, String type) throws IOException;

/**
* Get the file info for a specific file or directory.
* @param src The string representation of the path to the file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetDataNodeBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
Expand Down Expand Up @@ -976,6 +977,16 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
ipc(() -> rpcProxy.setBalancerBandwidth(null, req));
}

@Override
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
SetDataNodeBandwidthRequestProto req =
SetDataNodeBandwidthRequestProto.newBuilder()
.setBandwidth(bandwidth)
.setType(type)
.build();
ipc(() -> rpcProxy.setDataNodeBandwidth(null, req));
}

@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,17 @@ message SetBalancerBandwidthRequestProto {
required int64 bandwidth = 1;
}

message SetDataNodeBandwidthRequestProto {
required int64 bandwidth = 1;
required string type = 2;
}

message SetBalancerBandwidthResponseProto { // void response
}

message SetDataNodeBandwidthResponseProto { // void response
}

message GetDataEncryptionKeyRequestProto { // no parameters
}

Expand Down Expand Up @@ -999,6 +1007,8 @@ service ClientNamenodeProtocol {
returns(hadoop.common.CancelDelegationTokenResponseProto);
rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto)
returns(SetBalancerBandwidthResponseProto);
rpc setDataNodeBandwidth(SetDataNodeBandwidthRequestProto)
returns(SetDataNodeBandwidthResponseProto);
rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto)
returns(GetDataEncryptionKeyResponseProto);
rpc createSnapshot(CreateSnapshotRequestProto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,16 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
rpcClient.invokeConcurrent(nss, method, true, false);
}

@Override
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);

RemoteMethod method = new RemoteMethod("setDataNodeBandwidth",
new Class<?>[] {long.class, String.class}, bandwidth, type);
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, true, false);
}

/**
* Recursively get all the locations for the path.
* For example, there are some mount points:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,11 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
clientProto.setBalancerBandwidth(bandwidth);
}

@Override // ClientProtocol
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
clientProto.setDataNodeBandwidth(bandwidth, type);
}

@Override // ClientProtocol
public ContentSummary getContentSummary(String path) throws IOException {
return clientProto.getContentSummary(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,15 @@ public void testSetBalancerBandwidth() throws Exception {
}, 100, 60 * 1000);
}

@Test
public void testSetDataNodeBandwidth() throws Exception {
routerProtocol.setDataNodeBandwidth(1000L, "transfer");
ArrayList<DataNode> datanodes = cluster.getCluster().getDataNodes();
GenericTestUtils.waitFor(() -> {
return datanodes.get(0).getTransferBandwidth() == 1000L;
}, 100, 60 * 1000);
}

@Test
public void testAddClientIpPortToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetDataNodeBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetDataNodeBandwidthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto;
Expand Down Expand Up @@ -415,6 +417,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
protected static final SetBalancerBandwidthResponseProto VOID_SETBALANCERBANDWIDTH_RESPONSE =
SetBalancerBandwidthResponseProto.newBuilder().build();

protected static final SetDataNodeBandwidthResponseProto VOID_SETDATANODEBANDWIDTH_RESPONSE =
SetDataNodeBandwidthResponseProto.newBuilder().build();

protected static final SetAclResponseProto VOID_SETACL_RESPONSE =
SetAclResponseProto.getDefaultInstance();

Expand Down Expand Up @@ -1254,6 +1259,18 @@ public SetBalancerBandwidthResponseProto setBalancerBandwidth(
}
}

@Override
public SetDataNodeBandwidthResponseProto setDataNodeBandwidth(
RpcController controller, SetDataNodeBandwidthRequestProto req)
throws ServiceException {
try {
server.setDataNodeBandwidth(req.getBandwidth(), req.getType());
return VOID_SETDATANODEBANDWIDTH_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
RpcController controller, GetDataEncryptionKeyRequestProto request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.KeyValueProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DataNodeBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.DataNodeBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
Expand Down Expand Up @@ -458,6 +460,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) {
switch (proto.getCmdType()) {
case BalancerBandwidthCommand:
return PBHelper.convert(proto.getBalancerCmd());
case DataNodeBandwidthCommand:
return PBHelper.convert(proto.getBandwidthCmd());
case BlockCommand:
return PBHelper.convert(proto.getBlkCmd());
case BlockRecoveryCommand:
Expand All @@ -483,6 +487,13 @@ public static BalancerBandwidthCommandProto convert(
.setBandwidth(bbCmd.getBalancerBandwidthValue()).build();
}

public static DataNodeBandwidthCommandProto convert(
DataNodeBandwidthCommand bbCmd) {
return DataNodeBandwidthCommandProto.newBuilder()
.setBandwidth(bbCmd.getDataNodeBandwidthValue())
.setType(bbCmd.getDataNodeBandwidthType()).build();
}

public static KeyUpdateCommandProto convert(KeyUpdateCommand cmd) {
return KeyUpdateCommandProto.newBuilder()
.setKeys(convert(cmd.getExportedKeys())).build();
Expand Down Expand Up @@ -572,6 +583,11 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
.setBalancerCmd(
PBHelper.convert((BalancerBandwidthCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_DATANODEBANDWIDTHUPDATE:
builder.setCmdType(DatanodeCommandProto.Type.DataNodeBandwidthCommand)
.setBandwidthCmd(
PBHelper.convert((DataNodeBandwidthCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
builder
.setCmdType(DatanodeCommandProto.Type.KeyUpdateCommand)
Expand Down Expand Up @@ -709,6 +725,12 @@ public static BalancerBandwidthCommand convert(
return new BalancerBandwidthCommand(balancerCmd.getBandwidth());
}

public static DataNodeBandwidthCommand convert(
DataNodeBandwidthCommandProto bandwidthCmd) {
return new DataNodeBandwidthCommand(bandwidthCmd.getBandwidth(),
bandwidthCmd.getType());
}

public static ReceivedDeletedBlockInfoProto convert(
ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
ReceivedDeletedBlockInfoProto.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public Type getType() {
// specified datanode, this value will be set back to 0.
private long bandwidth;

private Map<String, Long> throttlers = new HashMap<>();

/** A queue of blocks to be replicated by this datanode */
private final BlockQueue<BlockTargetPair> replicateBlocks =
new BlockQueue<>();
Expand Down Expand Up @@ -1027,6 +1029,21 @@ public synchronized void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
}

/**
* @return bandwidth throttlers for this datanode
*/
public synchronized Map<String, Long> getDataNodeBandwidth() {
return this.throttlers;
}

/**
* @param dnBandwidth Bandwidth in bytes per second for this datanode
* @param type DataNode bandwidth type.
*/
public synchronized void setDataNodeBandwidth(long dnBandwidth, String type) {
this.throttlers.put(type, dnBandwidth);
}

@Override
public String dumpDatanode() {
StringBuilder sb = new StringBuilder(super.dumpDatanode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,8 +1930,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
if (!pendingList.isEmpty()) {
// If the block is deleted, the block size will become
// BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
// need
// to send for replication or reconstruction
// need to send for replication or reconstruction
Iterator<BlockTargetPair> iterator = pendingList.iterator();
while (iterator.hasNext()) {
BlockTargetPair cmd = iterator.next();
Expand Down Expand Up @@ -1962,9 +1961,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
blks));
}
// cache commands
addCacheCommands(blockPoolId, nodeinfo, cmds);
// key update command
blockManager.addKeyUpdateCommand(cmds, nodeinfo);

// check for balancer bandwidth update
Expand All @@ -1974,6 +1971,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
nodeinfo.setBalancerBandwidth(0);
}

if (!nodeinfo.getDataNodeBandwidth().isEmpty()) {
for (Map.Entry<String, Long> entry : nodeinfo.getDataNodeBandwidth().entrySet()){
cmds.add(new DataNodeBandwidthCommand(entry.getValue(), entry.getKey()));
}
nodeinfo.getDataNodeBandwidth().clear();
}

Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");

if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
Expand Down Expand Up @@ -2085,7 +2089,25 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
}
}
}


/**
* Tell all datanodes to reset the bandwidth of the specified type.
*
* A system administrator can tune the datanode bandwidth dynamically by calling
* "dfsadmin -setDataNodeBandwidth newbandwidth -type <transfer|write|read>"
*
* @param bandwidth Bandwidth in bytes per second for all datanodes.
* @param type DataNode bandwidth type.
* @throws IOException If an I/O error occurred
*/
public void setDataNodeBandwidth(long bandwidth, String type) throws IOException {
synchronized(this) {
for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
nodeInfo.setDataNodeBandwidth(bandwidth, type);
}
}
}

public void markAllDatanodesStaleAndSetKeyUpdateIfNeed() {
LOG.info("Marking all datanodes as stale and schedule update block token if need.");
synchronized (this) {
Expand Down
Loading