Source code for flask_attachments.extension

import contextlib
import dataclasses as dc
import datetime as dt
import hashlib
import tempfile
from pathlib import Path
from typing import Any
from typing import cast
from typing import TYPE_CHECKING

import structlog
from flask import current_app
from flask import Flask
from sqlalchemy import event
from sqlalchemy import MetaData
from sqlalchemy.engine import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.engine import make_url
from sqlalchemy.orm import registry as Registry
from werkzeug.local import LocalProxy

from .compression import CompressionAlgorithm

if TYPE_CHECKING:
    from .services import AttachmentCache


log = structlog.get_logger(__name__)

EXTENSION_NAME = "flask-attachments"
EXTENSION_CONFIG_NAMESPACE = "ATTACHMENTS_"


logger = structlog.get_logger(__name__)

__all__ = ["Attachments", "AttachmentSettings", "settings", "AttachmentsConfigurationError"]


@dc.dataclass
class AttachmentSettings:
    engine: Engine

    @property
    def config(self) -> dict[str, Any]:
        return current_app.config.get_namespace(EXTENSION_CONFIG_NAMESPACE, lowercase=False)

    def attach_filepath(self) -> str | None:
        uri = make_url(self.config["DATABASE_URI"])
        if "sqlalchemy" in current_app.extensions:
            db = current_app.extensions["sqlalchemy"].db
            (uri, _options) = db.apply_driver_hacks(current_app, uri, {})
        return uri.database

    def attach_ddl(self) -> str:
        schema = self.config.get("DATABASE_SCHEMA", "attachments")
        return f'ATTACH DATABASE "{self.attach_filepath()}" AS {schema}'  # noqa: B907

    def digest(self) -> str:
        return self.config["DIGEST"]

    def compression(self) -> CompressionAlgorithm:
        compression = self.config["COMPRESSION"]
        return CompressionAlgorithm[compression.upper()]

    def cache_directory(self) -> Path:
        return (Path(current_app.instance_path) / Path(self.config["CACHE_DIRECTORY"])).absolute()

    def cache_age(self) -> dt.timedelta:
        return dt.timedelta(hours=int(self.config["CACHE_AGE_HOURS"]))

    def cache_size(self) -> int:
        return int(self.config["CACHE_SIZE_MAX"])

    def cache(self) -> "AttachmentCache":
        from .services import AttachmentCache

        return AttachmentCache(self)


def get_settings() -> AttachmentSettings:
    try:
        return current_app.extensions[EXTENSION_NAME]
    except KeyError as exc:
        raise RuntimeError("No attachments extension registered on this app, call init_app first") from exc


#: Flask proxy to the current attachment settings
settings = cast(AttachmentSettings, LocalProxy(get_settings))


class AttachmentsConfigurationError(ValueError):
    """Configuration error for the Attachments extension"""

    pass


[docs] class Attachments: def __init__(self, app: Flask | None = None, registry: Registry | None = None) -> None: from .models import Attachment if registry is None: registry = Registry() if not hasattr(Attachment, "__mapper__"): registry.map_declaratively(Attachment) elif Attachment.__table__ not in registry.metadata: # type: ignore raise AttachmentsConfigurationError( "The Attachment model has already been mapped to a different metadata" "\nConsider providing a custom registry to the Attachments extension" "\nor only intializing the extension once" ) self.registry = registry if app is not None: self.init_app(app) def __repr__(self) -> str: return f"<{self.__class__.__name__} at {id(self)!s}>" @property def metadata(self) -> MetaData: return self.registry.metadata
[docs] def init_app(self, app: Flask) -> None: """Initialize the app here""" from .cli import group as command_group if f"{EXTENSION_CONFIG_NAMESPACE}DATABASE_URI" not in app.config: raise AttachmentsConfigurationError( f"Must set {EXTENSION_CONFIG_NAMESPACE}DATABASE_URI for attachments extension" ) app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}DATABASE_SCHEMA", "attachments") directory = app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}CACHE_DIRECTORY", None) if directory is None: app.config[f"{EXTENSION_CONFIG_NAMESPACE}CACHE_DIRECTORY"] = directory = tempfile.mkdtemp() log.warn("Using a temporary directory for attachment caching", directory=directory) if not directory: raise AttachmentsConfigurationError( f"Must set {EXTENSION_CONFIG_NAMESPACE}CACHE_DIRECTORY for attachments extension" ) # Ensure the directory exists directory = Path(directory) try: directory.mkdir(parents=True, exist_ok=True) except OSError as exc: log.exception("Failed to create attachment cache directory", directory=directory) raise AttachmentsConfigurationError(f"Unsupported attachment cache directory: {directory}") from exc cache_size = app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}CACHE_SIZE_MAX", 2 * 10**9) try: int(cache_size) except ValueError as exc: log.exception("Invalid cache size", cache_size=cache_size) raise AttachmentsConfigurationError(f"Invalid cache size: {cache_size}") from exc cache_age = app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}CACHE_AGE_HOURS", 12) try: int(cache_age) except ValueError as exc: log.exception("Invalid cache age", cache_age=cache_age) raise AttachmentsConfigurationError(f"Invalid cache age: {cache_age}") from exc compression = app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}COMPRESSION", "lzma") algorithm = app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}DIGEST", "sha256") try: CompressionAlgorithm[compression.upper()] except KeyError as exc: log.exception("Unsupported compression algorithm", compression=compression) raise AttachmentsConfigurationError(f"Unsupported compression algorithm: {compression}") from exc try: hashlib.new(algorithm) except ValueError as exc: log.exception("Unsupported digest algorithm", algorithm=algorithm) raise AttachmentsConfigurationError(f"Unsupported digest algorithm: {algorithm}") from exc engine = create_engine(app.config[f"{EXTENSION_CONFIG_NAMESPACE}DATABASE_URI"]) app.extensions[EXTENSION_NAME] = AttachmentSettings(engine=engine) app.cli.add_command(command_group) if app.config.setdefault(f"{EXTENSION_CONFIG_NAMESPACE}BLUEPRINT", True): from .views import bp app.register_blueprint(bp)
@event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection: Any, connection_record: Any) -> None: try: ddl = current_app.extensions[EXTENSION_NAME].attach_ddl() except KeyError: log.debug("No attachments extension registered on this app, skipping") return except RuntimeError: log.debug("No app context, skipping") return log.debug("Attaching database", ddl=ddl) with contextlib.closing(dbapi_connection.cursor()) as cursor: cursor.execute(ddl)