From ac98314c7d9fa65708f4bc9459b19b22f1d3b497 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 5 Sep 2024 09:20:56 -0700 Subject: [PATCH 1/4] SNOW-1651234: Fix create_dataframe throwing an exepction for structured dtypes --- CHANGELOG.md | 1 + src/snowflake/snowpark/session.py | 23 +++++++-------- tests/integ/scala/test_datatype_suite.py | 36 +++--------------------- 3 files changed, 17 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b680a6ddf..eae2d228b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ - Fixed a bug where calling `DataFrame.to_snowpark_pandas_dataframe` without explicitly initializing the Snowpark pandas plugin caused an error. - Fixed a bug where using the `explode` function in dynamic table creation caused a SQL compilation error due to improper boolean type casting on the `outer` parameter. - Fixed a bug where using `to_pandas_batches` with async jobs caused an error due to improper handling of waiting for asynchronous query completion. +- Fixed a bug in `session.create_dataframe` that caused a sql error when creating an iceberg table with structured datatypes. ### Snowpark Local Testing Updates diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index c6f430cc98..3a6fd7e0bf 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -124,12 +124,10 @@ column, lit, parse_json, - to_array, to_date, to_decimal, to_geography, to_geometry, - to_object, to_time, to_timestamp, to_timestamp_ltz, @@ -2763,14 +2761,15 @@ def convert_row_to_list( if isinstance( field.datatype, ( - VariantType, ArrayType, - MapType, - TimeType, DateType, - TimestampType, GeographyType, GeometryType, + MapType, + StructType, + TimeType, + TimestampType, + VariantType, VectorType, ), ) @@ -2808,7 +2807,9 @@ def convert_row_to_list( data_type, ArrayType ): converted_row.append(json.dumps(value, cls=PythonObjJSONEncoder)) - elif isinstance(value, dict) and isinstance(data_type, MapType): + elif isinstance(value, dict) and isinstance( + data_type, (MapType, StructType) + ): converted_row.append(json.dumps(value, cls=PythonObjJSONEncoder)) elif isinstance(data_type, VariantType): converted_row.append(json.dumps(value, cls=PythonObjJSONEncoder)) @@ -2856,10 +2857,10 @@ def convert_row_to_list( project_columns.append(to_geography(column(name)).as_(name)) elif isinstance(field.datatype, GeometryType): project_columns.append(to_geometry(column(name)).as_(name)) - elif isinstance(field.datatype, ArrayType): - project_columns.append(to_array(parse_json(column(name))).as_(name)) - elif isinstance(field.datatype, MapType): - project_columns.append(to_object(parse_json(column(name))).as_(name)) + elif isinstance(field.datatype, (ArrayType, MapType, StructType)): + project_columns.append( + parse_json(column(name)).cast(field.datatype).as_(name) + ) elif isinstance(field.datatype, VectorType): project_columns.append( parse_json(column(name)).cast(field.datatype).as_(name) diff --git a/tests/integ/scala/test_datatype_suite.py b/tests/integ/scala/test_datatype_suite.py index edc97b41a4..671fdfaf98 100644 --- a/tests/integ/scala/test_datatype_suite.py +++ b/tests/integ/scala/test_datatype_suite.py @@ -430,10 +430,6 @@ def test_structured_dtypes_pandas(structured_type_session, structured_type_suppo ) -@pytest.mark.skipif( - "config.getoption('local_testing_mode', default=False)", - reason="strucutred types do not fully support structured types yet.", -) def test_structured_dtypes_iceberg( structured_type_session, local_testing_mode, structured_type_support ): @@ -447,18 +443,8 @@ def test_structured_dtypes_iceberg( table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" save_table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" try: - structured_type_session.sql( - f""" - create iceberg table if not exists {table_name} ( - map map(varchar, int), - obj object(A varchar, B float), - arr array(float) - ) - CATALOG = 'SNOWFLAKE' - EXTERNAL_VOLUME = 'python_connector_iceberg_exvol' - BASE_LOCATION = 'python_connector_merge_gate'; - """ - ).collect() + create_df = structured_type_session.create_dataframe([], schema=expected_schema) + create_df.write.save_as_table(table_name, iceberg_config=ICEBERG_CONFIG) structured_type_session.sql( f""" insert into {table_name} @@ -489,10 +475,6 @@ def test_structured_dtypes_iceberg( structured_type_session.sql(f"drop table if exists {save_table_name}") -@pytest.mark.skipif( - "config.getoption('local_testing_mode', default=False)", - reason="strucutred types do not fully support structured types yet.", -) def test_structured_dtypes_iceberg_udf( structured_type_session, local_testing_mode, structured_type_support ): @@ -520,18 +502,8 @@ def nop(x): ) try: - structured_type_session.sql( - f""" - create iceberg table if not exists {table_name} ( - map map(varchar, int), - obj object(A varchar, B float), - arr array(float) - ) - CATALOG = 'SNOWFLAKE' - EXTERNAL_VOLUME = 'python_connector_iceberg_exvol' - BASE_LOCATION = 'python_connector_merge_gate'; - """ - ).collect() + create_df = structured_type_session.create_dataframe([], schema=expected_schema) + create_df.write.save_as_table(table_name, iceberg_config=ICEBERG_CONFIG) structured_type_session.sql( f""" insert into {table_name} From 95497c1b2e80480722b33bf3d4a96cf879b55f42 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 5 Sep 2024 09:36:17 -0700 Subject: [PATCH 2/4] add another test --- tests/integ/scala/test_datatype_suite.py | 38 ++++++++++++++++++------ 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/tests/integ/scala/test_datatype_suite.py b/tests/integ/scala/test_datatype_suite.py index 671fdfaf98..ffca96f481 100644 --- a/tests/integ/scala/test_datatype_suite.py +++ b/tests/integ/scala/test_datatype_suite.py @@ -441,7 +441,6 @@ def test_structured_dtypes_iceberg( query, expected_dtypes, expected_schema = STRUCTURED_TYPES_EXAMPLES[True] table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" - save_table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" try: create_df = structured_type_session.create_dataframe([], schema=expected_schema) create_df.write.save_as_table(table_name, iceberg_config=ICEBERG_CONFIG) @@ -455,16 +454,11 @@ def test_structured_dtypes_iceberg( assert df.schema == expected_schema assert df.dtypes == expected_dtypes - # Try to save_as_table - structured_type_session.table(table_name).write.save_as_table( - save_table_name, iceberg_config=ICEBERG_CONFIG - ) - save_ddl = structured_type_session._run_query( - f"select get_ddl('table', '{save_table_name}')" + f"select get_ddl('table', '{table_name}')" ) assert save_ddl[0][0] == ( - f"create or replace ICEBERG TABLE {save_table_name.upper()} (\n\t" + f"create or replace ICEBERG TABLE {table_name.upper()} (\n\t" "MAP MAP(STRING, LONG),\n\tOBJ OBJECT(A STRING, B DOUBLE),\n\tARR ARRAY(DOUBLE)\n)\n " "EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n " "BASE_LOCATION = 'python_connector_merge_gate/';" @@ -472,7 +466,33 @@ def test_structured_dtypes_iceberg( finally: structured_type_session.sql(f"drop table if exists {table_name}") - structured_type_session.sql(f"drop table if exists {save_table_name}") + + +def test_structured_dtypes_iceberg_create_from_values( + structured_type_session, local_testing_mode, structured_type_support +): + if not ( + structured_type_support + and iceberg_supported(structured_type_session, local_testing_mode) + ): + pytest.skip("Test requires iceberg support and structured type support.") + + _, __, expected_schema = STRUCTURED_TYPES_EXAMPLES[True] + table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" + data = [ + ({"x": 1}, {"A": "a", "B": 1}, [1, 1, 1]), + ({"x": 2}, {"A": "b", "B": 2}, [2, 2, 2]), + ] + try: + create_df = structured_type_session.create_dataframe( + data, schema=expected_schema + ) + create_df.write.save_as_table(table_name, iceberg_config=ICEBERG_CONFIG) + assert structured_type_session.table(table_name).order_by( + col("ARR"), ascending=True + ).collect() == [Row(*d) for d in data] + finally: + structured_type_session.sql(f"drop table if exists {table_name}") def test_structured_dtypes_iceberg_udf( From 4db607e8bf1ff092eb1ec75166fceb7d3495eba2 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 5 Sep 2024 09:42:50 -0700 Subject: [PATCH 3/4] add local testing skips back --- tests/integ/scala/test_datatype_suite.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/integ/scala/test_datatype_suite.py b/tests/integ/scala/test_datatype_suite.py index ffca96f481..adb9fb6a08 100644 --- a/tests/integ/scala/test_datatype_suite.py +++ b/tests/integ/scala/test_datatype_suite.py @@ -468,6 +468,10 @@ def test_structured_dtypes_iceberg( structured_type_session.sql(f"drop table if exists {table_name}") +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="local testing does not fully support structured types yet.", +) def test_structured_dtypes_iceberg_create_from_values( structured_type_session, local_testing_mode, structured_type_support ): @@ -495,6 +499,10 @@ def test_structured_dtypes_iceberg_create_from_values( structured_type_session.sql(f"drop table if exists {table_name}") +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="local testing does not fully support structured types yet.", +) def test_structured_dtypes_iceberg_udf( structured_type_session, local_testing_mode, structured_type_support ): From 47d72a7d4c01c39f20299f8f2d755d21bf712971 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 5 Sep 2024 09:44:04 -0700 Subject: [PATCH 4/4] another skip --- tests/integ/scala/test_datatype_suite.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integ/scala/test_datatype_suite.py b/tests/integ/scala/test_datatype_suite.py index adb9fb6a08..88b38ee137 100644 --- a/tests/integ/scala/test_datatype_suite.py +++ b/tests/integ/scala/test_datatype_suite.py @@ -430,6 +430,10 @@ def test_structured_dtypes_pandas(structured_type_session, structured_type_suppo ) +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="local testing does not fully support structured types yet.", +) def test_structured_dtypes_iceberg( structured_type_session, local_testing_mode, structured_type_support ):