Skip to content

Commit 34c5939

Browse files
committed
Merge pull request #2029 from dwmclary/add_udf_support.
Add UDFResource support for QueryJob / Query.
2 parents 565c992 + 2a53ae1 commit 34c5939

File tree

5 files changed

+270
-5
lines changed

5 files changed

+270
-5
lines changed

gcloud/bigquery/job.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,58 @@
2727
from gcloud.bigquery._helpers import _TypedProperty
2828

2929

30+
class UDFResource(object):
31+
"""Describe a single user-defined function (UDF) resource.
32+
:type udf_type: str
33+
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')
34+
35+
:type value: str
36+
:param value: the inline code or resource URI
37+
38+
See
39+
https://cloud.google.com/bigquery/user-defined-functions#api
40+
"""
41+
def __init__(self, udf_type, value):
42+
self.udf_type = udf_type
43+
self.value = value
44+
45+
def __eq__(self, other):
46+
return(
47+
self.udf_type == other.udf_type and
48+
self.value == other.value)
49+
50+
51+
def _build_udf_resources(resources):
52+
"""
53+
:type resources: sequence of :class:`UDFResource`
54+
:param resources: fields to be appended
55+
56+
:rtype: mapping
57+
:returns: a mapping describing userDefinedFunctionResources for the query.
58+
"""
59+
udfs = []
60+
for resource in resources:
61+
udf = {resource.udf_type: resource.value}
62+
udfs.append(udf)
63+
return udfs
64+
65+
66+
class UDFResourcesProperty(object):
67+
"""Custom property type for :class:`QueryJob` / :class:`.query.Query`."""
68+
69+
def __get__(self, instance, owner):
70+
"""Descriptor protocal: accesstor"""
71+
if instance is None:
72+
return self
73+
return list(instance._udf_resources)
74+
75+
def __set__(self, instance, value):
76+
"""Descriptor protocal: mutator"""
77+
if not all(isinstance(u, UDFResource) for u in value):
78+
raise ValueError("udf items must be UDFResource")
79+
instance._udf_resources = tuple(value)
80+
81+
3082
class Compression(_EnumProperty):
3183
"""Pseudo-enum for ``compression`` properties."""
3284
GZIP = 'GZIP'
@@ -888,12 +940,19 @@ class QueryJob(_AsyncJob):
888940
:type client: :class:`gcloud.bigquery.client.Client`
889941
:param client: A client which holds credentials and project configuration
890942
for the dataset (which requires a project).
943+
944+
:type udf_resources: tuple
945+
:param udf_resources: An iterable of
946+
:class:`gcloud.bigquery.job.UDFResource`
947+
(empty by default)
891948
"""
892949
_JOB_TYPE = 'query'
950+
_UDF_KEY = 'userDefinedFunctionResources'
893951

894-
def __init__(self, name, query, client):
952+
def __init__(self, name, query, client, udf_resources=()):
895953
super(QueryJob, self).__init__(name, client)
896954
self.query = query
955+
self.udf_resources = udf_resources
897956
self._configuration = _AsyncQueryConfiguration()
898957

899958
allow_large_results = _TypedProperty('allow_large_results', bool)
@@ -926,6 +985,8 @@ def __init__(self, name, query, client):
926985
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority
927986
"""
928987

988+
udf_resources = UDFResourcesProperty()
989+
929990
use_query_cache = _TypedProperty('use_query_cache', bool)
930991
"""See:
931992
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.useQueryCache
@@ -979,6 +1040,9 @@ def _populate_config_resource(self, configuration):
9791040
configuration['useLegacySql'] = self.use_legacy_sql
9801041
if self.write_disposition is not None:
9811042
configuration['writeDisposition'] = self.write_disposition
1043+
if len(self._udf_resources) > 0:
1044+
configuration[self._UDF_KEY] = _build_udf_resources(
1045+
self._udf_resources)
9821046

9831047
def _build_resource(self):
9841048
"""Generate a resource for :meth:`begin`."""

gcloud/bigquery/query.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from gcloud.bigquery._helpers import _rows_from_json
2121
from gcloud.bigquery.dataset import Dataset
2222
from gcloud.bigquery.job import QueryJob
23+
from gcloud.bigquery.job import UDFResourcesProperty
24+
from gcloud.bigquery.job import _build_udf_resources
2325
from gcloud.bigquery.table import _parse_schema_resource
2426

2527

@@ -46,12 +48,21 @@ class QueryResults(object):
4648
:type client: :class:`gcloud.bigquery.client.Client`
4749
:param client: A client which holds credentials and project configuration
4850
for the dataset (which requires a project).
51+
52+
:type udf_resources: tuple
53+
:param udf_resources: An iterable of
54+
:class:`gcloud.bigquery.job.UDFResource`
55+
(empty by default)
4956
"""
50-
def __init__(self, query, client):
57+
58+
_UDF_KEY = 'userDefinedFunctionResources'
59+
60+
def __init__(self, query, client, udf_resources=()):
5161
self._client = client
5262
self._properties = {}
5363
self.query = query
5464
self._configuration = _SyncQueryConfiguration()
65+
self.udf_resources = udf_resources
5566
self._job = None
5667

5768
@property
@@ -229,6 +240,8 @@ def schema(self):
229240
https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#timeoutMs
230241
"""
231242

243+
udf_resources = UDFResourcesProperty()
244+
232245
use_query_cache = _TypedProperty('use_query_cache', bool)
233246
"""See:
234247
https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#useQueryCache
@@ -277,6 +290,9 @@ def _build_resource(self):
277290
if self.dry_run is not None:
278291
resource['dryRun'] = self.dry_run
279292

293+
if len(self._udf_resources) > 0:
294+
resource[self._UDF_KEY] = _build_udf_resources(self._udf_resources)
295+
280296
return resource
281297

282298
def run(self, client=None):

gcloud/bigquery/test_job.py

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,76 @@
1515
import unittest2
1616

1717

18+
class Test_UDFResourcesProperty(unittest2.TestCase):
19+
20+
def _getTargetClass(self):
21+
from gcloud.bigquery.job import UDFResourcesProperty
22+
return UDFResourcesProperty
23+
24+
def _makeOne(self, *args, **kw):
25+
return self._getTargetClass()(*args, **kw)
26+
27+
def _descriptor_and_klass(self):
28+
descriptor = self._makeOne()
29+
30+
class _Test(object):
31+
_udf_resources = ()
32+
udf_resources = descriptor
33+
34+
return descriptor, _Test
35+
36+
def test_class_getter(self):
37+
descriptor, klass = self._descriptor_and_klass()
38+
self.assertTrue(klass.udf_resources is descriptor)
39+
40+
def test_instance_getter_empty(self):
41+
_, klass = self._descriptor_and_klass()
42+
instance = klass()
43+
self.assertEqual(instance.udf_resources, [])
44+
45+
def test_instance_getter_w_non_empty_list(self):
46+
from gcloud.bigquery.job import UDFResource
47+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
48+
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
49+
_, klass = self._descriptor_and_klass()
50+
instance = klass()
51+
instance._udf_resources = tuple(udf_resources)
52+
53+
self.assertEqual(instance.udf_resources, udf_resources)
54+
55+
def test_instance_setter_w_empty_list(self):
56+
from gcloud.bigquery.job import UDFResource
57+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
58+
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
59+
_, klass = self._descriptor_and_klass()
60+
instance = klass()
61+
instance._udf_resources = udf_resources
62+
63+
instance.udf_resources = []
64+
65+
self.assertEqual(instance.udf_resources, [])
66+
67+
def test_instance_setter_w_valid_udf(self):
68+
from gcloud.bigquery.job import UDFResource
69+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
70+
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
71+
_, klass = self._descriptor_and_klass()
72+
instance = klass()
73+
74+
instance.udf_resources = udf_resources
75+
76+
self.assertEqual(instance.udf_resources, udf_resources)
77+
78+
def test_instance_setter_w_bad_udfs(self):
79+
_, klass = self._descriptor_and_klass()
80+
instance = klass()
81+
82+
with self.assertRaises(ValueError):
83+
instance.udf_resources = ["foo"]
84+
85+
self.assertEqual(instance.udf_resources, [])
86+
87+
1888
class _Base(object):
1989
PROJECT = 'project'
2090
SOURCE1 = 'http://example.com/source1.csv'
@@ -1384,7 +1454,7 @@ def test_begin_w_bound_client(self):
13841454
job = self._makeOne(self.JOB_NAME, self.QUERY, client)
13851455

13861456
job.begin()
1387-
1457+
self.assertEqual(job.udf_resources, [])
13881458
self.assertEqual(len(conn._requested), 1)
13891459
req = conn._requested[0]
13901460
self.assertEqual(req['method'], 'POST')
@@ -1396,7 +1466,7 @@ def test_begin_w_bound_client(self):
13961466
},
13971467
'configuration': {
13981468
'query': {
1399-
'query': self.QUERY,
1469+
'query': self.QUERY
14001470
},
14011471
},
14021472
}
@@ -1468,6 +1538,47 @@ def test_begin_w_alternate_client(self):
14681538
self.assertEqual(req['data'], SENT)
14691539
self._verifyResourceProperties(job, RESOURCE)
14701540

1541+
def test_begin_w_bound_client_and_udf(self):
1542+
from gcloud.bigquery.job import UDFResource
1543+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
1544+
PATH = 'projects/%s/jobs' % self.PROJECT
1545+
RESOURCE = self._makeResource()
1546+
# Ensure None for missing server-set props
1547+
del RESOURCE['statistics']['creationTime']
1548+
del RESOURCE['etag']
1549+
del RESOURCE['selfLink']
1550+
del RESOURCE['user_email']
1551+
conn = _Connection(RESOURCE)
1552+
client = _Client(project=self.PROJECT, connection=conn)
1553+
job = self._makeOne(self.JOB_NAME, self.QUERY, client,
1554+
udf_resources=[
1555+
UDFResource("resourceUri", RESOURCE_URI)
1556+
])
1557+
1558+
job.begin()
1559+
1560+
self.assertEqual(len(conn._requested), 1)
1561+
req = conn._requested[0]
1562+
self.assertEqual(req['method'], 'POST')
1563+
self.assertEqual(req['path'], '/%s' % PATH)
1564+
self.assertEqual(job.udf_resources,
1565+
[UDFResource("resourceUri", RESOURCE_URI)])
1566+
SENT = {
1567+
'jobReference': {
1568+
'projectId': self.PROJECT,
1569+
'jobId': self.JOB_NAME,
1570+
},
1571+
'configuration': {
1572+
'query': {
1573+
'query': self.QUERY,
1574+
'userDefinedFunctionResources':
1575+
[{'resourceUri': RESOURCE_URI}]
1576+
},
1577+
},
1578+
}
1579+
self.assertEqual(req['data'], SENT)
1580+
self._verifyResourceProperties(job, RESOURCE)
1581+
14711582
def test_exists_miss_w_bound_client(self):
14721583
PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)
14731584
conn = _Connection()

gcloud/bigquery/test_query.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def test_run_w_bound_client(self):
181181
conn = _Connection(RESOURCE)
182182
client = _Client(project=self.PROJECT, connection=conn)
183183
query = self._makeOne(self.QUERY, client)
184-
184+
self.assertEqual(query.udf_resources, [])
185185
query.run()
186186

187187
self.assertEqual(len(conn._requested), 1)
@@ -233,6 +233,78 @@ def test_run_w_alternate_client(self):
233233
self.assertEqual(req['data'], SENT)
234234
self._verifyResourceProperties(query, RESOURCE)
235235

236+
def test_run_w_inline_udf(self):
237+
from gcloud.bigquery.query import UDFResource
238+
INLINE_UDF_CODE = 'var someCode = "here";'
239+
PATH = 'projects/%s/queries' % self.PROJECT
240+
RESOURCE = self._makeResource(complete=False)
241+
conn = _Connection(RESOURCE)
242+
client = _Client(project=self.PROJECT, connection=conn)
243+
query = self._makeOne(self.QUERY, client)
244+
query.udf_resources = [UDFResource("inlineCode", INLINE_UDF_CODE)]
245+
246+
query.run()
247+
248+
self.assertEqual(len(conn._requested), 1)
249+
req = conn._requested[0]
250+
self.assertEqual(req['method'], 'POST')
251+
self.assertEqual(req['path'], '/%s' % PATH)
252+
SENT = {'query': self.QUERY,
253+
'userDefinedFunctionResources':
254+
[{'inlineCode': INLINE_UDF_CODE}]}
255+
self.assertEqual(req['data'], SENT)
256+
self._verifyResourceProperties(query, RESOURCE)
257+
258+
def test_run_w_udf_resource_uri(self):
259+
from gcloud.bigquery.job import UDFResource
260+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
261+
PATH = 'projects/%s/queries' % self.PROJECT
262+
RESOURCE = self._makeResource(complete=False)
263+
conn = _Connection(RESOURCE)
264+
client = _Client(project=self.PROJECT, connection=conn)
265+
query = self._makeOne(self.QUERY, client)
266+
query.udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
267+
268+
query.run()
269+
270+
self.assertEqual(len(conn._requested), 1)
271+
req = conn._requested[0]
272+
self.assertEqual(req['method'], 'POST')
273+
self.assertEqual(req['path'], '/%s' % PATH)
274+
SENT = {'query': self.QUERY,
275+
'userDefinedFunctionResources':
276+
[{'resourceUri': RESOURCE_URI}]}
277+
self.assertEqual(req['data'], SENT)
278+
self._verifyResourceProperties(query, RESOURCE)
279+
280+
def test_run_w_mixed_udfs(self):
281+
from gcloud.bigquery.job import UDFResource
282+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
283+
INLINE_UDF_CODE = 'var someCode = "here";'
284+
PATH = 'projects/%s/queries' % self.PROJECT
285+
RESOURCE = self._makeResource(complete=False)
286+
conn = _Connection(RESOURCE)
287+
client = _Client(project=self.PROJECT, connection=conn)
288+
query = self._makeOne(self.QUERY, client)
289+
query.udf_resources = [UDFResource("resourceUri", RESOURCE_URI),
290+
UDFResource("inlineCode", INLINE_UDF_CODE)]
291+
292+
query.run()
293+
294+
self.assertEqual(len(conn._requested), 1)
295+
req = conn._requested[0]
296+
self.assertEqual(req['method'], 'POST')
297+
self.assertEqual(req['path'], '/%s' % PATH)
298+
self.assertEqual(query.udf_resources,
299+
[UDFResource("resourceUri", RESOURCE_URI),
300+
UDFResource("inlineCode", INLINE_UDF_CODE)])
301+
SENT = {'query': self.QUERY,
302+
'userDefinedFunctionResources': [
303+
{'resourceUri': RESOURCE_URI},
304+
{"inlineCode": INLINE_UDF_CODE}]}
305+
self.assertEqual(req['data'], SENT)
306+
self._verifyResourceProperties(query, RESOURCE)
307+
236308
def test_fetch_data_query_not_yet_run(self):
237309
conn = _Connection()
238310
client = _Client(project=self.PROJECT, connection=conn)

tox.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ basepython =
4444
python2.6
4545
deps =
4646
{[testing]deps}
47+
# ordereddict needed for google.protobuf, which doesn't declare it.
48+
ordereddict
4749
setenv =
4850
PYTHONPATH = {toxinidir}/_testing
4951

0 commit comments

Comments
 (0)