Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use execute_many from backends directly #468

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion databases/backends/aiopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ async def execute(self, query: ClauseElement) -> typing.Any:
finally:
cursor.close()

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
async def execute_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> None:
assert self._connection is not None, "Connection is not acquired"
cursor = await self._connection.cursor()
try:
Expand Down
4 changes: 3 additions & 1 deletion databases/backends/asyncmy.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ async def execute(self, query: ClauseElement) -> typing.Any:
finally:
await cursor.close()

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
async def execute_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> None:
assert self._connection is not None, "Connection is not acquired"
async with self._connection.cursor() as cursor:
try:
Expand Down
24 changes: 20 additions & 4 deletions databases/backends/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ async def execute(self, query: ClauseElement) -> typing.Any:
finally:
await cursor.close()

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
async def execute_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> None:
assert self._connection is not None, "Connection is not acquired"
cursor = await self._connection.cursor()
query_str, values = self._compile_many(queries, values)
try:
for single_query in queries:
single_query, args, context = self._compile(single_query)
await cursor.execute(single_query, args)
await cursor.executemany(query_str, values)
finally:
await cursor.close()

Expand Down Expand Up @@ -220,6 +221,21 @@ def _compile(
logger.debug("Query: %s Args: %s", query_message, repr(args), extra=LOG_EXTRA)
return compiled.string, args, CompilationContext(execution_context)

def _compile_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> typing.Tuple[str, list]:
compiled = queries[0].compile(
dialect=self._dialect, compile_kwargs={"render_postcompile": True}
)
if not isinstance(queries[0], DDLElement):
for args in values:
for key, val in args.items():
if key in compiled._bind_processors:
args[key] = compiled._bind_processors[key](val)
else:
values = []
return compiled.string, values

@property
def raw_connection(self) -> aiomysql.connection.Connection:
assert self._connection is not None, "Connection is not acquired"
Expand Down
36 changes: 29 additions & 7 deletions databases/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,12 @@ async def execute(self, query: ClauseElement) -> typing.Any:
query_str, args, result_columns = self._compile(query)
return await self._connection.fetchval(query_str, *args)

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
async def execute_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> None:
Comment on lines +219 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this it not a breaking change for users because this is an internal method and values is always provided by us in the public method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean here, but I think users are only supposed to use execute_many from the core file, not from any of the backends

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's the question — I want to gauge if changing this function signature is a breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I don't think it should break anything

assert self._connection is not None, "Connection is not acquired"
# asyncpg uses prepared statements under the hood, so we just
# loop through multiple executes here, which should all end up
# using the same prepared statement.
for single_query in queries:
single_query, args, result_columns = self._compile(single_query)
await self._connection.execute(single_query, *args)
query_str, values = self._compile_many(queries, values)
await self._connection.executemany(query_str, values)

async def iterate(
self, query: ClauseElement
Expand Down Expand Up @@ -268,6 +266,30 @@ def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]:
)
return compiled_query, args, result_map

def _compile_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> typing.Tuple[str, list]:
compiled = queries[0].compile(
dialect=self._dialect, compile_kwargs={"render_postcompile": True}
)
new_values = []
if not isinstance(queries[0], DDLElement):
for args in values:
sorted_args = sorted(args.items())
mapping = {
key: "$" + str(i) for i, (key, _) in enumerate(sorted_args, start=1)
}
compiled_query = compiled.string % mapping
processors = compiled._bind_processors
values = [
processors[key](val) if key in processors else val
for key, val in sorted_args
]
new_values.append(values)
else:
compiled_query = compiled.string
return compiled_query, new_values

@staticmethod
def _create_column_maps(
result_columns: tuple,
Expand Down
29 changes: 26 additions & 3 deletions databases/backends/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,13 @@ async def execute(self, query: ClauseElement) -> typing.Any:
return cursor.rowcount
return cursor.lastrowid

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
async def execute_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> None:
assert self._connection is not None, "Connection is not acquired"
for single_query in queries:
await self.execute(single_query)
query_str, values = self._compile_many(queries, values)
async with self._connection.cursor() as cursor:
await cursor.executemany(query_str, values)

async def iterate(
self, query: ClauseElement
Expand Down Expand Up @@ -194,6 +197,26 @@ def _compile(
)
return compiled.string, args, CompilationContext(execution_context)

def _compile_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> typing.Tuple[str, list]:
compiled = queries[0].compile(
dialect=self._dialect, compile_kwargs={"render_postcompile": True}
)
new_values = []
if not isinstance(queries[0], DDLElement):
for args in values:
temp_arr = []
for key in compiled.positiontup:
raw_val = args[key]
if key in compiled._bind_processors:
val = compiled._bind_processors[key](raw_val)
else:
val = raw_val
temp_arr.append(val)
new_values.append(temp_arr)
return compiled.string, new_values

@property
def raw_connection(self) -> aiosqlite.core.Connection:
assert self._connection is not None, "Connection is not acquired"
Expand Down
2 changes: 1 addition & 1 deletion databases/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def execute_many(
) -> None:
queries = [self._build_query(query, values_set) for values_set in values]
async with self._query_lock:
await self._connection.execute_many(queries)
await self._connection.execute_many(queries, values)
Comment on lines 279 to +281
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid building the queries here if you're going to compile them later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it's been a long time since I wrote this PR, but I think we still need to convert the query to ClauseElement and bind params, but maybe we don't need list comprehension here, so maybe something like that is sufficient

async def execute_many(
        self, query: typing.Union[ClauseElement, str], values: list
    ) -> None:
        query = self._build_query(query,values[0])
        async with self._query_lock:
            await self._connection.execute_many(query, values)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm interesting. I'm not sure myself, just wanted to check if you had considered it.

Thanks for chiming in on this after the delay. If you're interested in working on it again I can review and release it. Did you confirm this works and improves performance as expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can work on it tmr for a few.
it should improve the performance but i didn't make any benchmarks tbh


async def iterate(
self, query: typing.Union[ClauseElement, str], values: dict = None
Expand Down
4 changes: 3 additions & 1 deletion databases/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ async def fetch_val(
async def execute(self, query: ClauseElement) -> typing.Any:
raise NotImplementedError() # pragma: no cover

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
async def execute_many(
self, queries: typing.List[ClauseElement], values: typing.List[dict]
) -> None:
raise NotImplementedError() # pragma: no cover

async def iterate(
Expand Down