41
41
42
42
# Named tuple with details for each output of an Analyzer.
43
43
_AnalyzerOutputInfo = collections .namedtuple (
44
- 'AnalyzerOutputInfo' , ['name' , 'dtype' , ' is_asset' ])
44
+ 'AnalyzerOutputInfo' , ['name' , 'is_asset' ])
45
45
46
46
47
47
# NOTE: this code is designed so that Analyzer is pickleable, and in particular
52
52
# of a PTransform in our implementation of tf.Transform on Beam currently, so
53
53
# we must avoid directly putting `Tensor`s inside `Analyzer`, and instead use
54
54
# tensor names.
55
+ #
56
+ # Due to these pickling issues and also logical separation of TensorFlow and
57
+ # numpy code, the spec should also not contain TensorFlow dtypes but rather
58
+ # their numpy equivalent.
55
59
class Analyzer (object ):
56
60
"""An operation-like class for full-pass analyses of data.
57
61
@@ -91,7 +95,7 @@ def __init__(self, inputs, output_dtype_shape_and_is_asset, spec, name):
91
95
raise ValueError (('Tensor {} cannot represent an asset, because it '
92
96
'is not a string.' ).format (output_tensor .name ))
93
97
self ._output_infos .append (_AnalyzerOutputInfo (
94
- output_tensor .name , output_tensor . dtype , is_asset ))
98
+ output_tensor .name , is_asset ))
95
99
self ._spec = spec
96
100
tf .add_to_collection (ANALYZER_COLLECTION , self )
97
101
@@ -201,11 +205,18 @@ def combine_analyzer(inputs, output_dtypes, output_shapes, combiner_spec, name):
201
205
202
206
203
207
class _NumPyCombinerSpec (CombinerSpec ):
204
- """Combines the PCollection only on the 0th dimension using nparray."""
208
+ """Combines the PCollection only on the 0th dimension using nparray.
209
+
210
+ Args:
211
+ fn: The numpy function representing the reduction to be done.
212
+ reduce_instance_dims: Whether to reduce across non-batch dimensions.
213
+ output_dtypes: The numpy dtype to cast each output to.
214
+ """
205
215
206
- def __init__ (self , fn , reduce_instance_dims ):
216
+ def __init__ (self , fn , reduce_instance_dims , output_dtypes ):
207
217
self ._fn = fn
208
218
self ._reduce_instance_dims = reduce_instance_dims
219
+ self ._output_dtypes = output_dtypes
209
220
210
221
def create_accumulator (self ):
211
222
return None
@@ -232,7 +243,13 @@ def merge_accumulators(self, accumulators):
232
243
for sub_accumulators in zip (* accumulators )]
233
244
234
245
def extract_output (self , accumulator ):
235
- return accumulator
246
+ if accumulator is None :
247
+ return None
248
+ # For each output, cast that output to the specified type. Note there will
249
+ # be one output for each input tensor to the analyzer.
250
+ return [sub_accumulator .astype (output_dtype )
251
+ for sub_accumulator , output_dtype
252
+ in zip (accumulator , self ._output_dtypes )]
236
253
237
254
238
255
def _numeric_combine (inputs , fn , reduce_instance_dims = True , name = None ):
@@ -266,11 +283,10 @@ def _numeric_combine(inputs, fn, reduce_instance_dims=True, name=None):
266
283
# shape.
267
284
shapes = [x .shape .as_list ()[1 :] if x .shape .dims is not None else None
268
285
for x in inputs ]
286
+ spec = _NumPyCombinerSpec (fn , reduce_instance_dims ,
287
+ [x .dtype .as_numpy_dtype for x in inputs ])
269
288
return combine_analyzer (
270
- inputs ,
271
- [x .dtype for x in inputs ],
272
- shapes ,
273
- _NumPyCombinerSpec (fn , reduce_instance_dims ),
289
+ inputs , [x .dtype for x in inputs ], shapes , spec ,
274
290
name if name is not None else fn .__name__ )
275
291
276
292
@@ -615,22 +631,17 @@ def quantiles(x, num_buckets, epsilon, name=None):
615
631
616
632
with tf .name_scope (name , 'quantiles' ):
617
633
spec = _QuantilesSpec (epsilon , num_buckets )
618
- quantile_boundaries = Analyzer (
634
+ return Analyzer (
619
635
[x ], [(spec .bucket_dtype , [1 , None ], False )], spec ,
620
636
'quantiles' ).outputs [0 ]
621
637
622
- # The Analyzer returns a 2d matrix of 1*num_buckets. Below, we remove
623
- # the first dimension and return the boundaries as a simple 1d list.
624
- return quantile_boundaries [0 :1 ]
625
-
626
638
627
639
class _CovarianceCombinerSpec (CombinerSpec ):
628
640
"""Combines the PCollection to compute the biased covariance matrix."""
629
641
630
- def __init__ (self , dtype = tf .float64 ):
642
+ def __init__ (self , numpy_dtype = np .float64 ):
631
643
"""Store the dtype for np arrays/matrices for precision."""
632
- self ._output_dtype = dtype
633
- self ._np_dtype = dtype .as_numpy_dtype
644
+ self ._numpy_dtype = numpy_dtype
634
645
635
646
def create_accumulator (self ):
636
647
"""Create an accumulator with all zero entries."""
@@ -663,9 +674,9 @@ def add_input(self, accumulator, batch_values):
663
674
batch_cross_terms = np .matmul (
664
675
np .transpose (batch_value ),
665
676
batch_value
666
- ).astype (self ._np_dtype )
677
+ ).astype (self ._numpy_dtype )
667
678
668
- batch_sum = np .array (np .sum (batch_value , axis = 0 ), self ._np_dtype )
679
+ batch_sum = np .array (np .sum (batch_value , axis = 0 ), self ._numpy_dtype )
669
680
batch_count = np .shape (batch_value )[0 ]
670
681
671
682
if accumulator is None :
@@ -725,7 +736,7 @@ def covariance(x, dtype, name=None):
725
736
Args:
726
737
x: A rank-2 `Tensor`, 0th dim are rows, 1st dim are indices in each input
727
738
vector.
728
- dtype: numpy dtype of entries in the returned matrix.
739
+ dtype: Tensorflow dtype of entries in the returned matrix.
729
740
name: (Optional) A name for this operation.
730
741
731
742
Raises:
@@ -743,17 +754,17 @@ def covariance(x, dtype, name=None):
743
754
input_dim = x .shape .as_list ()[1 ]
744
755
shape = (input_dim , input_dim )
745
756
746
- spec = _CovarianceCombinerSpec (dtype )
757
+ spec = _CovarianceCombinerSpec (dtype . as_numpy_dtype )
747
758
return combine_analyzer (
748
759
[x ], [dtype ], [shape ], spec ,
749
760
name if name is not None else 'covariance' )[0 ]
750
761
751
762
752
763
class _PCACombinerSpec (_CovarianceCombinerSpec ):
753
764
754
- def __init__ (self , output_dim = None , dtype = tf .float64 ):
765
+ def __init__ (self , output_dim = None , numpy_dtype = np .float64 ):
755
766
"""Store pca output dimension, and dtype for precision."""
756
- super (_PCACombinerSpec , self ).__init__ (dtype = dtype )
767
+ super (_PCACombinerSpec , self ).__init__ (numpy_dtype = numpy_dtype )
757
768
self ._output_dim = output_dim
758
769
759
770
def extract_output (self , accumulator ):
@@ -844,7 +855,7 @@ def pca(x, output_dim, dtype, name=None):
844
855
Args:
845
856
x: A rank-2 `Tensor`, 0th dim are rows, 1st dim are indices in row vectors.
846
857
output_dim: The PCA output dimension (number of eigenvectors to return).
847
- dtype: numpy dtype of entries in the returned matrix.
858
+ dtype: Tensorflow dtype of entries in the returned matrix.
848
859
name: (Optional) A name for this operation.
849
860
850
861
Raises:
@@ -862,7 +873,7 @@ def pca(x, output_dim, dtype, name=None):
862
873
input_dim = x .shape .as_list ()[1 ]
863
874
shape = (input_dim , output_dim )
864
875
865
- spec = _PCACombinerSpec (output_dim , dtype )
876
+ spec = _PCACombinerSpec (output_dim , dtype . as_numpy_dtype )
866
877
return combine_analyzer (
867
878
[x ], [dtype ], [shape ], spec ,
868
879
name if name is not None else 'pca' )[0 ]
0 commit comments