Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ def rank_error(msg):
state_update_callback(response.currentState)
_LOGGER.info('Job %s is in state %s', job_id, response.currentState)
last_job_state = response.currentState
if str(response.currentState) != 'JOB_STATE_RUNNING':
if str(response.currentState) not in ('JOB_STATE_RUNNING',
'JOB_STATE_PAUSED',
'JOB_STATE_PAUSING'):
# Stop checking for new messages on timeout, explanatory
# message received, success, or a terminal job state caused
# by the user that therefore doesn't require explanation.
Expand Down Expand Up @@ -751,6 +753,8 @@ def api_jobstate_to_pipeline_state(api_jobstate):
values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState.
RESOURCE_CLEANING_UP,
values_enum.JOB_STATE_PAUSING: PipelineState.PAUSING,
values_enum.JOB_STATE_PAUSED: PipelineState.PAUSED,
})

return (
Expand Down
42 changes: 42 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ def get_job_side_effect(*args, **kwargs):
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.RUNNING)

with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
duration_timedout_runner = MockDataflowRunner(
[values_enum.JOB_STATE_PAUSING])
duration_timedout_result = DataflowPipelineResult(
duration_timedout_runner.job, duration_timedout_runner, options)
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.PAUSING)

with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
duration_timedout_runner = MockDataflowRunner(
[values_enum.JOB_STATE_PAUSED])
duration_timedout_result = DataflowPipelineResult(
duration_timedout_runner.job, duration_timedout_runner, options)
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.PAUSED)

with mock.patch('time.time', mock.MagicMock(side_effect=[1, 1, 2, 2, 3])):
with self.assertRaisesRegex(DataflowRuntimeException,
'Dataflow pipeline failed. State: CANCELLED'):
Expand Down Expand Up @@ -239,6 +255,32 @@ def __init__(self, state, cancel_result):
terminal_runner.job, terminal_runner, options)
terminal_result.cancel()

def test_api_jobstate_to_pipeline_state(self):
values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
expected_mappings = [
(values_enum.JOB_STATE_UNKNOWN, PipelineState.UNKNOWN),
(values_enum.JOB_STATE_STOPPED, PipelineState.STOPPED),
(values_enum.JOB_STATE_RUNNING, PipelineState.RUNNING),
(values_enum.JOB_STATE_DONE, PipelineState.DONE),
(values_enum.JOB_STATE_FAILED, PipelineState.FAILED),
(values_enum.JOB_STATE_CANCELLED, PipelineState.CANCELLED),
(values_enum.JOB_STATE_UPDATED, PipelineState.UPDATED),
(values_enum.JOB_STATE_DRAINING, PipelineState.DRAINING),
(values_enum.JOB_STATE_DRAINED, PipelineState.DRAINED),
(values_enum.JOB_STATE_PENDING, PipelineState.PENDING),
(values_enum.JOB_STATE_CANCELLING, PipelineState.CANCELLING),
(
values_enum.JOB_STATE_RESOURCE_CLEANING_UP,
PipelineState.RESOURCE_CLEANING_UP),
(values_enum.JOB_STATE_PAUSING, PipelineState.PAUSING),
(values_enum.JOB_STATE_PAUSED, PipelineState.PAUSED),
]

for api_state, pipeline_state in expected_mappings:
self.assertEqual(
DataflowPipelineResult.api_jobstate_to_pipeline_state(api_state),
pipeline_state)

def test_create_runner(self):
self.assertTrue(isinstance(create_runner('DataflowRunner'), DataflowRunner))
self.assertTrue(
Expand Down
Loading
Loading