Skip to content

ConnectionError in Client.insert_rows_json() #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yan-hic opened this issue Dec 10, 2020 · 11 comments · Fixed by #571
Closed

ConnectionError in Client.insert_rows_json() #434

yan-hic opened this issue Dec 10, 2020 · 11 comments · Fixed by #571
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@yan-hic
Copy link

yan-hic commented Dec 10, 2020

We have a http Cloud Function that does some data processing and then streams to BQ. The function errors out sometimes because of either the bq client losing connection or it is the insert_rows that can't connect.
See below an example of a stack trace captured in the GCP logs.

  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/functions_framework/__init__.py", line 66, in view_func
    return function(request._get_current_object())
  File "/workspace/main.py", line 162, in stream_tax
    errors = bq.insert_rows_json(table=dataset_table,
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 3013, in insert_rows_json
    response = self._call_api(
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 636, in _call_api
    return call()
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/api_core/retry.py", line 281, in retry_wrapped_func
    return retry_target(
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/cloud/_http.py", line 427, in api_request
    response = self._make_request(
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/cloud/_http.py", line 291, in _make_request
    return self._do_request(
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/cloud/_http.py", line 329, in _do_request
    return self.http.request(
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/auth/transport/requests.py", line 464, in request
    response = super(AuthorizedSession, self).request(
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/layers/google.python.pip/pip/lib/python3.8/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

Thoughts ?

@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Dec 10, 2020
@yan-hic yan-hic changed the title ConnectionError ConnectionError in Client.insert_rows_json() Dec 10, 2020
@yan-hic
Copy link
Author

yan-hic commented Dec 10, 2020

Actually Client() does not make any API call so reinstantiating won't help.

I have instead wrapped the insert_rows_json() in a backoff decorator like so

@backoff.on_exception(backoff.expo, ConnectionError)
def insert_into_bq(dataset_table, rows,row_ids):
    errors = bq.insert_rows_json(table=dataset_table,
                                 json_rows=rows,
                                 ignore_unknown_values=True,
                                 row_ids=row_ids)
    return errors

Will see if this would make the errors go away.

@yan-hic
Copy link
Author

yan-hic commented Dec 11, 2020

Seems to work under medium workload (500 calls/s) so I wonder if ConnectionError could not be made retriable in the bigquery library

@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Dec 11, 2020
@meredithslota meredithslota added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. and removed triage me I really want to be triaged. labels Dec 11, 2020
@tswast
Copy link
Contributor

tswast commented Dec 15, 2020

We recently added ConnectionError to allowed retries in one of our core libraries. googleapis/google-resumable-media-python#186 I think it'd make sense to do the same here.

@vavdoshka
Copy link

Is it a correct assumption that when this error (ConnectionReset) occurred there is no guarantee about whether some records were inserted or not? So in order to retry reliably the insert_ids should be provided for de-duplication.

@yan-hic
Copy link
Author

yan-hic commented Feb 3, 2021

Not sure if this was addressed since @tswast 's comment - hence why I won't close - but if that helps, we ended up using the backoff decorator like so:

@backoff.on_exception(backoff.expo, ConnectionError)
def insert_into_bq(dataset_table, rows):
    errors = bq.insert_rows_json(table=dataset_table,
                                 json_rows=rows,
                                 row_ids=[None] * len(rows),
                                 ignore_unknown_values=True)
    return errors

It's easy enough to add another retriable error here if a new one shows up.

@vavdoshka
Copy link

@yiga2 thanks, I want also to understand if this is a safe approach to retry this error in case if we have strict duplication requirement and can not use "row_ids" yet. Or in this case we might end up with some duplicates in Big Query.

@tswast
Copy link
Contributor

tswast commented Feb 3, 2021

@vavdoshka If you explicitly set row_ids=[None] * len(rows), you opt-out of any de-duplication features. If you do not set these, the client library automatically adds row IDs. The deduplication feature was designed with transport errors like ConnectionError in mind.

@yan-hic
Copy link
Author

yan-hic commented Feb 3, 2021

The deduplication on BQ side is "best effort" (https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency) so not trustfully, and inserts are slower since it has to check for existing rowid/key.
We decided to opt-out.

Out of scope but fyi, we run the below to truly dedupe, and only if applicable:

IF (SELECT MAX(count) FROM (SELECT key, COUNT(*) AS count
                            FROM mydataset.mytable
                            GROUP BY 1)
    ) > 1 THEN
    CREATE OR REPLACE TABLE mydataset.mytable CLUSTER BY key AS (
    SELECT * EXCEPT (rn) FROM (
        SELECT *,
            row_number() OVER (PARTITION BY key ORDER BY transaction_date DESC) rn
        FROM mydataset.mytable
    )
    WHERE rn = 1);
END IF;  

@vavdoshka
Copy link

vavdoshka commented Feb 3, 2021

@tswast thanks, yeah I guess in my case it is better to skip and backup the data somewhere else in case of any transport error i.e. to not use insertId by explicitly setting it to None. Cause I want to have the maximum insert quota available instead. Please another question on this topic, is the default google.api_core.retry.Retry logic used in insert_rows_json can be considered as duplication safe itself? Cause by default it should retry only on transient API errors, and that technically should mean the insert was not in progress on Big Query side when error happened, right?
It will help me to understand wether I can keep this behaviour in place or should opt-out to avoid possible duplicates.

@camerondavison
Copy link

I think that this error happens everywhere. I saw it while reading results from a big query job.

  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 2931, in query
    query_job._begin(retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1088, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 457, in _begin
    api_response = client._call_api(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 640, in _call_api
    return call()
  File "/usr/local/lib/python3.8/site-packages/google/api_core/retry.py", line 281, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.8/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/usr/local/lib/python3.8/site-packages/google/cloud/_http.py", line 472, in api_request
    response = self._make_request(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/_http.py", line 336, in _make_request
    return self._do_request(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/_http.py", line 374, in _do_request
    return self.http.request(
  File "/usr/local/lib/python3.8/site-packages/google/auth/transport/requests.py", line 482, in request
    response = super(AuthorizedSession, self).request(
  File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.8/site-packages/elasticapm/instrumentation/packages/base.py", line 210, in call_if_sampling
    return self.call(module, method, wrapped, instance, args, kwargs)
  File "/usr/local/lib/python3.8/site-packages/elasticapm/instrumentation/packages/requests.py", line 59, in call
    response = wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

is there a way to at least make the ConnectionError be retryable for read only requests?

@tswast
Copy link
Contributor

tswast commented Mar 22, 2021

@camerondavison can you file a separate request? Since insert_rows_json has a bit complex retry logic due to presence or absence of row IDs, it's probably worth considering separately from requests like queries.

Edit: On second thought, I think the fix is the same either way: to add connection error to default retry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants