serializer.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. from __future__ import annotations
  2. import collections.abc as cabc
  3. import json
  4. import typing as t
  5. from .encoding import want_bytes
  6. from .exc import BadPayload
  7. from .exc import BadSignature
  8. from .signer import _make_keys_list
  9. from .signer import Signer
  10. if t.TYPE_CHECKING:
  11. import typing_extensions as te
  12. # This should be either be str or bytes. To avoid having to specify the
  13. # bound type, it falls back to a union if structural matching fails.
  14. _TSerialized = te.TypeVar(
  15. "_TSerialized", bound=t.Union[str, bytes], default=t.Union[str, bytes]
  16. )
  17. else:
  18. # Still available at runtime on Python < 3.13, but without the default.
  19. _TSerialized = t.TypeVar("_TSerialized", bound=t.Union[str, bytes])
  20. class _PDataSerializer(t.Protocol[_TSerialized]):
  21. def loads(self, payload: _TSerialized, /) -> t.Any: ...
  22. # A signature with additional arguments is not handled correctly by type
  23. # checkers right now, so an overload is used below for serializers that
  24. # don't match this strict protocol.
  25. def dumps(self, obj: t.Any, /) -> _TSerialized: ...
  26. # Use TypeIs once it's available in typing_extensions or 3.13.
  27. def is_text_serializer(
  28. serializer: _PDataSerializer[t.Any],
  29. ) -> te.TypeGuard[_PDataSerializer[str]]:
  30. """Checks whether a serializer generates text or binary."""
  31. return isinstance(serializer.dumps({}), str)
  32. class Serializer(t.Generic[_TSerialized]):
  33. """A serializer wraps a :class:`~itsdangerous.signer.Signer` to
  34. enable serializing and securely signing data other than bytes. It
  35. can unsign to verify that the data hasn't been changed.
  36. The serializer provides :meth:`dumps` and :meth:`loads`, similar to
  37. :mod:`json`, and by default uses :mod:`json` internally to serialize
  38. the data to bytes.
  39. The secret key should be a random string of ``bytes`` and should not
  40. be saved to code or version control. Different salts should be used
  41. to distinguish signing in different contexts. See :doc:`/concepts`
  42. for information about the security of the secret key and salt.
  43. :param secret_key: The secret key to sign and verify with. Can be a
  44. list of keys, oldest to newest, to support key rotation.
  45. :param salt: Extra key to combine with ``secret_key`` to distinguish
  46. signatures in different contexts.
  47. :param serializer: An object that provides ``dumps`` and ``loads``
  48. methods for serializing data to a string. Defaults to
  49. :attr:`default_serializer`, which defaults to :mod:`json`.
  50. :param serializer_kwargs: Keyword arguments to pass when calling
  51. ``serializer.dumps``.
  52. :param signer: A ``Signer`` class to instantiate when signing data.
  53. Defaults to :attr:`default_signer`, which defaults to
  54. :class:`~itsdangerous.signer.Signer`.
  55. :param signer_kwargs: Keyword arguments to pass when instantiating
  56. the ``Signer`` class.
  57. :param fallback_signers: List of signer parameters to try when
  58. unsigning with the default signer fails. Each item can be a dict
  59. of ``signer_kwargs``, a ``Signer`` class, or a tuple of
  60. ``(signer, signer_kwargs)``. Defaults to
  61. :attr:`default_fallback_signers`.
  62. .. versionchanged:: 2.0
  63. Added support for key rotation by passing a list to
  64. ``secret_key``.
  65. .. versionchanged:: 2.0
  66. Removed the default SHA-512 fallback signer from
  67. ``default_fallback_signers``.
  68. .. versionchanged:: 1.1
  69. Added support for ``fallback_signers`` and configured a default
  70. SHA-512 fallback. This fallback is for users who used the yanked
  71. 1.0.0 release which defaulted to SHA-512.
  72. .. versionchanged:: 0.14
  73. The ``signer`` and ``signer_kwargs`` parameters were added to
  74. the constructor.
  75. """
  76. #: The default serialization module to use to serialize data to a
  77. #: string internally. The default is :mod:`json`, but can be changed
  78. #: to any object that provides ``dumps`` and ``loads`` methods.
  79. default_serializer: _PDataSerializer[t.Any] = json
  80. #: The default ``Signer`` class to instantiate when signing data.
  81. #: The default is :class:`itsdangerous.signer.Signer`.
  82. default_signer: type[Signer] = Signer
  83. #: The default fallback signers to try when unsigning fails.
  84. default_fallback_signers: list[
  85. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  86. ] = []
  87. # Serializer[str] if no data serializer is provided, or if it returns str.
  88. @t.overload
  89. def __init__(
  90. self: Serializer[str],
  91. secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
  92. salt: str | bytes | None = b"itsdangerous",
  93. serializer: None | _PDataSerializer[str] = None,
  94. serializer_kwargs: dict[str, t.Any] | None = None,
  95. signer: type[Signer] | None = None,
  96. signer_kwargs: dict[str, t.Any] | None = None,
  97. fallback_signers: list[
  98. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  99. ]
  100. | None = None,
  101. ): ...
  102. # Serializer[bytes] with a bytes data serializer positional argument.
  103. @t.overload
  104. def __init__(
  105. self: Serializer[bytes],
  106. secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
  107. salt: str | bytes | None,
  108. serializer: _PDataSerializer[bytes],
  109. serializer_kwargs: dict[str, t.Any] | None = None,
  110. signer: type[Signer] | None = None,
  111. signer_kwargs: dict[str, t.Any] | None = None,
  112. fallback_signers: list[
  113. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  114. ]
  115. | None = None,
  116. ): ...
  117. # Serializer[bytes] with a bytes data serializer keyword argument.
  118. @t.overload
  119. def __init__(
  120. self: Serializer[bytes],
  121. secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
  122. salt: str | bytes | None = b"itsdangerous",
  123. *,
  124. serializer: _PDataSerializer[bytes],
  125. serializer_kwargs: dict[str, t.Any] | None = None,
  126. signer: type[Signer] | None = None,
  127. signer_kwargs: dict[str, t.Any] | None = None,
  128. fallback_signers: list[
  129. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  130. ]
  131. | None = None,
  132. ): ...
  133. # Fall back with a positional argument. If the strict signature of
  134. # _PDataSerializer doesn't match, fall back to a union, requiring the user
  135. # to specify the type.
  136. @t.overload
  137. def __init__(
  138. self,
  139. secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
  140. salt: str | bytes | None,
  141. serializer: t.Any,
  142. serializer_kwargs: dict[str, t.Any] | None = None,
  143. signer: type[Signer] | None = None,
  144. signer_kwargs: dict[str, t.Any] | None = None,
  145. fallback_signers: list[
  146. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  147. ]
  148. | None = None,
  149. ): ...
  150. # Fall back with a keyword argument.
  151. @t.overload
  152. def __init__(
  153. self,
  154. secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
  155. salt: str | bytes | None = b"itsdangerous",
  156. *,
  157. serializer: t.Any,
  158. serializer_kwargs: dict[str, t.Any] | None = None,
  159. signer: type[Signer] | None = None,
  160. signer_kwargs: dict[str, t.Any] | None = None,
  161. fallback_signers: list[
  162. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  163. ]
  164. | None = None,
  165. ): ...
  166. def __init__(
  167. self,
  168. secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
  169. salt: str | bytes | None = b"itsdangerous",
  170. serializer: t.Any | None = None,
  171. serializer_kwargs: dict[str, t.Any] | None = None,
  172. signer: type[Signer] | None = None,
  173. signer_kwargs: dict[str, t.Any] | None = None,
  174. fallback_signers: list[
  175. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  176. ]
  177. | None = None,
  178. ):
  179. #: The list of secret keys to try for verifying signatures, from
  180. #: oldest to newest. The newest (last) key is used for signing.
  181. #:
  182. #: This allows a key rotation system to keep a list of allowed
  183. #: keys and remove expired ones.
  184. self.secret_keys: list[bytes] = _make_keys_list(secret_key)
  185. if salt is not None:
  186. salt = want_bytes(salt)
  187. # if salt is None then the signer's default is used
  188. self.salt = salt
  189. if serializer is None:
  190. serializer = self.default_serializer
  191. self.serializer: _PDataSerializer[_TSerialized] = serializer
  192. self.is_text_serializer: bool = is_text_serializer(serializer)
  193. if signer is None:
  194. signer = self.default_signer
  195. self.signer: type[Signer] = signer
  196. self.signer_kwargs: dict[str, t.Any] = signer_kwargs or {}
  197. if fallback_signers is None:
  198. fallback_signers = list(self.default_fallback_signers)
  199. self.fallback_signers: list[
  200. dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
  201. ] = fallback_signers
  202. self.serializer_kwargs: dict[str, t.Any] = serializer_kwargs or {}
  203. @property
  204. def secret_key(self) -> bytes:
  205. """The newest (last) entry in the :attr:`secret_keys` list. This
  206. is for compatibility from before key rotation support was added.
  207. """
  208. return self.secret_keys[-1]
  209. def load_payload(
  210. self, payload: bytes, serializer: _PDataSerializer[t.Any] | None = None
  211. ) -> t.Any:
  212. """Loads the encoded object. This function raises
  213. :class:`.BadPayload` if the payload is not valid. The
  214. ``serializer`` parameter can be used to override the serializer
  215. stored on the class. The encoded ``payload`` should always be
  216. bytes.
  217. """
  218. if serializer is None:
  219. use_serializer = self.serializer
  220. is_text = self.is_text_serializer
  221. else:
  222. use_serializer = serializer
  223. is_text = is_text_serializer(serializer)
  224. try:
  225. if is_text:
  226. return use_serializer.loads(payload.decode("utf-8")) # type: ignore[arg-type]
  227. return use_serializer.loads(payload) # type: ignore[arg-type]
  228. except Exception as e:
  229. raise BadPayload(
  230. "Could not load the payload because an exception"
  231. " occurred on unserializing the data.",
  232. original_error=e,
  233. ) from e
  234. def dump_payload(self, obj: t.Any) -> bytes:
  235. """Dumps the encoded object. The return value is always bytes.
  236. If the internal serializer returns text, the value will be
  237. encoded as UTF-8.
  238. """
  239. return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs))
  240. def make_signer(self, salt: str | bytes | None = None) -> Signer:
  241. """Creates a new instance of the signer to be used. The default
  242. implementation uses the :class:`.Signer` base class.
  243. """
  244. if salt is None:
  245. salt = self.salt
  246. return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs)
  247. def iter_unsigners(self, salt: str | bytes | None = None) -> cabc.Iterator[Signer]:
  248. """Iterates over all signers to be tried for unsigning. Starts
  249. with the configured signer, then constructs each signer
  250. specified in ``fallback_signers``.
  251. """
  252. if salt is None:
  253. salt = self.salt
  254. yield self.make_signer(salt)
  255. for fallback in self.fallback_signers:
  256. if isinstance(fallback, dict):
  257. kwargs = fallback
  258. fallback = self.signer
  259. elif isinstance(fallback, tuple):
  260. fallback, kwargs = fallback
  261. else:
  262. kwargs = self.signer_kwargs
  263. for secret_key in self.secret_keys:
  264. yield fallback(secret_key, salt=salt, **kwargs)
  265. def dumps(self, obj: t.Any, salt: str | bytes | None = None) -> _TSerialized:
  266. """Returns a signed string serialized with the internal
  267. serializer. The return value can be either a byte or unicode
  268. string depending on the format of the internal serializer.
  269. """
  270. payload = want_bytes(self.dump_payload(obj))
  271. rv = self.make_signer(salt).sign(payload)
  272. if self.is_text_serializer:
  273. return rv.decode("utf-8") # type: ignore[return-value]
  274. return rv # type: ignore[return-value]
  275. def dump(self, obj: t.Any, f: t.IO[t.Any], salt: str | bytes | None = None) -> None:
  276. """Like :meth:`dumps` but dumps into a file. The file handle has
  277. to be compatible with what the internal serializer expects.
  278. """
  279. f.write(self.dumps(obj, salt))
  280. def loads(
  281. self, s: str | bytes, salt: str | bytes | None = None, **kwargs: t.Any
  282. ) -> t.Any:
  283. """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the
  284. signature validation fails.
  285. """
  286. s = want_bytes(s)
  287. last_exception = None
  288. for signer in self.iter_unsigners(salt):
  289. try:
  290. return self.load_payload(signer.unsign(s))
  291. except BadSignature as err:
  292. last_exception = err
  293. raise t.cast(BadSignature, last_exception)
  294. def load(self, f: t.IO[t.Any], salt: str | bytes | None = None) -> t.Any:
  295. """Like :meth:`loads` but loads from a file."""
  296. return self.loads(f.read(), salt)
  297. def loads_unsafe(
  298. self, s: str | bytes, salt: str | bytes | None = None
  299. ) -> tuple[bool, t.Any]:
  300. """Like :meth:`loads` but without verifying the signature. This
  301. is potentially very dangerous to use depending on how your
  302. serializer works. The return value is ``(signature_valid,
  303. payload)`` instead of just the payload. The first item will be a
  304. boolean that indicates if the signature is valid. This function
  305. never fails.
  306. Use it for debugging only and if you know that your serializer
  307. module is not exploitable (for example, do not use it with a
  308. pickle serializer).
  309. .. versionadded:: 0.15
  310. """
  311. return self._loads_unsafe_impl(s, salt)
  312. def _loads_unsafe_impl(
  313. self,
  314. s: str | bytes,
  315. salt: str | bytes | None,
  316. load_kwargs: dict[str, t.Any] | None = None,
  317. load_payload_kwargs: dict[str, t.Any] | None = None,
  318. ) -> tuple[bool, t.Any]:
  319. """Low level helper function to implement :meth:`loads_unsafe`
  320. in serializer subclasses.
  321. """
  322. if load_kwargs is None:
  323. load_kwargs = {}
  324. try:
  325. return True, self.loads(s, salt=salt, **load_kwargs)
  326. except BadSignature as e:
  327. if e.payload is None:
  328. return False, None
  329. if load_payload_kwargs is None:
  330. load_payload_kwargs = {}
  331. try:
  332. return (
  333. False,
  334. self.load_payload(e.payload, **load_payload_kwargs),
  335. )
  336. except BadPayload:
  337. return False, None
  338. def load_unsafe(
  339. self, f: t.IO[t.Any], salt: str | bytes | None = None
  340. ) -> tuple[bool, t.Any]:
  341. """Like :meth:`loads_unsafe` but loads from a file.
  342. .. versionadded:: 0.15
  343. """
  344. return self.loads_unsafe(f.read(), salt=salt)