Skip to content

Commit f8887d3

Browse files
committed
perf: dont fetch rows when waiting for query to finish
When there are large result sets, fetching rows while waiting for the query to finish can cause the API to hang indefinitely. (This may be due to an interaction between connection timeout and API timeout.) This reverts commit 86f6a51 (googleapis#374).
1 parent a1949ae commit f8887d3

6 files changed

Lines changed: 60 additions & 115 deletions

File tree

google/cloud/bigquery/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,7 +1534,7 @@ def _get_query_results(
15341534
A new ``_QueryResults`` instance.
15351535
"""
15361536

1537-
extra_params = {}
1537+
extra_params = {"maxResults": 0}
15381538

15391539
if project is None:
15401540
project = self.project
@@ -3187,7 +3187,6 @@ def _list_rows_from_query_results(
31873187
page_size=None,
31883188
retry=DEFAULT_RETRY,
31893189
timeout=None,
3190-
first_page_response=None,
31913190
):
31923191
"""List the rows of a completed query.
31933192
See
@@ -3248,7 +3247,6 @@ def _list_rows_from_query_results(
32483247
table=destination,
32493248
extra_params=params,
32503249
total_rows=total_rows,
3251-
first_page_response=first_page_response,
32523250
)
32533251
return row_iterator
32543252

google/cloud/bigquery/job/query.py

Lines changed: 31 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -991,22 +991,48 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
991991
Returns:
992992
bool: True if the job is complete, False otherwise.
993993
"""
994+
is_done = (
995+
# Only consider a QueryJob complete when we know we have the final
996+
# query results available.
997+
self._query_results is not None
998+
and self._query_results.complete
999+
and self.state == _DONE_STATE
1000+
)
9941001
# Do not refresh if the state is already done, as the job will not
9951002
# change once complete.
996-
is_done = self.state == _DONE_STATE
9971003
if not reload or is_done:
9981004
return is_done
9991005

1000-
self._reload_query_results(retry=retry, timeout=timeout)
1006+
# Since the API to getQueryResults can hang up to the timeout value
1007+
# (default of 10 seconds), set the timeout parameter to ensure that
1008+
# the timeout from the futures API is respected. See:
1009+
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1010+
timeout_ms = None
1011+
if self._done_timeout is not None:
1012+
# Subtract a buffer for context switching, network latency, etc.
1013+
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1014+
api_timeout = max(min(api_timeout, 10), 0)
1015+
self._done_timeout -= api_timeout
1016+
self._done_timeout = max(0, self._done_timeout)
1017+
timeout_ms = int(api_timeout * 1000)
10011018

10021019
# If an explicit timeout is not given, fall back to the transport timeout
10031020
# stored in _blocking_poll() in the process of polling for job completion.
10041021
transport_timeout = timeout if timeout is not None else self._transport_timeout
10051022

1023+
self._query_results = self._client._get_query_results(
1024+
self.job_id,
1025+
retry,
1026+
project=self.project,
1027+
timeout_ms=timeout_ms,
1028+
location=self.location,
1029+
timeout=transport_timeout,
1030+
)
1031+
10061032
# Only reload the job once we know the query is complete.
10071033
# This will ensure that fields such as the destination table are
10081034
# correctly populated.
1009-
if self._query_results.complete:
1035+
if self._query_results.complete and self.state != _DONE_STATE:
10101036
self.reload(retry=retry, timeout=transport_timeout)
10111037

10121038
return self.state == _DONE_STATE
@@ -1073,45 +1099,6 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
10731099
exc.query_job = self
10741100
raise
10751101

1076-
def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
1077-
"""Refresh the cached query results.
1078-
1079-
Args:
1080-
retry (Optional[google.api_core.retry.Retry]):
1081-
How to retry the call that retrieves query results.
1082-
timeout (Optional[float]):
1083-
The number of seconds to wait for the underlying HTTP transport
1084-
before using ``retry``.
1085-
"""
1086-
if self._query_results and self._query_results.complete:
1087-
return
1088-
1089-
# Since the API to getQueryResults can hang up to the timeout value
1090-
# (default of 10 seconds), set the timeout parameter to ensure that
1091-
# the timeout from the futures API is respected. See:
1092-
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1093-
timeout_ms = None
1094-
if self._done_timeout is not None:
1095-
# Subtract a buffer for context switching, network latency, etc.
1096-
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1097-
api_timeout = max(min(api_timeout, 10), 0)
1098-
self._done_timeout -= api_timeout
1099-
self._done_timeout = max(0, self._done_timeout)
1100-
timeout_ms = int(api_timeout * 1000)
1101-
1102-
# If an explicit timeout is not given, fall back to the transport timeout
1103-
# stored in _blocking_poll() in the process of polling for job completion.
1104-
transport_timeout = timeout if timeout is not None else self._transport_timeout
1105-
1106-
self._query_results = self._client._get_query_results(
1107-
self.job_id,
1108-
retry,
1109-
project=self.project,
1110-
timeout_ms=timeout_ms,
1111-
location=self.location,
1112-
timeout=transport_timeout,
1113-
)
1114-
11151102
def result(
11161103
self,
11171104
page_size=None,
@@ -1158,11 +1145,6 @@ def result(
11581145
"""
11591146
try:
11601147
super(QueryJob, self).result(retry=retry, timeout=timeout)
1161-
1162-
# Since the job could already be "done" (e.g. got a finished job
1163-
# via client.get_job), the superclass call to done() might not
1164-
# set the self._query_results cache.
1165-
self._reload_query_results(retry=retry, timeout=timeout)
11661148
except exceptions.GoogleAPICallError as exc:
11671149
exc.message += self._format_for_exception(self.query, self.job_id)
11681150
exc.query_job = self
@@ -1177,14 +1159,10 @@ def result(
11771159
if self._query_results.total_rows is None:
11781160
return _EmptyRowIterator()
11791161

1180-
first_page_response = None
1181-
if max_results is None and page_size is None and start_index is None:
1182-
first_page_response = self._query_results._properties
1183-
11841162
rows = self._client._list_rows_from_query_results(
1185-
self.job_id,
1163+
self._query_results.job_id,
11861164
self.location,
1187-
self.project,
1165+
self._query_results.project,
11881166
self._query_results.schema,
11891167
total_rows=self._query_results.total_rows,
11901168
destination=self.destination,
@@ -1193,7 +1171,6 @@ def result(
11931171
start_index=start_index,
11941172
retry=retry,
11951173
timeout=timeout,
1196-
first_page_response=first_page_response,
11971174
)
11981175
rows._preserve_order = _contains_order_by(self.query)
11991176
return rows

google/cloud/bigquery/table.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,9 +1301,7 @@ class RowIterator(HTTPIterator):
13011301
A subset of columns to select from this table.
13021302
total_rows (Optional[int]):
13031303
Total number of rows in the table.
1304-
first_page_response (Optional[dict]):
1305-
API response for the first page of results. These are returned when
1306-
the first page is requested.
1304+
13071305
"""
13081306

13091307
def __init__(
@@ -1319,7 +1317,6 @@ def __init__(
13191317
table=None,
13201318
selected_fields=None,
13211319
total_rows=None,
1322-
first_page_response=None,
13231320
):
13241321
super(RowIterator, self).__init__(
13251322
client,
@@ -1342,7 +1339,6 @@ def __init__(
13421339
self._selected_fields = selected_fields
13431340
self._table = table
13441341
self._total_rows = total_rows
1345-
self._first_page_response = first_page_response
13461342

13471343
def _is_completely_cached(self):
13481344
"""Check if all results are completely cached.
@@ -1386,11 +1382,6 @@ def _get_next_page_response(self):
13861382
Dict[str, object]:
13871383
The parsed JSON response of the next page's contents.
13881384
"""
1389-
if self._first_page_response:
1390-
response = self._first_page_response
1391-
self._first_page_response = None
1392-
return response
1393-
13941385
params = self._get_query_params()
13951386
if self._page_size is not None:
13961387
if self.page_number and "startIndex" in params:

tests/unit/job/test_query.py

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -787,9 +787,7 @@ def test_result(self):
787787
"location": "EU",
788788
},
789789
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
790-
"totalRows": "3",
791-
"rows": [{"f": [{"v": "abc"}]}],
792-
"pageToken": "next-page",
790+
"totalRows": "2",
793791
}
794792
job_resource = self._make_resource(started=True, location="EU")
795793
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
@@ -801,9 +799,9 @@ def test_result(self):
801799
query_page_resource = {
802800
# Explicitly set totalRows to be different from the initial
803801
# response to test update during iteration.
804-
"totalRows": "2",
802+
"totalRows": "1",
805803
"pageToken": None,
806-
"rows": [{"f": [{"v": "def"}]}],
804+
"rows": [{"f": [{"v": "abc"}]}],
807805
}
808806
conn = _make_connection(
809807
query_resource, query_resource_done, job_resource_done, query_page_resource
@@ -814,20 +812,19 @@ def test_result(self):
814812
result = job.result()
815813

816814
self.assertIsInstance(result, RowIterator)
817-
self.assertEqual(result.total_rows, 3)
815+
self.assertEqual(result.total_rows, 2)
818816
rows = list(result)
819-
self.assertEqual(len(rows), 2)
817+
self.assertEqual(len(rows), 1)
820818
self.assertEqual(rows[0].col1, "abc")
821-
self.assertEqual(rows[1].col1, "def")
822819
# Test that the total_rows property has changed during iteration, based
823820
# on the response from tabledata.list.
824-
self.assertEqual(result.total_rows, 2)
821+
self.assertEqual(result.total_rows, 1)
825822

826823
query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
827824
query_results_call = mock.call(
828825
method="GET",
829826
path=query_results_path,
830-
query_params={"location": "EU"},
827+
query_params={"maxResults": 0, "location": "EU"},
831828
timeout=None,
832829
)
833830
reload_call = mock.call(
@@ -842,7 +839,6 @@ def test_result(self):
842839
query_params={
843840
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
844841
"location": "EU",
845-
"pageToken": "next-page",
846842
},
847843
timeout=None,
848844
)
@@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self):
855851
"jobComplete": True,
856852
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
857853
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
858-
"totalRows": "2",
859-
"rows": [{"f": [{"v": "abc"}]}],
860-
"pageToken": "next-page",
854+
"totalRows": "1",
861855
}
862856
job_resource = self._make_resource(started=True, ended=True, location="EU")
863857
job_resource["configuration"]["query"]["destinationTable"] = {
@@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self):
866860
"tableId": "dest_table",
867861
}
868862
results_page_resource = {
869-
"totalRows": "2",
863+
"totalRows": "1",
870864
"pageToken": None,
871-
"rows": [{"f": [{"v": "def"}]}],
865+
"rows": [{"f": [{"v": "abc"}]}],
872866
}
873867
conn = _make_connection(query_resource_done, results_page_resource)
874868
client = _make_client(self.PROJECT, connection=conn)
@@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self):
877871
result = job.result()
878872

879873
rows = list(result)
880-
self.assertEqual(len(rows), 2)
874+
self.assertEqual(len(rows), 1)
881875
self.assertEqual(rows[0].col1, "abc")
882-
self.assertEqual(rows[1].col1, "def")
883876

884877
query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
885878
query_results_call = mock.call(
886879
method="GET",
887880
path=query_results_path,
888-
query_params={"location": "EU"},
881+
query_params={"maxResults": 0, "location": "EU"},
889882
timeout=None,
890883
)
891884
query_results_page_call = mock.call(
@@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self):
894887
query_params={
895888
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
896889
"location": "EU",
897-
"pageToken": "next-page",
898890
},
899891
timeout=None,
900892
)
@@ -908,12 +900,6 @@ def test_result_with_max_results(self):
908900
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
909901
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
910902
"totalRows": "5",
911-
# These rows are discarded because max_results is set.
912-
"rows": [
913-
{"f": [{"v": "xyz"}]},
914-
{"f": [{"v": "uvw"}]},
915-
{"f": [{"v": "rst"}]},
916-
],
917903
}
918904
query_page_resource = {
919905
"totalRows": "5",
@@ -939,7 +925,6 @@ def test_result_with_max_results(self):
939925
rows = list(result)
940926

941927
self.assertEqual(len(rows), 3)
942-
self.assertEqual(rows[0].col1, "abc")
943928
self.assertEqual(len(connection.api_request.call_args_list), 2)
944929
query_page_request = connection.api_request.call_args_list[1]
945930
self.assertEqual(
@@ -994,7 +979,7 @@ def test_result_w_retry(self):
994979
query_results_call = mock.call(
995980
method="GET",
996981
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
997-
query_params={"location": "asia-northeast1"},
982+
query_params={"maxResults": 0, "location": "asia-northeast1"},
998983
timeout=None,
999984
)
1000985
reload_call = mock.call(
@@ -1094,12 +1079,6 @@ def test_result_w_page_size(self):
10941079
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
10951080
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
10961081
"totalRows": "4",
1097-
# These rows are discarded because page_size is set.
1098-
"rows": [
1099-
{"f": [{"v": "xyz"}]},
1100-
{"f": [{"v": "uvw"}]},
1101-
{"f": [{"v": "rst"}]},
1102-
],
11031082
}
11041083
job_resource = self._make_resource(started=True, ended=True, location="US")
11051084
q_config = job_resource["configuration"]["query"]
@@ -1130,7 +1109,6 @@ def test_result_w_page_size(self):
11301109
# Assert
11311110
actual_rows = list(result)
11321111
self.assertEqual(len(actual_rows), 4)
1133-
self.assertEqual(actual_rows[0].col1, "row1")
11341112

11351113
query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
11361114
query_page_1_call = mock.call(
@@ -1164,12 +1142,6 @@ def test_result_with_start_index(self):
11641142
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
11651143
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
11661144
"totalRows": "5",
1167-
# These rows are discarded because start_index is set.
1168-
"rows": [
1169-
{"f": [{"v": "xyz"}]},
1170-
{"f": [{"v": "uvw"}]},
1171-
{"f": [{"v": "rst"}]},
1172-
],
11731145
}
11741146
tabledata_resource = {
11751147
"totalRows": "5",
@@ -1196,7 +1168,6 @@ def test_result_with_start_index(self):
11961168
rows = list(result)
11971169

11981170
self.assertEqual(len(rows), 4)
1199-
self.assertEqual(rows[0].col1, "abc")
12001171
self.assertEqual(len(connection.api_request.call_args_list), 2)
12011172
tabledata_list_request = connection.api_request.call_args_list[1]
12021173
self.assertEqual(

0 commit comments

Comments
 (0)