Skip to content

Commit d80fef4

Browse files
jroachgolf84mangal-vairalkar
authored andcommitted
Add create_intermediate_dirs option to GCSToSFTPOperator (apache#54916)
* Adding create_intermediate_dirs to determine if intemediate directories should be created * Isolating tests for create_intermediate_dirs
1 parent 3e47eae commit d80fef4

2 files changed

Lines changed: 54 additions & 2 deletions

File tree

providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ class GCSToSFTPOperator(BaseOperator):
8080
:param destination_path: The sftp remote path. This is the specified directory path for
8181
uploading to the SFTP server.
8282
:param keep_directory_structure: (Optional) When set to False the path of the file
83-
on the bucket is recreated within path passed in destination_path.
83+
on the bucket is recreated within path passed in destination_path.
84+
:param create_intermediate_dirs: (Optional) When set to True the intermediate directories
85+
in the specified file path will be created.
8486
:param move_object: When move object is True, the object is moved instead
8587
of copied to the new location. This is the equivalent of a mv command
8688
as opposed to a cp command.
@@ -112,6 +114,7 @@ def __init__(
112114
source_object: str,
113115
destination_path: str,
114116
keep_directory_structure: bool = True,
117+
create_intermediate_dirs: bool = True,
115118
move_object: bool = False,
116119
gcp_conn_id: str = "google_cloud_default",
117120
sftp_conn_id: str = "ssh_default",
@@ -124,6 +127,7 @@ def __init__(
124127
self.source_object = source_object
125128
self.destination_path = destination_path
126129
self.keep_directory_structure = keep_directory_structure
130+
self.create_intermediate_dirs = create_intermediate_dirs
127131
self.move_object = move_object
128132
self.gcp_conn_id = gcp_conn_id
129133
self.sftp_conn_id = sftp_conn_id
@@ -190,7 +194,9 @@ def _copy_single_object(
190194
)
191195

192196
dir_path = os.path.dirname(destination_path)
193-
sftp_hook.create_directory(dir_path)
197+
198+
if self.create_intermediate_dirs:
199+
sftp_hook.create_directory(dir_path)
194200

195201
with NamedTemporaryFile("w") as tmp:
196202
gcs_hook.download(

providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,3 +400,49 @@ def test_get_openlineage_facets(
400400
assert result.inputs[0].name == expected_source
401401
assert result.outputs[0].namespace == "file://11.222.33.44:22"
402402
assert result.outputs[0].name == expected_destination
403+
404+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSHook")
405+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.SFTPHook")
406+
def test_create_intermediate_dirs_true(self, sftp_hook_mock, gcp_hook_mock):
407+
task = GCSToSFTPOperator(
408+
task_id=TASK_ID,
409+
source_bucket=TEST_BUCKET,
410+
source_object="folder/test_object.txt", # Hard-coding
411+
destination_path=DESTINATION_SFTP,
412+
keep_directory_structure=True, # Hard-coding
413+
create_intermediate_dirs=True,
414+
move_object=False,
415+
gcp_conn_id=GCP_CONN_ID,
416+
sftp_conn_id=SFTP_CONN_ID,
417+
impersonation_chain=IMPERSONATION_CHAIN,
418+
)
419+
420+
assert task.create_intermediate_dirs
421+
422+
task.execute(None)
423+
424+
sftp_hook_mock.return_value.create_directory.assert_called_once_with(
425+
os.path.join(DESTINATION_SFTP, "folder")
426+
)
427+
428+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSHook")
429+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.SFTPHook")
430+
def test_create_intermediate_dirs_false(self, sftp_hook_mock, gcp_hook_mock):
431+
task = GCSToSFTPOperator(
432+
task_id=TASK_ID,
433+
source_bucket=TEST_BUCKET,
434+
source_object="folder/test_object.txt", # Hard-coding
435+
destination_path=DESTINATION_SFTP,
436+
keep_directory_structure=True, # Hard-coding
437+
create_intermediate_dirs=False,
438+
move_object=False,
439+
gcp_conn_id=GCP_CONN_ID,
440+
sftp_conn_id=SFTP_CONN_ID,
441+
impersonation_chain=IMPERSONATION_CHAIN,
442+
)
443+
444+
assert not task.create_intermediate_dirs
445+
446+
task.execute(None)
447+
448+
sftp_hook_mock.return_value.create_directory.assert_not_called()

0 commit comments

Comments
 (0)