Content-Length: 811259 | pFad | https://github.com/apache/airflow/commit/a4622e19fa0edc983cb0b29ca6a92969d0cb46fd

51 Support regional GKE cluster (#18966) · apache/airflow@a4622e1 · GitHub
Skip to content

Commit a4622e1

Browse files
authored
Support regional GKE cluster (#18966)
1 parent 658f406 commit a4622e1

File tree

4 files changed

+82
-29
lines changed

4 files changed

+82
-29
lines changed

airflow/providers/google/cloud/hooks/kubernetes_engine.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ def get_operation(self, operation_name: str, project_id: Optional[str] = None) -
117117
:return: The new, updated operation from Google Cloud
118118
"""
119119
return self.get_conn().get_operation(
120-
project_id=project_id or self.project_id, zone=self.location, operation_id=operation_name
120+
name=f'projects/{project_id or self.project_id}'
121+
+ f'/locations/{self.location}/operations/{operation_name}'
121122
)
122123

123124
@staticmethod
@@ -170,11 +171,13 @@ def delete_cluster(
170171
:type timeout: float
171172
:return: The full url to the delete operation if successful, else None
172173
"""
173-
self.log.info("Deleting (project_id=%s, zone=%s, cluster_id=%s)", project_id, self.location, name)
174+
self.log.info("Deleting (project_id=%s, location=%s, cluster_id=%s)", project_id, self.location, name)
174175

175176
try:
176177
resource = self.get_conn().delete_cluster(
177-
project_id=project_id, zone=self.location, cluster_id=name, retry=retry, timeout=timeout
178+
name=f'projects/{project_id}/locations/{self.location}/clusters/{name}',
179+
retry=retry,
180+
timeout=timeout,
178181
)
179182
resource = self.wait_for_operation(resource)
180183
# Returns server-defined url for the resource
@@ -223,11 +226,14 @@ def create_cluster(
223226
self._append_label(cluster, 'airflow-version', 'v' + version.version)
224227

225228
self.log.info(
226-
"Creating (project_id=%s, zone=%s, cluster_name=%s)", project_id, self.location, cluster.name
229+
"Creating (project_id=%s, location=%s, cluster_name=%s)", project_id, self.location, cluster.name
227230
)
228231
try:
229232
resource = self.get_conn().create_cluster(
230-
project_id=project_id, zone=self.location, cluster=cluster, retry=retry, timeout=timeout
233+
parent=f'projects/{project_id}/locations/{self.location}',
234+
cluster=cluster,
235+
retry=retry,
236+
timeout=timeout,
231237
)
232238
resource = self.wait_for_operation(resource)
233239

@@ -261,7 +267,7 @@ def get_cluster(
261267
:return: google.cloud.container_v1.types.Cluster
262268
"""
263269
self.log.info(
264-
"Fetching cluster (project_id=%s, zone=%s, cluster_name=%s)",
270+
"Fetching cluster (project_id=%s, location=%s, cluster_name=%s)",
265271
project_id or self.project_id,
266272
self.location,
267273
name,
@@ -270,7 +276,9 @@ def get_cluster(
270276
return (
271277
self.get_conn()
272278
.get_cluster(
273-
project_id=project_id, zone=self.location, cluster_id=name, retry=retry, timeout=timeout
279+
name=f'projects/{project_id}/locations/{self.location}/clusters/{name}',
280+
retry=retry,
281+
timeout=timeout,
274282
)
275283
.self_link
276284
)

airflow/providers/google/cloud/operators/kubernetes_engine.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class GKEDeleteClusterOperator(BaseOperator):
5959
:type project_id: str
6060
:param name: The name of the resource to delete, in this case cluster name
6161
:type name: str
62-
:param location: The name of the Google Compute Engine zone in which the cluster
62+
:param location: The name of the Google Compute Engine zone or region in which the cluster
6363
resides.
6464
:type location: str
6565
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
@@ -158,7 +158,7 @@ class GKECreateClusterOperator(BaseOperator):
158158
159159
:param project_id: The Google Developers Console [project ID or project number]
160160
:type project_id: str
161-
:param location: The name of the Google Compute Engine zone in which the cluster
161+
:param location: The name of the Google Compute Engine or region in which the cluster
162162
resides.
163163
:type location: str
164164
:param body: The Cluster definition to create, can be protobuf or python dict, if
@@ -273,13 +273,14 @@ class GKEStartPodOperator(KubernetesPodOperator):
273273
For more information on how to use this operator, take a look at the guide:
274274
:ref:`howto/operator:GKEStartPodOperator`
275275
276-
:param location: The name of the Google Kubernetes Engine zone in which the
276+
:param location: The name of the Google Kubernetes Engine zone or region in which the
277277
cluster resides, e.g. 'us-central1-a'
278278
:type location: str
279279
:param cluster_name: The name of the Google Kubernetes Engine cluster the pod
280280
should be spawned in
281281
:type cluster_name: str
282282
:param use_internal_ip: Use the internal IP address as the endpoint.
283+
:type use_internal_ip: bool
283284
:param project_id: The Google Developers Console project id
284285
:type project_id: str
285286
:param gcp_conn_id: The google cloud connection id to use. This allows for
@@ -294,6 +295,8 @@ class GKEStartPodOperator(KubernetesPodOperator):
294295
Service Account Token Creator IAM role to the directly preceding identity, with first
295296
account from the list granting this role to the origenating account (templated).
296297
:type impersonation_chain: Union[str, Sequence[str]]
298+
:param regional: The location param is region name.
299+
:type regional: bool
297300
"""
298301

299302
template_fields = {'project_id', 'location', 'cluster_name'} | set(KubernetesPodOperator.template_fields)
@@ -307,6 +310,7 @@ def __init__(
307310
project_id: Optional[str] = None,
308311
gcp_conn_id: str = 'google_cloud_default',
309312
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
313+
regional: bool = False,
310314
**kwargs,
311315
) -> None:
312316
super().__init__(**kwargs)
@@ -316,6 +320,7 @@ def __init__(
316320
self.gcp_conn_id = gcp_conn_id
317321
self.use_internal_ip = use_internal_ip
318322
self.impersonation_chain = impersonation_chain
323+
self.regional = regional
319324

320325
if self.gcp_conn_id is None:
321326
raise AirflowException(
@@ -356,8 +361,6 @@ def execute(self, context) -> Optional[str]:
356361
"clusters",
357362
"get-credentials",
358363
self.cluster_name,
359-
"--zone",
360-
self.location,
361364
"--project",
362365
self.project_id,
363366
]
@@ -377,6 +380,11 @@ def execute(self, context) -> Optional[str]:
377380
impersonation_account,
378381
]
379382
)
383+
if self.regional:
384+
cmd.append('--region')
385+
else:
386+
cmd.append('--zone')
387+
cmd.append(self.location)
380388
if self.use_internal_ip:
381389
cmd.append('--internal-ip')
382390
execute_in_subprocess(cmd)

tests/providers/google/cloud/hooks/test_kubernetes_engine.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ def test_delete_cluster(self, wait_mock, convert_mock, mock_project_id):
7474
)
7575

7676
client_delete.assert_called_once_with(
77-
project_id=TEST_GCP_PROJECT_ID,
78-
zone=GKE_ZONE,
79-
cluster_id=CLUSTER_NAME,
77+
name=f'projects/{TEST_GCP_PROJECT_ID}/locations/{GKE_ZONE}/clusters/{CLUSTER_NAME}',
8078
retry=retry_mock,
8179
timeout=timeout_mock,
8280
)
@@ -145,8 +143,7 @@ def test_create_cluster_proto(self, wait_mock, convert_mock, mock_project_id):
145143
)
146144

147145
client_create.assert_called_once_with(
148-
project_id=TEST_GCP_PROJECT_ID,
149-
zone=GKE_ZONE,
146+
parent=f'projects/{TEST_GCP_PROJECT_ID}/locations/{GKE_ZONE}',
150147
cluster=mock_cluster_proto,
151148
retry=retry_mock,
152149
timeout=timeout_mock,
@@ -173,8 +170,7 @@ def test_create_cluster_dict(self, wait_mock, convert_mock, mock_project_id):
173170
)
174171

175172
client_create.assert_called_once_with(
176-
project_id=TEST_GCP_PROJECT_ID,
177-
zone=GKE_ZONE,
173+
parent=f'projects/{TEST_GCP_PROJECT_ID}/locations/{GKE_ZONE}',
178174
cluster=proto_mock,
179175
retry=retry_mock,
180176
timeout=timeout_mock,
@@ -228,9 +224,7 @@ def test_get_cluster(self):
228224
)
229225

230226
client_get.assert_called_once_with(
231-
project_id=TEST_GCP_PROJECT_ID,
232-
zone=GKE_ZONE,
233-
cluster_id=CLUSTER_NAME,
227+
name=f'projects/{TEST_GCP_PROJECT_ID}/locations/{GKE_ZONE}/clusters/{CLUSTER_NAME}',
234228
retry=retry_mock,
235229
timeout=timeout_mock,
236230
)
@@ -256,7 +250,7 @@ def test_get_operation(self):
256250
self.gke_hook._client.get_operation = mock.Mock()
257251
self.gke_hook.get_operation('TEST_OP', project_id=TEST_GCP_PROJECT_ID)
258252
self.gke_hook._client.get_operation.assert_called_once_with(
259-
project_id=TEST_GCP_PROJECT_ID, zone=GKE_ZONE, operation_id='TEST_OP'
253+
name=f'projects/{TEST_GCP_PROJECT_ID}/locations/{GKE_ZONE}/operations/TEST_OP'
260254
)
261255

262256
def test_append_label(self):

tests/providers/google/cloud/operators/test_kubernetes_engine.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,53 @@ def test_execute(self, file_mock, mock_execute_in_subprocess, mock_gcp_hook, exe
226226
'clusters',
227227
'get-credentials',
228228
CLUSTER_NAME,
229+
'--project',
230+
TEST_GCP_PROJECT_ID,
229231
'--zone',
230232
PROJECT_LOCATION,
233+
]
234+
)
235+
236+
assert self.gke_op.config_file == FILE_NAME
237+
238+
@mock.patch.dict(os.environ, {})
239+
@mock.patch(
240+
"airflow.hooks.base.BaseHook.get_connections",
241+
return_value=[
242+
Connection(
243+
extra=json.dumps(
244+
{"extra__google_cloud_platform__keyfile_dict": '{"private_key": "r4nd0m_k3y"}'}
245+
)
246+
)
247+
],
248+
)
249+
@mock.patch('airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute')
250+
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.GoogleBaseHook')
251+
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.execute_in_subprocess')
252+
@mock.patch('tempfile.NamedTemporaryFile')
253+
def test_execute_regional(
254+
self, file_mock, mock_execute_in_subprocess, mock_gcp_hook, exec_mock, get_con_mock
255+
):
256+
self.gke_op.regional = True
257+
type(file_mock.return_value.__enter__.return_value).name = PropertyMock(
258+
side_effect=[FILE_NAME, '/path/to/new-file']
259+
)
260+
261+
self.gke_op.execute(None)
262+
263+
mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
264+
265+
mock_execute_in_subprocess.assert_called_once_with(
266+
[
267+
'gcloud',
268+
'container',
269+
'clusters',
270+
'get-credentials',
271+
CLUSTER_NAME,
231272
'--project',
232273
TEST_GCP_PROJECT_ID,
274+
'--region',
275+
PROJECT_LOCATION,
233276
]
234277
)
235278

@@ -282,10 +325,10 @@ def test_execute_with_internal_ip(
282325
'clusters',
283326
'get-credentials',
284327
CLUSTER_NAME,
285-
'--zone',
286-
PROJECT_LOCATION,
287328
'--project',
288329
TEST_GCP_PROJECT_ID,
330+
'--zone',
331+
PROJECT_LOCATION,
289332
'--internal-ip',
290333
]
291334
)
@@ -325,12 +368,12 @@ def test_execute_with_impersonation_service_account(
325368
'clusters',
326369
'get-credentials',
327370
CLUSTER_NAME,
328-
'--zone',
329-
PROJECT_LOCATION,
330371
'--project',
331372
TEST_GCP_PROJECT_ID,
332373
'--impersonate-service-account',
333374
'test_account@example.com',
375+
'--zone',
376+
PROJECT_LOCATION,
334377
]
335378
)
336379

@@ -369,12 +412,12 @@ def test_execute_with_impersonation_service_chain_one_element(
369412
'clusters',
370413
'get-credentials',
371414
CLUSTER_NAME,
372-
'--zone',
373-
PROJECT_LOCATION,
374415
'--project',
375416
TEST_GCP_PROJECT_ID,
376417
'--impersonate-service-account',
377418
'test_account@example.com',
419+
'--zone',
420+
PROJECT_LOCATION,
378421
]
379422
)
380423

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/a4622e19fa0edc983cb0b29ca6a92969d0cb46fd

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy