-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1458135 Implement DataFrame and Series initialization with lazy Index objects #2137
base: main
Are you sure you want to change the base?
SNOW-1458135 Implement DataFrame and Series initialization with lazy Index objects #2137
Conversation
…, add tests for the same
…y-index # Conflicts: # src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py
…data is not a Snowpark pandas object
… the constructor tests, rewrite concat tests
All of the join counts in the tests have increased because during DataFrame/Series creation with a non-Snowpark pandas object as In some cases the join count is a lot higher in tests but this is because of the way they are written - some tests call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! It's a lot of work btw.
Please also check
- if you identify some test code can be improved, please add todo and track with jira.
- please run a jenkins job to see if anything wrong there before merge.
…y-index # Conflicts: # CHANGELOG.md # src/snowflake/snowpark/modin/plugin/extensions/series_overrides.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataframe and series constructor are quite similar, can we reuse the code?
@@ -1073,6 +1073,11 @@ class TestSeriesGroupBy: | |||
@pytest.mark.parametrize("by", ["string_col_1", ["index", "string_col_1"], "index"]) | |||
def test_dataframe_groupby_getitem(self, by, func, dropna, group_keys, sort): | |||
"""Test apply() on a SeriesGroupBy that we get by DataFrameGroupBy.__getitem__""" | |||
qc = ( | |||
6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep using QUERY_COUNT_WITH_TRANSFORM_CHECK
and explain why the value change to 5?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done! The query count went down because we're no longer converting the index to pandas in the df constructor
eval_snowpark_pandas_result( | ||
*create_test_series({"a": range(len(datecol))}, index=datecol), | ||
*create_test_series({"2024-01-01": list(range(len(datecol)))}, index=datecol), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to check this. I'll do it today.
date_index = native_pd.date_range("1/1/2010", periods=6, freq="D") | ||
native_series = native_pd.Series( | ||
{"prices": [100, 101, np.nan, 100, 89, 88]}, index=date_index | ||
{"1/1/2023": [100, 101, np.nan, 100, 89, 88]}, index=date_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is still not able to be fixed?
|
||
else: | ||
# CASE 5: Non-Snowpark pandas data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just call from_pandas for all cases?
What is case 5.A?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the only special case is
Special case: data is a dictionary where all the values are Snowpark pandas Series
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored it all - the two special cases are dict or list with all snowpark pandas elements
…y-index # Conflicts: # src/snowflake/snowpark/modin/pandas/dataframe.py # src/snowflake/snowpark/modin/plugin/extensions/index.py
@@ -1995,3 +1996,68 @@ def create_frame_with_data_columns( | |||
def rindex(lst: list, value: int) -> int: | |||
"""Find the last index in the list of item value.""" | |||
return len(lst) - lst[::-1].index(value) - 1 | |||
|
|||
|
|||
def convert_index_to_qc(index: Any) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to make the return type "SnowflakeQueryCompiler" without causing circular import issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using "SnowflakeQueryCompiler" with quotes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That does not work either - it still causes the issues
if isinstance(index, DataFrame): # pandas raises the same error | ||
raise ValueError("Index data must be 1-dimensional") | ||
|
||
if dtype == "category": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does this check come from? i don't see it was checked anywhere before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this check because it was not checked before - we do not support Categorical type yet. If the user passes in dtype=category
, this makes the dtype of the Series/DataFrame category
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dtype seems only used when data is local, typically under this case, we should already apply the dtype check before uploading the data, we shouldn't need to do such check here. Is not not erroring out today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's not erroring out over here today - it's because the data itself is not categorical but should be treated like categorical if dtype is category
.
@@ -1995,3 +1996,68 @@ def create_frame_with_data_columns( | |||
def rindex(lst: list, value: int) -> int: | |||
"""Find the last index in the list of item value.""" | |||
return len(lst) - lst[::-1].index(value) - 1 | |||
|
|||
|
|||
def convert_index_to_qc(index: Any) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using "SnowflakeQueryCompiler" with quotes
# ----------------------------- | ||
if query_compiler is not None: | ||
# CASE I: query_compiler | ||
# If a query_compiler is passed in only use the query_compiler field to create a new DataFrame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in our doc, we actually doc in a way that both can be provided and used like following
Notes
DataFrame can be created either from passed data or query_compiler. If both parameters are provided, data source will be prioritized in the next order:
Modin DataFrame or Series passed with data parameter.
Query compiler from the query_compiler parameter.
Various pandas/NumPy/Python data structures passed with data parameter.
please make sure the doc and the behavior is consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the docs!
@@ -459,104 +464,222 @@ def __init__( | |||
# TODO: SNOW-1063346: Modin upgrade - modin.pandas.DataFrame functions | |||
# Siblings are other dataframes that share the same query compiler. We | |||
# use this list to update inplace when there is a shallow copy. | |||
from snowflake.snowpark.modin.pandas.utils import try_convert_index_to_native | |||
from snowflake.snowpark.modin.plugin.extensions.index import Index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the import be at the beginning of file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
|
||
elif isinstance(data, Series): | ||
# CASE III: data is a Snowpark pandas Series | ||
query_compiler = data._query_compiler.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we making a copy of the query compiler here? the query compiler is in general immutable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the copy logic
axis=1, labels=try_convert_index_to_native(columns) | ||
) | ||
# Reduce the dictionary to only the relevant columns as the keys. | ||
data = {key: value for key, value in data.items() if key in columns} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that function is kind of becoming too long, can you see if you can break this down into different functions like from_query_compiler, from_local_data etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the special cases to their own functions instead of the local data case
@@ -17,7 +17,7 @@ | |||
) | |||
|
|||
|
|||
@sql_count_checker(query_count=2, join_count=1) | |||
@sql_count_checker(query_count=1, join_count=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep couple of tests with native index objects
@@ -25,56 +25,56 @@ | |||
|
|||
@pytest.fixture(scope="function") | |||
def df1(): | |||
return pd.DataFrame( | |||
return native_pd.DataFrame( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change seems little bit wired to me, the original code returns snowpark dataframe, but now we return native pandas dataframe here? why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this because with a Snowpark pandas DataFrame being returned the query count is higher and to_pandas()
is called a lot. I think it's better to just convert the object to Snowpark pandas whenever needed
udtf_count=UDTF_COUNT, | ||
join_count=JOIN_COUNT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you update the JOIN_COUNT, instead of hard code the join cont here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases the join count is still 1, I modified it to use JOIN_COUNT+1 instead now
# CASE I: query_compiler | ||
# If a query_compiler is passed in, only use the query_compiler and name fields to create a new Series. | ||
assert ( | ||
data is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are those check very similar for series and dataframe, can we unify those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's first align with DataFrame init.
@@ -78,7 +78,7 @@ class Series(BasePandasDataset): | |||
c 3 | |||
dtype: int64 | |||
|
|||
The keys of the dictionary match with the Index values, hence the Index | |||
The keys of the dictionary match with the Index values, hence the dictionary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should keep the original version. Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this description is wrong - belong only the "index" values are used ('x', 'y', 'z') and the dict values ('a', 'b', 'c') are ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This statement is talking about the example above which should be accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!
error_checking_for_init(index, dtype) | ||
|
||
# The logic followed here is: | ||
# 1. Create a query_compiler from the provided data. If columns are provided, add/select the columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's confusing with the numbers here and the number for the cases. Maybe emphasize this is Step 1, Step 2, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed the steps, hopefully this is easier to read
if data.name is None: | ||
# If no name is provided, the default name is 0. | ||
query_compiler = query_compiler.set_columns(columns or [0]) | ||
if columns is not None and data.name not in columns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a elif
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe do this
if columns is None:
# handle all cases here
elif data.name in columns:
# handle the case here
else:
# handle data.name not in columns
You didn't handle this case here:
pd.DataFrame(pd.Series([1,2,3], name="b"), columns=["a", "b"])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored it, it should be handled now and I added a test for it
|
||
elif isinstance(data, Series): | ||
# CASE III: data is a Snowpark pandas Series | ||
query_compiler = data._query_compiler.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to copy qc right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the copy logic
extra_columns = [col for col in columns if col not in data.columns] | ||
else: | ||
extra_columns = [] | ||
query_compiler = data._query_compiler.create_qc_with_extra_columns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use loc like df[extra_columns] = None
.
if len(data) and all( | ||
isinstance(v, (Index, BasePandasDataset)) for v in data | ||
): | ||
# Special case V.c: data is a list/dict where all the values are Snowpark pandas objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cover them in a helper function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved special cases to helper functions
# The logic followed here is: | ||
# 1. Create a query_compiler from the provided data. If columns are provided, add/select the columns. | ||
# 2. If an index is provided, set the index through set_index or reindex. | ||
# 3. If the data is a DataFrame, perform loc to select the required index and columns from the DataFrame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should move all columns operation into Step 1. It is confusing to select columns here again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
columns=try_convert_index_to_native(columns), | ||
dtype=dtype, | ||
copy=copy, | ||
# 3. If data is a DataFrame, filter result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should move all columns operation into Step 1. It is confusing to select columns here again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
… github.com:snowflakedb/snowpark-python into vbudati/SNOW-1458135-df-series-init-with-lazy-index
distributed_frame = from_non_pandas(data, index, columns, dtype) | ||
if distributed_frame is not None: | ||
self._query_compiler = distributed_frame._query_compiler | ||
new_name = data.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you missing these cases?
pd.DataFrame(pd.Index([1,2,3], name = 'b'), columns = ['a'])
.
pd.DataFrame(pd.Index([1,2,3], name = 'b'), columns = ['a', 'b'])
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be taken care of now
|
||
# The logic followed here is: | ||
# STEP 1: Create a query_compiler from the provided data. If columns are provided, add/select the columns. | ||
# STEP 2: If an index is provided, set the index through set_index or reindex. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not match your implementation. Some index is handled in Step 1 now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's implement it following these steps:
- handle data
- handle columns
- handle index
In Step1, we make sure update query_compiler
for all lazy data. if query_compiler
is None, that means the data is local, and make sure convert data to the right native dataframe in this step.
-- here we can test all cases of pd.DataFrame(data=any, columns=None, index=None)
In Step 2, handle columns based on whether query_compiler is None or not
-- here we can test all cases of pd.DataFrame(data=any, columns=not none, index=None)
In Step 3. similarly handle it based on whether query_compiler is None or not
-- here we can test all cases of pd.DataFrame(data=any, columns=any, index=any)
pytest.param( | ||
"series", | ||
marks=pytest.mark.xfail( | ||
reason="SNOW-1675191 reindex does not work with tuple series" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-vbudati @sfc-gh-azhan mentioned that the main purpose of this pr is to remove a to_pandas materialization, can we just do that in this pr, and move the other refactoring part out of the current pr?
name = data.name | ||
from snowflake.snowpark.modin.plugin.extensions.index import Index | ||
|
||
# Setting the query compiler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more general comment here about the change, our orignial code behaves in such way that if both data and query compiler are provided, the data is used.
However, here seems we want to change it to a way that only one of them can be configured. i think that is fine, however, please make sure we update the doc to clear this part.
Here is couple of points:
- from the structure point of view, i think we can do parameter check first, for example, where both query_compiler and parameter is provided. Then check init the query_compiler like the original code structure, unless there are case works very differently.
- the check message doesn't seem very clear. for example, query_compiler and index can not be provided together, might be better to "index is not supported when query_compiler is provided" etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can make the error messages clearer like you pointed out in (2.) --> "index is not supported when query_compiler is provided". But the parameters are right now checked before they are used. I don't think there are any cases in the code where both query compiler and data/index/columns are provided (no tests have failed so far with anything related to this). I think it's also simpler behavior to have it this way.
The doc should also be updated with this behavior.
if isinstance(index, DataFrame): # pandas raises the same error | ||
raise ValueError("Index data must be 1-dimensional") | ||
|
||
if dtype == "category": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dtype seems only used when data is local, typically under this case, we should already apply the dtype check before uploading the data, we shouldn't need to do such check here. Is not not erroring out today?
if hasattr(data, "name") and data.name is not None: | ||
# If data is an object that has a name field, use that as the name of the new Series. | ||
name = data.name | ||
# If any of the values are Snowpark pandas objects, convert them to native pandas objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under this case, shouldn't we try to convert other ones to snowpark pandas objects instead of pulling them to local? or maybe we should just error it out.
Do you have one example about this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One example where its better to convert it to pandas is this:
data = {"A": pd.Series([1, 2, 3]), "B": pd.Index([4, 5, 6]), "C": 5}
pd.DataFrame(data)
Out[58]:
A B C
0 1 4 5
1 2 5 5
2 3 6 5
5 is put in every single row even though it's a scalar in the dict
@sfc-gh-yzou I prefer not making the refactor changes in a new PR since I think this one is very close to merging and it will take a lot more work to separate the index changes from this |
columns = ensure_index(columns) | ||
|
||
# The logic followed here is: | ||
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, the query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, the query | |
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, keep the query |
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, the query | ||
# compiler is None. | ||
# STEP 2: If columns are provided, set the columns if data is lazy. | ||
# STEP 3: If both the data and index are local (or index is None), create a query compiler from pandas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If data is local, create a query compiler from it with local index.
# compiler is None. | ||
# STEP 2: If columns are provided, set the columns if data is lazy. | ||
# STEP 3: If both the data and index are local (or index is None), create a query compiler from pandas. | ||
# STEP 4: Otherwise, set the index through set_index or reindex. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For lazy index, set the index through set_index or reindex.
I kind agree with @sfc-gh-yun this PR is becoming too big. Can we use this as the PoC draft PR, and we can review smaller PRs one by one. You can either start with refactoring pieces first or fix the lazy index first. Try to make sure refactoring PR only do refactoring and no test changes. |
@sfc-gh-azhan @sfc-gh-yzou I can try to separate this PR into two other PRs - one for the lazy index change and the other for the refactor. It is impossible to avoid test changes in the refactor PR since I introduced functionality to allow passing non-existent columns or index values to the constructor. The constructors should be able to handle any kind of inputs and I added tests for this. However, that requires me to make a non-trivial amount of redundant code changes, for example, the same set of tests are changed in both PRs where the query count will likely be different due to the refactor. I was hoping to work on IR tickets from Monday, so I still prefer merging this PR as is, please let me know if you both feel strongly about this. In the future, I'd really appreciate if the feedback about splitting PRs is brought up earlier. |
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-1458135
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
data
,index
, and/orcolumns
.data
.data
is a Series or DataFrame object, the new Series or DataFrame object is creating by filtering thedata
with providedindex
andcolumns
.index
don't exist indata
's index, these values are added as new rows and their corresponding data values areNaN
.columns
don't exist indata
's columns, these values are added as newNaN
columns.NaN
columns in the logic.