15
15
# specific language governing permissions and limitations
16
16
# under the License.
17
17
18
- from io import BytesIO
18
+ import warnings
19
19
from typing import Optional , Sequence , Union
20
20
21
21
from airflow .models import BaseOperator
@@ -32,11 +32,15 @@ class GoogleDriveToGCSOperator(BaseOperator):
32
32
For more information on how to use this operator, take a look at the guide:
33
33
:ref:`howto/operator:GoogleDriveToGCSOperator`
34
34
35
- :param destination_bucket : The destination Google cloud storage bucket where the
35
+ :param bucket_name : The destination Google cloud storage bucket where the
36
36
file should be written to
37
- :type destination_bucket : str
38
- :param destination_object : The Google Cloud Storage object name for the object created by the operator.
37
+ :type bucket_name : str
38
+ :param object_name : The Google Cloud Storage object name for the object created by the operator.
39
39
For example: ``path/to/my/file/file.txt``.
40
+ :type object_name: str
41
+ :param destination_bucket: Same as bucket_name, but for backward compatibly
42
+ :type destination_bucket: str
43
+ :param destination_object: Same as object_name, but for backward compatibly
40
44
:type destination_object: str
41
45
:param folder_id: The folder id of the folder in which the Google Drive file resides
42
46
:type folder_id: str
@@ -62,6 +66,8 @@ class GoogleDriveToGCSOperator(BaseOperator):
62
66
"""
63
67
64
68
template_fields = [
69
+ "bucket_name" ,
70
+ "object_name" ,
65
71
"destination_bucket" ,
66
72
"destination_object" ,
67
73
"folder_id" ,
@@ -74,8 +80,10 @@ class GoogleDriveToGCSOperator(BaseOperator):
74
80
def __init__ (
75
81
self ,
76
82
* ,
77
- destination_bucket : str ,
78
- destination_object : str ,
83
+ bucket_name : Optional [str ] = None ,
84
+ object_name : Optional [str ] = None ,
85
+ destination_bucket : Optional [str ] = None , # deprecated
86
+ destination_object : Optional [str ] = None , # deprecated
79
87
file_name : str ,
80
88
folder_id : str ,
81
89
drive_id : Optional [str ] = None ,
@@ -85,38 +93,18 @@ def __init__(
85
93
** kwargs ,
86
94
) -> None :
87
95
super ().__init__ (** kwargs )
88
- self .destination_bucket = destination_bucket
89
- self .destination_object = destination_object
96
+ self .bucket_name = destination_bucket or bucket_name
97
+ if destination_bucket :
98
+ warnings .warn ("`destination_bucket` is deprecated please use `bucket_name`" , DeprecationWarning )
99
+ self .object_name = destination_object or object_name
100
+ if destination_object :
101
+ warnings .warn ("`destination_object` is deprecated please use `object_name`" , DeprecationWarning )
90
102
self .folder_id = folder_id
91
103
self .drive_id = drive_id
92
104
self .file_name = file_name
93
105
self .gcp_conn_id = gcp_conn_id
94
106
self .delegate_to = delegate_to
95
107
self .impersonation_chain = impersonation_chain
96
- self .file_metadata = None
97
-
98
- def _set_file_metadata (self , gdrive_hook ):
99
- if not self .file_metadata :
100
- self .file_metadata = gdrive_hook .get_file_id (
101
- folder_id = self .folder_id , file_name = self .file_name , drive_id = self .drive_id
102
- )
103
- return self .file_metadata
104
-
105
- def _upload_data (self , gcs_hook : GCSHook , gdrive_hook : GoogleDriveHook ) -> str :
106
- file_handle = BytesIO ()
107
- self ._set_file_metadata (gdrive_hook = gdrive_hook )
108
- file_id = self .file_metadata ["id" ]
109
- mime_type = self .file_metadata ["mime_type" ]
110
- request = gdrive_hook .get_media_request (file_id = file_id )
111
- gdrive_hook .download_content_from_request (
112
- file_handle = file_handle , request = request , chunk_size = 104857600
113
- )
114
- gcs_hook .upload (
115
- bucket_name = self .destination_bucket ,
116
- object_name = self .destination_object ,
117
- data = file_handle .getvalue (),
118
- mime_type = mime_type ,
119
- )
120
108
121
109
def execute (self , context ):
122
110
gdrive_hook = GoogleDriveHook (
@@ -129,4 +117,10 @@ def execute(self, context):
129
117
delegate_to = self .delegate_to ,
130
118
impersonation_chain = self .impersonation_chain ,
131
119
)
132
- self ._upload_data (gdrive_hook = gdrive_hook , gcs_hook = gcs_hook )
120
+ file_metadata = gdrive_hook .get_file_id (
121
+ folder_id = self .folder_id , file_name = self .file_name , drive_id = self .drive_id
122
+ )
123
+ with gcs_hook .provide_file_and_upload (
124
+ bucket_name = self .bucket_name , object_name = self .object_name
125
+ ) as file :
126
+ gdrive_hook .download_file (file_id = file_metadata ["id" ], file_handle = file )
0 commit comments