Skip to content

Commit 3bfa77a

Browse files
committed
add torch distribution
1 parent 6c4100c commit 3bfa77a

File tree

7 files changed

+259
-4
lines changed

7 files changed

+259
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#! /usr/bin/python
2+
# -*- coding: utf-8 -*-
3+
4+
import os
5+
# os.environ['TL_BACKEND'] = 'paddle'
6+
# os.environ['TL_BACKEND'] = 'tensorflow'
7+
# os.environ['TL_BACKEND'] = 'mindspore'
8+
os.environ['TL_BACKEND'] = 'torch'
9+
10+
import time
11+
from tensorlayerx.dataflow import Dataset, DataLoader
12+
from tensorlayerx.vision.transforms import (
13+
Compose, Resize, RandomFlipHorizontal, RandomContrast, RandomBrightness, StandardizePerImage, RandomCrop
14+
)
15+
from tensorlayerx.model import TrainOneStep
16+
from tensorlayerx.nn import Module
17+
import tensorlayerx as tlx
18+
from tensorlayerx.nn import (Conv2d, Linear, Flatten, MaxPool2d, BatchNorm2d)
19+
import argparse
20+
21+
parser = argparse.ArgumentParser()
22+
parser.add_argument("--local_rank", type=int, default=-1,
23+
help="For distributed training: local_rank")
24+
args = parser.parse_args()
25+
# enable debug logging
26+
tlx.logging.set_verbosity(tlx.logging.DEBUG)
27+
28+
tlx.ops.set_device(device = 'MLU', id = args.local_rank)
29+
tlx.ops.distributed_init(backend="cncl")
30+
# ################## Download and prepare the CIFAR10 dataset ##################
31+
# This is just some way of getting the CIFAR10 dataset from an online location
32+
# and loading it into numpy arrays with shape [32,32,3]
33+
X_train, y_train, X_test, y_test = tlx.files.load_cifar10_dataset(shape=(-1, 32, 32, 3), plotable=False)
34+
35+
# ################## CIFAR10 dataset ##################
36+
# We define a Dataset class for Loading CIFAR10 images and labels.
37+
class make_dataset(Dataset):
38+
39+
def __init__(self, data, label, transforms):
40+
self.data = data
41+
self.label = label
42+
self.transforms = transforms
43+
44+
def __getitem__(self, idx):
45+
x = self.data[idx].astype('uint8')
46+
y = self.label[idx].astype('int64')
47+
x = self.transforms(x)
48+
49+
return x, y
50+
51+
def __len__(self):
52+
53+
return len(self.label)
54+
55+
# We define the CIFAR10 iamges preprocessing pipeline.
56+
train_transforms = Compose( # Combining multiple operations sequentially
57+
[
58+
RandomCrop(size=[24, 24]), #random crop from images to shape [24, 24]
59+
RandomFlipHorizontal(), # random invert each image horizontally by probability
60+
RandomBrightness(brightness_factor=(0.5, 1.5)), # Within the range of values (0.5, 1.5), adjust brightness randomly
61+
RandomContrast(contrast_factor=(0.5, 1.5)), # Within the range of values (0.5, 1.5), adjust contrast randomly
62+
StandardizePerImage() #Normalize the values of each image to [-1, 1]
63+
]
64+
)
65+
66+
test_transforms = Compose([Resize(size=(24, 24)), StandardizePerImage()])
67+
68+
# We use DataLoader to batch and shuffle data, and make data into iterators.
69+
train_dataset = make_dataset(data=X_train, label=y_train, transforms=train_transforms)
70+
test_dataset = make_dataset(data=X_test, label=y_test, transforms=test_transforms)
71+
72+
train_dataset = DataLoader(train_dataset, batch_size=128, shuffle=True)
73+
test_dataset = DataLoader(test_dataset, batch_size=128)
74+
75+
# ################## CNN network ##################
76+
class CNN(Module):
77+
78+
def __init__(self):
79+
super(CNN, self).__init__()
80+
# Parameter initialization method
81+
W_init = tlx.nn.initializers.truncated_normal(stddev=5e-2)
82+
W_init2 = tlx.nn.initializers.truncated_normal(stddev=0.04)
83+
b_init2 = tlx.nn.initializers.constant(value=0.1)
84+
85+
# 2D Convolutional Neural Network, Set padding method "SAME", convolutional kernel size [5,5], stride [1,1], in channels, out channels
86+
self.conv1 = Conv2d(64, (5, 5), (1, 1), padding='SAME', W_init=W_init, b_init=None, name='conv1', in_channels=3)
87+
# Add 2D BatchNormalize, using ReLU for output.
88+
self.bn = BatchNorm2d(num_features=64, act=tlx.nn.ReLU)
89+
# Add 2D Max pooling layer.
90+
self.maxpool1 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool1')
91+
92+
self.conv2 = Conv2d(
93+
64, (5, 5), (1, 1), padding='SAME', act=tlx.nn.ReLU, W_init=W_init, name='conv2', in_channels=64
94+
)
95+
self.maxpool2 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool2')
96+
# Flatten 2D data to 1D data
97+
self.flatten = Flatten(name='flatten')
98+
# Linear layer with 384 units, using ReLU for output.
99+
self.linear1 = Linear(384, act=tlx.nn.ReLU, W_init=W_init2, b_init=b_init2, name='linear1relu', in_features=2304)
100+
self.linear2 = Linear(192, act=tlx.nn.ReLU, W_init=W_init2, b_init=b_init2, name='linear2relu', in_features=384)
101+
self.linear3 = Linear(10, act=None, W_init=W_init2, name='output', in_features=192)
102+
103+
# We define the forward computation process.
104+
def forward(self, x):
105+
z = self.conv1(x)
106+
z = self.bn(z)
107+
z = self.maxpool1(z)
108+
z = self.conv2(z)
109+
z = self.maxpool2(z)
110+
z = self.flatten(z)
111+
z = self.linear1(z)
112+
z = self.linear2(z)
113+
z = self.linear3(z)
114+
return z
115+
116+
117+
# get the network
118+
net = CNN()
119+
120+
# training settings
121+
n_epoch = 500
122+
learning_rate = 0.0001
123+
print_freq = 5
124+
n_step_epoch = int(len(y_train) / 128)
125+
n_step = n_epoch * n_step_epoch
126+
shuffle_buffer_size = 128
127+
# Get training parameters
128+
train_weights = net.trainable_weights
129+
# Define the optimizer, use the Adam optimizer.
130+
optimizer = tlx.optimizers.Adam(learning_rate)
131+
# Define evaluation metrics.
132+
metrics = tlx.metrics.Accuracy()
133+
134+
# Define the loss calculation process
135+
class WithLoss(Module):
136+
137+
def __init__(self, net, loss_fn):
138+
super(WithLoss, self).__init__()
139+
self._net = net
140+
self._loss_fn = loss_fn
141+
142+
def forward(self, data, label):
143+
out = self._net(data)
144+
loss = self._loss_fn(out, label)
145+
return loss
146+
147+
148+
net_with_loss = WithLoss(net.mlu(), loss_fn=tlx.losses.softmax_cross_entropy_with_logits).mlu()
149+
model = tlx.ops.distributed_model(net_with_loss, device_ids=[args.local_rank],
150+
output_device=args.local_rank,
151+
find_unused_parameters=True)
152+
# Initialize one-step training
153+
#net_with_train = TrainOneStep(net_with_loss, optimizer, train_weights)
154+
net_with_train = TrainOneStep(model, optimizer, train_weights)
155+
156+
# Custom training loops
157+
for epoch in range(n_epoch):
158+
start_time = time.time()
159+
# Set the network to training state
160+
net.set_train()
161+
train_loss, train_acc, n_iter = 0, 0, 0
162+
# Get training data and labels
163+
for X_batch, y_batch in train_dataset:
164+
# Calculate the loss value, and automatically complete the gradient update
165+
_loss_ce = net_with_train(X_batch.mlu(), y_batch.mlu())
166+
train_loss += _loss_ce
167+
168+
n_iter += 1
169+
_logits = net(X_batch.mlu())
170+
# Calculate accuracy
171+
metrics.update(_logits, y_batch.mlu())
172+
train_acc += metrics.result()
173+
metrics.reset()
174+
if (n_iter % 100 == 0):
175+
print("Epoch {} of {} took {}".format(epoch + 1, n_epoch, time.time() - start_time))
176+
print("rank {} train loss: {}".format(args.local_rank,train_loss / n_iter))
177+
print("rank {} train acc: {}".format(args.local_rank,train_acc / n_iter))
178+

examples/basic_tutorials/test_dist.sh

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export MLU_VISIBLE_DEVICES=0,1
2+
python -m tensorlayerx.distributed.launch --nproc_per_node=2 cifar10_cnn_torch_dist.py

tensorlayerx/backend/ops/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@
207207
from .load_backend import eye
208208
from .load_backend import einsum
209209
from .load_backend import set_device
210+
from .load_backend import distributed_init
211+
from .load_backend import distributed_model
210212
from .load_backend import get_device
211213
from .load_backend import scatter_update
212214
from .load_backend import to_device

tensorlayerx/backend/ops/load_backend.py

+4
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@
7575
from .torch_nn import *
7676
from .torch_backend import *
7777
import torch
78+
try:
79+
import torch_mlu
80+
except:
81+
pass
7882
BACKEND_VERSION = torch.__version__
7983
sys.stderr.write('Using PyTorch backend.\n')
8084
elif BACKEND == 'oneflow':

tensorlayerx/backend/ops/torch_backend.py

+38-4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ def zeros(shape, dtype=None, device = None):
7272
device = torch.device('cpu')
7373
elif device == 'gpu':
7474
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
75+
elif device == 'mlu':
76+
device = torch.device('mlu:0' if torch.mlu.is_available() else 'cpu')
7577
return torch.zeros(size=shape, dtype=dtype, device = device)
7678

7779

@@ -95,6 +97,8 @@ def ones(shape, dtype=None, device = None):
9597
device = torch.device('cpu')
9698
elif device == 'gpu':
9799
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
100+
elif device == 'mlu':
101+
device = torch.device('mlu:0' if torch.mlu.is_available() else 'cpu')
98102
return torch.ones(size=shape, dtype=dtype, device = device)
99103

100104

@@ -120,6 +124,8 @@ def constant(value, dtype=None, shape=None, device =None):
120124
device = torch.device('cpu')
121125
elif device == 'gpu':
122126
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
127+
elif device == 'mlu':
128+
device = torch.device('mlu:0' if torch.mlu.is_available() else 'cpu')
123129
w = torch.empty(size=shape, dtype=dtype, device = device)
124130
return torch.nn.init.constant_(w, value)
125131

@@ -1687,6 +1693,10 @@ def set_seed(seed):
16871693

16881694
torch.manual_seed(seed)
16891695
torch.cuda.manual_seed_all(seed)
1696+
try:
1697+
torch.mlu.manual_seed_all(seed)
1698+
except:
1699+
pass
16901700
np.random.seed(seed)
16911701
random.seed(seed)
16921702
torch.backends.cudnn.deterministic = True
@@ -1745,6 +1755,23 @@ def set_device(device = 'GPU', id = 0):
17451755
if device == 'GPU':
17461756
torch.set_default_tensor_type('torch.cuda.FloatTensor')
17471757
torch.cuda.set_device(id)
1758+
if device == 'MLU':
1759+
torch.set_default_tensor_type('torch.mlu.FloatTensor')
1760+
torch.mlu.set_device(id)
1761+
1762+
def distributed_init(backend="cncl"):
1763+
torch.distributed.init_process_group(backend=backend)
1764+
1765+
def distributed_model(module, device_ids=None, output_device=None,
1766+
dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25,
1767+
find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False):
1768+
return torch.nn.parallel.DistributedDataParallel(module, device_ids=device_ids,
1769+
output_device=output_device,
1770+
dim=dim, broadcast_buffers=broadcast_buffers,
1771+
process_group=process_group, bucket_cap_mb=bucket_cap_mb,
1772+
find_unused_parameters=find_unused_parameters,
1773+
check_reduction=check_reduction,
1774+
gradient_as_bucket_view=gradient_as_bucket_view)
17481775

17491776
def scatter_update(tensor, indices, updates):
17501777
tensor = torch.tensor(tensor)
@@ -1756,16 +1783,23 @@ def scatter_update(tensor, indices, updates):
17561783
def get_device():
17571784
try:
17581785
id = torch.cuda.current_device()
1759-
device = 'GPU:' + str(id)
1760-
return device
1786+
device = 'GPU:' + str(id)
17611787
except:
17621788
device = 'CPU'
1763-
return device
1789+
1790+
try:
1791+
id = torch.mlu.current_device()
1792+
device = 'MLU:' + str(id)
1793+
except:
1794+
device = 'CPU'
1795+
return device
17641796

1765-
def to_device(tensor, device='GPU', id=0):
1797+
def to_device(tensor, device='MLU', id=0):
17661798
device = device.lower()
17671799
if device == 'gpu':
17681800
device = 'cuda' + ':' + str(id)
1801+
if device == 'mlu':
1802+
device = 'mlu' + ':' + str(id)
17691803
tensor = tensor.detach().to(device)
17701804
return tensor
17711805

tensorlayerx/distributed/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .launch import *

tensorlayerx/distributed/launch.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import os
2+
BACKEND = 'torch'
3+
4+
5+
# Set backend based on TL_BACKEND.
6+
if 'TL_BACKEND' in os.environ:
7+
backend = os.environ['TL_BACKEND']
8+
if backend:
9+
BACKEND = backend
10+
11+
12+
def main(args=None):
13+
if BACKEND == 'torch':
14+
from torch.distributed.run import get_args_parser, run
15+
def parse_args(args):
16+
parser = get_args_parser()
17+
parser.add_argument(
18+
"--use_env",
19+
default=False,
20+
action="store_true",
21+
help="Use environment variable to pass "
22+
"'local rank'. For legacy reasons, the default value is False. "
23+
"If set to True, the script will not pass "
24+
"--local_rank as argument, and will instead set LOCAL_RANK.",
25+
)
26+
return parser.parse_args(args)
27+
args = parse_args(args)
28+
run(args)
29+
else:
30+
raise NotImplementedError("This backend:{} is not supported".format(BACKEND))
31+
32+
33+
if __name__ == "__main__":
34+
main()

0 commit comments

Comments
 (0)