Skip to content

task throttling (configurable limited parallelism for certain tasks) #800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vincentschut opened this issue Jan 9, 2017 · 9 comments
Closed

Comments

@vincentschut
Copy link

Many of our pipelines initially start with downloading data from a public server. These usually have limits on how many parallel connections per user are allowed. However, when scaling a certain pipeline to more workers than the number of allowed connections, there is no way to tell the sceduler to run that first task with only a certain amount of parallelism. Ideally I could pass this restriction when creating a task, like a delayed, as the limit in my case is task (signature) specific, and these are pure tasks. Could however also follow the 'resources' semantics, defining the limit ultimately when calling compute, like "c.compute(final_task, limits={downloader_tasks: 8})". In fact I'm open to any (convenient) way to run certain tasks with limits on their parallelism.
However, caveat is that in my case many of these tasks are issued from other tasks using the local_client paradigm, but I still want the throttling to be strictly global of course. I don't see a way to do this with the 'resources' like semantics, as that needs a tuple of delayeds for which the restriction aplies, and thus the limit would only be applied to every local group instead of global. But maybe I'm missing something here.

NB: my current workaround is creating a separate worker pool, limited to n threads, and limiting those network-critical tasks to run only on that pool. Starts to get inconvenient when more than one type of download task is run on the same cluster, connnecting to a different server and which could thus run independent from the other download tasks.

@mrocklin
Copy link
Member

mrocklin commented Jan 9, 2017

You are correct that there is no way to do this currently. Using resources as you point out is probably the closest option for now.

You would define your workers with a fixed number of "download-connections" resources on them. As long as no new workers arrived with more "download-connections" resources then you would be assured that at most that many downloading tasks would run at a particular time.

@vincentschut
Copy link
Author

So basically that is the same mechanism as my current workaround: limit the number of connections by limiting the number of workers that is allowed to process download tasks. Good to know I have not overlooked something.

Bold question: is there any chance to have a global limit for certain tasks, enforced by the scheduler, implemented, either by us (if we can free the necessary resources) or by someone else? Would such an addition be welcomed? And if so, what would be the best way to approach this?

@mrocklin
Copy link
Member

Using resources would allow the same workers to process the special download tasks as well as normal tasks. This might reduce inter-worker data movement.

Bold question: is there any chance to have a global limit for certain tasks, enforced by the scheduler, implemented, either by us (if we can free the necessary resources) or by someone else? Would such an addition be welcomed? And if so, what would be the best way to approach this?

Generally yes, I expect the scheduler to slowly grow new features over time. This generally happens with care to ensure that the scheduler remains clean (we are somewhat conservative with this code) but the feature that you describe seems like it is probably in-scope to me.

@jreback
Copy link
Contributor

jreback commented Jan 10, 2017

@mrocklin
Copy link
Member

Right, good point @jreback. Or generally "you can do a lot of custom scheduling on the client side".

@mrocklin
Copy link
Member

I think that @jreback 's solution of using Queues here may be sufficient. Closing. Please reopen if there are further issues.

@KrishanBhasin
Copy link
Contributor

KrishanBhasin commented Mar 6, 2020

Sorry to revive an old thread, but #800 refers to Data Streams with Queues which appears to be no longer supported.

I have an application using Dask which loads data from a couple of tables, then performs many operations over the top of them.
I would like to keep the parallelism of the Dask/Pandas operations high, without overwhelming the database.
I figured that using the resources option might help here - I'm uncertain though because I'd like to use many processes, and as far as I understand, the resource constraints are applied on a per-worker/process basis.

I'm working entirely within the Dask Dataframe paradigm here, so the docs don't quite explain how I can do this.

My code is broadly as follows below:

ddf = dd.read_sql_table(q)

ddf['z'] = ddf['a'] / ddf['b']

with Client() as client:
    ddf.compute()

It looks like I can pass through the worker-specific resources by passing worker_kwargs={"DB_CON": 1} into the Client() constructor, but I can't work out how to pass in the limitation on read_sql_table() to the .compute() method.

If resources indeed do not help me, I would like to work out how I could adapt @vincentschut's solution of different worker pools for connectivity and computing could work while only using the python Client() constructor.

I'd be more than happy to PR an update to the docs once I get my head around this!

@mrocklin
Copy link
Member

mrocklin commented Mar 8, 2020

This issue is closed, and so people don't actively track it. You might be better served by opening a new issue or, if your question is more about how to use Dask, then perhaps asking a question on Stack Overflow.

@Danferno
Copy link
Contributor

Danferno commented Oct 21, 2022

Mild necro because this post pops up when you look for task throttling.

You can now do this very elegantly with semaphores. The code from the Dask documentation (pasted below) reduces concurrent access to 2 through the max_leases parameter.

from dask.distributed import Semaphore

sem = Semaphore(max_leases=2, name="database")

def access_limited(val, sem):
   with sem:
      # Interact with the DB
      return

futures = client.map(access_limited, range(10), sem=sem)
client.gather(futures)
sem.close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants