1
1
import asyncio
2
2
import io
3
+ import os
4
+ from concurrent .futures .thread import ThreadPoolExecutor
3
5
from datetime import tzinfo
4
6
from typing import Optional , Union , Dict , Any , Sequence , Iterable , Generator , BinaryIO
5
7
@@ -20,10 +22,13 @@ class AsyncClient:
20
22
Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor.
21
23
"""
22
24
23
- def __init__ (self , * , client : Client ):
25
+ def __init__ (self , * , client : Client , executor_threads : int = 0 ):
24
26
if isinstance (client , HttpClient ):
25
27
client .headers ['User-Agent' ] = client .headers ['User-Agent' ].replace ('mode:sync;' , 'mode:async;' )
26
28
self .client = client
29
+ if executor_threads == 0 :
30
+ executor_threads = min (32 , (os .cpu_count () or 1 ) + 4 ) # Mimic the default behavior
31
+ self .executor = ThreadPoolExecutor (max_workers = executor_threads )
27
32
28
33
29
34
def set_client_setting (self , key , value ):
@@ -88,7 +93,7 @@ def _query():
88
93
external_data = external_data )
89
94
90
95
loop = asyncio .get_running_loop ()
91
- result = await loop .run_in_executor (None , _query )
96
+ result = await loop .run_in_executor (self . executor , _query )
92
97
return result
93
98
94
99
async def query_column_block_stream (self ,
@@ -117,7 +122,7 @@ def _query_column_block_stream():
117
122
external_data = external_data )
118
123
119
124
loop = asyncio .get_running_loop ()
120
- result = await loop .run_in_executor (None , _query_column_block_stream )
125
+ result = await loop .run_in_executor (self . executor , _query_column_block_stream )
121
126
return result
122
127
123
128
async def query_row_block_stream (self ,
@@ -146,7 +151,7 @@ def _query_row_block_stream():
146
151
external_data = external_data )
147
152
148
153
loop = asyncio .get_running_loop ()
149
- result = await loop .run_in_executor (None , _query_row_block_stream )
154
+ result = await loop .run_in_executor (self . executor , _query_row_block_stream )
150
155
return result
151
156
152
157
async def query_rows_stream (self ,
@@ -175,7 +180,7 @@ def _query_rows_stream():
175
180
external_data = external_data )
176
181
177
182
loop = asyncio .get_running_loop ()
178
- result = await loop .run_in_executor (None , _query_rows_stream )
183
+ result = await loop .run_in_executor (self . executor , _query_rows_stream )
179
184
return result
180
185
181
186
async def raw_query (self ,
@@ -202,7 +207,7 @@ def _raw_query():
202
207
use_database = use_database , external_data = external_data )
203
208
204
209
loop = asyncio .get_running_loop ()
205
- result = await loop .run_in_executor (None , _raw_query )
210
+ result = await loop .run_in_executor (self . executor , _raw_query )
206
211
return result
207
212
208
213
async def raw_stream (self , query : str ,
@@ -228,7 +233,7 @@ def _raw_stream():
228
233
use_database = use_database , external_data = external_data )
229
234
230
235
loop = asyncio .get_running_loop ()
231
- result = await loop .run_in_executor (None , _raw_stream )
236
+ result = await loop .run_in_executor (self . executor , _raw_stream )
232
237
return result
233
238
234
239
async def query_np (self ,
@@ -255,7 +260,7 @@ def _query_np():
255
260
external_data = external_data )
256
261
257
262
loop = asyncio .get_running_loop ()
258
- result = await loop .run_in_executor (None , _query_np )
263
+ result = await loop .run_in_executor (self . executor , _query_np )
259
264
return result
260
265
261
266
async def query_np_stream (self ,
@@ -282,7 +287,7 @@ def _query_np_stream():
282
287
context = context , external_data = external_data )
283
288
284
289
loop = asyncio .get_running_loop ()
285
- result = await loop .run_in_executor (None , _query_np_stream )
290
+ result = await loop .run_in_executor (self . executor , _query_np_stream )
286
291
return result
287
292
288
293
async def query_df (self ,
@@ -314,7 +319,7 @@ def _query_df():
314
319
external_data = external_data , use_extended_dtypes = use_extended_dtypes )
315
320
316
321
loop = asyncio .get_running_loop ()
317
- result = await loop .run_in_executor (None , _query_df )
322
+ result = await loop .run_in_executor (self . executor , _query_df )
318
323
return result
319
324
320
325
async def query_df_stream (self ,
@@ -347,7 +352,7 @@ def _query_df_stream():
347
352
external_data = external_data , use_extended_dtypes = use_extended_dtypes )
348
353
349
354
loop = asyncio .get_running_loop ()
350
- result = await loop .run_in_executor (None , _query_df_stream )
355
+ result = await loop .run_in_executor (self . executor , _query_df_stream )
351
356
return result
352
357
353
358
def create_query_context (self ,
@@ -433,7 +438,7 @@ def _query_arrow():
433
438
use_strings = use_strings , external_data = external_data )
434
439
435
440
loop = asyncio .get_running_loop ()
436
- result = await loop .run_in_executor (None , _query_arrow )
441
+ result = await loop .run_in_executor (self . executor , _query_arrow )
437
442
return result
438
443
439
444
async def query_arrow_stream (self ,
@@ -457,7 +462,7 @@ def _query_arrow_stream():
457
462
use_strings = use_strings , external_data = external_data )
458
463
459
464
loop = asyncio .get_running_loop ()
460
- result = await loop .run_in_executor (None , _query_arrow_stream )
465
+ result = await loop .run_in_executor (self . executor , _query_arrow_stream )
461
466
return result
462
467
463
468
async def command (self ,
@@ -486,7 +491,7 @@ def _command():
486
491
use_database = use_database , external_data = external_data )
487
492
488
493
loop = asyncio .get_running_loop ()
489
- result = await loop .run_in_executor (None , _command )
494
+ result = await loop .run_in_executor (self . executor , _command )
490
495
return result
491
496
492
497
async def ping (self ) -> bool :
@@ -499,7 +504,7 @@ def _ping():
499
504
return self .client .ping ()
500
505
501
506
loop = asyncio .get_running_loop ()
502
- result = await loop .run_in_executor (None , _ping )
507
+ result = await loop .run_in_executor (self . executor , _ping )
503
508
return result
504
509
505
510
async def insert (self ,
@@ -537,7 +542,7 @@ def _insert():
537
542
column_oriented = column_oriented , settings = settings , context = context )
538
543
539
544
loop = asyncio .get_running_loop ()
540
- result = await loop .run_in_executor (None , _insert )
545
+ result = await loop .run_in_executor (self . executor , _insert )
541
546
return result
542
547
543
548
async def insert_df (self , table : str = None ,
@@ -572,7 +577,7 @@ def _insert_df():
572
577
context = context )
573
578
574
579
loop = asyncio .get_running_loop ()
575
- result = await loop .run_in_executor (None , _insert_df )
580
+ result = await loop .run_in_executor (self . executor , _insert_df )
576
581
return result
577
582
578
583
async def insert_arrow (self , table : str ,
@@ -591,7 +596,7 @@ def _insert_arrow():
591
596
return self .client .insert_arrow (table = table , arrow_table = arrow_table , database = database , settings = settings )
592
597
593
598
loop = asyncio .get_running_loop ()
594
- result = await loop .run_in_executor (None , _insert_arrow )
599
+ result = await loop .run_in_executor (self . executor , _insert_arrow )
595
600
return result
596
601
597
602
async def create_insert_context (self ,
@@ -625,7 +630,7 @@ def _create_insert_context():
625
630
column_oriented = column_oriented , settings = settings , data = data )
626
631
627
632
loop = asyncio .get_running_loop ()
628
- result = await loop .run_in_executor (None , _create_insert_context )
633
+ result = await loop .run_in_executor (self . executor , _create_insert_context )
629
634
return result
630
635
631
636
async def data_insert (self , context : InsertContext ) -> QuerySummary :
@@ -639,7 +644,7 @@ def _data_insert():
639
644
return self .client .data_insert (context = context )
640
645
641
646
loop = asyncio .get_running_loop ()
642
- result = await loop .run_in_executor (None , _data_insert )
647
+ result = await loop .run_in_executor (self . executor , _data_insert )
643
648
return result
644
649
645
650
async def raw_insert (self , table : str ,
@@ -663,5 +668,5 @@ def _raw_insert():
663
668
settings = settings , fmt = fmt , compression = compression )
664
669
665
670
loop = asyncio .get_running_loop ()
666
- result = await loop .run_in_executor (None , _raw_insert )
671
+ result = await loop .run_in_executor (self . executor , _raw_insert )
667
672
return result
0 commit comments