@@ -80,23 +80,33 @@ For this example, we will first have a helper function that generates some CSV f
80
80
row_data[' label' ] = random.randint(0 , 9 )
81
81
writer.writerow(row_data)
82
82
83
- Next, we will build our DataPipes to read and parse through the generated CSV files:
83
+ Next, we will build our DataPipes to read and parse through the generated CSV files. Note that we prefer to have
84
+ pass defined functions to DataPipes rather than lambda functions because the formers are serializable with `pickle `.
84
85
85
86
.. code :: python
86
87
87
88
import numpy as np
88
89
import torchdata.datapipes as dp
89
90
91
+ def filter_for_data (filename ):
92
+ return " sample_data" in filename and filename.endswith(" .csv" )
93
+
94
+ def row_processer (row ):
95
+ return {" label" : np.array(row[0 ], np.int32), " data" : np.array(row[1 :], dtype = np.float64)}
96
+
90
97
def build_datapipes (root_dir = " ." ):
91
98
datapipe = dp.iter.FileLister(root_dir)
92
- datapipe = datapipe.filter(filter_fn = lambda filename : " sample_data " in filename and filename.endswith( " .csv " ) )
93
- datapipe = dp.iter.FileOpener(datapipe, mode = ' rt' )
99
+ datapipe = datapipe.filter(filter_fn = filter_for_data )
100
+ datapipe = datapipe.open_files( mode = ' rt' )
94
101
datapipe = datapipe.parse_csv(delimiter = " ," , skip_lines = 1 )
95
- datapipe = datapipe.map(lambda row : {" label" : np.array(row[0 ], np.int32),
96
- " data" : np.array(row[1 :], dtype = np.float64)})
102
+ datapipe = datapipe.map(row_processer)
103
+ # We will also have to set `shuffle=True` later in the DataLoader
104
+ datapipe = datapipe.shuffle()
97
105
return datapipe
98
106
99
- Lastly, we will put everything together in ``'__main__' `` and pass the DataPipe into the DataLoader.
107
+ Lastly, we will put everything together in ``'__main__' `` and pass the DataPipe into the DataLoader. Note that
108
+ if you choose to use `Batcher ` while setting `batch_size > 1 ` for DataLoader, your samples will be
109
+ batched more than once. You should choose one or the other.
100
110
101
111
.. code :: python
102
112
@@ -105,20 +115,65 @@ Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe
105
115
if __name__ == ' __main__' :
106
116
num_files_to_generate = 3
107
117
for i in range (num_files_to_generate):
108
- generate_csv(file_label = i)
118
+ generate_csv(file_label = i, num_rows = 10 , num_features = 3 )
109
119
datapipe = build_datapipes()
110
- dl = DataLoader(dataset = datapipe, batch_size = 50 , shuffle = True )
120
+ dl = DataLoader(dataset = datapipe, shuffle = True , batch_size = 5 , num_workers = 2 )
111
121
first = next (iter (dl))
112
122
labels, features = first[' label' ], first[' data' ]
113
123
print (f " Labels batch shape: { labels.size()} " )
114
124
print (f " Feature batch shape: { features.size()} " )
125
+ print (f " { labels = } \n { features = } " )
126
+ n_sample = 0
127
+ for row in iter (dl):
128
+ n_sample += 1
129
+ print (f " { n_sample = } " )
115
130
116
131
The following statements will be printed to show the shapes of a single batch of labels and features.
117
132
118
133
.. code ::
119
134
120
- Labels batch shape: 50
121
- Feature batch shape: torch.Size([50, 20])
135
+ Labels batch shape: torch.Size([5])
136
+ Feature batch shape: torch.Size([5, 3])
137
+ labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32)
138
+ features = tensor([[0.2867, 0.5973, 0.0730],
139
+ [0.7890, 0.9279, 0.7392],
140
+ [0.8930, 0.7434, 0.0780],
141
+ [0.8225, 0.4047, 0.0800],
142
+ [0.1655, 0.0323, 0.5561]], dtype=torch.float64)
143
+ n_sample = 12
144
+
145
+ The reason why ``n_sample = 12 `` is because ``ShardingFilter `` (``datapipe.sharding_filter() ``) was not used, such that
146
+ each worker will independently return all samples. In this case, there are 10 rows per file and 3 files, with a
147
+ batch size of 5, that gives us 6 batches per worker. With 2 workers, we get 12 total batches from the ``DataLoader ``.
148
+
149
+ In order for DataPipe sharding to work with ``DataLoader ``, we need to add the following:
150
+
151
+ .. code :: python
152
+
153
+ def build_datapipes (root_dir = " ." ):
154
+ datapipe = ...
155
+ # Add the following line to `build_datapipes`
156
+ datapipe = datapipe.sharding_filter()
157
+ return datapipe
158
+
159
+ def worker_init_fn (worker_id ):
160
+ info = torch.utils.data.get_worker_info()
161
+ num_workers = info.num_workers
162
+ datapipe = info.dataset
163
+ torch.utils.data.graph_settings.apply_sharding(datapipe, num_workers, worker_id)
164
+
165
+ # Pass `worker_init_fn` into `DataLoader` within '__main__'
166
+ ...
167
+ dl = DataLoader(dataset = datapipe, shuffle = True , batch_size = 5 , num_workers = 2 , worker_init_fn = worker_init_fn)
168
+ ...
169
+
170
+ When we re-run, we will get:
171
+
172
+ .. code ::
173
+
174
+ ...
175
+ n_sample = 6
176
+
122
177
123
178
You can find more DataPipe implementation examples for various research domains `on this page <torchexamples.html >`_.
124
179
0 commit comments