Content-Length: 633448 | pFad | https://github.com/apache/airflow/commit/a7e4266d675d5283cdd34c6451c8ef0f2858a501

3C Refactor GoogleDriveToGCSOperator to use common methods (#14276) · apache/airflow@a7e4266 · GitHub
Skip to content

Commit a7e4266

Browse files
authored
Refactor GoogleDriveToGCSOperator to use common methods (#14276)
Refactor GoogleDriveToGCSOperator to use common methods implemented in hooks used by this operator.
1 parent 82cb041 commit a7e4266

File tree

3 files changed

+43
-86
lines changed

3 files changed

+43
-86
lines changed

airflow/providers/google/cloud/transfers/gdrive_to_gcs.py

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from io import BytesIO
18+
import warnings
1919
from typing import Optional, Sequence, Union
2020

2121
from airflow.models import BaseOperator
@@ -32,11 +32,15 @@ class GoogleDriveToGCSOperator(BaseOperator):
3232
For more information on how to use this operator, take a look at the guide:
3333
:ref:`howto/operator:GoogleDriveToGCSOperator`
3434
35-
:param destination_bucket: The destination Google cloud storage bucket where the
35+
:param bucket_name: The destination Google cloud storage bucket where the
3636
file should be written to
37-
:type destination_bucket: str
38-
:param destination_object: The Google Cloud Storage object name for the object created by the operator.
37+
:type bucket_name: str
38+
:param object_name: The Google Cloud Storage object name for the object created by the operator.
3939
For example: ``path/to/my/file/file.txt``.
40+
:type object_name: str
41+
:param destination_bucket: Same as bucket_name, but for backward compatibly
42+
:type destination_bucket: str
43+
:param destination_object: Same as object_name, but for backward compatibly
4044
:type destination_object: str
4145
:param folder_id: The folder id of the folder in which the Google Drive file resides
4246
:type folder_id: str
@@ -62,6 +66,8 @@ class GoogleDriveToGCSOperator(BaseOperator):
6266
"""
6367

6468
template_fields = [
69+
"bucket_name",
70+
"object_name",
6571
"destination_bucket",
6672
"destination_object",
6773
"folder_id",
@@ -74,8 +80,10 @@ class GoogleDriveToGCSOperator(BaseOperator):
7480
def __init__(
7581
self,
7682
*,
77-
destination_bucket: str,
78-
destination_object: str,
83+
bucket_name: Optional[str] = None,
84+
object_name: Optional[str] = None,
85+
destination_bucket: Optional[str] = None, # deprecated
86+
destination_object: Optional[str] = None, # deprecated
7987
file_name: str,
8088
folder_id: str,
8189
drive_id: Optional[str] = None,
@@ -85,38 +93,18 @@ def __init__(
8593
**kwargs,
8694
) -> None:
8795
super().__init__(**kwargs)
88-
self.destination_bucket = destination_bucket
89-
self.destination_object = destination_object
96+
self.bucket_name = destination_bucket or bucket_name
97+
if destination_bucket:
98+
warnings.warn("`destination_bucket` is deprecated please use `bucket_name`", DeprecationWarning)
99+
self.object_name = destination_object or object_name
100+
if destination_object:
101+
warnings.warn("`destination_object` is deprecated please use `object_name`", DeprecationWarning)
90102
self.folder_id = folder_id
91103
self.drive_id = drive_id
92104
self.file_name = file_name
93105
self.gcp_conn_id = gcp_conn_id
94106
self.delegate_to = delegate_to
95107
self.impersonation_chain = impersonation_chain
96-
self.file_metadata = None
97-
98-
def _set_file_metadata(self, gdrive_hook):
99-
if not self.file_metadata:
100-
self.file_metadata = gdrive_hook.get_file_id(
101-
folder_id=self.folder_id, file_name=self.file_name, drive_id=self.drive_id
102-
)
103-
return self.file_metadata
104-
105-
def _upload_data(self, gcs_hook: GCSHook, gdrive_hook: GoogleDriveHook) -> str:
106-
file_handle = BytesIO()
107-
self._set_file_metadata(gdrive_hook=gdrive_hook)
108-
file_id = self.file_metadata["id"]
109-
mime_type = self.file_metadata["mime_type"]
110-
request = gdrive_hook.get_media_request(file_id=file_id)
111-
gdrive_hook.download_content_from_request(
112-
file_handle=file_handle, request=request, chunk_size=104857600
113-
)
114-
gcs_hook.upload(
115-
bucket_name=self.destination_bucket,
116-
object_name=self.destination_object,
117-
data=file_handle.getvalue(),
118-
mime_type=mime_type,
119-
)
120108

121109
def execute(self, context):
122110
gdrive_hook = GoogleDriveHook(
@@ -129,4 +117,10 @@ def execute(self, context):
129117
delegate_to=self.delegate_to,
130118
impersonation_chain=self.impersonation_chain,
131119
)
132-
self._upload_data(gdrive_hook=gdrive_hook, gcs_hook=gcs_hook)
120+
file_metadata = gdrive_hook.get_file_id(
121+
folder_id=self.folder_id, file_name=self.file_name, drive_id=self.drive_id
122+
)
123+
with gcs_hook.provide_file_and_upload(
124+
bucket_name=self.bucket_name, object_name=self.object_name
125+
) as file:
126+
gdrive_hook.download_file(file_id=file_metadata["id"], file_handle=file)

tests/providers/google/cloud/transfers/test_gdrive_to_gcs.py

Lines changed: 12 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -30,48 +30,9 @@
3030

3131

3232
class TestGoogleDriveToGCSOperator:
33-
@mock.patch("airflow.providers.google.cloud.transfers.gdrive_to_gcs.BytesIO")
34-
@mock.patch("airflow.providers.google.cloud.transfers.gdrive_to_gcs.GoogleDriveHook")
35-
def test_upload_data(self, mock_gdrive_hook, mock_file_handle):
36-
mock_gdrive_hook.return_value.get_media_request.return_value = mock.MagicMock()
37-
38-
file_id = mock_gdrive_hook.get_file_id.return_value["id"]
39-
mime_type = mock_gdrive_hook.get_file_id.return_value["mime_type"]
40-
41-
mock_gcs_hook = mock.Mock()
42-
43-
op = GoogleDriveToGCSOperator(
44-
task_id="test_task",
45-
folder_id=FOLDER_ID,
46-
file_name=FILE_NAME,
47-
drive_id=DRIVE_ID,
48-
destination_bucket=BUCKET,
49-
destination_object=OBJECT,
50-
)
51-
52-
op._upload_data(
53-
gcs_hook=mock_gcs_hook,
54-
gdrive_hook=mock_gdrive_hook,
55-
)
56-
# Test writing to file
57-
mock_gdrive_hook.get_media_request.assert_called_once_with(file_id=file_id)
58-
mock_gdrive_hook.download_content_from_request.assert_called_once_with(
59-
file_handle=mock_file_handle(),
60-
request=mock_gdrive_hook.get_media_request.return_value,
61-
chunk_size=104857600,
62-
)
63-
64-
# Test upload
65-
mock_gcs_hook.upload.assert_called_once_with(
66-
bucket_name=BUCKET, object_name=OBJECT, data=mock_file_handle().getvalue(), mime_type=mime_type
67-
)
68-
6933
@mock.patch("airflow.providers.google.cloud.transfers.gdrive_to_gcs.GCSHook")
7034
@mock.patch("airflow.providers.google.cloud.transfers.gdrive_to_gcs.GoogleDriveHook")
71-
@mock.patch(
72-
"airflow.providers.google.cloud.transfers.gdrive_to_gcs.GoogleDriveToGCSOperator._upload_data"
73-
)
74-
def test_execute(self, mock_upload_data, mock_gdrive_hook, mock_gcs_hook):
35+
def test_execute(self, mock_gdrive_hook, mock_gcs_hook):
7536
context = {}
7637
op = GoogleDriveToGCSOperator(
7738
task_id="test_task",
@@ -83,15 +44,18 @@ def test_execute(self, mock_upload_data, mock_gdrive_hook, mock_gcs_hook):
8344
gcp_conn_id=GCP_CONN_ID,
8445
impersonation_chain=IMPERSONATION_CHAIN,
8546
)
47+
meta = {"id": "123xyz"}
48+
mock_gdrive_hook.return_value.get_file_id.return_value = meta
49+
8650
op.execute(context)
51+
mock_gdrive_hook.return_value.get_file_id.assert_called_once_with(
52+
folder_id=FOLDER_ID, file_name=FILE_NAME, drive_id=DRIVE_ID
53+
)
8754

88-
mock_gdrive_hook.assert_called_once_with(
89-
gcp_conn_id=GCP_CONN_ID,
90-
delegate_to=None,
91-
impersonation_chain=IMPERSONATION_CHAIN,
55+
mock_gdrive_hook.return_value.download_file.assert_called_once_with(
56+
file_id=meta["id"], file_handle=mock.ANY
9257
)
93-
mock_gcs_hook.assert_called_once_with(
94-
gcp_conn_id=GCP_CONN_ID,
95-
delegate_to=None,
96-
impersonation_chain=IMPERSONATION_CHAIN,
58+
59+
mock_gcs_hook.return_value.provide_file_and_upload.assert_called_once_with(
60+
bucket_name=BUCKET, object_name=OBJECT
9761
)

tests/providers/google/cloud/transfers/test_gdrive_to_local.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,12 @@ def test_execute(self, hook_mock):
3535
file_name=FILE_NAME,
3636
output_file=temp_file.name,
3737
)
38+
meta = {"id": "123xyz"}
39+
hook_mock.return_value.get_file_id.return_value = meta
40+
3841
op.execute(context=None)
3942
hook_mock.assert_called_once_with(delegate_to=None, impersonation_chain=None)
4043

41-
hook_mock.return_value.get_file_id.assert_called_once_with(
42-
folder_id=FOLDER_ID, file_name=FILE_NAME, drive_id=None
43-
)
44-
4544
hook_mock.return_value.download_file.assert_called_once_with(
46-
file_id=mock.ANY, file_handle=mock.ANY
45+
file_id=meta["id"], file_handle=mock.ANY
4746
)

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/a7e4266d675d5283cdd34c6451c8ef0f2858a501

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy