Content-Length: 589064 | pFad | https://github.com/apache/airflow/commit/2372e21d9dd44a9cb1f7cd20bbee7f1c37936faf

39 add service_file support to GKEPodAsyncHook (#37081) · apache/airflow@2372e21 · GitHub
Skip to content

Commit 2372e21

Browse files
authored
add service_file support to GKEPodAsyncHook (#37081)
Currently, GKEPodAsyncHook does not support service_file. Thus, passing credentials through " Keyfile Path " or " Keyfile JSON " will be ignored. This PR intends to fix this issue. As the default value of service_file in Token is None (https://github.com/talkiq/gcloud-aio/blob/8c8e1b39ec2e40b42212c270acb98c039267fbc5/auth/gcloud/aio/auth/token.py#L157) and the return value of service_file_as_context when both key file path and key file json are not provided is None. This change won't affect existing behavior
1 parent 2e95a2a commit 2372e21

File tree

2 files changed

+76
-43
lines changed

2 files changed

+76
-43
lines changed

airflow/providers/google/cloud/hooks/kubernetes_engine.py

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -508,33 +508,37 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod:
508508
:param name: Name of the pod.
509509
:param namespace: Name of the pod's namespace.
510510
"""
511-
async with Token(scopes=self.scopes) as token:
512-
async with self.get_conn(token) as connection:
513-
v1_api = async_client.CoreV1Api(connection)
514-
pod: V1Pod = await v1_api.read_namespaced_pod(
515-
name=name,
516-
namespace=namespace,
517-
)
518-
return pod
511+
async with self.service_file_as_context() as service_file: # type: ignore[attr-defined]
512+
async with Token(scopes=self.scopes, service_file=service_file) as token:
513+
async with self.get_conn(token) as connection:
514+
v1_api = async_client.CoreV1Api(connection)
515+
pod: V1Pod = await v1_api.read_namespaced_pod(
516+
name=name,
517+
namespace=namespace,
518+
)
519+
return pod
519520

520521
async def delete_pod(self, name: str, namespace: str):
521522
"""Delete a pod.
522523
523524
:param name: Name of the pod.
524525
:param namespace: Name of the pod's namespace.
525526
"""
526-
async with Token(scopes=self.scopes) as token, self.get_conn(token) as connection:
527-
try:
528-
v1_api = async_client.CoreV1Api(connection)
529-
await v1_api.delete_namespaced_pod(
530-
name=name,
531-
namespace=namespace,
532-
body=client.V1DeleteOptions(),
533-
)
534-
except async_client.ApiException as e:
535-
# If the pod is already deleted
536-
if e.status != 404:
537-
raise
527+
async with self.service_file_as_context() as service_file: # type: ignore[attr-defined]
528+
async with Token(scopes=self.scopes, service_file=service_file) as token, self.get_conn(
529+
token
530+
) as connection:
531+
try:
532+
v1_api = async_client.CoreV1Api(connection)
533+
await v1_api.delete_namespaced_pod(
534+
name=name,
535+
namespace=namespace,
536+
body=client.V1DeleteOptions(),
537+
)
538+
except async_client.ApiException as e:
539+
# If the pod is already deleted
540+
if e.status != 404:
541+
raise
538542

539543
async def read_logs(self, name: str, namespace: str):
540544
"""Read logs inside the pod while starting containers inside.
@@ -547,19 +551,22 @@ async def read_logs(self, name: str, namespace: str):
547551
:param name: Name of the pod.
548552
:param namespace: Name of the pod's namespace.
549553
"""
550-
async with Token(scopes=self.scopes) as token, self.get_conn(token) as connection:
551-
try:
552-
v1_api = async_client.CoreV1Api(connection)
553-
logs = await v1_api.read_namespaced_pod_log(
554-
name=name,
555-
namespace=namespace,
556-
follow=False,
557-
timestamps=True,
558-
)
559-
logs = logs.splitlines()
560-
for line in logs:
561-
self.log.info("Container logs from %s", line)
562-
return logs
563-
except HTTPError:
564-
self.log.exception("There was an error reading the kubernetes API.")
565-
raise
554+
async with self.service_file_as_context() as service_file: # type: ignore[attr-defined]
555+
async with Token(scopes=self.scopes, service_file=service_file) as token, self.get_conn(
556+
token
557+
) as connection:
558+
try:
559+
v1_api = async_client.CoreV1Api(connection)
560+
logs = await v1_api.read_namespaced_pod_log(
561+
name=name,
562+
namespace=namespace,
563+
follow=False,
564+
timestamps=True,
565+
)
566+
logs = logs.splitlines()
567+
for line in logs:
568+
self.log.info("Container logs from %s", line)
569+
return logs
570+
except HTTPError:
571+
self.log.exception("There was an error reading the kubernetes API.")
572+
raise

tests/providers/google/cloud/hooks/test_kubernetes_engine.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -317,29 +317,46 @@ def async_hook(self):
317317
)
318318

319319
@pytest.mark.asyncio
320-
@mock.patch(GKE_STRING.format("Token"), mock.MagicMock())
320+
@pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", None))
321+
@mock.patch(GKE_STRING.format("Token"))
321322
@mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn"))
322323
@mock.patch(GKE_STRING.format("async_client.CoreV1Api.read_namespaced_pod"))
323-
async def test_get_pod(self, read_namespace_pod_mock, get_conn_mock, async_hook):
324+
async def test_get_pod(
325+
self, read_namespace_pod_mock, get_conn_mock, mock_token, async_hook, mock_service_file
326+
):
327+
async_hook.service_file_as_context = mock.MagicMock()
328+
async_hook.service_file_as_context.return_value.__aenter__.return_value = mock_service_file
329+
324330
self.make_mock_awaitable(read_namespace_pod_mock)
325331

326332
await async_hook.get_pod(name=POD_NAME, namespace=POD_NAMESPACE)
327-
333+
mock_token.assert_called_with(
334+
scopes=["https://www.googleapis.com/auth/cloud-platform"], service_file=mock_service_file
335+
)
328336
get_conn_mock.assert_called_once()
329337
read_namespace_pod_mock.assert_called_with(
330338
name=POD_NAME,
331339
namespace=POD_NAMESPACE,
332340
)
333341

334342
@pytest.mark.asyncio
335-
@mock.patch(GKE_STRING.format("Token"), mock.MagicMock())
343+
@pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", None))
344+
@mock.patch(GKE_STRING.format("Token"))
336345
@mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn"))
337346
@mock.patch(GKE_STRING.format("async_client.CoreV1Api.delete_namespaced_pod"))
338-
async def test_delete_pod(self, delete_namespaced_pod, get_conn_mock, async_hook):
347+
async def test_delete_pod(
348+
self, delete_namespaced_pod, get_conn_mock, mock_token, async_hook, mock_service_file
349+
):
350+
async_hook.service_file_as_context = mock.MagicMock()
351+
async_hook.service_file_as_context.return_value.__aenter__.return_value = mock_service_file
352+
339353
self.make_mock_awaitable(delete_namespaced_pod)
340354

341355
await async_hook.delete_pod(name=POD_NAME, namespace=POD_NAMESPACE)
342356

357+
mock_token.assert_called_with(
358+
scopes=["https://www.googleapis.com/auth/cloud-platform"], service_file=mock_service_file
359+
)
343360
get_conn_mock.assert_called_once()
344361
delete_namespaced_pod.assert_called_with(
345362
name=POD_NAME,
@@ -348,14 +365,23 @@ async def test_delete_pod(self, delete_namespaced_pod, get_conn_mock, async_hook
348365
)
349366

350367
@pytest.mark.asyncio
351-
@mock.patch(GKE_STRING.format("Token"), mock.MagicMock())
368+
@pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", None))
369+
@mock.patch(GKE_STRING.format("Token"))
352370
@mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn"))
353371
@mock.patch(GKE_STRING.format("async_client.CoreV1Api.read_namespaced_pod_log"))
354-
async def test_read_logs(self, read_namespaced_pod_log, get_conn_mock, async_hook, caplog):
372+
async def test_read_logs(
373+
self, read_namespaced_pod_log, get_conn_mock, mock_token, async_hook, mock_service_file, caplog
374+
):
375+
async_hook.service_file_as_context = mock.MagicMock()
376+
async_hook.service_file_as_context.return_value.__aenter__.return_value = mock_service_file
377+
355378
self.make_mock_awaitable(read_namespaced_pod_log, result="Test string #1\nTest string #2\n")
356379

357380
await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE)
358381

382+
mock_token.assert_called_with(
383+
scopes=["https://www.googleapis.com/auth/cloud-platform"], service_file=mock_service_file
384+
)
359385
get_conn_mock.assert_called_once()
360386
read_namespaced_pod_log.assert_called_with(
361387
name=POD_NAME,

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/2372e21d9dd44a9cb1f7cd20bbee7f1c37936faf

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy