Skip to content

Commit

Permalink
SNOW-1023211: Support resample with rule offset W, ME, YE with …
Browse files Browse the repository at this point in the history
…`closed = left` (#2254)

SNOW-1023211

This PR adds support for `resample` by rule offset `W`, `ME`, `YE` with
parameter `closed = left`.

Note: Snowpark pandas `resample` implementation uses the Snowflake SQL
function `TIME_SLICE` which assumes the left hand side of the given
interval is closed, meaning support using `closed = right` parameter is
non-trivial.

---------

Signed-off-by: Naren Krishna <[email protected]>
  • Loading branch information
sfc-gh-nkrishna committed Sep 20, 2024
1 parent af97654 commit d2d3e97
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Added support for passing parameter `include_describe` to `Session.query_history`.
- Added support for `DatetimeIndex.mean` and `DatetimeIndex.std` methods.
- Added support for `Resampler.asfreq`.
- Added support for `resample` frequency `W`, `ME`, `YE` with `closed = "left"`.

#### Bug Fixes

Expand Down
14 changes: 8 additions & 6 deletions docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``applymap`` | P | | ``N`` if ``na_action == "ignore"`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asfreq`` | P | ``how``, ``normalize``, | See ``resample`` |
| | | ``fill_value`` | |
| ``asfreq`` | P | ``how``, ``normalize``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``fill_value`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | | 'h', and 'D' are supported. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asof`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down Expand Up @@ -347,10 +348,11 @@ Methods
| ``replace`` | P | ``copy`` is ignored, ``method``, | |
| | | ``limit`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``resample`` | P | ``axis``, ``closed``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. |
| | | , ``offset``, ``group_keys`` | |
| ``resample`` | P | ``axis``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. ``rule`` frequencies 's', 'min', |
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. ``rule`` frequencies |
| | | , ``offset``, ``group_keys`` | 'W', 'ME', and 'YE' are supported with |
| | | | `closed = "left"` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``reset_index`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
14 changes: 8 additions & 6 deletions docs/source/modin/supported/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``argsort`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asfreq`` | P | ``how``, ``normalize``, | See ``resample`` |
| | | ``fill_value`` | |
| ``asfreq`` | P | ``how``, ``normalize``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``fill_value`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | | 'h', and 'D' are supported. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asof`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down Expand Up @@ -340,10 +341,11 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``replace`` | P | ``method``, ``limit`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``resample`` | P | ``axis``, ``closed``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. |
| | | , ``offset``, ``group_keys`` | |
| ``resample`` | P | ``axis``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. ``rule`` frequencies 's', 'min', |
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. ``rule`` frequencies |
| | | , ``offset``, ``group_keys`` | 'W', 'ME', and 'YE' are supported with |
| | | | `closed = "left"` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``reset_index`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
65 changes: 45 additions & 20 deletions src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from typing import Any, Literal, NoReturn, Optional, Union

import pandas as native_pd
from pandas._libs.lib import no_default
from pandas._libs.tslibs import to_offset
from pandas._typing import Frequency
Expand All @@ -16,6 +15,7 @@
builtin,
dateadd,
datediff,
last_day,
lit,
to_timestamp_ntz,
)
Expand Down Expand Up @@ -47,7 +47,9 @@
"last",
]
IMPLEMENTED_MISC_METHODS = ["ffill"]
SUPPORTED_RESAMPLE_RULES = ["day", "hour", "second", "minute"]
SUPPORTED_RESAMPLE_RULES = ("second", "minute", "hour", "day", "week", "month", "year")
RULE_SECOND_TO_DAY = ("second", "minute", "hour", "day")
RULE_WEEK_TO_YEAR = ("week", "quarter", "month", "year")


# https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects
Expand Down Expand Up @@ -82,9 +84,21 @@
"ns",
]

SNOWFLAKE_SUPPORTED_DATEOFFSETS = ["W", "ME", "QE", "QS", "YS", "D", "h", "min", "s"]
SNOWFLAKE_SUPPORTED_DATEOFFSETS = [
"s",
"min",
"h",
"D",
"W",
"MS",
"ME",
"QS",
"QE",
"YS",
"YE",
]

IMPLEMENTED_DATEOFFSET_STRINGS = ["min", "s", "h", "D"]
IMPLEMENTED_DATEOFFSET_STRINGS = ["s", "min", "h", "D", "W", "ME", "YE"]

UNSUPPORTED_DATEOFFSET_STRINGS = list(
# sort so that tests that generate test cases from this last always use the
Expand Down Expand Up @@ -134,7 +148,6 @@ def rule_to_snowflake_width_and_slice_unit(rule: Frequency) -> tuple[int, str]:

rule_code = offset.rule_code
slice_width = offset.n

if rule_code == "s":
slice_unit = "second"
elif rule_code == "min":
Expand All @@ -143,16 +156,16 @@ def rule_to_snowflake_width_and_slice_unit(rule: Frequency) -> tuple[int, str]:
slice_unit = "hour"
elif rule_code == "D":
slice_unit = "day"
elif rule_code[0] == "W": # pragma: no cover
elif rule_code[0] == "W":
# treat codes like W-MON and W-SUN as "week":
slice_unit = "week"
elif rule_code == "ME": # pragma: no cover
elif rule_code == "ME":
slice_unit = "month"
elif rule_code[0] == "QE": # pragma: no cover
# treat codes like Q-DEC and Q-JAN as "quarter":
elif rule_code[0:2] == "QE": # pragma: no cover
# treat codes like QE-DEC and QE-JAN as "quarter":
slice_unit = "quarter"
elif rule_code[0] == "YE": # pragma: no cover
# treat codes like A-DEC and A-JAN as "year":
elif rule_code[0:2] == "YE":
# treat codes like YE-DEC and YE-JAN as "year":
slice_unit = "year"
else:
raise NotImplementedError(
Expand Down Expand Up @@ -204,9 +217,7 @@ def validate_resample_supported_by_snowflake(
"""
rule = resample_kwargs.get("rule")

_, slice_unit = rule_to_snowflake_width_and_slice_unit(
rule # type: ignore[arg-type]
)
_, slice_unit = rule_to_snowflake_width_and_slice_unit(rule)

if slice_unit not in SUPPORTED_RESAMPLE_RULES:
_argument_not_implemented("rule", rule)
Expand All @@ -216,8 +227,13 @@ def validate_resample_supported_by_snowflake(
_argument_not_implemented("axis", axis)

closed = resample_kwargs.get("closed")
if closed is not None: # pragma: no cover
if closed not in ("left", None) and slice_unit in RULE_SECOND_TO_DAY:
_argument_not_implemented("closed", closed)
if slice_unit in RULE_WEEK_TO_YEAR:
if closed != "left":
ErrorMessage.not_implemented(
f"resample with rule offset {rule} is only implemented with closed='left'"
)

label = resample_kwargs.get("label")
if label is not None: # pragma: no cover
Expand Down Expand Up @@ -376,8 +392,7 @@ def perform_resample_binning_on_frame(
# Time slices in Snowflake are aligned to snowflake_timeslice_alignment_date,
# so we must normalize input datetimes.
normalization_amt = (
native_pd.to_datetime(start_date)
- native_pd.to_datetime(SNOWFLAKE_TIMESLICE_ALIGNMENT_DATE)
pd.to_datetime(start_date) - pd.to_datetime(SNOWFLAKE_TIMESLICE_ALIGNMENT_DATE)
).total_seconds()

# Subtract the normalization amount in seconds from the input datetime.
Expand All @@ -399,7 +414,12 @@ def perform_resample_binning_on_frame(

# Call time_slice on the normalized datetime column with the slice_width and slice_unit.
# time_slice is not supported for timestamps with timezones, only TIMESTAMP_NTZ
normalized_dates_set_to_bins = time_slice(normalized_dates, slice_width, slice_unit)
normalized_dates_set_to_bins = time_slice(
column=normalized_dates,
slice_length=slice_width,
date_or_time_part=slice_unit,
start_or_end="start" if slice_unit in RULE_SECOND_TO_DAY else "end",
)
# frame:
# data_col
# date
Expand All @@ -414,8 +434,13 @@ def perform_resample_binning_on_frame(
# 1970-01-10 9

# Add the normalization amount in seconds back to the input datetime for the correct result.
unnormalized_dates_set_to_bins = dateadd(
"second", lit(normalization_amt), normalized_dates_set_to_bins
unnormalized_dates_set_to_bins = (
dateadd("second", lit(normalization_amt), normalized_dates_set_to_bins)
if slice_unit in RULE_SECOND_TO_DAY
else last_day(
dateadd("second", lit(normalization_amt), normalized_dates_set_to_bins),
slice_unit,
)
)
# frame:
# data_col
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
is_char,
is_null,
lag,
last_day,
last_value,
lead,
least,
Expand Down Expand Up @@ -255,6 +256,8 @@
)
from snowflake.snowpark.modin.plugin._internal.resample_utils import (
IMPLEMENTED_AGG_METHODS,
RULE_SECOND_TO_DAY,
RULE_WEEK_TO_YEAR,
fill_missing_resample_bins_for_frame,
get_expected_resample_bins_frame,
get_snowflake_quoted_identifier_for_resample_index_col,
Expand Down Expand Up @@ -11900,6 +11903,12 @@ def asfreq(
"Snowpark pandas `asfreq` does not support parameters `how`, `normalize`, or `fill_value`."
)

_, slice_unit = rule_to_snowflake_width_and_slice_unit(freq)
if slice_unit not in RULE_SECOND_TO_DAY:
ErrorMessage.not_implemented(
"Snowpark pandas `asfreq` does not yet support frequencies week, month, quarter, or year"
)

resample_kwargs = {
"rule": freq,
"axis": 0,
Expand Down Expand Up @@ -11974,7 +11983,7 @@ def resample(

rule = resample_kwargs.get("rule")

_, slice_unit = rule_to_snowflake_width_and_slice_unit(rule)
slice_width, slice_unit = rule_to_snowflake_width_and_slice_unit(rule)

min_max_index_column_quoted_identifier = (
frame.ordered_dataframe.generate_snowflake_quoted_identifiers(
Expand All @@ -11990,14 +11999,45 @@ def resample(
# For instance, if rule='3D' and the earliest date is
# 2020-03-01 1:00:00, the first date should be 2020-03-01,
# which is what date_trunc gives us.
start_date, end_date = frame.ordered_dataframe.agg(
date_trunc(slice_unit, min_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[0]
),
date_trunc(slice_unit, max_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[1]
),
).collect()[0]
if slice_unit in RULE_SECOND_TO_DAY:
# `slice_unit` in 'second', 'minute', 'hour', 'day'
start_date, end_date = frame.ordered_dataframe.agg(
date_trunc(slice_unit, min_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[0]
),
date_trunc(slice_unit, max_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[1]
),
).collect()[0]
else:
assert slice_unit in RULE_WEEK_TO_YEAR
# `slice_unit` in 'week', 'month', 'quarter', or 'year'. Set the start and end dates
# to the last day of the given `slice_unit`. Use the right bin edge by adding a `slice_width`
# of the given `slice_unit` to the first and last date of the index.
start_date, end_date = frame.ordered_dataframe.agg(
last_day(
date_trunc(
slice_unit,
dateadd(
slice_unit,
pandas_lit(slice_width),
min_(snowflake_index_column_identifier),
),
),
slice_unit,
).as_(min_max_index_column_quoted_identifier[0]),
last_day(
date_trunc(
slice_unit,
dateadd(
slice_unit,
pandas_lit(slice_width),
max_(snowflake_index_column_identifier),
),
),
slice_unit,
).as_(min_max_index_column_quoted_identifier[1]),
).collect()[0]

if resample_method in ("ffill", "bfill"):
expected_frame = get_expected_resample_bins_frame(
Expand Down
6 changes: 3 additions & 3 deletions tests/integ/modin/resample/test_resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_resample_with_varying_freq_and_interval(freq, interval, agg_func):
{"A": np.random.randn(15)},
index=native_pd.date_range("2020-01-01", periods=15, freq=f"1{freq}"),
),
lambda df: getattr(df.resample(rule=rule), agg_func)(),
lambda df: getattr(df.resample(rule=rule, closed="left"), agg_func)(),
check_freq=False,
)

Expand Down Expand Up @@ -102,7 +102,7 @@ def test_resample_missing_data_upsample(agg_func, freq):
rule = f"1{freq}"
eval_snowpark_pandas_result(
*create_test_dfs({"A": np.random.randn(10)}, index=date_data),
lambda df: getattr(df.resample(rule=rule), agg_func)(),
lambda df: getattr(df.resample(rule=rule, closed="left"), agg_func)(),
check_freq=False,
)

Expand Down Expand Up @@ -159,7 +159,7 @@ def test_resample_series(freq, interval, agg_func):
range(15),
index=native_pd.date_range("2020-01-01", periods=15, freq=f"1{freq}"),
),
lambda ser: getattr(ser.resample(rule=rule), agg_func)(),
lambda ser: getattr(ser.resample(rule=rule, closed="left"), agg_func)(),
check_freq=False,
)

Expand Down
Loading

0 comments on commit d2d3e97

Please sign in to comment.