123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- """The optional bytecode cache system. This is useful if you have very
- complex template situations and the compilation of all those templates
- slows down your application too much.
- Situations where this is useful are often forking web applications that
- are initialized on the first request.
- """
- import errno
- import fnmatch
- import marshal
- import os
- import pickle
- import stat
- import sys
- import tempfile
- import typing as t
- from hashlib import sha1
- from io import BytesIO
- from types import CodeType
- if t.TYPE_CHECKING:
- import typing_extensions as te
- from .environment import Environment
- class _MemcachedClient(te.Protocol):
- def get(self, key: str) -> bytes: ...
- def set(
- self, key: str, value: bytes, timeout: t.Optional[int] = None
- ) -> None: ...
- bc_version = 5
- # Magic bytes to identify Jinja bytecode cache files. Contains the
- # Python major and minor version to avoid loading incompatible bytecode
- # if a project upgrades its Python version.
- bc_magic = (
- b"j2"
- + pickle.dumps(bc_version, 2)
- + pickle.dumps((sys.version_info[0] << 24) | sys.version_info[1], 2)
- )
- class Bucket:
- """Buckets are used to store the bytecode for one template. It's created
- and initialized by the bytecode cache and passed to the loading functions.
- The buckets get an internal checksum from the cache assigned and use this
- to automatically reject outdated cache material. Individual bytecode
- cache subclasses don't have to care about cache invalidation.
- """
- def __init__(self, environment: "Environment", key: str, checksum: str) -> None:
- self.environment = environment
- self.key = key
- self.checksum = checksum
- self.reset()
- def reset(self) -> None:
- """Resets the bucket (unloads the bytecode)."""
- self.code: t.Optional[CodeType] = None
- def load_bytecode(self, f: t.BinaryIO) -> None:
- """Loads bytecode from a file or file like object."""
- # make sure the magic header is correct
- magic = f.read(len(bc_magic))
- if magic != bc_magic:
- self.reset()
- return
- # the source code of the file changed, we need to reload
- checksum = pickle.load(f)
- if self.checksum != checksum:
- self.reset()
- return
- # if marshal_load fails then we need to reload
- try:
- self.code = marshal.load(f)
- except (EOFError, ValueError, TypeError):
- self.reset()
- return
- def write_bytecode(self, f: t.IO[bytes]) -> None:
- """Dump the bytecode into the file or file like object passed."""
- if self.code is None:
- raise TypeError("can't write empty bucket")
- f.write(bc_magic)
- pickle.dump(self.checksum, f, 2)
- marshal.dump(self.code, f)
- def bytecode_from_string(self, string: bytes) -> None:
- """Load bytecode from bytes."""
- self.load_bytecode(BytesIO(string))
- def bytecode_to_string(self) -> bytes:
- """Return the bytecode as bytes."""
- out = BytesIO()
- self.write_bytecode(out)
- return out.getvalue()
- class BytecodeCache:
- """To implement your own bytecode cache you have to subclass this class
- and override :meth:`load_bytecode` and :meth:`dump_bytecode`. Both of
- these methods are passed a :class:`~jinja2.bccache.Bucket`.
- A very basic bytecode cache that saves the bytecode on the file system::
- from os import path
- class MyCache(BytecodeCache):
- def __init__(self, directory):
- self.directory = directory
- def load_bytecode(self, bucket):
- filename = path.join(self.directory, bucket.key)
- if path.exists(filename):
- with open(filename, 'rb') as f:
- bucket.load_bytecode(f)
- def dump_bytecode(self, bucket):
- filename = path.join(self.directory, bucket.key)
- with open(filename, 'wb') as f:
- bucket.write_bytecode(f)
- A more advanced version of a filesystem based bytecode cache is part of
- Jinja.
- """
- def load_bytecode(self, bucket: Bucket) -> None:
- """Subclasses have to override this method to load bytecode into a
- bucket. If they are not able to find code in the cache for the
- bucket, it must not do anything.
- """
- raise NotImplementedError()
- def dump_bytecode(self, bucket: Bucket) -> None:
- """Subclasses have to override this method to write the bytecode
- from a bucket back to the cache. If it unable to do so it must not
- fail silently but raise an exception.
- """
- raise NotImplementedError()
- def clear(self) -> None:
- """Clears the cache. This method is not used by Jinja but should be
- implemented to allow applications to clear the bytecode cache used
- by a particular environment.
- """
- def get_cache_key(
- self, name: str, filename: t.Optional[t.Union[str]] = None
- ) -> str:
- """Returns the unique hash key for this template name."""
- hash = sha1(name.encode("utf-8"))
- if filename is not None:
- hash.update(f"|{filename}".encode())
- return hash.hexdigest()
- def get_source_checksum(self, source: str) -> str:
- """Returns a checksum for the source."""
- return sha1(source.encode("utf-8")).hexdigest()
- def get_bucket(
- self,
- environment: "Environment",
- name: str,
- filename: t.Optional[str],
- source: str,
- ) -> Bucket:
- """Return a cache bucket for the given template. All arguments are
- mandatory but filename may be `None`.
- """
- key = self.get_cache_key(name, filename)
- checksum = self.get_source_checksum(source)
- bucket = Bucket(environment, key, checksum)
- self.load_bytecode(bucket)
- return bucket
- def set_bucket(self, bucket: Bucket) -> None:
- """Put the bucket into the cache."""
- self.dump_bytecode(bucket)
- class FileSystemBytecodeCache(BytecodeCache):
- """A bytecode cache that stores bytecode on the filesystem. It accepts
- two arguments: The directory where the cache items are stored and a
- pattern string that is used to build the filename.
- If no directory is specified a default cache directory is selected. On
- Windows the user's temp directory is used, on UNIX systems a directory
- is created for the user in the system temp directory.
- The pattern can be used to have multiple separate caches operate on the
- same directory. The default pattern is ``'__jinja2_%s.cache'``. ``%s``
- is replaced with the cache key.
- >>> bcc = FileSystemBytecodeCache('/tmp/jinja_cache', '%s.cache')
- This bytecode cache supports clearing of the cache using the clear method.
- """
- def __init__(
- self, directory: t.Optional[str] = None, pattern: str = "__jinja2_%s.cache"
- ) -> None:
- if directory is None:
- directory = self._get_default_cache_dir()
- self.directory = directory
- self.pattern = pattern
- def _get_default_cache_dir(self) -> str:
- def _unsafe_dir() -> "te.NoReturn":
- raise RuntimeError(
- "Cannot determine safe temp directory. You "
- "need to explicitly provide one."
- )
- tmpdir = tempfile.gettempdir()
- # On windows the temporary directory is used specific unless
- # explicitly forced otherwise. We can just use that.
- if os.name == "nt":
- return tmpdir
- if not hasattr(os, "getuid"):
- _unsafe_dir()
- dirname = f"_jinja2-cache-{os.getuid()}"
- actual_dir = os.path.join(tmpdir, dirname)
- try:
- os.mkdir(actual_dir, stat.S_IRWXU)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise
- try:
- os.chmod(actual_dir, stat.S_IRWXU)
- actual_dir_stat = os.lstat(actual_dir)
- if (
- actual_dir_stat.st_uid != os.getuid()
- or not stat.S_ISDIR(actual_dir_stat.st_mode)
- or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
- ):
- _unsafe_dir()
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise
- actual_dir_stat = os.lstat(actual_dir)
- if (
- actual_dir_stat.st_uid != os.getuid()
- or not stat.S_ISDIR(actual_dir_stat.st_mode)
- or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
- ):
- _unsafe_dir()
- return actual_dir
- def _get_cache_filename(self, bucket: Bucket) -> str:
- return os.path.join(self.directory, self.pattern % (bucket.key,))
- def load_bytecode(self, bucket: Bucket) -> None:
- filename = self._get_cache_filename(bucket)
- # Don't test for existence before opening the file, since the
- # file could disappear after the test before the open.
- try:
- f = open(filename, "rb")
- except (FileNotFoundError, IsADirectoryError, PermissionError):
- # PermissionError can occur on Windows when an operation is
- # in progress, such as calling clear().
- return
- with f:
- bucket.load_bytecode(f)
- def dump_bytecode(self, bucket: Bucket) -> None:
- # Write to a temporary file, then rename to the real name after
- # writing. This avoids another process reading the file before
- # it is fully written.
- name = self._get_cache_filename(bucket)
- f = tempfile.NamedTemporaryFile(
- mode="wb",
- dir=os.path.dirname(name),
- prefix=os.path.basename(name),
- suffix=".tmp",
- delete=False,
- )
- def remove_silent() -> None:
- try:
- os.remove(f.name)
- except OSError:
- # Another process may have called clear(). On Windows,
- # another program may be holding the file open.
- pass
- try:
- with f:
- bucket.write_bytecode(f)
- except BaseException:
- remove_silent()
- raise
- try:
- os.replace(f.name, name)
- except OSError:
- # Another process may have called clear(). On Windows,
- # another program may be holding the file open.
- remove_silent()
- except BaseException:
- remove_silent()
- raise
- def clear(self) -> None:
- # imported lazily here because google app-engine doesn't support
- # write access on the file system and the function does not exist
- # normally.
- from os import remove
- files = fnmatch.filter(os.listdir(self.directory), self.pattern % ("*",))
- for filename in files:
- try:
- remove(os.path.join(self.directory, filename))
- except OSError:
- pass
- class MemcachedBytecodeCache(BytecodeCache):
- """This class implements a bytecode cache that uses a memcache cache for
- storing the information. It does not enforce a specific memcache library
- (tummy's memcache or cmemcache) but will accept any class that provides
- the minimal interface required.
- Libraries compatible with this class:
- - `cachelib <https://github.com/pallets/cachelib>`_
- - `python-memcached <https://pypi.org/project/python-memcached/>`_
- (Unfortunately the django cache interface is not compatible because it
- does not support storing binary data, only text. You can however pass
- the underlying cache client to the bytecode cache which is available
- as `django.core.cache.cache._client`.)
- The minimal interface for the client passed to the constructor is this:
- .. class:: MinimalClientInterface
- .. method:: set(key, value[, timeout])
- Stores the bytecode in the cache. `value` is a string and
- `timeout` the timeout of the key. If timeout is not provided
- a default timeout or no timeout should be assumed, if it's
- provided it's an integer with the number of seconds the cache
- item should exist.
- .. method:: get(key)
- Returns the value for the cache key. If the item does not
- exist in the cache the return value must be `None`.
- The other arguments to the constructor are the prefix for all keys that
- is added before the actual cache key and the timeout for the bytecode in
- the cache system. We recommend a high (or no) timeout.
- This bytecode cache does not support clearing of used items in the cache.
- The clear method is a no-operation function.
- .. versionadded:: 2.7
- Added support for ignoring memcache errors through the
- `ignore_memcache_errors` parameter.
- """
- def __init__(
- self,
- client: "_MemcachedClient",
- prefix: str = "jinja2/bytecode/",
- timeout: t.Optional[int] = None,
- ignore_memcache_errors: bool = True,
- ):
- self.client = client
- self.prefix = prefix
- self.timeout = timeout
- self.ignore_memcache_errors = ignore_memcache_errors
- def load_bytecode(self, bucket: Bucket) -> None:
- try:
- code = self.client.get(self.prefix + bucket.key)
- except Exception:
- if not self.ignore_memcache_errors:
- raise
- else:
- bucket.bytecode_from_string(code)
- def dump_bytecode(self, bucket: Bucket) -> None:
- key = self.prefix + bucket.key
- value = bucket.bytecode_to_string()
- try:
- if self.timeout is not None:
- self.client.set(key, value, self.timeout)
- else:
- self.client.set(key, value)
- except Exception:
- if not self.ignore_memcache_errors:
- raise
|