Skip to content

Complex join fails with memory error #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
nils-braun opened this issue Mar 13, 2021 · 10 comments
Closed

Complex join fails with memory error #148

nils-braun opened this issue Mar 13, 2021 · 10 comments

Comments

@nils-braun
Copy link
Collaborator

From @timhdesilva

So I have a large dataset (50GB) that needs to be merged with a small dataset that is a Pandas dataframe. Prior to the merge, I need to perform a groupby observation on the large dataset. Using Dask, I have been able to perform the groupby observation on the large dataset (which is a Dask dataframe). When I then merge the two datasets using X.merge(Y), I have no issues. The problem is that I need to perform a merge than is not exact (i.e. one column between two others), which is why I'm turning to dask-sql. When I try to do the merge with dask-sql though, I get a memory error (the number of observations should only be ~ 10x than the exact merge, so memory shouldn't be a problem).

Any ideas here? I'm thinking somehow the issue might be that I am performing a groupby operation on the Dask dataframe prior to the dask-sql merge. Is this allowed - i.e. can one do a groupby and not execute it prior to using the dask-sql create_table() command and then performing a dask-sql merge with c.sql?

@nils-braun
Copy link
Collaborator Author

That is a perfectly valid use-case and that is exactly what I am looking for to test out the limits of the current implementation! Thanks for this - that is definitely something we can debug together :-)

Some background: complex joins are definitely hard, as we basically need to join everything with everything to test out the join condition (there might be some shortcuts on "<" and ">" but the optimization is not as good by now). After your issue I realized that currently I have solved that by copying all data into a single node - which hardly makes sense for big data (so I would like to reimplement that)

Here would be a solution that I am currently thinking of:
if the first dataset has N partitions and the second one M partitions, we can either create M x N partitions and join each partition with each partition. While doing so, we can already filter each partition separately with the join condition which should reduce the data by a large amount. The only problem is, is that this will probably cause many empty partitions (which might be an overhead). So we probably want to re-partition afterwards to have max(N, M) partitions. Would you think that makes sense? I will start implementing this in dask-sql - could you maybe give it a test once it is done?

@quasiben
Copy link
Contributor

quasiben commented Mar 13, 2021 via email

@timhdesilva
Copy link

@nils-braun: I think that solution sounds promising - happy to give it a test once it's done!

@nils-braun
Copy link
Collaborator Author

Thanks, @quasiben for the hint! Very interesting read. I chose a quite similar technique here (basically also the PR introduces a nested loop over both partitions and combines every with every). There are unfortunately some differences between the left/right/inner join from the PR and the cross join we need here, so I can not re-use the exact same code:

  • the PR produces max(N, N) partitions by concationating. Althought I had the same plan (see my last post), I have now realized that this will produce very large partitions.
  • also, the PR uses splitting/hashing on the non-broadcast side - something that is unfortunately not possible here :-(

@timhdesilva, I added a first draft implementation in #150 - would you be able to test it? It might be a bit rought around the edges still, but for my very limited test cases (on my local machine) it worked.

@timhdesilva
Copy link

@nils-braun, thanks for the response! In order to try your update, what's the best way to update dask-sql? I installed the package using conda, so not sure what the best way is to get the updated version on my local machine. Thanks (sorry I'm a GitHub beginner)!

@nils-braun
Copy link
Collaborator Author

nils-braun commented Mar 17, 2021

Great question @timhdesilva! You can follow the instructions from the docu if you want. The only thing you need to change:

  • After cloning the repository with git (or any GUI tool if you want), check out the correct branch (feature/cross-joins-on-large-data in this case)
  • You do not need to install the precommit hook (as you do not want to do commits - or maybe you do want ;-))
  • And you do not need to run the tests (but you can)
  • The instructions tell you to create a new conda environment. You can of course also just re-use your already present one

If that did not help, please contact me again! I am very happy that you try it out!

@nils-braun
Copy link
Collaborator Author

Hi @timhdesilva - did you have a chance to look into the issue? No pressure; just did not want to be the one you are waiting for :-)

@timhdesilva
Copy link

timhdesilva commented Apr 6, 2021 via email

@nils-braun
Copy link
Collaborator Author

nils-braun commented Apr 6, 2021

Ok, then I will merge the PR with my fix. At least in my tests it worked and did also not run into the memory error.
Feel free to pick it up later if you want!
Thanks for the issue - that was really interesting!

@nils-braun
Copy link
Collaborator Author

I am closing this issue for now. If the problem comes up again, feel free to ping me or re-open

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants