Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expr instances are serialized as part of the graph #323

Closed
fjetter opened this issue Oct 9, 2023 · 14 comments · Fixed by #331
Closed

Expr instances are serialized as part of the graph #323

fjetter opened this issue Oct 9, 2023 · 14 comments · Fixed by #331

Comments

@fjetter
Copy link
Member

fjetter commented Oct 9, 2023

Running some profiling on the TCPH benchmarks, I noticed a significant portion of the main thread to be blocked by the calculation of _meta during the deserialization of Expr objects

image

This screenshot indicates that the expr instance that is being (de-)serialized is the Merge expression.

Looking through the traces a bit more, I can also find references to a parquet layer which almost looks like it would fetch some data (but I hope this is false). It is definitely performing some pyarrow table conversion.

image

speedscope file (a Worker)
tls-10_0_32_152-45157.json.zip

Just skimming the code, I saw stuff like this

https://github.com/dask-contrib/dask-expr/blob/52346744cf530ca86108553fb0427f552d91854e/dask_expr/_concat.py#L183

where a method of an expression is used to define a dask task. This inevitably requires us to serialize the instance itself which is something we should avoid. Ideally, the graph only includes global functions such that we can pickle by reference and not by value.

I don't know if there are other places where this pattern comes up.

related #284

@rjzamora
Copy link
Member

rjzamora commented Oct 9, 2023

Just skimming the code, I saw stuff like this ... [[self._meta, key], 0, self.join, False, True] ... where a method of an expression is used to define a dask task

In this case, self.join is a parameter (not a method). So, we definitely shouldn't need to serialize the expression.

@fjetter
Copy link
Member Author

fjetter commented Oct 9, 2023

In this case, self.join is a parameter (not a method). So, we definitely shouldn't need to serialize the expression.

Well, my mistake. Sorry. I haven't really tried to find out where this is happening, yet. I mixed it up with DataFrame.join which is a method

What's interesting is that for whatever objects this path is triggered, these objects have empty caches and according to my tests, cached_property is something that is preserved in pickle roundtrips

@rjzamora
Copy link
Member

rjzamora commented Oct 9, 2023

Well, my mistake. Sorry. I haven't really tried to find out where this is happening, yet. I mixed it up with DataFrame.join which is a method

No problem - I just wanted to put your mind at ease about that particular detail. I'm glad to see you investigating serialization behavior in general.

What's interesting is that for whatever objects this path is triggered, these objects have empty caches and according to my tests, cached_property is something that is preserved in pickle roundtrips

I suppose I don't really understand why expressions are bing pickled at all yet. Are you using a version of dask/distributed where expressions are shipped to the scheduler?

@fjetter
Copy link
Member Author

fjetter commented Oct 9, 2023

This is just using standard main, nothing sophisticated. This is also a worker profile so even if we did send expressions to the scheduler, the worker should never see this. The only way this makes sense if we accidentally pickle something by value.

I'll spin up a debugger. This should be easy to track down

@rjzamora
Copy link
Member

rjzamora commented Oct 9, 2023

The only way this makes sense if we accidentally pickle something by value.

Okay, I see. Thanks for clarifying! Then I agree that we shouldn't see any expression objects show up at all here (and should fix the fact that we do).

@fjetter
Copy link
Member Author

fjetter commented Oct 9, 2023

So, the expression that ends up on the worker is a RenameFrame expression that includes a Merge expression as one of it's dependencies and the Merge is slow such that it pops up on the profile.

This object is deeply buried in a fused task.

@mrocklin
Copy link
Member

mrocklin commented Oct 9, 2023

Agreed that serializing task graphs shouldn't carry along expressions. Probably some method is getting carried along somewhere. If we had an assert_eq function of our own (maybe it's time for this) we could probably include something like this check in that function (we'd serialize any __dask_graph__ that was present and ensure that it didn't include the bytes for an Expr class (if that's easy)).

If we do serialize then I'd expect it to be cheap, see

https://github.com/dask-contrib/dask-expr/blob/52346744cf530ca86108553fb0427f552d91854e/dask_expr/_expr.py#L138-L139

and

https://github.com/dask-contrib/dask-expr/blob/52346744cf530ca86108553fb0427f552d91854e/dask_expr/tests/test_collection.py#L677-L693

It looks like the errant piece of code is coming out of the Parquet stuff, which does do more interesting caching than is common in other Expr subclasses. It might makes sense to review that in particular.

@fjetter
Copy link
Member Author

fjetter commented Oct 9, 2023

A simple albeit less elegant way to ensure this isn't serialized is also to just increase a counter whenever an object is serialized and assert that this is zero in subprocesses. counter does not increase when roundtriping a materialized graph

@fjetter
Copy link
Member Author

fjetter commented Oct 9, 2023

Yes, I can also see a ParquetReader expression...

Digging in here a bit points to https://github.com/dask-contrib/dask-expr/blob/52346744cf530ca86108553fb0427f552d91854e/dask_expr/_groupby.py#L43

I'll have to run this again since I also had a ddf.partitions[:2].sample(...) in here but that one is definitely a problem. IIUC Expr.operation must not be a method but has to be a property since otherwise things start to break like this. If that's true, there are a couple of other offenders as well

@rjzamora
Copy link
Member

rjzamora commented Oct 9, 2023

Thanks for investigating that @fjetter - I do see how GroupByChunk can be a problem. I'll see if we can redesign a bit to avoid passing chunk as a parameter (and thus avoid the need for operation to be a method).

[EDIT: I suppose the easiest fix is to define operation as a property (as you already suggested above)]

@fjetter
Copy link
Member Author

fjetter commented Oct 12, 2023

I think the case we've seen has been addressed but I don't think we have tests for this, yet. I think we should have tests so I suggest to keep this open until we have them.

@rjzamora
Copy link
Member

Yeah, I agree that we should keep this open until we have test coverage. Still not sure about the best way to do this.

@mrocklin
Copy link
Member

mrocklin commented Oct 12, 2023 via email

@rjzamora
Copy link
Member

I submitted a rough solution in #331 - Suggestions are very welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants