1717)
1818
1919# current library imports
20+ from google .cloud import kms
2021from google .cloud .storage .asyncio .async_grpc_client import AsyncGrpcClient
2122from google .cloud .storage .asyncio .async_multi_range_downloader import (
2223 AsyncMultiRangeDownloader ,
2930
3031
3132# TODO: replace this with a fixture once zonal bucket creation / deletion
32- # is supported in grpc client or json client client .
33+ # is supported in grpc client or json client.
3334_ZONAL_BUCKET = os .getenv ("ZONAL_BUCKET" )
3435_CROSS_REGION_BUCKET = os .getenv ("CROSS_REGION_BUCKET" )
3536_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
@@ -40,6 +41,58 @@ async def create_async_grpc_client(attempt_direct_path=True):
4041 return AsyncGrpcClient (attempt_direct_path = attempt_direct_path )
4142
4243
44+ @pytest .fixture (scope = "session" )
45+ def zonal_kms_key (storage_client , kms_client ):
46+ """Provisions a KMS key in the same location as of the zonal bucket."""
47+ # Get the zonal bucket and extract its location
48+ bucket = storage_client .get_bucket (_ZONAL_BUCKET )
49+ location = bucket .location .lower ()
50+
51+ project = storage_client .project
52+ keyring_name = "gcs-test-zonal-ring"
53+ key_name = "gcs-test-zonal-key"
54+
55+ keyring_path = kms_client .key_ring_path (project , location , keyring_name )
56+
57+ # Create the KeyRing if it doesn't exist
58+ try :
59+ kms_client .get_key_ring (name = keyring_path )
60+ except NotFound :
61+ parent = f"projects/{ project } /locations/{ location } "
62+ kms_client .create_key_ring (
63+ request = {"parent" : parent , "key_ring_id" : keyring_name , "key_ring" : {}}
64+ )
65+
66+ # Grant GCS service account permissions to use the key
67+ service_account_email = storage_client .get_service_account_email ()
68+ policy = {
69+ "bindings" : [
70+ {
71+ "role" : "roles/cloudkms.cryptoKeyEncrypterDecrypter" ,
72+ "members" : [f"serviceAccount:{ service_account_email } " ],
73+ }
74+ ]
75+ }
76+ kms_client .set_iam_policy (request = {"resource" : keyring_path , "policy" : policy })
77+
78+ # Create the CryptoKey if it doesn't exist
79+ key_path = kms_client .crypto_key_path (project , location , keyring_name , key_name )
80+ try :
81+ kms_client .get_crypto_key (name = key_path )
82+ except NotFound :
83+ purpose = kms .CryptoKey .CryptoKeyPurpose .ENCRYPT_DECRYPT
84+ key = {"purpose" : purpose }
85+ kms_client .create_crypto_key (
86+ request = {
87+ "parent" : keyring_path ,
88+ "crypto_key_id" : key_name ,
89+ "crypto_key" : key ,
90+ }
91+ )
92+
93+ return key_path
94+
95+
4396@pytest .fixture (scope = "session" )
4497def event_loop ():
4598 """Redefine pytest-asyncio's event_loop fixture to be session-scoped."""
@@ -286,6 +339,83 @@ async def _run():
286339 event_loop .run_until_complete (_run ())
287340
288341
342+ def test_write_from_blob (
343+ storage_client ,
344+ blobs_to_delete ,
345+ event_loop ,
346+ grpc_client ,
347+ ):
348+ object_name = f"test_from_blob-{ str (uuid .uuid4 ())[:4 ]} "
349+ content_type = "text/plain"
350+ metadata = {"environment" : "system-test" }
351+ test_data = b"system-test-data"
352+
353+ async def _run ():
354+ # 1. Create a Blob instance
355+ blob = storage_client .bucket (_ZONAL_BUCKET ).blob (object_name )
356+ blob .content_type = content_type
357+ blob .metadata = metadata
358+
359+ # 2. Use from_blob to create the writer
360+ writer = AsyncAppendableObjectWriter .from_blob (grpc_client , blob )
361+ await writer .open ()
362+ await writer .append (test_data )
363+ await writer .close (finalize_on_close = True )
364+
365+ # 3. Verify the object metadata
366+ obj = await grpc_client .get_object (
367+ bucket_name = _ZONAL_BUCKET ,
368+ object_name = object_name ,
369+ )
370+
371+ assert obj .content_type == content_type
372+ assert obj .metadata ["environment" ] == "system-test"
373+
374+ blobs_to_delete .append (blob )
375+
376+ event_loop .run_until_complete (_run ())
377+
378+
379+ def test_write_from_blob_with_kms_key (
380+ storage_client ,
381+ blobs_to_delete ,
382+ event_loop ,
383+ grpc_client ,
384+ zonal_kms_key ,
385+ ):
386+ """Verifies AsyncAppendableObjectWriter.from_blob correctly applies KMS encryption."""
387+
388+ object_name = f"test_from_blob_kms-{ str (uuid .uuid4 ())[:4 ]} "
389+ test_data = b"kms-protected-data"
390+
391+ async def _run ():
392+ # Create a local Blob instance with the KMS key
393+ blob = storage_client .bucket (_ZONAL_BUCKET ).blob (
394+ object_name , kms_key_name = zonal_kms_key
395+ )
396+
397+ writer = AsyncAppendableObjectWriter .from_blob (grpc_client , blob )
398+
399+ await writer .open ()
400+ await writer .append (test_data )
401+
402+ await writer .close (finalize_on_close = True )
403+
404+ # Verify the encryption metadata
405+ obj = await grpc_client .get_object (
406+ bucket_name = _ZONAL_BUCKET ,
407+ object_name = object_name ,
408+ )
409+
410+ # Assert that the object was encrypted with the correct key
411+ # GCS appends a version suffix, so we use startswith()
412+ assert obj .kms_key .startswith (zonal_kms_key )
413+
414+ blobs_to_delete .append (blob )
415+
416+ event_loop .run_until_complete (_run ())
417+
418+
289419def test_read_unfinalized_appendable_object (
290420 storage_client , blobs_to_delete , event_loop , grpc_client_direct
291421):
0 commit comments