Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1527717: Add profiler snowpark API #2252

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0c1b6d9
add profiler
sfc-gh-yuwang Sep 7, 2024
9d911df
profiler finish
sfc-gh-yuwang Sep 9, 2024
968ac2c
t
sfc-gh-yuwang Sep 9, 2024
71967c6
t
sfc-gh-yuwang Sep 9, 2024
3222459
add test
sfc-gh-yuwang Sep 10, 2024
e689d54
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 10, 2024
83b22a0
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 11, 2024
e1d314a
address comment
sfc-gh-yuwang Sep 11, 2024
bf71cc2
fix get last query id
sfc-gh-yuwang Sep 11, 2024
b80f113
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 16, 2024
3320814
Update session.py
sfc-gh-yuwang Sep 16, 2024
286cc31
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 16, 2024
791cf2a
add docstring
sfc-gh-yuwang Sep 17, 2024
b458a85
add regx for anonymous procedure
sfc-gh-yuwang Sep 17, 2024
ba726ff
add docstring
sfc-gh-yuwang Sep 17, 2024
242e4de
skip in localtesting
sfc-gh-yuwang Sep 17, 2024
bf4d169
coverage test
sfc-gh-yuwang Sep 17, 2024
e788125
fix lint
sfc-gh-yuwang Sep 17, 2024
c45ca1e
lint fix
sfc-gh-yuwang Sep 17, 2024
32a448c
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 17, 2024
72e0162
fix test
sfc-gh-yuwang Sep 18, 2024
1d992f2
fix test
sfc-gh-yuwang Sep 18, 2024
60c4aba
fix test
sfc-gh-yuwang Sep 18, 2024
0657c2e
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 18, 2024
dcc6ad7
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 18, 2024
eda9559
add test
sfc-gh-yuwang Sep 18, 2024
d196ddd
add unit test
sfc-gh-yuwang Sep 18, 2024
6a7046d
fix test
sfc-gh-yuwang Sep 19, 2024
fcac5fd
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
b2c6513
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
a27e64c
make test robust
sfc-gh-yuwang Sep 19, 2024
896c9cd
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
3e6a771
address comments
sfc-gh-yuwang Sep 20, 2024
0d75b84
fix test
sfc-gh-yuwang Sep 20, 2024
f9df3cd
address comments
sfc-gh-yuwang Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ This is a re-release of 1.22.0. Please refer to the 1.22.0 release notes for det
- Added the following new functions in `snowflake.snowpark.functions`:
- `array_remove`
- `ln`
- Added snowpark python API for profiler.

#### Improvements

Expand Down
212 changes: 212 additions & 0 deletions src/snowflake/snowpark/profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#
import re
from contextlib import contextmanager
from typing import List, Optional

import snowflake.snowpark
from snowflake.snowpark._internal.utils import validate_object_name


class Profiler:
"""
Setup profiler to receive profiles of stored procedures.

Note:
This feature cannot be used in owner's right SP because owner's right SP will not be able to set session-level parameters.
"""

def __init__(
self,
stage: Optional[str] = "",
active_profiler: Optional[str] = "LINE",
session: Optional["snowflake.snowpark.Session"] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should customer expect to happen if session is None?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session is required for profiler to work.
There is two way to use profiler as described in the design doc.
In context manager, session needed to be provided when create the context manager.
In profiler class, session is set when registering profiler.
This is why I make it optional here.

) -> None:
self.stage = stage
self.active_profiler = active_profiler
self.modules_to_register = []
self._register_modules_sql = ""
self._set_targeted_stage_sql = ""
self._enable_profiler_sql = ""
self._disable_profiler_sql = ""
self._set_active_profiler_sql = ""
self.pattern = r"WITH\s+.*?\s+AS\s+PROCEDURE\s+.*?\s+CALL\s+.*"
self.session = session
self._prepare_sql()
self.query_history = None

def _prepare_sql(self):
self._register_modules_sql = f"alter session set python_profiler_modules='{','.join(self.modules_to_register)}'"
self._set_targeted_stage_sql = (
f'alter session set PYTHON_PROFILER_TARGET_STAGE ="{self.stage}"'
)
self._disable_profiler_sql = "alter session set ACTIVE_PYTHON_PROFILER = ''"
self._set_active_profiler_sql = f"alter session set ACTIVE_PYTHON_PROFILER = '{self.active_profiler.upper()}'"

def register_profiler_modules(self, stored_procedures: List[str]):
"""
Register stored procedures to generate profiles for them.

Note:
Registered nodules will be overwritten by this function,
use this function with an empty string will remove registered modules.
Args:
stored_procedures: List of names of stored procedures.
"""
self.modules_to_register = stored_procedures
self._prepare_sql()
if self.session is not None:
self._register_modules()

def set_targeted_stage(self, stage: str):
"""
Set targeted stage for profiler output.

Note:
The stage name must be a fully qualified name.

Args:
stage: String of fully qualified name of targeted stage
"""
validate_object_name(stage)
self.stage = stage
self._prepare_sql()
if self.session is not None:
if (
len(self.session.sql(f"show stages like '{self.stage}'").collect()) == 0
and len(
self.session.sql(
f"show stages like '{self.stage.split('.')[-1]}'"
).collect()
)
== 0
):
self.session.sql(
f"create temp stage if not exists {self.stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp stage gets dropped at the end of the session which means we might lose all the profiler data.
when are users going to access the file, after the session or during the session?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user want to access it after the session, user would want to use a permanent stage they created.
when user give a stage name that does not exist, we expect user want to create a temp stage.

).collect()
self._set_targeted_stage()

def set_active_profiler(self, active_profiler: str):
"""
Set active profiler.

Note:
Active profiler must be either 'LINE' or 'MEMORY' (case-sensitive),
active profiler is 'LINE' by default.
Args:
active_profiler: String that represent active_profiler, must be either 'LINE' or 'MEMORY' (case-sensitive).

"""
if active_profiler not in ["LINE", "MEMORY"]:
raise ValueError(
f"active_profiler expect 'LINE' or 'MEMORY', got {active_profiler} instead"
)
self.active_profiler = active_profiler
self._prepare_sql()
if self.session is not None:
self._set_active_profiler()

def _register_modules(self):
self.session.sql(self._register_modules_sql).collect()

def _set_targeted_stage(self):
self.session.sql(self._set_targeted_stage_sql).collect()

def _set_active_profiler(self):
self.session.sql(self._set_active_profiler_sql).collect()

def enable(self):
"""
Enable profiler. Profiles will be generated until profiler is disabled.
"""
if self.active_profiler == "":
self.active_profiler = "LINE"
self._prepare_sql()
self._set_active_profiler()

def disable(self):
"""
Disable profiler.
"""
self.session.sql(self._disable_profiler_sql).collect()

def _is_sp_call(self, query):
return re.match(self.pattern, query, re.DOTALL) is not None

def _get_last_query_id(self):
for query in self.query_history.queries[::-1]:
if query.sql_text.upper().startswith("CALL") or self._is_sp_call(
query.sql_text
):
return query.query_id
sfc-gh-yuwang marked this conversation as resolved.
Show resolved Hide resolved
return None

def show_profiles(self) -> str:
"""
Return and show the profiles of last executed stored procedure.

Note:
This function must be called right after the execution of stored procedure you want to profile.
"""
query_id = self._get_last_query_id()
sql = f"select snowflake.core.get_python_profiler_output('{query_id}')"
res = self.session.sql(sql).collect()
print(res[0][0]) # noqa: T201: we need to print here.
return res[0][0]

def dump_profiles(self, dst_file: str):
"""
Write the profiles of last executed stored procedure to given file.

Note:
This function must be called right after the execution of stored procedure you want to profile.

Args:
dst_file: String of file name that you want to store the profiles.
"""
query_id = self._get_last_query_id()
sql = f"select snowflake.core.get_python_profiler_output('{query_id}')"
res = self.session.sql(sql).collect()
with open(dst_file, "w") as f:
f.write(str(res[0][0]))


@contextmanager
def profiler(
stage: str,
active_profiler: str,
session: "snowflake.snowpark.Session",
modules: Optional[List[str]] = None,
):
internal_profiler = Profiler(stage, active_profiler, session)
session.profiler = internal_profiler
internal_profiler.query_history = session.query_history()
modules = modules or []
try:
# create stage if not exist
if (
len(session.sql(f"show stages like '{internal_profiler.stage}'").collect())
== 0
and len(
session.sql(
f"show stages like '{internal_profiler.stage.split('.')[-1]}'"
).collect()
)
== 0
):
session.sql(
f"create temp stage if not exists {internal_profiler.stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
).collect()
# set up phase
internal_profiler._set_targeted_stage()
internal_profiler._set_active_profiler()

internal_profiler.register_profiler_modules(modules)
internal_profiler._register_modules()
internal_profiler.enable()
finally:
yield
internal_profiler.register_profiler_modules([])
internal_profiler._register_modules()
internal_profiler.disable()
76 changes: 76 additions & 0 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
from snowflake.snowpark.mock._plan_builder import MockSnowflakePlanBuilder
from snowflake.snowpark.mock._stored_procedure import MockStoredProcedureRegistration
from snowflake.snowpark.mock._udf import MockUDFRegistration
from snowflake.snowpark.profiler import Profiler
from snowflake.snowpark.query_history import QueryHistory
from snowflake.snowpark.row import Row
from snowflake.snowpark.stored_procedure import StoredProcedureRegistration
Expand Down Expand Up @@ -603,6 +604,8 @@ def __init__(
self._conf = self.RuntimeConfig(self, options or {})
self._runtime_version_from_requirement: str = None
self._temp_table_auto_cleaner: TempTableAutoCleaner = TempTableAutoCleaner(self)
self.profiler = None

_logger.info("Snowpark Session information: %s", self._session_info)

def __enter__(self):
Expand Down Expand Up @@ -3456,6 +3459,79 @@ def flatten(
set_api_call_source(df, "Session.flatten")
return df

def register_profiler(self, profiler: Profiler):
"""Register a profiler to a session, all action are actually executed during this function"""
if (
profiler.session is not None
and profiler.session._session_id != self._session_id
):
raise ValueError("A profiler can only be registered to one session.")
self.profiler = profiler
self.profiler.session = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we pass a profiler which is created a different session, is the overwriting here on purpose?
or another way to ask the question is should one profiler only be linked with one session?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, I think one profiler link to only one session is a good idea since the new session could have different setting.

if (
len(self.sql(f"show stages like '{profiler.stage}'").collect()) == 0
and len(
self.sql(
f"show stages like '{profiler.stage.split('.')[-1]}'"
).collect()
)
== 0
):
self.sql(
f"create temp stage if not exists {profiler.stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
).collect()
self.profiler._register_modules()
self.profiler._set_targeted_stage()
self.profiler._set_active_profiler()
self.profiler.query_history = self.query_history()

def show_profiles(self) -> str:
"""
Return and show the profiles of last executed stored procedure.

Note:
This function must be called right after the execution of stored procedure you want to profile.
"""
if self.profiler is not None and isinstance(self.profiler, Profiler):
return self.profiler.show_profiles()
else:
raise ValueError(
"profiler is not set, use session.register_profiler or profiler context manager"
)

def dump_profiles(self, dst_file: str):
"""
Write the profiles of last executed stored procedure to given file.

Note:
This function must be called right after the execution of stored procedure you want to profile.

Args:
dst_file: String of file name that you want to store the profiles.
"""
if self.profiler is not None and isinstance(self.profiler, Profiler):
self.profiler.dump_profiles(dst_file=dst_file)
else:
raise ValueError(
"profiler is not set, use session.register_profiler or profiler context manager"
)

def register_profiler_modules(self, stored_procedures: List[str]):
"""
Register stored procedures to generate profiles for them.

Note:
Registered nodules will be overwritten by this function,
use this function with an empty string will remove registered modules.
Args:
stored_procedures: List of names of stored procedures.
"""
if self.profiler is not None and isinstance(self.profiler, Profiler):
self.profiler.register_profiler_modules(stored_procedures)
else:
sql_statement = f"alter session set python_profiler_modules='{','.join(stored_procedures)}'"
self.sql(sql_statement).collect()

def query_history(self, include_describe: bool = False) -> QueryHistory:
"""Create an instance of :class:`QueryHistory` as a context manager to record queries that are pushed down to the Snowflake database.

Expand Down
33 changes: 33 additions & 0 deletions tests/integ/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,39 @@ def session(
session.close()


@pytest.fixture(scope="function")
def profiler_session(
db_parameters,
resources_path,
sql_simplifier_enabled,
local_testing_mode,
cte_optimization_enabled,
):
rule1 = f"rule1{Utils.random_alphanumeric_str(10)}"
rule2 = f"rule2{Utils.random_alphanumeric_str(10)}"
key1 = f"key1{Utils.random_alphanumeric_str(10)}"
key2 = f"key2{Utils.random_alphanumeric_str(10)}"
integration1 = f"integration1{Utils.random_alphanumeric_str(10)}"
integration2 = f"integration2{Utils.random_alphanumeric_str(10)}"
session = (
Session.builder.configs(db_parameters)
.config("local_testing", local_testing_mode)
.create()
)
session.sql_simplifier_enabled = sql_simplifier_enabled
session._cte_optimization_enabled = cte_optimization_enabled
if os.getenv("GITHUB_ACTIONS") == "true" and not local_testing_mode:
set_up_external_access_integration_resources(
session, rule1, rule2, key1, key2, integration1, integration2
)
yield session
if os.getenv("GITHUB_ACTIONS") == "true" and not local_testing_mode:
clean_up_external_access_integration_resources(
session, rule1, rule2, key1, key2, integration1, integration2
)
session.close()


@pytest.fixture(scope="function")
def temp_schema(connection, session, local_testing_mode) -> None:
"""Set up and tear down a temp schema for cross-schema test.
Expand Down
Loading
Loading