This commit is contained in:
Iliyan Angelov
2025-09-14 23:24:25 +03:00
commit c67067a2a4
71311 changed files with 6800714 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
from __future__ import annotations
from whitenoise.base import WhiteNoise
__all__ = ["WhiteNoise"]

View File

@@ -0,0 +1,265 @@
from __future__ import annotations
import os
import re
import warnings
from posixpath import normpath
from typing import Callable
from wsgiref.headers import Headers
from wsgiref.util import FileWrapper
from whitenoise.media_types import MediaTypes
from whitenoise.responders import IsDirectoryError
from whitenoise.responders import MissingFileError
from whitenoise.responders import Redirect
from whitenoise.responders import StaticFile
from whitenoise.string_utils import decode_path_info
from whitenoise.string_utils import ensure_leading_trailing_slash
class WhiteNoise:
# Ten years is what nginx sets a max age if you use 'expires max;'
# so we'll follow its lead
FOREVER = 10 * 365 * 24 * 60 * 60
def __init__(
self,
application,
root=None,
prefix=None,
*,
# Re-check the filesystem on every request so that any changes are
# automatically picked up. NOTE: For use in development only, not supported
# in production
autorefresh: bool = False,
max_age: int | None = 60, # seconds
# Set 'Access-Control-Allow-Origin: *' header on all files.
# As these are all public static files this is safe (See
# https://www.w3.org/TR/cors/#security) and ensures that things (e.g
# webfonts in Firefox) still work as expected when your static files are
# served from a CDN, rather than your primary domain.
allow_all_origins: bool = True,
charset: str = "utf-8",
mimetypes: dict[str, str] | None = None,
add_headers_function: Callable[[Headers, str, str], None] | None = None,
index_file: str | bool | None = None,
immutable_file_test: Callable | str | None = None,
):
self.autorefresh = autorefresh
self.max_age = max_age
self.allow_all_origins = allow_all_origins
self.charset = charset
self.add_headers_function = add_headers_function
if index_file is True:
self.index_file: str | None = "index.html"
elif isinstance(index_file, str):
self.index_file = index_file
else:
self.index_file = None
if immutable_file_test is not None:
if not callable(immutable_file_test):
regex = re.compile(immutable_file_test)
self.immutable_file_test = lambda path, url: bool(regex.search(url))
else:
self.immutable_file_test = immutable_file_test
self.media_types = MediaTypes(extra_types=mimetypes)
self.application = application
self.files = {}
self.directories = []
if root is not None:
self.add_files(root, prefix)
def __call__(self, environ, start_response):
path = decode_path_info(environ.get("PATH_INFO", ""))
if self.autorefresh:
static_file = self.find_file(path)
else:
static_file = self.files.get(path)
if static_file is None:
return self.application(environ, start_response)
else:
return self.serve(static_file, environ, start_response)
@staticmethod
def serve(static_file, environ, start_response):
response = static_file.get_response(environ["REQUEST_METHOD"], environ)
status_line = f"{response.status} {response.status.phrase}"
start_response(status_line, list(response.headers))
if response.file is not None:
file_wrapper = environ.get("wsgi.file_wrapper", FileWrapper)
return file_wrapper(response.file)
else:
return []
def add_files(self, root, prefix=None):
root = os.path.abspath(root)
root = root.rstrip(os.path.sep) + os.path.sep
prefix = ensure_leading_trailing_slash(prefix)
if self.autorefresh:
# Later calls to `add_files` overwrite earlier ones, hence we need
# to store the list of directories in reverse order so later ones
# match first when they're checked in "autorefresh" mode
self.directories.insert(0, (root, prefix))
else:
if os.path.isdir(root):
self.update_files_dictionary(root, prefix)
else:
warnings.warn(f"No directory at: {root}", stacklevel=3)
def update_files_dictionary(self, root, prefix):
# Build a mapping from paths to the results of `os.stat` calls
# so we only have to touch the filesystem once
stat_cache = dict(scantree(root))
for path in stat_cache.keys():
relative_path = path[len(root) :]
relative_url = relative_path.replace("\\", "/")
url = prefix + relative_url
self.add_file_to_dictionary(url, path, stat_cache=stat_cache)
def add_file_to_dictionary(self, url, path, stat_cache=None):
if self.is_compressed_variant(path, stat_cache=stat_cache):
return
if self.index_file is not None and url.endswith("/" + self.index_file):
index_url = url[: -len(self.index_file)]
index_no_slash = index_url.rstrip("/")
self.files[url] = self.redirect(url, index_url)
self.files[index_no_slash] = self.redirect(index_no_slash, index_url)
url = index_url
static_file = self.get_static_file(path, url, stat_cache=stat_cache)
self.files[url] = static_file
def find_file(self, url):
# Optimization: bail early if the URL can never match a file
if self.index_file is None and url.endswith("/"):
return
if not self.url_is_canonical(url):
return
for path in self.candidate_paths_for_url(url):
try:
return self.find_file_at_path(path, url)
except MissingFileError:
pass
def candidate_paths_for_url(self, url):
for root, prefix in self.directories:
if url.startswith(prefix):
path = os.path.join(root, url[len(prefix) :])
if os.path.commonprefix((root, path)) == root:
yield path
def find_file_at_path(self, path, url):
if self.is_compressed_variant(path):
raise MissingFileError(path)
if self.index_file is not None:
if url.endswith("/"):
path = os.path.join(path, self.index_file)
return self.get_static_file(path, url)
elif url.endswith("/" + self.index_file):
if os.path.isfile(path):
return self.redirect(url, url[: -len(self.index_file)])
else:
try:
return self.get_static_file(path, url)
except IsDirectoryError:
if os.path.isfile(os.path.join(path, self.index_file)):
return self.redirect(url, url + "/")
raise MissingFileError(path)
return self.get_static_file(path, url)
@staticmethod
def url_is_canonical(url):
"""
Check that the URL path is in canonical format i.e. has normalised
slashes and no path traversal elements
"""
if "\\" in url:
return False
normalised = normpath(url)
if url.endswith("/") and url != "/":
normalised += "/"
return normalised == url
@staticmethod
def is_compressed_variant(path, stat_cache=None):
if path[-3:] in (".gz", ".br"):
uncompressed_path = path[:-3]
if stat_cache is None:
return os.path.isfile(uncompressed_path)
else:
return uncompressed_path in stat_cache
return False
def get_static_file(self, path, url, stat_cache=None):
# Optimization: bail early if file does not exist
if stat_cache is None and not os.path.exists(path):
raise MissingFileError(path)
headers = Headers([])
self.add_mime_headers(headers, path, url)
self.add_cache_headers(headers, path, url)
if self.allow_all_origins:
headers["Access-Control-Allow-Origin"] = "*"
if self.add_headers_function is not None:
self.add_headers_function(headers, path, url)
return StaticFile(
path,
headers.items(),
stat_cache=stat_cache,
encodings={"gzip": path + ".gz", "br": path + ".br"},
)
def add_mime_headers(self, headers, path, url):
media_type = self.media_types.get_type(path)
if media_type.startswith("text/"):
params = {"charset": str(self.charset)}
else:
params = {}
headers.add_header("Content-Type", str(media_type), **params)
def add_cache_headers(self, headers, path, url):
if self.immutable_file_test(path, url):
headers["Cache-Control"] = "max-age={}, public, immutable".format(
self.FOREVER
)
elif self.max_age is not None:
headers["Cache-Control"] = f"max-age={self.max_age}, public"
def immutable_file_test(self, path, url):
"""
This should be implemented by sub-classes (see e.g. WhiteNoiseMiddleware)
or by setting the `immutable_file_test` config option
"""
return False
def redirect(self, from_url, to_url):
"""
Return a relative 302 redirect
We use relative redirects as we don't know the absolute URL the app is
being hosted under
"""
if to_url == from_url + "/":
relative_url = from_url.split("/")[-1] + "/"
elif from_url == to_url + self.index_file:
relative_url = "./"
else:
raise ValueError(f"Cannot handle redirect: {from_url} > {to_url}")
if self.max_age is not None:
headers = {"Cache-Control": f"max-age={self.max_age}, public"}
else:
headers = {}
return Redirect(relative_url, headers=headers)
def scantree(root):
"""
Recurse the given directory yielding (pathname, os.stat(pathname)) pairs
"""
for entry in os.scandir(root):
if entry.is_dir():
yield from scantree(entry.path)
else:
yield entry.path, entry.stat()

View File

@@ -0,0 +1,189 @@
from __future__ import annotations
import argparse
import gzip
import os
import re
from io import BytesIO
try:
import brotli
brotli_installed = True
except ImportError: # pragma: no cover
brotli_installed = False
class Compressor:
# Extensions that it's not worth trying to compress
SKIP_COMPRESS_EXTENSIONS = (
# Images
"jpg",
"jpeg",
"png",
"gif",
"webp",
# Compressed files
"zip",
"gz",
"tgz",
"bz2",
"tbz",
"xz",
"br",
# Flash
"swf",
"flv",
# Fonts
"woff",
"woff2",
# Video
"3gp",
"3gpp",
"asf",
"avi",
"m4v",
"mov",
"mp4",
"mpeg",
"mpg",
"webm",
"wmv",
)
def __init__(
self, extensions=None, use_gzip=True, use_brotli=True, log=print, quiet=False
):
if extensions is None:
extensions = self.SKIP_COMPRESS_EXTENSIONS
self.extension_re = self.get_extension_re(extensions)
self.use_gzip = use_gzip
self.use_brotli = use_brotli and brotli_installed
if not quiet:
self.log = log
@staticmethod
def get_extension_re(extensions):
if not extensions:
return re.compile("^$")
else:
return re.compile(
r"\.({})$".format("|".join(map(re.escape, extensions))), re.IGNORECASE
)
def should_compress(self, filename):
return not self.extension_re.search(filename)
def log(self, message):
pass
def compress(self, path):
with open(path, "rb") as f:
stat_result = os.fstat(f.fileno())
data = f.read()
size = len(data)
if self.use_brotli:
compressed = self.compress_brotli(data)
if self.is_compressed_effectively("Brotli", path, size, compressed):
yield self.write_data(path, compressed, ".br", stat_result)
else:
# If Brotli compression wasn't effective gzip won't be either
return
if self.use_gzip:
compressed = self.compress_gzip(data)
if self.is_compressed_effectively("Gzip", path, size, compressed):
yield self.write_data(path, compressed, ".gz", stat_result)
@staticmethod
def compress_gzip(data):
output = BytesIO()
# Explicitly set mtime to 0 so gzip content is fully determined
# by file content (0 = "no timestamp" according to gzip spec)
with gzip.GzipFile(
filename="", mode="wb", fileobj=output, compresslevel=9, mtime=0
) as gz_file:
gz_file.write(data)
return output.getvalue()
@staticmethod
def compress_brotli(data):
return brotli.compress(data)
def is_compressed_effectively(self, encoding_name, path, orig_size, data):
compressed_size = len(data)
if orig_size == 0:
is_effective = False
else:
ratio = compressed_size / orig_size
is_effective = ratio <= 0.95
if is_effective:
self.log(
"{} compressed {} ({}K -> {}K)".format(
encoding_name, path, orig_size // 1024, compressed_size // 1024
)
)
else:
self.log(f"Skipping {path} ({encoding_name} compression not effective)")
return is_effective
def write_data(self, path, data, suffix, stat_result):
filename = path + suffix
with open(filename, "wb") as f:
f.write(data)
os.utime(filename, (stat_result.st_atime, stat_result.st_mtime))
return filename
def main(argv=None):
parser = argparse.ArgumentParser(
description="Search for all files inside <root> *not* matching "
"<extensions> and produce compressed versions with "
"'.gz' and '.br' suffixes (as long as this results in a "
"smaller file)"
)
parser.add_argument(
"-q", "--quiet", help="Don't produce log output", action="store_true"
)
parser.add_argument(
"--no-gzip",
help="Don't produce gzip '.gz' files",
action="store_false",
dest="use_gzip",
)
parser.add_argument(
"--no-brotli",
help="Don't produce brotli '.br' files",
action="store_false",
dest="use_brotli",
)
parser.add_argument("root", help="Path root from which to search for files")
default_exclude = ", ".join(Compressor.SKIP_COMPRESS_EXTENSIONS)
parser.add_argument(
"extensions",
nargs="*",
help=(
"File extensions to exclude from compression "
+ f"(default: {default_exclude})"
),
default=Compressor.SKIP_COMPRESS_EXTENSIONS,
)
args = parser.parse_args(argv)
compressor = Compressor(
extensions=args.extensions,
use_gzip=args.use_gzip,
use_brotli=args.use_brotli,
quiet=args.quiet,
)
for dirpath, _dirs, files in os.walk(args.root):
for filename in files:
if compressor.should_compress(filename):
path = os.path.join(dirpath, filename)
for _compressed in compressor.compress(path):
pass
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

View File

@@ -0,0 +1,137 @@
from __future__ import annotations
import os
class MediaTypes:
__slots__ = ("types_map",)
def __init__(self, *, extra_types: dict[str, str] | None = None) -> None:
self.types_map = default_types()
if extra_types is not None:
self.types_map.update(extra_types)
def get_type(self, path: str) -> str:
name = os.path.basename(path).lower()
media_type = self.types_map.get(name)
if media_type is not None:
return media_type
extension = os.path.splitext(name)[1]
return self.types_map.get(extension, "application/octet-stream")
def default_types() -> dict[str, str]:
"""
We use our own set of default media types rather than the system-supplied
ones. This ensures consistent media type behaviour across varied
environments. The defaults are based on those shipped with nginx, with
some custom additions.
(Auto-generated by scripts/generate_default_media_types.py)
"""
return {
".3gp": "video/3gpp",
".3gpp": "video/3gpp",
".7z": "application/x-7z-compressed",
".ai": "application/postscript",
".asf": "video/x-ms-asf",
".asx": "video/x-ms-asf",
".atom": "application/atom+xml",
".avi": "video/x-msvideo",
".avif": "image/avif",
".bmp": "image/x-ms-bmp",
".cco": "application/x-cocoa",
".crt": "application/x-x509-ca-cert",
".css": "text/css",
".der": "application/x-x509-ca-cert",
".doc": "application/msword",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".ear": "application/java-archive",
".eot": "application/vnd.ms-fontobject",
".eps": "application/postscript",
".flv": "video/x-flv",
".gif": "image/gif",
".hqx": "application/mac-binhex40",
".htc": "text/x-component",
".htm": "text/html",
".html": "text/html",
".ico": "image/x-icon",
".jad": "text/vnd.sun.j2me.app-descriptor",
".jar": "application/java-archive",
".jardiff": "application/x-java-archive-diff",
".jng": "image/x-jng",
".jnlp": "application/x-java-jnlp-file",
".jpeg": "image/jpeg",
".jpg": "image/jpeg",
".js": "text/javascript",
".json": "application/json",
".kar": "audio/midi",
".kml": "application/vnd.google-earth.kml+xml",
".kmz": "application/vnd.google-earth.kmz",
".m3u8": "application/vnd.apple.mpegurl",
".m4a": "audio/x-m4a",
".m4v": "video/x-m4v",
".md": "text/markdown",
".mid": "audio/midi",
".midi": "audio/midi",
".mjs": "text/javascript",
".mml": "text/mathml",
".mng": "video/x-mng",
".mov": "video/quicktime",
".mp3": "audio/mpeg",
".mp4": "video/mp4",
".mpeg": "video/mpeg",
".mpg": "video/mpeg",
".odg": "application/vnd.oasis.opendocument.graphics",
".odp": "application/vnd.oasis.opendocument.presentation",
".ods": "application/vnd.oasis.opendocument.spreadsheet",
".odt": "application/vnd.oasis.opendocument.text",
".ogg": "audio/ogg",
".pdb": "application/x-pilot",
".pdf": "application/pdf",
".pem": "application/x-x509-ca-cert",
".pl": "application/x-perl",
".pm": "application/x-perl",
".png": "image/png",
".ppt": "application/vnd.ms-powerpoint",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".prc": "application/x-pilot",
".ps": "application/postscript",
".ra": "audio/x-realaudio",
".rar": "application/x-rar-compressed",
".rpm": "application/x-redhat-package-manager",
".rss": "application/rss+xml",
".rtf": "application/rtf",
".run": "application/x-makeself",
".sea": "application/x-sea",
".shtml": "text/html",
".sit": "application/x-stuffit",
".svg": "image/svg+xml",
".svgz": "image/svg+xml",
".swf": "application/x-shockwave-flash",
".tcl": "application/x-tcl",
".tif": "image/tiff",
".tiff": "image/tiff",
".tk": "application/x-tcl",
".ts": "video/mp2t",
".txt": "text/plain",
".war": "application/java-archive",
".wasm": "application/wasm",
".wbmp": "image/vnd.wap.wbmp",
".webm": "video/webm",
".webp": "image/webp",
".wml": "text/vnd.wap.wml",
".wmlc": "application/vnd.wap.wmlc",
".wmv": "video/x-ms-wmv",
".woff": "application/font-woff",
".woff2": "font/woff2",
".xhtml": "application/xhtml+xml",
".xls": "application/vnd.ms-excel",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xml": "text/xml",
".xpi": "application/x-xpinstall",
".xspf": "application/xspf+xml",
".zip": "application/zip",
"apple-app-site-association": "application/pkc7-mime",
"crossdomain.xml": "text/x-cross-domain-policy",
}

View File

@@ -0,0 +1,202 @@
from __future__ import annotations
import os
from posixpath import basename
from urllib.parse import urlparse
from django.conf import settings
from django.contrib.staticfiles import finders
from django.contrib.staticfiles.storage import staticfiles_storage
from django.http import FileResponse
from django.urls import get_script_prefix
from whitenoise.base import WhiteNoise
from whitenoise.string_utils import ensure_leading_trailing_slash
__all__ = ["WhiteNoiseMiddleware"]
class WhiteNoiseFileResponse(FileResponse):
"""
Wrap Django's FileResponse to prevent setting any default headers. For the
most part these just duplicate work already done by WhiteNoise but in some
cases (e.g. the content-disposition header introduced in Django 3.0) they
are actively harmful.
"""
def set_headers(self, *args, **kwargs):
pass
class WhiteNoiseMiddleware(WhiteNoise):
"""
Wrap WhiteNoise to allow it to function as Django middleware, rather
than WSGI middleware.
"""
def __init__(self, get_response=None, settings=settings):
self.get_response = get_response
try:
autorefresh: bool = settings.WHITENOISE_AUTOREFRESH
except AttributeError:
autorefresh = settings.DEBUG
try:
max_age = settings.WHITENOISE_MAX_AGE
except AttributeError:
if settings.DEBUG:
max_age = 0
else:
max_age = 60
try:
allow_all_origins = settings.WHITENOISE_ALLOW_ALL_ORIGINS
except AttributeError:
allow_all_origins = True
try:
charset = settings.WHITENOISE_CHARSET
except AttributeError:
charset = "utf-8"
try:
mimetypes = settings.WHITENOISE_MIMETYPES
except AttributeError:
mimetypes = None
try:
add_headers_function = settings.WHITENOISE_ADD_HEADERS_FUNCTION
except AttributeError:
add_headers_function = None
try:
index_file = settings.WHITENOISE_INDEX_FILE
except AttributeError:
index_file = None
try:
immutable_file_test = settings.WHITENOISE_IMMUTABLE_FILE_TEST
except AttributeError:
immutable_file_test = None
super().__init__(
application=None,
autorefresh=autorefresh,
max_age=max_age,
allow_all_origins=allow_all_origins,
charset=charset,
mimetypes=mimetypes,
add_headers_function=add_headers_function,
index_file=index_file,
immutable_file_test=immutable_file_test,
)
try:
self.use_finders = settings.WHITENOISE_USE_FINDERS
except AttributeError:
self.use_finders = settings.DEBUG
try:
self.static_prefix = settings.WHITENOISE_STATIC_PREFIX
except AttributeError:
self.static_prefix = urlparse(settings.STATIC_URL or "").path
script_prefix = get_script_prefix().rstrip("/")
if script_prefix:
if self.static_prefix.startswith(script_prefix):
self.static_prefix = self.static_prefix[len(script_prefix) :]
self.static_prefix = ensure_leading_trailing_slash(self.static_prefix)
self.static_root = settings.STATIC_ROOT
if self.static_root:
self.add_files(self.static_root, prefix=self.static_prefix)
try:
root = settings.WHITENOISE_ROOT
except AttributeError:
root = None
if root:
self.add_files(root)
if self.use_finders and not self.autorefresh:
self.add_files_from_finders()
def __call__(self, request):
if self.autorefresh:
static_file = self.find_file(request.path_info)
else:
static_file = self.files.get(request.path_info)
if static_file is not None:
return self.serve(static_file, request)
return self.get_response(request)
@staticmethod
def serve(static_file, request):
response = static_file.get_response(request.method, request.META)
status = int(response.status)
http_response = WhiteNoiseFileResponse(response.file or (), status=status)
# Remove default content-type
del http_response["content-type"]
for key, value in response.headers:
http_response[key] = value
return http_response
def add_files_from_finders(self):
files = {}
for finder in finders.get_finders():
for path, storage in finder.list(None):
prefix = (getattr(storage, "prefix", None) or "").strip("/")
url = "".join(
(
self.static_prefix,
prefix,
"/" if prefix else "",
path.replace("\\", "/"),
)
)
# Use setdefault as only first matching file should be used
files.setdefault(url, storage.path(path))
stat_cache = {path: os.stat(path) for path in files.values()}
for url, path in files.items():
self.add_file_to_dictionary(url, path, stat_cache=stat_cache)
def candidate_paths_for_url(self, url):
if self.use_finders and url.startswith(self.static_prefix):
path = finders.find(url[len(self.static_prefix) :])
if path:
yield path
paths = super().candidate_paths_for_url(url)
for path in paths:
yield path
def immutable_file_test(self, path, url):
"""
Determine whether given URL represents an immutable file (i.e. a
file with a hash of its contents as part of its name) which can
therefore be cached forever
"""
if not url.startswith(self.static_prefix):
return False
name = url[len(self.static_prefix) :]
name_without_hash = self.get_name_without_hash(name)
if name == name_without_hash:
return False
static_url = self.get_static_url(name_without_hash)
# If the static_url function maps the name without hash
# back to the original name, then we know we've got a
# versioned filename
if static_url and basename(static_url) == basename(url):
return True
return False
def get_name_without_hash(self, filename):
"""
Removes the version hash from a filename e.g, transforms
'css/application.f3ea4bcc2.css' into 'css/application.css'
Note: this is specific to the naming scheme used by Django's
CachedStaticFilesStorage. You may have to override this if
you are using a different static files versioning system
"""
name_with_hash, ext = os.path.splitext(filename)
name = os.path.splitext(name_with_hash)[0]
return name + ext
def get_static_url(self, name):
try:
return staticfiles_storage.url(name)
except ValueError:
return None

View File

@@ -0,0 +1,287 @@
from __future__ import annotations
import errno
import os
import re
import stat
from email.utils import formatdate
from email.utils import parsedate
from http import HTTPStatus
from io import BufferedIOBase
from time import mktime
from urllib.parse import quote
from wsgiref.headers import Headers
class Response:
__slots__ = ("status", "headers", "file")
def __init__(self, status, headers, file):
self.status = status
self.headers = headers
self.file = file
NOT_ALLOWED_RESPONSE = Response(
status=HTTPStatus.METHOD_NOT_ALLOWED,
headers=[("Allow", "GET, HEAD")],
file=None,
)
# Headers which should be returned with a 304 Not Modified response as
# specified here: https://tools.ietf.org/html/rfc7232#section-4.1
NOT_MODIFIED_HEADERS = (
"Cache-Control",
"Content-Location",
"Date",
"ETag",
"Expires",
"Vary",
)
class SlicedFile(BufferedIOBase):
"""
A file like wrapper to handle seeking to the start byte of a range request
and to return no further output once the end byte of a range request has
been reached.
"""
def __init__(self, fileobj, start, end):
fileobj.seek(start)
self.fileobj = fileobj
self.remaining = end - start + 1
def read(self, size=-1):
if self.remaining <= 0:
return b""
if size < 0:
size = self.remaining
else:
size = min(size, self.remaining)
data = self.fileobj.read(size)
self.remaining -= len(data)
return data
def close(self):
self.fileobj.close()
class StaticFile:
def __init__(self, path, headers, encodings=None, stat_cache=None):
files = self.get_file_stats(path, encodings, stat_cache)
headers = self.get_headers(headers, files)
self.last_modified = parsedate(headers["Last-Modified"])
self.etag = headers["ETag"]
self.not_modified_response = self.get_not_modified_response(headers)
self.alternatives = self.get_alternatives(headers, files)
def get_response(self, method, request_headers):
if method not in ("GET", "HEAD"):
return NOT_ALLOWED_RESPONSE
if self.is_not_modified(request_headers):
return self.not_modified_response
path, headers = self.get_path_and_headers(request_headers)
if method != "HEAD":
file_handle = open(path, "rb")
else:
file_handle = None
range_header = request_headers.get("HTTP_RANGE")
if range_header:
try:
return self.get_range_response(range_header, headers, file_handle)
except ValueError:
# If we can't interpret the Range request for any reason then
# just ignore it and return the standard response (this
# behaviour is allowed by the spec)
pass
return Response(HTTPStatus.OK, headers, file_handle)
def get_range_response(self, range_header, base_headers, file_handle):
headers = []
for item in base_headers:
if item[0] == "Content-Length":
size = int(item[1])
else:
headers.append(item)
start, end = self.get_byte_range(range_header, size)
if start >= end:
return self.get_range_not_satisfiable_response(file_handle, size)
if file_handle is not None:
file_handle = SlicedFile(file_handle, start, end)
headers.append(("Content-Range", f"bytes {start}-{end}/{size}"))
headers.append(("Content-Length", str(end - start + 1)))
return Response(HTTPStatus.PARTIAL_CONTENT, headers, file_handle)
def get_byte_range(self, range_header, size):
start, end = self.parse_byte_range(range_header)
if start < 0:
start = max(start + size, 0)
if end is None:
end = size - 1
else:
end = min(end, size - 1)
return start, end
@staticmethod
def parse_byte_range(range_header):
units, _, range_spec = range_header.strip().partition("=")
if units != "bytes":
raise ValueError()
# Only handle a single range spec. Multiple ranges will trigger a
# ValueError below which will result in the Range header being ignored
start_str, sep, end_str = range_spec.strip().partition("-")
if not sep:
raise ValueError()
if not start_str:
start = -int(end_str)
end = None
else:
start = int(start_str)
end = int(end_str) if end_str else None
return start, end
@staticmethod
def get_range_not_satisfiable_response(file_handle, size):
if file_handle is not None:
file_handle.close()
return Response(
HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
[("Content-Range", f"bytes */{size}")],
None,
)
@staticmethod
def get_file_stats(path, encodings, stat_cache):
# Primary file has an encoding of None
files = {None: FileEntry(path, stat_cache)}
if encodings:
for encoding, alt_path in encodings.items():
try:
files[encoding] = FileEntry(alt_path, stat_cache)
except MissingFileError:
continue
return files
def get_headers(self, headers_list, files):
headers = Headers(headers_list)
main_file = files[None]
if len(files) > 1:
headers["Vary"] = "Accept-Encoding"
if "Last-Modified" not in headers:
mtime = main_file.mtime
# Not all filesystems report mtimes, and sometimes they report an
# mtime of 0 which we know is incorrect
if mtime:
headers["Last-Modified"] = formatdate(mtime, usegmt=True)
if "ETag" not in headers:
last_modified = parsedate(headers["Last-Modified"])
if last_modified:
timestamp = int(mktime(last_modified))
headers["ETag"] = f'"{timestamp:x}-{main_file.size:x}"'
return headers
@staticmethod
def get_not_modified_response(headers):
not_modified_headers = []
for key in NOT_MODIFIED_HEADERS:
if key in headers:
not_modified_headers.append((key, headers[key]))
return Response(
status=HTTPStatus.NOT_MODIFIED, headers=not_modified_headers, file=None
)
@staticmethod
def get_alternatives(base_headers, files):
# Sort by size so that the smallest compressed alternative matches first
alternatives = []
files_by_size = sorted(files.items(), key=lambda i: i[1].size)
for encoding, file_entry in files_by_size:
headers = Headers(base_headers.items())
headers["Content-Length"] = str(file_entry.size)
if encoding:
headers["Content-Encoding"] = encoding
encoding_re = re.compile(r"\b%s\b" % encoding)
else:
encoding_re = re.compile("")
alternatives.append((encoding_re, file_entry.path, headers.items()))
return alternatives
def is_not_modified(self, request_headers):
previous_etag = request_headers.get("HTTP_IF_NONE_MATCH")
if previous_etag is not None:
return previous_etag == self.etag
if self.last_modified is None:
return False
try:
last_requested = request_headers["HTTP_IF_MODIFIED_SINCE"]
except KeyError:
return False
last_requested_ts = parsedate(last_requested)
if last_requested_ts is not None:
return last_requested_ts >= self.last_modified
return False
def get_path_and_headers(self, request_headers):
accept_encoding = request_headers.get("HTTP_ACCEPT_ENCODING", "")
if accept_encoding == "*":
accept_encoding = ""
# These are sorted by size so first match is the best
for encoding_re, path, headers in self.alternatives:
if encoding_re.search(accept_encoding):
return path, headers
class Redirect:
def __init__(self, location, headers=None):
headers = list(headers.items()) if headers else []
headers.append(("Location", quote(location.encode("utf8"))))
self.response = Response(HTTPStatus.FOUND, headers, None)
def get_response(self, method, request_headers):
return self.response
class NotARegularFileError(Exception):
pass
class MissingFileError(NotARegularFileError):
pass
class IsDirectoryError(MissingFileError):
pass
class FileEntry:
__slots__ = ("path", "size", "mtime")
def __init__(self, path, stat_cache=None):
self.path = path
stat_function = os.stat if stat_cache is None else stat_cache.__getitem__
stat = self.stat_regular_file(path, stat_function)
self.size = stat.st_size
self.mtime = stat.st_mtime
@staticmethod
def stat_regular_file(path, stat_function):
"""
Wrap `stat_function` to raise appropriate errors if `path` is not a
regular file
"""
try:
stat_result = stat_function(path)
except KeyError:
raise MissingFileError(path)
except OSError as e:
if e.errno in (errno.ENOENT, errno.ENAMETOOLONG):
raise MissingFileError(path)
else:
raise
if not stat.S_ISREG(stat_result.st_mode):
if stat.S_ISDIR(stat_result.st_mode):
raise IsDirectoryError(f"Path is a directory: {path}")
else:
raise NotARegularFileError(f"Not a regular file: {path}")
return stat_result

View File

@@ -0,0 +1,53 @@
"""
Subclass the existing 'runserver' command and change the default options
to disable static file serving, allowing WhiteNoise to handle static files.
There is some unpleasant hackery here because we don't know which command class
to subclass until runtime as it depends on which INSTALLED_APPS we have, so we
have to determine this dynamically.
"""
from __future__ import annotations
from importlib import import_module
from django.apps import apps
def get_next_runserver_command():
"""
Return the next highest priority "runserver" command class
"""
for app_name in get_lower_priority_apps():
module_path = "%s.management.commands.runserver" % app_name
try:
return import_module(module_path).Command
except (ImportError, AttributeError):
pass
def get_lower_priority_apps():
"""
Yield all app module names below the current app in the INSTALLED_APPS list
"""
self_app_name = ".".join(__name__.split(".")[:-3])
reached_self = False
for app_config in apps.get_app_configs():
if app_config.name == self_app_name:
reached_self = True
elif reached_self:
yield app_config.name
yield "django.core"
RunserverCommand = get_next_runserver_command()
class Command(RunserverCommand):
def add_arguments(self, parser):
super().add_arguments(parser)
if parser.get_default("use_static_handler") is True:
parser.set_defaults(use_static_handler=False)
parser.description += (
"\n(Wrapped by 'whitenoise.runserver_nostatic' to always"
" enable '--nostatic')"
)

View File

@@ -0,0 +1,178 @@
from __future__ import annotations
import errno
import os
import re
import textwrap
from typing import Any
from typing import Iterator
from typing import Tuple
from typing import Union
from django.conf import settings
from django.contrib.staticfiles.storage import ManifestStaticFilesStorage
from django.contrib.staticfiles.storage import StaticFilesStorage
from whitenoise.compress import Compressor
_PostProcessT = Iterator[Union[Tuple[str, str, bool], Tuple[str, None, RuntimeError]]]
class CompressedStaticFilesStorage(StaticFilesStorage):
"""
StaticFilesStorage subclass that compresses output files.
"""
def post_process(
self, paths: dict[str, Any], dry_run: bool = False, **options: Any
) -> _PostProcessT:
if dry_run:
return
extensions = getattr(settings, "WHITENOISE_SKIP_COMPRESS_EXTENSIONS", None)
compressor = self.create_compressor(extensions=extensions, quiet=True)
for path in paths:
if compressor.should_compress(path):
full_path = self.path(path)
prefix_len = len(full_path) - len(path)
for compressed_path in compressor.compress(full_path):
compressed_name = compressed_path[prefix_len:]
yield path, compressed_name, True
def create_compressor(self, **kwargs: Any) -> Compressor:
return Compressor(**kwargs)
class MissingFileError(ValueError):
pass
class CompressedManifestStaticFilesStorage(ManifestStaticFilesStorage):
"""
Extends ManifestStaticFilesStorage instance to create compressed versions
of its output files and, optionally, to delete the non-hashed files (i.e.
those without the hash in their name)
"""
_new_files = None
def __init__(self, *args, **kwargs):
manifest_strict = getattr(settings, "WHITENOISE_MANIFEST_STRICT", None)
if manifest_strict is not None:
self.manifest_strict = manifest_strict
super().__init__(*args, **kwargs)
def post_process(self, *args, **kwargs):
files = super().post_process(*args, **kwargs)
if not kwargs.get("dry_run"):
files = self.post_process_with_compression(files)
# Make exception messages helpful
for name, hashed_name, processed in files:
if isinstance(processed, Exception):
processed = self.make_helpful_exception(processed, name)
yield name, hashed_name, processed
def post_process_with_compression(self, files):
# Files may get hashed multiple times, we want to keep track of all the
# intermediate files generated during the process and which of these
# are the final names used for each file. As not every intermediate
# file is yielded we have to hook in to the `hashed_name` method to
# keep track of them all.
hashed_names = {}
new_files = set()
self.start_tracking_new_files(new_files)
for name, hashed_name, processed in files:
if hashed_name and not isinstance(processed, Exception):
hashed_names[self.clean_name(name)] = hashed_name
yield name, hashed_name, processed
self.stop_tracking_new_files()
original_files = set(hashed_names.keys())
hashed_files = set(hashed_names.values())
if self.keep_only_hashed_files:
files_to_delete = (original_files | new_files) - hashed_files
files_to_compress = hashed_files
else:
files_to_delete = set()
files_to_compress = original_files | hashed_files
self.delete_files(files_to_delete)
for name, compressed_name in self.compress_files(files_to_compress):
yield name, compressed_name, True
def hashed_name(self, *args, **kwargs):
name = super().hashed_name(*args, **kwargs)
if self._new_files is not None:
self._new_files.add(self.clean_name(name))
return name
def start_tracking_new_files(self, new_files):
self._new_files = new_files
def stop_tracking_new_files(self):
self._new_files = None
@property
def keep_only_hashed_files(self):
return getattr(settings, "WHITENOISE_KEEP_ONLY_HASHED_FILES", False)
def delete_files(self, files_to_delete):
for name in files_to_delete:
try:
os.unlink(self.path(name))
except OSError as e:
if e.errno != errno.ENOENT:
raise
def create_compressor(self, **kwargs):
return Compressor(**kwargs)
def compress_files(self, names):
extensions = getattr(settings, "WHITENOISE_SKIP_COMPRESS_EXTENSIONS", None)
compressor = self.create_compressor(extensions=extensions, quiet=True)
for name in names:
if compressor.should_compress(name):
path = self.path(name)
prefix_len = len(path) - len(name)
for compressed_path in compressor.compress(path):
compressed_name = compressed_path[prefix_len:]
yield name, compressed_name
def make_helpful_exception(self, exception, name):
"""
If a CSS file contains references to images, fonts etc that can't be found
then Django's `post_process` blows up with a not particularly helpful
ValueError that leads people to think WhiteNoise is broken.
Here we attempt to intercept such errors and reformat them to be more
helpful in revealing the source of the problem.
"""
if isinstance(exception, ValueError):
message = exception.args[0] if len(exception.args) else ""
# Stringly typed exceptions. Yay!
match = self._error_msg_re.search(message)
if match:
extension = os.path.splitext(name)[1].lstrip(".").upper()
message = self._error_msg.format(
orig_message=message,
filename=name,
missing=match.group(1),
ext=extension,
)
exception = MissingFileError(message)
return exception
_error_msg_re = re.compile(r"^The file '(.+)' could not be found")
_error_msg = textwrap.dedent(
"""\
{orig_message}
The {ext} file '{filename}' references a file which could not be found:
{missing}
Please check the URL references in this {ext} file, particularly any
relative paths which might be pointing to the wrong location.
"""
)

View File

@@ -0,0 +1,13 @@
from __future__ import annotations
# Follow Django in treating URLs as UTF-8 encoded (which requires undoing the
# implicit ISO-8859-1 decoding applied in Python 3). Strictly speaking, URLs
# should only be ASCII anyway, but UTF-8 can be found in the wild.
def decode_path_info(path_info):
return path_info.encode("iso-8859-1", "replace").decode("utf-8", "replace")
def ensure_leading_trailing_slash(path):
path = (path or "").strip("/")
return f"/{path}/" if path else "/"