Implement ZipStore delete via archive rewrite (fixes #828) by Akash-t25 · Pull Request #4085 · zarr-developers/zarr-python · GitHub
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/828.feature.md
63 changes: 54 additions & 9 deletions src/zarr/storage/_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import shutil
import tempfile
import threading
import time
import zipfile
Expand All @@ -18,7 +19,7 @@
from zarr.core.buffer import Buffer, BufferPrototype

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncIterator, Callable, Iterable

ZipStoreAccessModeLiteral = Literal["r", "w", "a"]

Expand Down Expand Up @@ -55,7 +56,7 @@ class ZipStore(Store):
"""

supports_writes: bool = True
supports_deletes: bool = False
supports_deletes: bool = True
supports_listing: bool = True

path: Path
Expand Down Expand Up @@ -229,21 +230,65 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None:
if key not in members:
self._set(key, value)

def _rewrite_without(self, should_delete: Callable[[str], bool]) -> None:
# Rewrite the archive, dropping every member for which ``should_delete``
# returns True. ZIP files do not support in-place deletion, so the only
# way to remove an entry is to copy the surviving entries into a fresh
# archive (see issue #828). Duplicate members (created when a chunk is
# overwritten via ``writestr``) are compacted to their most recent value
# as a side effect, since that is what reads already return.
#
# This must be called while holding ``self._lock``.
members: dict[str, zipfile.ZipInfo] = {}
for info in self._zf.infolist():
members[info.filename] = info # keep the last entry for each name

to_delete = [name for name in members if should_delete(name)]
if not to_delete:
# nothing matched; leave the archive untouched
return

fd, tmp_path = tempfile.mkstemp(dir=self.path.parent)
os.close(fd)
try:
with zipfile.ZipFile(
tmp_path, mode="w", compression=self.compression, allowZip64=self.allowZip64
) as new_zf:
for name, info in members.items():
if should_delete(name):
continue
new_zf.writestr(info, self._zf.read(name))
self._zf.close()
os.replace(tmp_path, self.path)
except BaseException:
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise

# Reopen in append mode so subsequent writes preserve the archive
# (the original mode may be "w"/"x", which would truncate or fail).
self._zf = zipfile.ZipFile(
self.path, mode="a", compression=self.compression, allowZip64=self.allowZip64
)

async def delete_dir(self, prefix: str) -> None:
# only raise NotImplementedError if any keys are found
# docstring inherited
self._check_writable()
if not self._is_open:
self._sync_open()
if prefix != "" and not prefix.endswith("/"):
prefix += "/"
async for _ in self.list_prefix(prefix):
raise NotImplementedError
with self._lock:
self._rewrite_without(lambda name: name.startswith(prefix))

async def delete(self, key: str) -> None:
# docstring inherited
# we choose to only raise NotImplementedError here if the key exists
# this allows the array/group APIs to avoid the overhead of existence checks
# deleting a missing key is a no-op, matching the other stores
self._check_writable()
if await self.exists(key):
raise NotImplementedError
if not self._is_open:
self._sync_open()
with self._lock:
self._rewrite_without(lambda name: name == key)

async def exists(self, key: str) -> bool:
# docstring inherited
Expand Down
3 changes: 3 additions & 0 deletions tests/test_codecs/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,9 @@ def test_write_partial_sharded_chunks(store: Store) -> None:
assert np.array_equal(a[0:16, 0:16], data)


# ZipStore overwrites shards by appending duplicate archive members (reads return
# the most recent), which zipfile reports via a "Duplicate name" UserWarning.
@pytest.mark.filterwarnings("ignore:Duplicate name:UserWarning")
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
async def test_delete_empty_shards(store: Store) -> None:
if not store.supports_deletes:
Expand Down
124 changes: 117 additions & 7 deletions tests/test_store/test_zip.py
Loading