-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
Copy pathdata_loader.py
565 lines (497 loc) · 24.4 KB
/
data_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import contextlib
from collections import OrderedDict
from polygraphy import constants, func, mod, util
from polygraphy.comparator.struct import RunResults
from polygraphy.datatype import DataType
from polygraphy.exception import DataTypeConversionException, PolygraphyException
from polygraphy.json import save_json
from polygraphy.logger import G_LOGGER, LogMode
np = mod.lazy_import("numpy")
torch = mod.lazy_import("torch")
class ArraySampler:
def __init__(self, data_loader_backend_module, seed):
"""
Args:
data_loader_backend_module (str):
The module specifying the array type to use to generate arrays.
Can be either "numpy" or "torch".
seed (int):
The seed to use when generating random inputs.
"""
self.rng = None
VALID_ARRAY_MODULES = ["numpy", "torch"]
if data_loader_backend_module not in VALID_ARRAY_MODULES:
G_LOGGER.critical(
f"Invalid `data_loader_backend_module`. Note: got: {data_loader_backend_module} but valid modules are: {VALID_ARRAY_MODULES}"
)
self.data_loader_backend_module = data_loader_backend_module
if self.data_loader_backend_module == "numpy":
self.rng = np.random.RandomState(seed)
elif self.data_loader_backend_module == "torch":
self.rng = torch.Generator()
self.rng.manual_seed(seed)
def sample_integer(self, shape, dtype, low, high):
"""
Samples an array containing integral values in the range [low, high], inclusive
"""
dtype = (
DataType.to_dtype(
DataType.from_dtype(dtype), self.data_loader_backend_module
)
if dtype is not None
else dtype
)
if self.data_loader_backend_module == "numpy":
return np.array(
self.rng.randint(low=low, high=high + 1, size=shape, dtype=dtype)
)
elif self.data_loader_backend_module == "torch":
return torch.randint(low, high + 1, shape, generator=self.rng, dtype=dtype)
def sample_float(self, shape, dtype, fmin, fmax):
"""
Samples an array containing float values in the range [fmin, fmax], inclusive
"""
# Special handling for infinite lower/upper bounds
# Without this, two infinities will collapse into a NaN, resulting in no infinities
# in the final output.
scale = fmax - fmin
shift = fmin
if util.is_inf(fmin):
scale = fmin
shift = 0
if util.is_inf(fmax):
scale = fmax
dtype = (
DataType.to_dtype(
DataType.from_dtype(dtype), self.data_loader_backend_module
)
if dtype is not None
else dtype
)
if self.data_loader_backend_module == "numpy":
return np.array(
(self.rng.random_sample(size=shape) * scale + shift).astype(dtype)
)
elif self.data_loader_backend_module == "torch":
return torch.rand(shape, generator=self.rng, dtype=dtype)
def constant_array(self, shape, dtype):
dtype = (
DataType.to_dtype(
DataType.from_dtype(dtype), self.data_loader_backend_module
)
if dtype is not None
else dtype
)
if self.data_loader_backend_module == "numpy":
return np.array(shape, dtype=dtype)
elif self.data_loader_backend_module == "torch":
return torch.tensor(shape, dtype=dtype)
@mod.export()
class DataLoader:
"""
Generates synthetic input data.
"""
def __init__(
self,
seed=None,
iterations=None,
input_metadata=None,
int_range=None,
float_range=None,
val_range=None,
data_loader_backend_module=None,
):
"""
Args:
seed (int):
The seed to use when generating random inputs.
Defaults to ``util.constants.DEFAULT_SEED``.
iterations (int):
The number of iterations for which to supply data.
Defaults to 1.
input_metadata (TensorMetadata):
A mapping of input names to their corresponding shapes and data types.
This will be used to determine what shapes to supply for inputs with dynamic shape, as
well as to set the data type of the generated inputs.
If either dtype or shape are None, then the value will be automatically determined.
For input shape tensors, i.e. inputs whose *value* describes a shape in the model, the
provided shape will be used to populate the values of the inputs, rather than to determine
their shape.
val_range (Union[Tuple[number], Dict[str, Tuple[number]]]):
A tuple containing exactly 2 numbers, indicating the minimum and maximum values (inclusive)
the data loader should generate.
If either value in the tuple is None, the default will be used for that value.
If None is provided instead of a tuple, then the default values will be used for both the
minimum and maximum.
This can be specified on a per-input basis using a dictionary. In that case,
use an empty string ("") as the key to specify default range for inputs not explicitly listed.
Defaults to (0.0, 1.0).
data_loader_backend_module (str):
A string denoting what module to use to construct the input data arrays. Currently supports
"numpy" and "torch".
Defaults to "numpy".
int_range (Tuple[int]):
[DEPRECATED - Use val_range instead]
A tuple containing exactly 2 integers, indicating the minimum and maximum integer values (inclusive)
the data loader should generate. If either value in the tuple is None, the default will be used
for that value.
If None is provided instead of a tuple, then the default values will be used for both the
minimum and maximum.
float_range (Tuple[float]):
[DEPRECATED - Use val_range instead]
A tuple containing exactly 2 floats, indicating the minimum and maximum float values (inclusive)
the data loader should generate. If either value in the tuple is None, the default will be used
for that value.
If None is provided instead of a tuple, then the default values will be used for both the
minimum and maximum.
"""
def default_tuple(tup, default):
if tup is None or (
not isinstance(tup, tuple) and not isinstance(tup, list)
):
return default
new_tup = []
for elem, default_elem in zip(tup, default):
new_tup.append(util.default(elem, default_elem))
return tuple(new_tup)
self.seed = util.default(seed, constants.DEFAULT_SEED)
self.iterations = util.default(iterations, 1)
self.user_input_metadata = util.default(input_metadata, {})
self.data_loader_backend_module = util.default(
data_loader_backend_module, "numpy"
)
self._int_range_set = int_range is not None
if self._int_range_set:
mod.warn_deprecated(
"The int_range parameter in DataLoader", "val_range", remove_in="0.50.0"
)
self._int_range = default_tuple(int_range, (1, 25))
self._float_range_set = float_range is not None
if self._float_range_set:
mod.warn_deprecated(
"The float_range parameter in DataLoader",
"val_range",
remove_in="0.50.0",
)
self._float_range = default_tuple(float_range, (-1.0, 1.0))
self.input_metadata = None
self.default_val_range = default_tuple(val_range, (0.0, 1.0))
self.val_range = util.default(val_range, self.default_val_range)
if self.user_input_metadata:
G_LOGGER.info(
f"Will generate inference input data according to provided TensorMetadata: {self.user_input_metadata}"
)
def __repr__(self):
return util.make_repr(
"DataLoader",
seed=self.seed,
iterations=self.iterations,
input_metadata=self.user_input_metadata or None,
int_range=self._int_range,
float_range=self._float_range,
val_range=self.val_range,
data_loader_backend_module=self.data_loader_backend_module,
)[0]
def _get_range(self, name, cast_type):
if cast_type == int and self._int_range_set:
return self._int_range
elif cast_type == float and self._float_range_set:
return self._float_range
tup = util.value_or_from_dict(self.val_range, name, self.default_val_range)
return tuple(cast_type(val) for val in tup)
def __getitem__(self, index):
"""
Generates random input data.
May update the DataLoader's `input_metadata` attribute.
Args:
index (int):
Since this class behaves like an iterable, it takes an index parameter.
Generated data is guaranteed to be the same for the same index.
Returns:
OrderedDict[str, Union[numpy.ndarray, torch.Tensor]]: A mapping of input names to input numpy buffers.
"""
if index >= self.iterations:
raise IndexError()
G_LOGGER.verbose(f"Generating data using numpy seed: {self.seed + index}")
array_sampler = ArraySampler(self.data_loader_backend_module, self.seed + index)
def get_static_shape(name, shape):
static_shape = shape
if util.is_shape_dynamic(shape):
if shape.min is not None:
static_shape = shape.min
elif shape.max is not None:
static_shape = shape.max
else:
static_shape = util.override_dynamic_shape(shape)
if static_shape != shape:
if not util.is_valid_shape_override(static_shape, shape):
G_LOGGER.critical(
f"Input tensor: {name} | Cannot override original shape: {shape} to {static_shape}"
)
G_LOGGER.warning(
f"Input tensor: {name} [shape={shape}] | Will generate data of shape: {static_shape}.\n"
f"If this is incorrect, please provide a custom data loader.",
mode=LogMode.ONCE,
)
return static_shape
# Whether the user provided the values for a shape tensor input,
# rather than the shape of the input.
# If the shape is 1D, and has a value equal to the rank of the provided default shape, it is
# likely to be a shape tensor, and so its value, not shape, should be overriden.
# Note that this is a hack needed for older versions of TensorRT. Ideally, we wouldn't care
# whether the input is a shape tensor or not.
def is_shape_tensor(name, dtype):
if name not in self.input_metadata or name not in self.user_input_metadata:
return False
_, shape = self.input_metadata[name]
if (
(dtype is not None and not DataType.from_dtype(dtype).is_integral)
or util.is_shape_dynamic(shape)
or len(shape) != 1
):
return False
user_shape = self.user_input_metadata[name].shape
# Shape of shape cannot be dynamic.
return not util.is_shape_dynamic(user_shape) and len(user_shape) == shape[0]
def generate_buffer(name, dtype, shape):
if is_shape_tensor(name, dtype):
buffer = array_sampler.constant_array(shape, dtype)
G_LOGGER.info(
f"Assuming {name} is a shape tensor. Setting input values to: {buffer}. "
"If these values are not correct, please set it correctly in 'input_metadata' or by providing --input-shapes",
mode=LogMode.ONCE,
)
elif dtype is not None and (
DataType.from_dtype(dtype).is_integral
or DataType.from_dtype(dtype) == DataType.BOOL
):
imin, imax = self._get_range(
name,
cast_type=int if DataType.from_dtype(dtype).is_integral else bool,
)
G_LOGGER.verbose(
f"Input tensor: {name} | Generating input data in range: [{imin}, {imax}]",
mode=LogMode.ONCE,
)
# high is 1 greater than the max int drawn.
buffer = array_sampler.sample_integer(shape, dtype, imin, imax)
else:
fmin, fmax = self._get_range(name, cast_type=float)
G_LOGGER.verbose(
f"Input tensor: {name} | Generating input data in range: [{fmin}, {fmax}]",
mode=LogMode.ONCE,
)
buffer = array_sampler.sample_float(shape, dtype, fmin, fmax)
return buffer
if self.input_metadata is None and self.user_input_metadata is not None:
self.input_metadata = self.user_input_metadata
buffers = OrderedDict()
# Expand wildcards to inputs in user_input_metadata
inp_to_user_inp, unmatched_user_inps = util.match_keys(list(self.user_input_metadata), list(self.input_metadata))
# Warn about unused metadata
if unmatched_user_inps:
for name in unmatched_user_inps:
if util.contains_wildcard(name):
msg = f"Input tensor wildcard: {name} | Metadata was provided, but none of the matched inputs exist in one or more runners."
G_LOGGER.warning(msg)
else:
msg = f"Input tensor: {name} | Metadata was provided, but the input does not exist in one or more runners."
close_match = util.find_str_in_iterable(
name, self.input_metadata.keys()
)
if close_match:
msg += f"\nMaybe you meant to set: {close_match}?"
G_LOGGER.warning(msg)
for name, (dtype, shape) in self.input_metadata.items():
try:
dtype = (
DataType.to_dtype(
DataType.from_dtype(dtype), self.data_loader_backend_module
)
if dtype is not None
else None
)
except DataTypeConversionException:
G_LOGGER.critical(
f"Could not convert data type: {dtype} to {self.data_loader_backend_module}, so the default data loader cannot generate a {self.data_loader_backend_module} array for input: {name}. "
f"Please use a custom data loader to provide inputs. "
)
if name in inp_to_user_inp:
user_dtype, user_shape = self.user_input_metadata[inp_to_user_inp[name]]
dtype = util.default(user_dtype, dtype)
dtype = (
DataType.to_dtype(
DataType.from_dtype(dtype), self.data_loader_backend_module
)
if dtype is not None
else None
)
is_valid_shape_override = (
user_shape is not None
and util.is_valid_shape_override(user_shape, shape)
)
if util.is_shape_dynamic(user_shape):
G_LOGGER.warning(
f"Input tensor: {name} [shape={shape}] | Provided input shape: {user_shape} is dynamic.\nDynamic shapes cannot be used to generate inference data. Will use default shape instead.\nTo avoid this, please provide a fixed shape to the data loader. "
)
elif not is_valid_shape_override and not is_shape_tensor(name, dtype):
G_LOGGER.warning(
f"Input tensor: {name} [shape={shape}] | Cannot use provided custom shape: {user_shape} to override tensor shape. Will use default shape instead.",
mode=LogMode.ONCE,
)
else:
shape = util.default(user_shape, shape)
static_shape = get_static_shape(name, shape)
buffers[name] = generate_buffer(name, dtype, shape=static_shape)
# Warn about unused val_range
if not isinstance(self.val_range, tuple):
util.check_sequence_contains(
self.val_range.keys(),
list(self.input_metadata.keys()) + [""],
name="val_range",
log_func=G_LOGGER.warning,
check_missing=False,
)
return buffers
# Caches data loaded by a DataLoader for use across multiple runners.
class DataLoaderCache:
def __init__(self, data_loader, save_inputs_path=None):
self.data_loader = data_loader
self.cache = [] # List[OrderedDict[str, numpy.ndarray]]
self.save_inputs_path = save_inputs_path
@func.constantmethod
def __getitem__(self, iteration):
"""
Load the specified iteration from the cache if present, or load it from the data loader.
Args:
iteration (int): The iteration whose data to retrieve.
"""
if iteration >= len(self.cache):
raise IndexError()
# Attempts to match existing input buffers to the requested input_metadata
def coerce_cached_input(index, name, dtype, shape):
cached_feed_dict = self.cache[iteration]
cached_name = util.find_str_in_iterable(
name, cached_feed_dict.keys(), index
)
if cached_name is None:
G_LOGGER.critical(
f"Input tensor: {name} | Does not exist in the data loader cache."
)
if cached_name != name:
G_LOGGER.warning(
f"Input tensor: {name} | Buffer name ({cached_name}) does not match expected input name ({name})."
)
buffer = cached_feed_dict[cached_name]
if dtype != util.array.dtype(buffer):
G_LOGGER.warning(
f"Input tensor: {name} | Buffer dtype ({util.array.dtype(buffer)}) does not match expected input dtype ({dtype}), attempting to cast. "
)
try:
np_type = DataType.to_dtype(dtype, "numpy")
except:
pass
else:
type_info = None
if dtype.is_integral:
type_info = np.iinfo(np_type)
elif dtype.is_floating:
type_info = np.finfo(np_type)
if type_info is not None and util.array.any(
(buffer < type_info.min) | (buffer > type_info.max)
):
G_LOGGER.warning(
f"Some values in this input are out of range of {dtype}. Unexpected behavior may ensue!"
)
buffer = util.array.cast(buffer, dtype)
if not util.is_valid_shape_override(util.array.shape(buffer), shape):
G_LOGGER.warning(
f"Input tensor: {name} | Buffer shape ({util.array.shape(buffer)}) does not match expected input shape ({shape}). "
f"Attempting to transpose/reshape. "
)
buffer = util.try_match_shape(buffer, shape)
if util.array.dtype(buffer) != dtype or not util.is_valid_shape_override(
util.array.shape(buffer), shape
):
G_LOGGER.critical(
f"Input tensor: {name} | Cannot reuse input data due to mismatch in shape or data type.\n"
f"Note: Cached input: [dtype={util.array.dtype(buffer)}, shape={util.array.shape(buffer)}], "
f"Requested input: [dtype={dtype}, shape={shape}]"
)
return buffer
feed_dict = OrderedDict()
# Reload from data loader if needed
data_loader_feed_dict = None
for index, (name, (dtype, shape)) in enumerate(self.input_metadata.items()):
try:
buffer = coerce_cached_input(
index, name, DataType.from_dtype(dtype), shape
)
except PolygraphyException:
G_LOGGER.warning(
f"Could not use buffer previously cached from data loader for input: {name}. Attempting to reload inputs from the data loader.\nNote that this will only work if the data loader supports random access.\nPlease refer to warnings above for details on why the previously generated input buffer didn't work. "
)
try:
if data_loader_feed_dict is None:
data_loader_feed_dict = self.data_loader[iteration]
buffer = data_loader_feed_dict[name]
except:
G_LOGGER.critical(
"Could not reload inputs from data loader. Are the runners running the same model? "
"If not, please rewrite the data loader to support random access."
)
feed_dict[name] = buffer
return feed_dict
def set_input_metadata(self, input_metadata):
"""
Set the input metadata for the data loader.
Args:
input_metadata (TensorMetadata):
Input Metadata, including shape and type information. The cache may attempt to transform inputs to
match the specified input_metadata when data already in the cache does not exactly match.
"""
self.input_metadata = input_metadata
with contextlib.suppress(AttributeError):
self.data_loader.input_metadata = input_metadata
if not self.cache:
G_LOGGER.verbose("Loading inputs from data loader")
self.cache = list(self.data_loader)
def _is_feed_dict(inp):
if isinstance(inp, RunResults):
return False
try:
for name, _ in inp.items():
if not isinstance(name, str):
return False
except:
return False
else:
return True
if not self.cache:
G_LOGGER.warning("Data loader did not yield any input data.")
elif not _is_feed_dict(self.cache[0]):
G_LOGGER.critical(
f"Data loader returned an object that cannot be recognized as a feed_dict (Dict[str, Union[np.ndarray, torch.Tensor, DeviceView]]):"
f"\nNote: The object was:\n{self.cache[0]}.\n"
f"\nHint: If this is a `RunReults` object (e.g. generated with `--save-outputs`), try using the "
f"`data to-input` tool to convert it to a feed_dict compatible format. "
)
# Only save inputs the first time the cache is generated
if self.save_inputs_path is not None:
save_json(self.cache, self.save_inputs_path, "inference input data")