timed.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from __future__ import annotations
  2. import collections.abc as cabc
  3. import time
  4. import typing as t
  5. from datetime import datetime
  6. from datetime import timezone
  7. from .encoding import base64_decode
  8. from .encoding import base64_encode
  9. from .encoding import bytes_to_int
  10. from .encoding import int_to_bytes
  11. from .encoding import want_bytes
  12. from .exc import BadSignature
  13. from .exc import BadTimeSignature
  14. from .exc import SignatureExpired
  15. from .serializer import _TSerialized
  16. from .serializer import Serializer
  17. from .signer import Signer
  18. class TimestampSigner(Signer):
  19. """Works like the regular :class:`.Signer` but also records the time
  20. of the signing and can be used to expire signatures. The
  21. :meth:`unsign` method can raise :exc:`.SignatureExpired` if the
  22. unsigning failed because the signature is expired.
  23. """
  24. def get_timestamp(self) -> int:
  25. """Returns the current timestamp. The function must return an
  26. integer.
  27. """
  28. return int(time.time())
  29. def timestamp_to_datetime(self, ts: int) -> datetime:
  30. """Convert the timestamp from :meth:`get_timestamp` into an
  31. aware :class`datetime.datetime` in UTC.
  32. .. versionchanged:: 2.0
  33. The timestamp is returned as a timezone-aware ``datetime``
  34. in UTC rather than a naive ``datetime`` assumed to be UTC.
  35. """
  36. return datetime.fromtimestamp(ts, tz=timezone.utc)
  37. def sign(self, value: str | bytes) -> bytes:
  38. """Signs the given string and also attaches time information."""
  39. value = want_bytes(value)
  40. timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
  41. sep = want_bytes(self.sep)
  42. value = value + sep + timestamp
  43. return value + sep + self.get_signature(value)
  44. # Ignore overlapping signatures check, return_timestamp is the only
  45. # parameter that affects the return type.
  46. @t.overload
  47. def unsign( # type: ignore[overload-overlap]
  48. self,
  49. signed_value: str | bytes,
  50. max_age: int | None = None,
  51. return_timestamp: t.Literal[False] = False,
  52. ) -> bytes: ...
  53. @t.overload
  54. def unsign(
  55. self,
  56. signed_value: str | bytes,
  57. max_age: int | None = None,
  58. return_timestamp: t.Literal[True] = True,
  59. ) -> tuple[bytes, datetime]: ...
  60. def unsign(
  61. self,
  62. signed_value: str | bytes,
  63. max_age: int | None = None,
  64. return_timestamp: bool = False,
  65. ) -> tuple[bytes, datetime] | bytes:
  66. """Works like the regular :meth:`.Signer.unsign` but can also
  67. validate the time. See the base docstring of the class for
  68. the general behavior. If ``return_timestamp`` is ``True`` the
  69. timestamp of the signature will be returned as an aware
  70. :class:`datetime.datetime` object in UTC.
  71. .. versionchanged:: 2.0
  72. The timestamp is returned as a timezone-aware ``datetime``
  73. in UTC rather than a naive ``datetime`` assumed to be UTC.
  74. """
  75. try:
  76. result = super().unsign(signed_value)
  77. sig_error = None
  78. except BadSignature as e:
  79. sig_error = e
  80. result = e.payload or b""
  81. sep = want_bytes(self.sep)
  82. # If there is no timestamp in the result there is something
  83. # seriously wrong. In case there was a signature error, we raise
  84. # that one directly, otherwise we have a weird situation in
  85. # which we shouldn't have come except someone uses a time-based
  86. # serializer on non-timestamp data, so catch that.
  87. if sep not in result:
  88. if sig_error:
  89. raise sig_error
  90. raise BadTimeSignature("timestamp missing", payload=result)
  91. value, ts_bytes = result.rsplit(sep, 1)
  92. ts_int: int | None = None
  93. ts_dt: datetime | None = None
  94. try:
  95. ts_int = bytes_to_int(base64_decode(ts_bytes))
  96. except Exception:
  97. pass
  98. # Signature is *not* okay. Raise a proper error now that we have
  99. # split the value and the timestamp.
  100. if sig_error is not None:
  101. if ts_int is not None:
  102. try:
  103. ts_dt = self.timestamp_to_datetime(ts_int)
  104. except (ValueError, OSError, OverflowError) as exc:
  105. # Windows raises OSError
  106. # 32-bit raises OverflowError
  107. raise BadTimeSignature(
  108. "Malformed timestamp", payload=value
  109. ) from exc
  110. raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt)
  111. # Signature was okay but the timestamp is actually not there or
  112. # malformed. Should not happen, but we handle it anyway.
  113. if ts_int is None:
  114. raise BadTimeSignature("Malformed timestamp", payload=value)
  115. # Check timestamp is not older than max_age
  116. if max_age is not None:
  117. age = self.get_timestamp() - ts_int
  118. if age > max_age:
  119. raise SignatureExpired(
  120. f"Signature age {age} > {max_age} seconds",
  121. payload=value,
  122. date_signed=self.timestamp_to_datetime(ts_int),
  123. )
  124. if age < 0:
  125. raise SignatureExpired(
  126. f"Signature age {age} < 0 seconds",
  127. payload=value,
  128. date_signed=self.timestamp_to_datetime(ts_int),
  129. )
  130. if return_timestamp:
  131. return value, self.timestamp_to_datetime(ts_int)
  132. return value
  133. def validate(self, signed_value: str | bytes, max_age: int | None = None) -> bool:
  134. """Only validates the given signed value. Returns ``True`` if
  135. the signature exists and is valid."""
  136. try:
  137. self.unsign(signed_value, max_age=max_age)
  138. return True
  139. except BadSignature:
  140. return False
  141. class TimedSerializer(Serializer[_TSerialized]):
  142. """Uses :class:`TimestampSigner` instead of the default
  143. :class:`.Signer`.
  144. """
  145. default_signer: type[TimestampSigner] = TimestampSigner
  146. def iter_unsigners(
  147. self, salt: str | bytes | None = None
  148. ) -> cabc.Iterator[TimestampSigner]:
  149. return t.cast("cabc.Iterator[TimestampSigner]", super().iter_unsigners(salt))
  150. # TODO: Signature is incompatible because parameters were added
  151. # before salt.
  152. def loads( # type: ignore[override]
  153. self,
  154. s: str | bytes,
  155. max_age: int | None = None,
  156. return_timestamp: bool = False,
  157. salt: str | bytes | None = None,
  158. ) -> t.Any:
  159. """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the
  160. signature validation fails. If a ``max_age`` is provided it will
  161. ensure the signature is not older than that time in seconds. In
  162. case the signature is outdated, :exc:`.SignatureExpired` is
  163. raised. All arguments are forwarded to the signer's
  164. :meth:`~TimestampSigner.unsign` method.
  165. """
  166. s = want_bytes(s)
  167. last_exception = None
  168. for signer in self.iter_unsigners(salt):
  169. try:
  170. base64d, timestamp = signer.unsign(
  171. s, max_age=max_age, return_timestamp=True
  172. )
  173. payload = self.load_payload(base64d)
  174. if return_timestamp:
  175. return payload, timestamp
  176. return payload
  177. except SignatureExpired:
  178. # The signature was unsigned successfully but was
  179. # expired. Do not try the next signer.
  180. raise
  181. except BadSignature as err:
  182. last_exception = err
  183. raise t.cast(BadSignature, last_exception)
  184. def loads_unsafe( # type: ignore[override]
  185. self,
  186. s: str | bytes,
  187. max_age: int | None = None,
  188. salt: str | bytes | None = None,
  189. ) -> tuple[bool, t.Any]:
  190. return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age})