Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1458135 Implement DataFrame and Series initialization with lazy Index objects #2137

Open
wants to merge 60 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
2094c3f
Update Series and DataFrame constructors to handle lazy Index objects…
sfc-gh-vbudati Aug 21, 2024
97a7229
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 21, 2024
1979257
update changelog
sfc-gh-vbudati Aug 21, 2024
5dbb76d
add more tests
sfc-gh-vbudati Aug 21, 2024
7de467f
fix minor bug
sfc-gh-vbudati Aug 21, 2024
5dd06fd
fix isocalendar docstring error
sfc-gh-vbudati Aug 21, 2024
8b94462
truncate tests, update changelog wording, reduce 2 queries to one que…
sfc-gh-vbudati Aug 22, 2024
c89dc5d
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 22, 2024
a2089b8
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 22, 2024
a9376c1
Get rid of the join performed when only index is an Index object and …
sfc-gh-vbudati Aug 22, 2024
420a5ac
Add back the index join query to DataFrame/Series constructor, update…
sfc-gh-vbudati Aug 22, 2024
66d634c
Update tests
sfc-gh-vbudati Aug 23, 2024
f277041
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 23, 2024
6a2cb79
added edge case logic, fix test query count
sfc-gh-vbudati Aug 23, 2024
df96f4a
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 23, 2024
f971b0d
more test fixes
sfc-gh-vbudati Aug 23, 2024
13db956
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 23, 2024
8c78f8d
fix dict case
sfc-gh-vbudati Aug 23, 2024
7970101
more test case fixes
sfc-gh-vbudati Aug 24, 2024
f3de1c3
correct the logic for series created with dict and index
sfc-gh-vbudati Aug 26, 2024
2447022
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Aug 26, 2024
82728bf
fix query counts
sfc-gh-vbudati Aug 26, 2024
23587a4
Merge branch 'vbudati/SNOW-1458135-df-series-init-with-lazy-index' of…
sfc-gh-vbudati Aug 26, 2024
1577ddc
fix join count
sfc-gh-vbudati Aug 26, 2024
8903f60
refactor series and df
sfc-gh-vbudati Sep 4, 2024
668c889
merge main into current branch
sfc-gh-vbudati Sep 4, 2024
67a07c1
refactor dataframe and series constructors
sfc-gh-vbudati Sep 4, 2024
a4351ba
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 4, 2024
1453680
fix docstring tests
sfc-gh-vbudati Sep 5, 2024
f39e751
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 5, 2024
b73f027
fix some tests
sfc-gh-vbudati Sep 6, 2024
c6fc05d
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 6, 2024
d422f86
replace series constructor
sfc-gh-vbudati Sep 6, 2024
1ea5d00
fix tests
sfc-gh-vbudati Sep 9, 2024
024acd8
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 9, 2024
f4a80f3
fix loc and iloc tests
sfc-gh-vbudati Sep 9, 2024
ce1ffa6
fix test
sfc-gh-vbudati Sep 9, 2024
00d2a8b
fix test
sfc-gh-vbudati Sep 9, 2024
cb91849
fix last valid index error
sfc-gh-vbudati Sep 9, 2024
d9fdbb0
remove stuff unnecessarily commented out
sfc-gh-vbudati Sep 9, 2024
3d5b785
explain high query count
sfc-gh-vbudati Sep 9, 2024
7f9dbaa
rewrite binary op test, fix coverage
sfc-gh-vbudati Sep 9, 2024
6de9f49
fix tests
sfc-gh-vbudati Sep 11, 2024
c2fb474
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 11, 2024
2274d1e
remove print statements and unnecessary comments
sfc-gh-vbudati Sep 11, 2024
9eef8d7
fix tests
sfc-gh-vbudati Sep 11, 2024
cc09403
increase coverage
sfc-gh-vbudati Sep 12, 2024
10c3954
try to move out common logic, add more tests
sfc-gh-vbudati Sep 14, 2024
64dda24
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 14, 2024
da56734
update df init
sfc-gh-vbudati Sep 14, 2024
8b47e17
moved common logic out, fixed some tests
sfc-gh-vbudati Sep 16, 2024
fa4eb09
remove unnecessary diffs
sfc-gh-vbudati Sep 16, 2024
db28630
fix doctest and couple of tests
sfc-gh-vbudati Sep 17, 2024
17be4c3
apply feedback to simplify logic
sfc-gh-vbudati Sep 18, 2024
301f47f
Merge branch 'main' into vbudati/SNOW-1458135-df-series-init-with-laz…
sfc-gh-vbudati Sep 18, 2024
2eb14a7
update query counts to use constants
sfc-gh-vbudati Sep 18, 2024
95065f7
Merge branch 'vbudati/SNOW-1458135-df-series-init-with-lazy-index' of…
sfc-gh-vbudati Sep 18, 2024
d9bbd9b
remove docstring update, add docstrings for helper functions
sfc-gh-vbudati Sep 18, 2024
f40c5b4
try to break down df init into three steps: data, columns, and index
sfc-gh-vbudati Sep 20, 2024
8cce409
merge main into current branch
sfc-gh-vbudati Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
- Added support for `pd.merge_asof`.
- Added support for `Series.dt.normalize` and `DatetimeIndex.normalize`.
- Added support for `Index.is_boolean`, `Index.is_integer`, `Index.is_floating`, `Index.is_numeric`, and `Index.is_object`.
- Added support for constructing `Series` and `DataFrame` objects with the lazy `Index` object as `data`, `index`, and `columns` arguments.

#### Bug Fixes

Expand Down
98 changes: 82 additions & 16 deletions src/snowflake/snowpark/modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,35 @@ def __init__(
# Siblings are other dataframes that share the same query compiler. We
# use this list to update inplace when there is a shallow copy.
from snowflake.snowpark.modin.pandas.utils import try_convert_index_to_native
from snowflake.snowpark.modin.plugin.extensions.index import Index

self._siblings = []

if isinstance(index, DataFrame): # pandas raises the same error
sfc-gh-azhan marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Index data must be 1-dimensional")

# Engine.subscribe(_update_engine)
if isinstance(data, Index):
sfc-gh-vbudati marked this conversation as resolved.
Show resolved Hide resolved
# If the data is an Index object, we need to convert it to a DataFrame to make sure
# that the values are in the correct format -- as a data column, not an index column.
# Additionally, if an index is provided, converting it to an Index object ensures that
# its values are an index column.
# We set the column name if it is not in the provided Index `data`.
if data.name is None:
new_name = 0 if columns is None else columns[0]
else:
new_name = data.name
query_compiler = data.to_frame(index=False, name=new_name)._query_compiler
if index is not None:
index = index if isinstance(index, Index) else Index(index)
query_compiler = query_compiler.create_qc_with_index_data_and_qc_index(
index._query_compiler
)

if isinstance(data, (DataFrame, Series)):
self._query_compiler = data._query_compiler.copy()
if index is not None and any(i not in data.index for i in index):
ErrorMessage.not_implemented(
"Passing non-existant columns or index values to constructor not"
+ " yet implemented."
) # pragma: no cover
if isinstance(data, Series):
# We set the column name if it is not in the provided Series
# We set the column name if it is not in the provided Series `data`.
if data.name is None:
self.columns = [0] if columns is None else columns
# If the columns provided are not in the named Series, pandas clears
Expand All @@ -174,22 +190,61 @@ def __init__(
self.__constructor__(columns=columns)
)._query_compiler
if index is not None:
# The `index` parameter is used to select the rows from `data` that will be in the resultant
# DataFrame. If a value in `index` is not present in `data`'s index, it will be filled with a
# NaN value.
# 1. The `index` is converted to an Index object so that the index values are in an index column.
index = index if isinstance(index, Index) else Index(index)
# 2. A right outer join is performed between `data` and `index` to create a Series object where
# any index values in `data`'s index that are not in `index` are filled with NaN.
data = Series(
query_compiler=data._query_compiler.create_qc_with_data_and_index_joined_on_index(
index._query_compiler
),
name=0 if data.name is None else data.name,
)
# 3. Perform .loc[] on `data` to select the rows that are in the `index`.
self._query_compiler = data.loc[index]._query_compiler

elif columns is None and index is None:
data._add_sibling(self)

else:
if columns is not None and any(i not in data.columns for i in columns):
ErrorMessage.not_implemented(
"Passing non-existant columns or index values to constructor not"
+ " yet implemented."
) # pragma: no cover
if index is None:
index = slice(None)
# The `columns` parameter is used to select the columns from `data` that will be in the resultant
# DataFrame. If a value in `columns` is not present in `data`'s columns, it will be added as a
# new column filled with NaN values. These columns are tracked by the `extra_columns` variable.
extra_columns = None
if columns is None:
# In case `columns` is not provided, `columns` is set to slice(None) to select all columns.
columns = slice(None)
else:
extra_columns = [col for col in columns if col not in data.columns]

# The `index` parameter is used to select the rows from `data` that will be in the resultant DataFrame.
# If a value in `index` is not present in `data`'s index, it will be filled with a NaN value.
if index is None:
# In case `index` is not provided, `index` is set to slice(None) to select all rows.
index = slice(None)
data = DataFrame(
query_compiler=data._query_compiler.create_qc_with_data_and_index_joined_on_index(
extra_columns=extra_columns
)
)
else:
# The `index` is converted to an Index object so that the index values are in an index column.
index = index if isinstance(index, Index) else Index(index)
# A right outer join is performed between `data` and `index` to create a DataFrame object where any
# index values in `data`'s index that are not in `index` are filled with NaN.
data = DataFrame(
query_compiler=data._query_compiler.create_qc_with_data_and_index_joined_on_index(
index._query_compiler,
extra_columns=extra_columns,
)
)
# 3. Perform .loc[] on `data` to select the rows and columns that are in `index` and `columns`.
self._query_compiler = data.loc[index, columns]._query_compiler

# Check type of data and use appropriate constructor
# Check the type of data and use the appropriate constructor
elif query_compiler is None:
distributed_frame = from_non_pandas(data, index, columns, dtype)
if distributed_frame is not None:
Expand Down Expand Up @@ -241,14 +296,25 @@ def __init__(
k: v._to_pandas() if isinstance(v, Series) else v
for k, v in data.items()
}

new_index = index
if isinstance(index, Index):
# Skip turning this into a native pandas object here since this issues an extra query.
# Instead, first get the query compiler from native pandas and then add the index column.
new_index = None
pandas_df = pandas.DataFrame(
data=try_convert_index_to_native(data),
index=try_convert_index_to_native(index),
index=try_convert_index_to_native(new_index),
columns=try_convert_index_to_native(columns),
dtype=dtype,
copy=copy,
)
self._query_compiler = from_pandas(pandas_df)._query_compiler
query_compiler = from_pandas(pandas_df)._query_compiler
if isinstance(index, Index):
query_compiler = query_compiler.create_qc_with_index_data_and_qc_index(
index._query_compiler
)
self._query_compiler = query_compiler
else:
self._query_compiler = query_compiler

Expand Down
51 changes: 42 additions & 9 deletions src/snowflake/snowpark/modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,41 @@ def __init__(

# modified:
# Engine.subscribe(_update_engine)
from snowflake.snowpark.modin.plugin.extensions.index import Index
sfc-gh-vbudati marked this conversation as resolved.
Show resolved Hide resolved

# Convert lazy index to Series without pulling the data to client.
if isinstance(data, pd.Index):
query_compiler = data.to_series(index=index, name=name)._query_compiler
query_compiler = query_compiler.reset_index(drop=True)
if isinstance(data, Index):
# If the data is an Index object, we need to convert it to a Series to make sure
# that the values are in the correct format -- as a data column, not an index column.
# Additionally, if an index is provided, converting it to an Index object ensures that
# its values are an index column.
query_compiler = (
data.to_series(index=None, name=name)
.reset_index(drop=True)
._query_compiler
)

if index is not None:
index = index if isinstance(index, Index) else Index(index)
query_compiler = query_compiler.create_qc_with_index_data_and_qc_index(
index._query_compiler
)
elif isinstance(data, type(self)):
query_compiler = data._query_compiler.copy()
if index is not None:
if any(i not in data.index for i in index):
ErrorMessage.not_implemented(
"Passing non-existent columns or index values to constructor "
+ "not yet implemented."
) # pragma: no cover
# The `index` parameter is used to select the rows from `data` that will be in the resultant Series.
# If a value in `index` is not present in `data`'s index, it will be filled with a NaN value.
# 1. The `index` is converted to an Index object so that the index values are in an index column.
index = index if isinstance(index, Index) else Index(index)
# 2. A right outer join is performed between `data` and `index` to create a Series object where any
# index values in `data`'s index that are not in `index` are filled with NaN.
data = Series(
query_compiler=data._query_compiler.create_qc_with_data_and_index_joined_on_index(
index._query_compiler
),
name=data.name,
)
# 3. Perform .loc[] on `data` to select the rows that are in `index`.
query_compiler = data.loc[index]._query_compiler
if query_compiler is None:
# Defaulting to pandas
Expand All @@ -153,18 +175,29 @@ def __init__(
):
name = data.name

new_index = index
if isinstance(index, Index):
# Skip turning this into a native pandas object here since this issues an extra query.
# Instead, first get the query compiler from native pandas and then add the index column.
new_index = None
query_compiler = from_pandas(
pandas.DataFrame(
pandas.Series(
data=try_convert_index_to_native(data),
index=try_convert_index_to_native(index),
index=try_convert_index_to_native(new_index),
dtype=dtype,
name=name,
copy=copy,
fastpath=fastpath,
)
)
)._query_compiler
if isinstance(index, Index):
query_compiler = (
query_compiler.create_qc_with_data_and_index_joined_on_index(
index._query_compiler
)
)
self._query_compiler = query_compiler.columnarize()
if name is not None:
self.name = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17518,3 +17518,118 @@ def tz_convert(self, *args: Any, **kwargs: Any) -> None:

def tz_localize(self, *args: Any, **kwargs: Any) -> None:
ErrorMessage.method_not_implemented_error("tz_convert", "BasePandasDataset")

def create_qc_with_index_data_and_qc_index(
sfc-gh-joshi marked this conversation as resolved.
Show resolved Hide resolved
self, index_qc: "SnowflakeQueryCompiler"
) -> "SnowflakeQueryCompiler":
"""
This is a helper function for creating a DataFrame/Series where the data is an Index
and an index is provided.
Before this method is called, the provided index is converted to an Index object;
the query compilers of the data and index are then joined.

Parameters
----------
index_qc : SnowflakeQueryCompiler
The query compiler of the index to be joined with the data.

Returns
-------
SnowflakeQueryCompiler
A new query compiler with the data and index joined.
"""
self_frame = self._modin_frame.ensure_row_position_column()
other_frame = index_qc._modin_frame.ensure_row_position_column()

new_internal_frame, _ = join_utils.join(
self_frame,
other_frame,
how="left",
left_on=[self_frame.row_position_snowflake_quoted_identifier],
right_on=[other_frame.row_position_snowflake_quoted_identifier],
inherit_join_index=InheritJoinIndex.FROM_RIGHT,
)

return SnowflakeQueryCompiler(new_internal_frame)

def create_qc_with_data_and_index_joined_on_index(
self,
index_qc: Optional["SnowflakeQueryCompiler"] = None,
extra_columns: Optional[List[Hashable]] = None,
sfc-gh-joshi marked this conversation as resolved.
Show resolved Hide resolved
) -> "SnowflakeQueryCompiler":
"""
This is a helper function for creating a DataFrame/Series where the data is a DataFrame/Series object.
This is a special case since only the values where the index value matches in the `data` and `index` provided
take on an actual value from the given `data`. Otherwise, they take on a NaN value.

For instance,

>>> data = pd.Series(["A", "B", "C", "D"], index=[1.1, 2.2, 3, 4], name="index series name")
>>> index = pd.Index([1, 2, 3, 4], name="some name")
>>> df = pd.DataFrame(data=data, index=index)
>>> df # doctest: +SKIP
index series name
some name
1 NaN
2 NaN
3 C
4 D

Notice how only the data for index values 3 and 4 have an actual value while 1 and 2 have a NaN value.
3 and 4 are values present in the index of the `data` and `index` provided. 1 and 2 are not present.

Parameters
----------
index_qc : SnowflakeQueryCompiler, default None
The query compiler of the index to be joined with the data. If no query compiler is provided,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If index_qc isn't supplied, does that mean the index of the new query compiler will be the default positional values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it just means that the index will be whatever the index is for self.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to clarify that in the docstring just so readers don't have to think about what "skip this join operation" means.

skip this join operation.
extra_columns : list of hashable, default None
If the DataFrame being created has new columns that are not a part of the data, they can be passed here
and appended as NaN columns.

Returns
-------
SnowflakeQueryCompiler
A new query compiler with the data and index joined.
"""
self_frame = self._modin_frame

if extra_columns:
# Append the new columns to the data's internal frame.
new_snowflake_quoted_identifiers = self._modin_frame.ordered_dataframe.generate_snowflake_quoted_identifiers(
pandas_labels=extra_columns,
excluded=self_frame.data_column_snowflake_quoted_identifiers,
)
new_ordered_frame = append_columns(
self_frame.ordered_dataframe,
new_snowflake_quoted_identifiers,
[pandas_lit(np.nan)] * len(extra_columns),
)
self_frame = InternalFrame.create(
ordered_dataframe=new_ordered_frame,
data_column_pandas_labels=self_frame.data_column_pandas_labels
+ extra_columns,
data_column_snowflake_quoted_identifiers=self_frame.data_column_snowflake_quoted_identifiers
+ new_snowflake_quoted_identifiers,
data_column_pandas_index_names=self_frame.data_column_pandas_index_names,
index_column_pandas_labels=self_frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self_frame.index_column_snowflake_quoted_identifiers,
data_column_types=None,
index_column_types=None,
)

if index_qc is None:
new_internal_frame = self_frame
else:
# Join the index and data internal frames.
other_frame = index_qc._modin_frame
new_internal_frame, _ = join_utils.join(
other_frame,
self_frame,
how="outer",
left_on=other_frame.index_column_snowflake_quoted_identifiers,
right_on=self_frame.index_column_snowflake_quoted_identifiers,
inherit_join_index=InheritJoinIndex.FROM_LEFT,
)

return SnowflakeQueryCompiler(new_internal_frame)
Loading
Loading