fernet.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import base64
  5. import binascii
  6. import os
  7. import struct
  8. import time
  9. import typing
  10. from cryptography import utils
  11. from cryptography.exceptions import InvalidSignature
  12. from cryptography.hazmat.backends import _get_backend
  13. from cryptography.hazmat.backends.interfaces import Backend
  14. from cryptography.hazmat.primitives import hashes, padding
  15. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  16. from cryptography.hazmat.primitives.hmac import HMAC
  17. class InvalidToken(Exception):
  18. pass
  19. _MAX_CLOCK_SKEW = 60
  20. class Fernet(object):
  21. def __init__(
  22. self,
  23. key: typing.Union[bytes, str],
  24. backend: typing.Optional[Backend] = None,
  25. ):
  26. backend = _get_backend(backend)
  27. key = base64.urlsafe_b64decode(key)
  28. if len(key) != 32:
  29. raise ValueError(
  30. "Fernet key must be 32 url-safe base64-encoded bytes."
  31. )
  32. self._signing_key = key[:16]
  33. self._encryption_key = key[16:]
  34. self._backend = backend
  35. @classmethod
  36. def generate_key(cls) -> bytes:
  37. return base64.urlsafe_b64encode(os.urandom(32))
  38. def encrypt(self, data: bytes) -> bytes:
  39. return self.encrypt_at_time(data, int(time.time()))
  40. def encrypt_at_time(self, data: bytes, current_time: int) -> bytes:
  41. iv = os.urandom(16)
  42. return self._encrypt_from_parts(data, current_time, iv)
  43. def _encrypt_from_parts(
  44. self, data: bytes, current_time: int, iv: bytes
  45. ) -> bytes:
  46. utils._check_bytes("data", data)
  47. padder = padding.PKCS7(algorithms.AES.block_size).padder()
  48. padded_data = padder.update(data) + padder.finalize()
  49. encryptor = Cipher(
  50. algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
  51. ).encryptor()
  52. ciphertext = encryptor.update(padded_data) + encryptor.finalize()
  53. basic_parts = (
  54. b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
  55. )
  56. h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
  57. h.update(basic_parts)
  58. hmac = h.finalize()
  59. return base64.urlsafe_b64encode(basic_parts + hmac)
  60. def decrypt(self, token: bytes, ttl: typing.Optional[int] = None) -> bytes:
  61. timestamp, data = Fernet._get_unverified_token_data(token)
  62. if ttl is None:
  63. time_info = None
  64. else:
  65. time_info = (ttl, int(time.time()))
  66. return self._decrypt_data(data, timestamp, time_info)
  67. def decrypt_at_time(
  68. self, token: bytes, ttl: int, current_time: int
  69. ) -> bytes:
  70. if ttl is None:
  71. raise ValueError(
  72. "decrypt_at_time() can only be used with a non-None ttl"
  73. )
  74. timestamp, data = Fernet._get_unverified_token_data(token)
  75. return self._decrypt_data(data, timestamp, (ttl, current_time))
  76. def extract_timestamp(self, token: bytes) -> int:
  77. timestamp, data = Fernet._get_unverified_token_data(token)
  78. # Verify the token was not tampered with.
  79. self._verify_signature(data)
  80. return timestamp
  81. @staticmethod
  82. def _get_unverified_token_data(token: bytes) -> typing.Tuple[int, bytes]:
  83. utils._check_bytes("token", token)
  84. try:
  85. data = base64.urlsafe_b64decode(token)
  86. except (TypeError, binascii.Error):
  87. raise InvalidToken
  88. if not data or data[0] != 0x80:
  89. raise InvalidToken
  90. try:
  91. (timestamp,) = struct.unpack(">Q", data[1:9])
  92. except struct.error:
  93. raise InvalidToken
  94. return timestamp, data
  95. def _verify_signature(self, data: bytes) -> None:
  96. h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
  97. h.update(data[:-32])
  98. try:
  99. h.verify(data[-32:])
  100. except InvalidSignature:
  101. raise InvalidToken
  102. def _decrypt_data(
  103. self,
  104. data: bytes,
  105. timestamp: int,
  106. time_info: typing.Optional[typing.Tuple[int, int]],
  107. ) -> bytes:
  108. if time_info is not None:
  109. ttl, current_time = time_info
  110. if timestamp + ttl < current_time:
  111. raise InvalidToken
  112. if current_time + _MAX_CLOCK_SKEW < timestamp:
  113. raise InvalidToken
  114. self._verify_signature(data)
  115. iv = data[9:25]
  116. ciphertext = data[25:-32]
  117. decryptor = Cipher(
  118. algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
  119. ).decryptor()
  120. plaintext_padded = decryptor.update(ciphertext)
  121. try:
  122. plaintext_padded += decryptor.finalize()
  123. except ValueError:
  124. raise InvalidToken
  125. unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
  126. unpadded = unpadder.update(plaintext_padded)
  127. try:
  128. unpadded += unpadder.finalize()
  129. except ValueError:
  130. raise InvalidToken
  131. return unpadded
  132. class MultiFernet(object):
  133. def __init__(self, fernets: typing.Iterable[Fernet]):
  134. fernets = list(fernets)
  135. if not fernets:
  136. raise ValueError(
  137. "MultiFernet requires at least one Fernet instance"
  138. )
  139. self._fernets = fernets
  140. def encrypt(self, msg: bytes) -> bytes:
  141. return self.encrypt_at_time(msg, int(time.time()))
  142. def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes:
  143. return self._fernets[0].encrypt_at_time(msg, current_time)
  144. def rotate(self, msg: bytes) -> bytes:
  145. timestamp, data = Fernet._get_unverified_token_data(msg)
  146. for f in self._fernets:
  147. try:
  148. p = f._decrypt_data(data, timestamp, None)
  149. break
  150. except InvalidToken:
  151. pass
  152. else:
  153. raise InvalidToken
  154. iv = os.urandom(16)
  155. return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)
  156. def decrypt(self, msg: bytes, ttl: typing.Optional[int] = None) -> bytes:
  157. for f in self._fernets:
  158. try:
  159. return f.decrypt(msg, ttl)
  160. except InvalidToken:
  161. pass
  162. raise InvalidToken
  163. def decrypt_at_time(
  164. self, msg: bytes, ttl: int, current_time: int
  165. ) -> bytes:
  166. for f in self._fernets:
  167. try:
  168. return f.decrypt_at_time(msg, ttl, current_time)
  169. except InvalidToken:
  170. pass
  171. raise InvalidToken