Skip to content
35 changes: 30 additions & 5 deletions databricks_cli/runs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import time
from json import loads as json_loads

import click
from tabulate import tabulate

from databricks_cli.click_types import OutputClickType, JsonClickType, RunIdClickType
from databricks_cli.jobs.cli import check_version
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, json_cli_base, \
truncate_string
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, truncate_string, \
error_and_quit
from databricks_cli.configure.config import provide_api_client, profile_option, debug_option, \
api_version_option
from databricks_cli.runs.api import RunsApi
Expand All @@ -39,21 +43,42 @@
help='File containing JSON request to POST to /api/2.*/jobs/runs/submit.')
@click.option('--json', default=None, type=JsonClickType(),
help=JsonClickType.help('/api/2.*/jobs/runs/submit'))
@click.option('--wait', is_flag=True,
help='If specified, the CLI will wait for the submitted run to complete.')
Comment thread
jerrylian-db marked this conversation as resolved.
Outdated
@api_version_option
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def submit_cli(api_client, json_file, json, version):
def submit_cli(api_client, json_file, json, wait, version):
"""
Submits a one-time run.
Comment thread
jerrylian-db marked this conversation as resolved.
Outdated

The specification for the request json can be found
https://docs.databricks.com/api/latest/jobs.html#runs-submit
"""
check_version(api_client, version)
json_cli_base(json_file, json, lambda json: RunsApi(
api_client).submit_run(json, version=version))
if json_file:
with open(json_file, 'r') as f:
json = f.read()
submit_res = RunsApi(api_client).submit_run(json_loads(json), version=version)
click.echo(pretty_format(submit_res))
Comment thread
jerrylian-db marked this conversation as resolved.
if wait:
run_id = submit_res['run_id']
completed_states = set(['TERMINATED', 'SKIPPED', 'INTERNAL_ERROR'])
# Wait for run to complete
while True:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a time-out? So it's not perpetually waiting if something goes wrong.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaning towards not having a time-out for now. I believe that the submitted run themselves have an internal timeouts in Databricks. Users can also force exit on their own.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Users can always CTRL-C

run = RunsApi(api_client).get_run(run_id, version=version)
run_state = run['state']
if run_state['life_cycle_state'] in completed_states:
Comment thread
jerrylian-db marked this conversation as resolved.
Outdated
if run_state['result_state'] == 'SUCCESS':
sys.exit(0)
else:
error_and_quit('job failed with state ' + run_state['result_state'] +
Copy link
Copy Markdown
Contributor Author

@jerrylian-db jerrylian-db Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: error_and_quit currently doesn't not echo to stderr. See definition below. Should I change it to do so?

def error_and_quit(message):
    ctx = click.get_current_context()
    context_object = ctx.ensure_object(ContextObject)
    if context_object.debug_mode:
        traceback.print_exc()
    click.echo(u'Error: {}'.format(message))
    sys.exit(1)

Comment thread
jerrylian-db marked this conversation as resolved.
Outdated
' and state message ' + run_state['state_message'])
click.echo('Job still running with lifecycle state ' + run_state['life_cycle_state'] +
Comment thread
jerrylian-db marked this conversation as resolved.
Outdated
'. URL: ' + run['run_page_url'], err=True)
time.sleep(5)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we're ok with this polling interval to start with, or if we need to add more complex backoff/jitter logic upfront. To start with, I'd prefer to keep this simple and not implement backoff logic but just hardcode a constant polling interval that the JAWS stability reviewer (@shivamdixit) is comfortable with, even if it's >5 seconds (I think e.g. up to 10s would be fine for detecting that run submitted in order to integration test a notebook succeeded/failed)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. The same is used elsewhere (e.g. dbx and your GH action).

Beware that there are uses for this beyond integration tests.

Comment thread
jerrylian-db marked this conversation as resolved.
Outdated


def _runs_to_table(runs_json):
Expand Down
51 changes: 51 additions & 0 deletions tests/runs/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@

SUBMIT_RETURN = {'run_id': 5}
SUBMIT_JSON = '{"name": "test_run"}'
RUNS_GET_RETURN_SUCCESS = {
"state": {
"life_cycle_state": "TERMINATED",
"result_state": "SUCCESS",
},
}
RUNS_GET_RETURN_FAILURE = {
"state": {
"life_cycle_state": "INTERNAL_ERROR",
"result_state": "FAILED",
"state_message": "OH NO!",
},
}
RUNS_GET_RETURN_RUNNING = {
"state": {
"life_cycle_state": "RUNNING",
},
"run_page_url": "https://www.google.com"
}


@pytest.fixture()
Expand All @@ -55,6 +74,38 @@ def test_submit_cli_json(runs_api_mock):
SUBMIT_JSON)
assert echo_mock.call_args[0][0] == pretty_format(SUBMIT_RETURN)

@provide_conf
def test_submit_wait_success(runs_api_mock):
runs_api_mock.submit_run.return_value = SUBMIT_RETURN
runs_api_mock.get_run.return_value = RUNS_GET_RETURN_SUCCESS
runner = CliRunner()
result = runner.invoke(cli.submit_cli, ['--json', SUBMIT_JSON, '--wait'])
runs_api_mock.get_run.assert_called_once()
assert result.exit_code == 0

@provide_conf
def test_submit_wait_failure(runs_api_mock):
Comment thread
jerrylian-db marked this conversation as resolved.
Outdated
with mock.patch('click.echo') as echo_mock:
runs_api_mock.submit_run.return_value = SUBMIT_RETURN
runs_api_mock.get_run.return_value = RUNS_GET_RETURN_FAILURE
runner = CliRunner()
result = runner.invoke(cli.submit_cli, ['--json', SUBMIT_JSON, '--wait'])
assert 'job failed with state FAILED and state message OH NO!' in echo_mock.call_args[0][0]
assert result.exit_code == 1

@provide_conf
def test_submit_wait_eventually_succeeds(runs_api_mock):
with mock.patch('databricks_cli.runs.cli.click.echo') as echo_mock, \
mock.patch('time.sleep') as sleep_mock:
runs_api_mock.submit_run.return_value = SUBMIT_RETURN
runs_api_mock.get_run.side_effect = [RUNS_GET_RETURN_RUNNING, RUNS_GET_RETURN_SUCCESS]
Comment thread
jerrylian-db marked this conversation as resolved.
Outdated
runner = CliRunner()
result = runner.invoke(cli.submit_cli, ['--json', SUBMIT_JSON, '--wait'])
assert echo_mock.call_args[0][0] == 'Job still running with lifecycle state ' + \
'RUNNING. URL: https://www.google.com'
sleep_mock.assert_called_once()
assert result.exit_code == 0


RUN_PAGE_URL = 'https://databricks.com/#job/1/run/1'
LIST_RETURN = {
Expand Down