Source code for flask_attachments.compression
import bz2
import enum
import gzip
import hashlib
import io
import lzma
from typing import cast
from typing import IO
from typing import Protocol
[docs]
class CompressionAlgorithm(enum.Enum):
NONE = enum.auto()
GZIP = enum.auto()
BZ2 = enum.auto()
LZMA = enum.auto()
def compress(self, data: bytes) -> bytes:
"""Compress in place the given data."""
if self == CompressionAlgorithm.NONE:
return data
if self == CompressionAlgorithm.LZMA:
return lzma.compress(data)
if self == CompressionAlgorithm.BZ2:
return bz2.compress(data)
if self == CompressionAlgorithm.GZIP: # pragma: no branch
return gzip.compress(data)
def open(self, stream: IO[bytes], mode: str) -> IO[bytes]:
"""Open the given stream in the given mode, returning a compressed stream or file-like object."""
if self == CompressionAlgorithm.NONE:
return stream
if "b" not in mode:
mode = mode + "b"
if self == CompressionAlgorithm.LZMA:
return cast(IO[bytes], lzma.open(stream, mode=mode))
if self == CompressionAlgorithm.BZ2:
return cast(IO[bytes], bz2.open(stream, mode=mode))
if self == CompressionAlgorithm.GZIP: # pragma: no branch
return cast(IO[bytes], gzip.open(stream, mode=mode))
def stream(self, digest: str) -> "CompressingStream":
return CompressingStream(self, digest=digest)
def read(self, stream: IO[bytes]) -> "DecompressingStream":
return DecompressingStream(self, stream)
def decompress(self, data: bytes) -> bytes:
"""Decompress in place the given data."""
if self == CompressionAlgorithm.NONE:
return data
if self == CompressionAlgorithm.LZMA:
return lzma.decompress(data)
if self == CompressionAlgorithm.BZ2:
return bz2.decompress(data)
if self == CompressionAlgorithm.GZIP: # pragma: no branch
return gzip.decompress(data)
class Hash(Protocol):
"""Matches the interface of hashlib.hashlib._Hash"""
def update(self, data: bytes) -> None:
...
def hexdigest(self) -> str:
...
class CompressingStream(io.BufferedIOBase):
"""A stream which compresses, digests, and tracks the length of the data written to it.
This prevents having to do all of these things independently, and allows us to compress without
holding a full file in memory, even if we do hold the compressed data in memory.
"""
def __init__(self, algorithm: CompressionAlgorithm, digest: str) -> None:
super().__init__()
self.algorithm = algorithm
self.inner = io.BytesIO()
self.stream = self.algorithm.open(self.inner, "wb")
self.length = 0
self.digest = hashlib.new(digest)
def write(self, data: bytes | bytearray) -> int: # type: ignore[override]
self.length += len(data)
self.digest.update(data)
return self.stream.write(data)
def getvalue(self) -> bytes:
return self.inner.getvalue()
def hexdigest(self) -> str:
return self.digest.hexdigest()
def close(self) -> None:
if self.algorithm != CompressionAlgorithm.NONE:
self.stream.close()
class DecompressingStream(io.BufferedIOBase):
def __init__(self, algorithm: CompressionAlgorithm, stream: IO[bytes]) -> None:
super().__init__()
self.algorithm = algorithm
self.stream = self.algorithm.open(stream, "rb")
def read(self, size: int | None = -1) -> bytes:
if size is None: # pragma: no cover
return self.stream.read()
return self.stream.read(size)
def close(self) -> None:
if self.algorithm != CompressionAlgorithm.NONE:
self.stream.close()