Skip to content

Commit dae9d9a

Browse files
authored
Add missing fields to JobsService (#614)
This change renames the argument `name_filter` to `name` to match the API.
1 parent d4909e0 commit dae9d9a

File tree

5 files changed

+65
-16
lines changed

5 files changed

+65
-16
lines changed

databricks_cli/jobs/api.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ def create_job(self, json, headers=None, version=None):
3232
version=version)
3333

3434
def list_jobs(self, job_type=None, expand_tasks=None, offset=None, limit=None, headers=None,
35-
version=None, name_filter=None):
35+
version=None, name=None):
3636
resp = self.client.list_jobs(job_type=job_type, expand_tasks=expand_tasks, offset=offset,
3737
limit=limit, headers=headers, version=version,
38-
name_filter=name_filter)
38+
name=name)
3939
if 'jobs' not in resp:
4040
resp['jobs'] = []
4141
return resp
@@ -57,6 +57,6 @@ def run_now(self, job_id, jar_params, notebook_params, python_params, spark_subm
5757
idempotency_token, headers=headers, version=version)
5858

5959
def _list_jobs_by_name(self, name, headers=None):
60-
jobs = self.list_jobs(headers=headers, name_filter=name)['jobs']
60+
jobs = self.list_jobs(headers=headers, name=name)['jobs']
6161
result = list(filter(lambda job: job['settings']['name'] == name, jobs))
6262
return result

databricks_cli/jobs/cli.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,15 @@ def _jobs_to_table(jobs_json):
130130
@click.option('--all', '_all', is_flag=True,
131131
help='Lists all jobs by executing sequential calls to the API ' +
132132
'(only available in API 2.1).')
133-
@click.option('--name', 'name_filter', default=None, type=str,
133+
@click.option('--name', 'name', default=None, type=str,
134134
help='If provided, only returns jobs that match the supplied ' +
135135
'name (only available in API 2.1).')
136136
@api_version_option
137137
@debug_option
138138
@profile_option
139139
@eat_exceptions
140140
@provide_api_client
141-
def list_cli(api_client, output, job_type, version, expand_tasks, offset, limit, _all, name_filter):
141+
def list_cli(api_client, output, job_type, version, expand_tasks, offset, limit, _all, name):
142142
"""
143143
Lists the jobs in the Databricks Job Service.
144144
@@ -154,7 +154,7 @@ def list_cli(api_client, output, job_type, version, expand_tasks, offset, limit,
154154
"""
155155
check_version(api_client, version)
156156
api_version = version or api_client.jobs_api_version
157-
using_features_only_in_21 = expand_tasks or offset or limit or _all or name_filter
157+
using_features_only_in_21 = expand_tasks or offset or limit or _all or name
158158
if api_version != '2.1' and using_features_only_in_21:
159159
click.echo(click.style('ERROR', fg='red') + ': the options --expand-tasks, ' +
160160
'--offset, --limit, --all, and --name are only available in API 2.1', err=True)
@@ -168,7 +168,7 @@ def list_cli(api_client, output, job_type, version, expand_tasks, offset, limit,
168168
while has_more:
169169
jobs_json = jobs_api.list_jobs(job_type=job_type, expand_tasks=expand_tasks,
170170
offset=offset, limit=limit, version=version,
171-
name_filter=name_filter)
171+
name=name)
172172
jobs += jobs_json['jobs'] if 'jobs' in jobs_json else []
173173
has_more = jobs_json.get('has_more', False) and _all
174174
if has_more:

databricks_cli/sdk/service.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ def create_job(
6161
access_control_list=None,
6262
pipeline_task=None,
6363
python_wheel_task=None,
64+
sql_task=None,
65+
webhook_notifications=None,
66+
continuous=None,
6467
):
6568
_data = {}
6669
if name is not None:
@@ -147,6 +150,22 @@ def create_job(
147150
raise TypeError(
148151
'Expected databricks.PythonWheelTask() or dict for field python_wheel_task'
149152
)
153+
if sql_task is not None:
154+
_data['sql_task'] = sql_task
155+
if not isinstance(sql_task, dict):
156+
raise TypeError('Expected databricks.SqlTask() or dict for field sql_task')
157+
if webhook_notifications is not None:
158+
_data['webhook_notifications'] = webhook_notifications
159+
if not isinstance(webhook_notifications, dict):
160+
raise TypeError(
161+
'Expected databricks.WebhookNotifications() or dict for field webhook_notifications'
162+
)
163+
if continuous is not None:
164+
_data['continuous'] = continuous
165+
if not isinstance(continuous, dict):
166+
raise TypeError(
167+
'Expected databricks.ContinuousSettings() or dict for field continuous'
168+
)
150169
return self.client.perform_query(
151170
'POST', '/jobs/create', data=_data, headers=headers, version=version
152171
)
@@ -172,6 +191,8 @@ def submit_run(
172191
access_control_list=None,
173192
pipeline_task=None,
174193
python_wheel_task=None,
194+
sql_task=None,
195+
webhook_notifications=None,
175196
):
176197
_data = {}
177198
if run_name is not None:
@@ -238,6 +259,16 @@ def submit_run(
238259
raise TypeError(
239260
'Expected databricks.PythonWheelTask() or dict for field python_wheel_task'
240261
)
262+
if sql_task is not None:
263+
_data['sql_task'] = sql_task
264+
if not isinstance(sql_task, dict):
265+
raise TypeError('Expected databricks.SqlTask() or dict for field sql_task')
266+
if webhook_notifications is not None:
267+
_data['webhook_notifications'] = webhook_notifications
268+
if not isinstance(webhook_notifications, dict):
269+
raise TypeError(
270+
'Expected databricks.WebhookNotifications() or dict for field webhook_notifications'
271+
)
241272
return self.client.perform_query(
242273
'POST', '/jobs/runs/submit', data=_data, headers=headers, version=version
243274
)
@@ -278,28 +309,35 @@ def delete_job(self, job_id, headers=None, version=None):
278309
'POST', '/jobs/delete', data=_data, headers=headers, version=version
279310
)
280311

281-
def get_job(self, job_id, headers=None, version=None):
312+
def get_job(self, job_id, headers=None, version=None, include_trigger_history=None):
282313
_data = {}
283314
if job_id is not None:
284315
_data['job_id'] = job_id
316+
if include_trigger_history is not None:
317+
_data['include_trigger_history'] = include_trigger_history
285318
return self.client.perform_query(
286319
'GET', '/jobs/get', data=_data, headers=headers, version=version
287320
)
288321

289322
def list_jobs(
290-
self, job_type=None, expand_tasks=None, limit=None, offset=None, headers=None, version=None, name_filter=None
323+
self,
324+
job_type=None,
325+
expand_tasks=None,
326+
limit=None,
327+
offset=None,
328+
headers=None,
329+
version=None,
330+
name=None,
291331
):
292332
_data = {}
293-
if job_type is not None:
294-
_data['job_type'] = job_type
295333
if expand_tasks is not None:
296334
_data['expand_tasks'] = expand_tasks
297335
if limit is not None:
298336
_data['limit'] = limit
299337
if offset is not None:
300338
_data['offset'] = offset
301-
if name_filter is not None:
302-
_data['name'] = name_filter
339+
if name is not None:
340+
_data['name'] = name
303341
return self.client.perform_query(
304342
'GET', '/jobs/list', data=_data, headers=headers, version=version
305343
)
@@ -359,6 +397,8 @@ def repair(
359397
version=None,
360398
dbt_commands=None,
361399
pipeline_params=None,
400+
rerun_all_failed_tasks=None,
401+
rerun_dependent_tasks=None,
362402
):
363403
_data = {}
364404
if run_id is not None:
@@ -385,6 +425,10 @@ def repair(
385425
raise TypeError(
386426
'Expected databricks.PipelineParameters() or dict for field pipeline_params'
387427
)
428+
if rerun_all_failed_tasks is not None:
429+
_data['rerun_all_failed_tasks'] = rerun_all_failed_tasks
430+
if rerun_dependent_tasks is not None:
431+
_data['rerun_dependent_tasks'] = rerun_dependent_tasks
388432
return self.client.perform_query(
389433
'POST', '/jobs/runs/repair', data=_data, headers=headers, version=version
390434
)
@@ -402,6 +446,7 @@ def list_runs(
402446
expand_tasks=None,
403447
start_time_from=None,
404448
start_time_to=None,
449+
page_token=None,
405450
):
406451
_data = {}
407452
if job_id is not None:
@@ -422,6 +467,8 @@ def list_runs(
422467
_data['start_time_from'] = start_time_from
423468
if start_time_to is not None:
424469
_data['start_time_to'] = start_time_to
470+
if page_token is not None:
471+
_data['page_token'] = page_token
425472
return self.client.perform_query(
426473
'GET', '/jobs/runs/list', data=_data, headers=headers, version=version
427474
)
@@ -452,10 +499,12 @@ def cancel_run(self, run_id, headers=None, version=None):
452499
'POST', '/jobs/runs/cancel', data=_data, headers=headers, version=version
453500
)
454501

455-
def cancel_all_runs(self, job_id, headers=None, version=None):
502+
def cancel_all_runs(self, job_id=None, headers=None, version=None, all_queued_runs=None):
456503
_data = {}
457504
if job_id is not None:
458505
_data['job_id'] = job_id
506+
if all_queued_runs is not None:
507+
_data['all_queued_runs'] = all_queued_runs
459508
return self.client.perform_query(
460509
'POST', '/jobs/runs/cancel-all', data=_data, headers=headers, version=version
461510
)

tests/jobs/test_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def test_list_jobs():
126126
'GET', '/jobs/list', data={}, headers=None, version='3.0'
127127
)
128128

129-
api.list_jobs(version='2.1', name_filter='foo')
129+
api.list_jobs(version='2.1', name='foo')
130130
api_client_mock.perform_query.assert_called_with(
131131
'GET', '/jobs/list', data={'name':'foo'}, headers=None, version='2.1'
132132
)

tests/jobs/test_cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def test_list_name(jobs_api_mock):
319319
runner = CliRunner()
320320
result = runner.invoke(cli.list_cli, ['--version=2.1', '--name', 'foo'])
321321
assert result.exit_code == 0
322-
assert jobs_api_mock.list_jobs.call_args[1]['name_filter'] == 'foo'
322+
assert jobs_api_mock.list_jobs.call_args[1]['name'] == 'foo'
323323
assert jobs_api_mock.list_jobs.call_args[1]['version'] == '2.1'
324324

325325
@provide_conf

0 commit comments

Comments
 (0)