#!/usr/bin/env python
#
# Copyright (C) 2019 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
Iterable ZIP archive generator.
Derived directly from zipfile.py and the zipstream project
https://github.com/allanlei/python-zipstream
"""
import os
import sys
import stat
import struct
import time
import zipfile
import asyncio
import aiofiles
from concurrent import futures
from zipfile import (
structCentralDir,
structEndArchive64,
structEndArchive,
structEndArchive64Locator,
stringCentralDir,
stringEndArchive64,
stringEndArchive,
stringEndArchive64Locator,
)
stringDataDescriptor = b"PK\x07\x08" # magic number for data descriptor
def _get_compressor(compress_type):
"""
Return the compressor.
"""
if compress_type == zipfile.ZIP_DEFLATED:
from zipfile import zlib
return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
elif compress_type == zipfile.ZIP_BZIP2:
from zipfile import bz2
return bz2.BZ2Compressor()
elif compress_type == zipfile.ZIP_LZMA:
from zipfile import LZMACompressor
return LZMACompressor()
else:
return None
class PointerIO:
def __init__(self, mode="wb"):
if mode not in ("wb",):
raise RuntimeError('zipstream.ZipFile() requires mode "wb"')
self.data_pointer = 0
self.__mode = mode
self.__closed = False
@property
def mode(self):
return self.__mode
@property
def closed(self):
return self.__closed
def close(self):
self.__closed = True
def flush(self):
pass
def next(self):
raise NotImplementedError()
def tell(self):
return self.data_pointer
def truncate(size=None):
raise NotImplementedError()
def write(self, data):
if self.closed:
raise ValueError("I/O operation on closed file")
if isinstance(data, str):
data = data.encode("utf-8")
if not isinstance(data, bytes):
raise TypeError("expected bytes")
self.data_pointer += len(data)
return data
class ZipInfo(zipfile.ZipInfo):
def __init__(self, *args, **kwargs):
zipfile.ZipInfo.__init__(self, *args, **kwargs)
def DataDescriptor(self):
"""
crc-32 4 bytes
compressed size 4 bytes
uncompressed size 4 bytes
"""
if self.compress_size > zipfile.ZIP64_LIMIT or self.file_size > zipfile.ZIP64_LIMIT:
fmt = b"<4sLQQ"
else:
fmt = b"<4sLLL"
return struct.pack(fmt, stringDataDescriptor, self.CRC, self.compress_size, self.file_size)
class ZipFile(zipfile.ZipFile):
def __init__(self, fileobj=None, mode="w", compression=zipfile.ZIP_STORED, allowZip64=True, chunksize=32768):
"""Open the ZIP file with mode write "w"."""
if mode not in ("w",):
raise RuntimeError('aiozipstream.ZipFile() requires mode "w"')
if fileobj is None:
fileobj = PointerIO()
self._comment = b""
zipfile.ZipFile.__init__(self, fileobj, mode=mode, compression=compression, allowZip64=allowZip64)
self._chunksize = chunksize
self.paths_to_write = []
def __aiter__(self):
return self._stream()
@property
def comment(self):
"""
The comment text associated with the ZIP file.
"""
return self._comment
@comment.setter
def comment(self, comment):
"""
Add a comment text associated with the ZIP file.
"""
if not isinstance(comment, bytes):
raise TypeError("comment: expected bytes, got %s" % type(comment))
# check for valid comment length
if len(comment) >= zipfile.ZIP_MAX_COMMENT:
if self.debug:
print("Archive comment is too long; truncating to %d bytes" % zipfile.ZIP_MAX_COMMENT)
comment = comment[: zipfile.ZIP_MAX_COMMENT]
self._comment = comment
self._didModify = True
async def data_generator(self, path):
async with aiofiles.open(path, "rb") as f:
while True:
part = await f.read(self._chunksize)
if not part:
break
yield part
return
async def _run_in_executor(self, task, *args, **kwargs):
"""
Run synchronous task in separate thread and await for result.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(futures.ThreadPoolExecutor(max_workers=1), task, *args, **kwargs)
async def _stream(self):
for kwargs in self.paths_to_write:
async for chunk in self._write(**kwargs):
yield chunk
for chunk in self._close():
yield chunk
def write(self, filename, arcname=None, compress_type=None):
"""
Write a file to the archive under the name `arcname`.
"""
kwargs = {"filename": filename, "arcname": arcname, "compress_type": compress_type}
self.paths_to_write.append(kwargs)
def write_iter(self, arcname, iterable, compress_type=None):
"""
Write the bytes iterable `iterable` to the archive under the name `arcname`.
"""
kwargs = {"arcname": arcname, "iterable": iterable, "compress_type": compress_type}
self.paths_to_write.append(kwargs)
def writestr(self, arcname, data, compress_type=None):
"""
Writes a str into ZipFile by wrapping data as a generator
"""
def _iterable():
yield data
return self.write_iter(arcname, _iterable(), compress_type=compress_type)
async def _write(self, filename=None, iterable=None, arcname=None, compress_type=None):
"""
Put the bytes from filename into the archive under the name `arcname`.
"""
if not self.fp:
raise RuntimeError("Attempt to write to ZIP archive that was already closed")
if (filename is None and iterable is None) or (filename is not None and iterable is not None):
raise ValueError("either (exclusively) filename or iterable shall be not None")
if filename:
st = os.stat(filename)
isdir = stat.S_ISDIR(st.st_mode)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
else:
st, isdir, date_time = None, False, time.localtime()[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
arcname = filename
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
while arcname[0] in (os.sep, os.altsep):
arcname = arcname[1:]
if isdir:
arcname += "/"
zinfo = ZipInfo(arcname, date_time)
if st:
zinfo.external_attr = (st[0] & 0xFFFF) << 16 # Unix attributes
else:
zinfo.external_attr = 0o600 << 16 # ?rw-------
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type
if st:
zinfo.file_size = st[6]
else:
zinfo.file_size = 0
zinfo.flag_bits = 0x00
zinfo.flag_bits |= 0x08 # ZIP flag bits, bit 3 indicates presence of data descriptor
zinfo.header_offset = self.fp.tell() # Start of header bytes
if zinfo.compress_type == zipfile.ZIP_LZMA:
# Compressed data includes an end-of-stream (EOS) marker
zinfo.flag_bits |= 0x02
self._writecheck(zinfo)
self._didModify = True
if isdir:
zinfo.file_size = 0
zinfo.compress_size = 0
zinfo.CRC = 0
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
yield self.fp.write(zinfo.FileHeader(False))
return
cmpr = _get_compressor(zinfo.compress_type)
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
# Compressed size can be larger than uncompressed size
zip64 = self._allowZip64 and zinfo.file_size * 1.05 > zipfile.ZIP64_LIMIT
yield self.fp.write(zinfo.FileHeader(zip64))
file_size = 0
if filename:
async for buf in self.data_generator(filename):
file_size = file_size + len(buf)
CRC = zipfile.crc32(buf, CRC) & 0xFFFFFFFF
if cmpr:
buf = await self._run_in_executor(cmpr.compress, buf)
compress_size = compress_size + len(buf)
yield self.fp.write(buf)
else: # we have an iterable
for buf in iterable:
file_size = file_size + len(buf)
CRC = zipfile.crc32(buf, CRC) & 0xFFFFFFFF
if cmpr:
buf = await self._run_in_executor(cmpr.compress, buf)
compress_size = compress_size + len(buf)
yield self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
yield self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
if not zip64 and self._allowZip64:
if file_size > zipfile.ZIP64_LIMIT:
raise RuntimeError("File size has increased during compressing")
if compress_size > zipfile.ZIP64_LIMIT:
raise RuntimeError("Compressed size larger than uncompressed size")
yield self.fp.write(zinfo.DataDescriptor())
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
def _close(self):
"""
Close the file, and for mode "w" write the ending records.
"""
if self.fp is None:
return
try:
if self.mode in ("w", "a") and self._didModify: # write ending records
count = 0
pos1 = self.fp.tell()
for zinfo in self.filelist: # write central directory
count = count + 1
dt = zinfo.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
extra = []
if zinfo.file_size > zipfile.ZIP64_LIMIT or zinfo.compress_size > zipfile.ZIP64_LIMIT:
extra.append(zinfo.file_size)
extra.append(zinfo.compress_size)
file_size = 0xFFFFFFFF
compress_size = 0xFFFFFFFF
else:
file_size = zinfo.file_size
compress_size = zinfo.compress_size
if zinfo.header_offset > zipfile.ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = 0xFFFFFFFF
else:
header_offset = zinfo.header_offset
extra_data = zinfo.extra
min_version = 0
if extra:
# Append a ZIP64 field to the extra's
extra_data = struct.pack(b"= zipfile.ZIP_FILECOUNT_LIMIT
or centDirOffset > zipfile.ZIP64_LIMIT
or centDirSize > zipfile.ZIP64_LIMIT
):
# Need to write the ZIP64 end-of-archive records
zip64endrec = struct.pack(
structEndArchive64,
stringEndArchive64,
44,
45,
45,
0,
0,
centDirCount,
centDirCount,
centDirSize,
centDirOffset,
)
yield self.fp.write(zip64endrec)
zip64locrec = struct.pack(structEndArchive64Locator, stringEndArchive64Locator, 0, pos2, 1)
yield self.fp.write(zip64locrec)
centDirCount = min(centDirCount, 0xFFFF)
centDirSize = min(centDirSize, 0xFFFFFFFF)
centDirOffset = min(centDirOffset, 0xFFFFFFFF)
endrec = struct.pack(
structEndArchive,
stringEndArchive,
0,
0,
centDirCount,
centDirCount,
centDirSize,
centDirOffset,
len(self._comment),
)
yield self.fp.write(endrec)
yield self.fp.write(self._comment)
self.fp.flush()
finally:
fp = self.fp
self.fp = None
if not self._filePassed:
fp.close()