Content-Length: 439490 | pFad | https://github.com/apache/airflow/commit/17ce4ac219846138d1005d20cffa72bea291b25f

4D Update GCS hook to get crc32c hash for CMEK-protected objects (#38191) · apache/airflow@17ce4ac · GitHub
Skip to content

Commit 17ce4ac

Browse files
shahar1dmedora
andauthored
Update GCS hook to get crc32c hash for CMEK-protected objects (#38191)
Co-authored-by: Daryus Medora <21352998+dmedora@users.noreply.github.com>
1 parent 2e3b175 commit 17ce4ac

File tree

2 files changed

+61
-2
lines changed
  • airflow/providers/google/cloud/hooks
  • tests/providers/google/cloud/hooks

2 files changed

+61
-2
lines changed

airflow/providers/google/cloud/hooks/gcs.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,7 +1338,15 @@ def _prepare_sync_plan(
13381338
for current_name in names_to_check:
13391339
source_blob = source_names_index[current_name]
13401340
destination_blob = destination_names_index[current_name]
1341-
# If the objects are different, save it
1341+
# If either object is CMEK-protected, use the Cloud Storage Objects Get API to retrieve them
1342+
# so that the crc32c is included
1343+
if source_blob.kms_key_name:
1344+
source_blob = source_bucket.get_blob(source_blob.name, generation=source_blob.generation)
1345+
if destination_blob.kms_key_name:
1346+
destination_blob = destination_bucket.get_blob(
1347+
destination_blob.name, generation=destination_blob.generation
1348+
)
1349+
# if the objects are different, save it
13421350
if source_blob.crc32c != destination_blob.crc32c:
13431351
to_rewrite_blobs.add(source_blob)
13441352

tests/providers/google/cloud/hooks/test_gcs.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from datetime import datetime, timedelta
2626
from io import BytesIO
2727
from unittest import mock
28+
from unittest.mock import MagicMock
2829

2930
import dateutil
3031
import pytest
@@ -1279,6 +1280,47 @@ def test_should_overwrite_files(self, mock_get_conn, mock_delete, mock_rewrite,
12791280
)
12801281
mock_copy.assert_not_called()
12811282

1283+
@mock.patch(GCS_STRING.format("GCSHook.copy"))
1284+
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
1285+
@mock.patch(GCS_STRING.format("GCSHook.delete"))
1286+
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
1287+
def test_should_overwrite_cmek_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1288+
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
1289+
source_bucket.list_blobs.return_value = [
1290+
self._create_blob("FILE_A", "C1", kms_key_name="KMS_KEY_1", generation=1),
1291+
self._create_blob("FILE_B", "C1"),
1292+
]
1293+
destination_bucket = self._create_bucket(name="DEST_BUCKET")
1294+
destination_bucket.list_blobs.return_value = [
1295+
self._create_blob("FILE_A", "C2", kms_key_name="KMS_KEY_2", generation=2),
1296+
self._create_blob("FILE_B", "C2"),
1297+
]
1298+
mock_get_conn.return_value.bucket.side_effect = [source_bucket, destination_bucket]
1299+
self.gcs_hook.sync(
1300+
source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", allow_overwrite=True
1301+
)
1302+
mock_delete.assert_not_called()
1303+
source_bucket.get_blob.assert_called_once_with("FILE_A", generation=1)
1304+
destination_bucket.get_blob.assert_called_once_with("FILE_A", generation=2)
1305+
mock_rewrite.assert_has_calls(
1306+
[
1307+
mock.call(
1308+
source_bucket="SOURCE_BUCKET",
1309+
source_object="FILE_B",
1310+
destination_bucket="DEST_BUCKET",
1311+
destination_object="FILE_B",
1312+
),
1313+
mock.call(
1314+
source_bucket="SOURCE_BUCKET",
1315+
source_object=source_bucket.get_blob.return_value.name,
1316+
destination_bucket="DEST_BUCKET",
1317+
destination_object=source_bucket.get_blob.return_value.name.__getitem__.return_value,
1318+
),
1319+
],
1320+
any_order=True,
1321+
)
1322+
mock_copy.assert_not_called()
1323+
12821324
@mock.patch(GCS_STRING.format("GCSHook.copy"))
12831325
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
12841326
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@@ -1440,11 +1482,20 @@ def test_should_not_overwrite_when_overwrite_is_disabled(
14401482
mock_rewrite.assert_not_called()
14411483
mock_copy.assert_not_called()
14421484

1443-
def _create_blob(self, name: str, crc32: str, bucket=None):
1485+
def _create_blob(
1486+
self,
1487+
name: str,
1488+
crc32: str,
1489+
bucket: MagicMock | None = None,
1490+
kms_key_name: str | None = None,
1491+
generation: int = 0,
1492+
):
14441493
blob = mock.MagicMock(name=f"BLOB:{name}")
14451494
blob.name = name
14461495
blob.crc32 = crc32
14471496
blob.bucket = bucket
1497+
blob.kms_key_name = kms_key_name
1498+
blob.generation = generation
14481499
return blob
14491500

14501501
def _create_bucket(self, name: str):

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/17ce4ac219846138d1005d20cffa72bea291b25f

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy