Skip to content

Commit 91210a2

Browse files
authored
Fix cumprod to work properly with Integer columns. (#1750)
Basically, this PR addressed #1739 (review) `cumprod` for `DataFrame` & `Series` & `GroupBy` isn't working properly with integer columns. ```python >>> pdf A B C 0 2.0 1.0 1 1 5.0 NaN 2 2 1.0 1.0 3 3 2.0 4.0 4 4 4.0 9.0 5 >>> pdf.cumprod() A B C 0 2.0 1.0 1 1 10.0 NaN 2 2 10.0 1.0 6 3 20.0 4.0 24 4 80.0 36.0 120 >>> ks.from_pandas(pdf).cumprod() A B C 0 2.0 1.0 1.0 1 10.0 NaN 2.0 2 10.0 1.0 6.0 3 20.0 4.0 24.0 4 80.0 36.0 120.0 ``` This PR addressed it and also addressed the related tests. ```python >>> pdf.cumprod() A B C 0.986323 2.0 1.0 1 0.297507 10.0 NaN 2 0.617855 10.0 1.0 6 0.711719 20.0 4.0 24 0.290114 80.0 36.0 120 >>> ks.from_pandas(pdf).cumprod() A B C 0.986323 2.0 1.0 1 0.297507 10.0 NaN 2 0.617855 10.0 1.0 6 0.711719 20.0 4.0 24 0.290114 80.0 36.0 120 ```
1 parent 6492dd2 commit 91210a2

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

databricks/koalas/groupby.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -824,11 +824,11 @@ def cumprod(self):
824824
By default, iterates over rows and finds the sum in each column.
825825
826826
>>> df.groupby("A").cumprod().sort_index()
827-
B C
828-
0 NaN 4.0
829-
1 0.1 12.0
830-
2 2.0 24.0
831-
3 10.0 1.0
827+
B C
828+
0 NaN 4
829+
1 0.1 12
830+
2 2.0 24
831+
3 10.0 1
832832
833833
It works as below in Series.
834834

databricks/koalas/series.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5230,8 +5230,10 @@ def _cum(self, func, skipna, part_cols=(), ascending=True):
52305230
def _cumprod(self, skipna, part_cols=()):
52315231
from pyspark.sql.functions import pandas_udf
52325232

5233+
data_type = self.spark.data_type
5234+
52335235
def cumprod(scol):
5234-
@pandas_udf(returnType=self.spark.data_type)
5236+
@pandas_udf(returnType=data_type)
52355237
def negative_check(s):
52365238
assert len(s) == 0 or ((s > 0) | (s.isnull())).all(), (
52375239
"values should be bigger than 0: %s" % s
@@ -5241,7 +5243,10 @@ def negative_check(s):
52415243
return F.sum(F.log(negative_check(scol)))
52425244

52435245
kser = self._cum(cumprod, skipna, part_cols)
5244-
return kser._with_new_scol(F.exp(kser.spark.column))
5246+
result = kser._with_new_scol(F.exp(kser.spark.column))
5247+
if isinstance(data_type, IntegralType):
5248+
result = result.spark.transform(lambda col: F.round(col).cast(LongType()))
5249+
return result
52455250

52465251
# ----------------------------------------------------------------------
52475252
# Accessor Methods

databricks/koalas/tests/test_dataframe.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2273,13 +2273,22 @@ def _test_cumprod(self, pdf, kdf):
22732273
self.assert_eq(pdf.cumprod().sum(), kdf.cumprod().sum(), almost=True)
22742274

22752275
def test_cumprod(self):
2276-
pdf = pd.DataFrame(
2277-
[[2.0, 1.0], [5, None], [1.0, 1.0], [2.0, 4.0], [4.0, 9.0]],
2278-
columns=list("AB"),
2279-
index=np.random.rand(5),
2280-
)
2281-
kdf = ks.from_pandas(pdf)
2282-
self._test_cumprod(pdf, kdf)
2276+
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
2277+
pdf = pd.DataFrame(
2278+
[[2.0, 1.0, 1], [5, None, 2], [1.0, 1.0, 3], [2.0, 4.0, 4], [4.0, 9.0, 5]],
2279+
columns=list("ABC"),
2280+
index=np.random.rand(5),
2281+
)
2282+
kdf = ks.from_pandas(pdf)
2283+
self._test_cumprod(pdf, kdf)
2284+
else:
2285+
pdf = pd.DataFrame(
2286+
[[2, 1, 1], [5, 1, 2], [1, 1, 3], [2, 4, 4], [4, 9, 5]],
2287+
columns=list("ABC"),
2288+
index=np.random.rand(5),
2289+
)
2290+
kdf = ks.from_pandas(pdf)
2291+
self._test_cumprod(pdf, kdf)
22832292

22842293
def test_cumprod_multiindex_columns(self):
22852294
arrays = [np.array(["A", "A", "B", "B"]), np.array(["one", "two", "one", "two"])]

databricks/koalas/tests/test_series.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,13 @@ def test_cumprod(self):
979979
self.assert_eq(pser.cumprod(skipna=False), kser.cumprod(skipna=False))
980980
self.assert_eq(pser.cumprod().sum(), kser.cumprod().sum())
981981

982+
# with integer type
983+
pser = pd.Series([1, 10, 1, 4, 9])
984+
kser = ks.from_pandas(pser)
985+
self.assert_eq(pser.cumprod(), kser.cumprod())
986+
self.assert_eq(pser.cumprod(skipna=False), kser.cumprod(skipna=False))
987+
self.assert_eq(pser.cumprod().sum(), kser.cumprod().sum())
988+
982989
# with reversed index
983990
pser.index = [4, 3, 2, 1, 0]
984991
kser = ks.from_pandas(pser)

0 commit comments

Comments
 (0)