from __future__ import absolute_import, division, print_function
import datetime
import zipfile
from fsspec import open_files
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.utils import DEFAULT_BLOCK_SIZE
[docs]class ZipFileSystem(AbstractArchiveFileSystem):
"""Read contents of ZIP archive as a file-system
Keeps file object open while instance lives.
This class is pickleable, but not necessarily thread-safe
"""
root_marker = ""
protocol = "zip"
cachable = False
[docs] def __init__(
self,
fo="",
mode="r",
target_protocol=None,
target_options=None,
block_size=DEFAULT_BLOCK_SIZE,
**kwargs,
):
"""
Parameters
----------
fo: str or file-like
Contains ZIP, and must exist. If a str, will fetch file using
`open_files()`, which must return one file exactly.
mode: str
Currently, only 'r' accepted
target_protocol: str (optional)
If ``fo`` is a string, this value can be used to override the
FS protocol inferred from a URL
target_options: dict (optional)
Kwargs passed when instantiating the target FS, if ``fo`` is
a string.
"""
super().__init__(self, **kwargs)
if isinstance(fo, str):
files = open_files(
fo, mode=mode + "b", protocol=target_protocol, **(target_options or {})
)
if len(files) != 1:
raise ValueError(
'Path "{}" did not resolve to exactly'
'one file: "{}"'.format(fo, files)
)
fo = files[0]
self.of = fo
self.fo = fo.__enter__() # the whole instance is a context
self.zip = zipfile.ZipFile(self.fo, mode=mode)
self.block_size = block_size
self.dir_cache = None
@classmethod
def _strip_protocol(cls, path):
# zip file paths are always relative to the archive root
return super()._strip_protocol(path).lstrip("/")
def __del__(self):
self.zip.close()
def _get_dirs(self):
if self.dir_cache is None:
files = self.zip.infolist()
self.dir_cache = {
dirname + "/": {"name": dirname + "/", "size": 0, "type": "directory"}
for dirname in self._all_dirnames(self.zip.namelist())
}
for z in files:
f = {s: getattr(z, s) for s in zipfile.ZipInfo.__slots__}
f.update(
{
"name": z.filename,
"size": z.file_size,
"type": ("directory" if z.is_dir() else "file"),
}
)
self.dir_cache[f["name"]] = f
def pipe_file(self, path, value, **kwargs):
# override upstream, because we know the exact file size in this case
info = zipfile.ZipInfo(path, datetime.datetime.now().timetuple())
info.file_size = len(value)
with self.zip.open(path, "w") as f:
f.write(value)
def _open(
self,
path,
mode="rb",
block_size=None,
autocommit=True,
cache_options=None,
**kwargs,
):
path = self._strip_protocol(path)
out = self.zip.open(path, mode.strip("b"))
if "r" in mode:
info = self.info(path)
out.size = info["size"]
out.name = info["name"]
return out