base.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import abc
  5. import typing
  6. from cryptography import utils
  7. from cryptography.exceptions import (
  8. AlreadyFinalized,
  9. AlreadyUpdated,
  10. NotYetFinalized,
  11. UnsupportedAlgorithm,
  12. _Reasons,
  13. )
  14. from cryptography.hazmat.backends import _get_backend
  15. from cryptography.hazmat.backends.interfaces import Backend, CipherBackend
  16. from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
  17. from cryptography.hazmat.primitives.ciphers import modes
  18. class CipherContext(metaclass=abc.ABCMeta):
  19. @abc.abstractmethod
  20. def update(self, data: bytes) -> bytes:
  21. """
  22. Processes the provided bytes through the cipher and returns the results
  23. as bytes.
  24. """
  25. @abc.abstractmethod
  26. def update_into(self, data: bytes, buf) -> int:
  27. """
  28. Processes the provided bytes and writes the resulting data into the
  29. provided buffer. Returns the number of bytes written.
  30. """
  31. @abc.abstractmethod
  32. def finalize(self) -> bytes:
  33. """
  34. Returns the results of processing the final block as bytes.
  35. """
  36. class AEADCipherContext(metaclass=abc.ABCMeta):
  37. @abc.abstractmethod
  38. def authenticate_additional_data(self, data: bytes) -> None:
  39. """
  40. Authenticates the provided bytes.
  41. """
  42. class AEADDecryptionContext(metaclass=abc.ABCMeta):
  43. @abc.abstractmethod
  44. def finalize_with_tag(self, tag: bytes) -> bytes:
  45. """
  46. Returns the results of processing the final block as bytes and allows
  47. delayed passing of the authentication tag.
  48. """
  49. class AEADEncryptionContext(metaclass=abc.ABCMeta):
  50. @abc.abstractproperty
  51. def tag(self) -> bytes:
  52. """
  53. Returns tag bytes. This is only available after encryption is
  54. finalized.
  55. """
  56. class Cipher(object):
  57. def __init__(
  58. self,
  59. algorithm: CipherAlgorithm,
  60. mode: typing.Optional[modes.Mode],
  61. backend: typing.Optional[Backend] = None,
  62. ):
  63. backend = _get_backend(backend)
  64. if not isinstance(backend, CipherBackend):
  65. raise UnsupportedAlgorithm(
  66. "Backend object does not implement CipherBackend.",
  67. _Reasons.BACKEND_MISSING_INTERFACE,
  68. )
  69. if not isinstance(algorithm, CipherAlgorithm):
  70. raise TypeError("Expected interface of CipherAlgorithm.")
  71. if mode is not None:
  72. mode.validate_for_algorithm(algorithm)
  73. self.algorithm = algorithm
  74. self.mode = mode
  75. self._backend = backend
  76. def encryptor(self):
  77. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  78. if self.mode.tag is not None:
  79. raise ValueError(
  80. "Authentication tag must be None when encrypting."
  81. )
  82. ctx = self._backend.create_symmetric_encryption_ctx(
  83. self.algorithm, self.mode
  84. )
  85. return self._wrap_ctx(ctx, encrypt=True)
  86. def decryptor(self):
  87. ctx = self._backend.create_symmetric_decryption_ctx(
  88. self.algorithm, self.mode
  89. )
  90. return self._wrap_ctx(ctx, encrypt=False)
  91. def _wrap_ctx(self, ctx, encrypt):
  92. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  93. if encrypt:
  94. return _AEADEncryptionContext(ctx)
  95. else:
  96. return _AEADCipherContext(ctx)
  97. else:
  98. return _CipherContext(ctx)
  99. @utils.register_interface(CipherContext)
  100. class _CipherContext(object):
  101. def __init__(self, ctx):
  102. self._ctx = ctx
  103. def update(self, data: bytes) -> bytes:
  104. if self._ctx is None:
  105. raise AlreadyFinalized("Context was already finalized.")
  106. return self._ctx.update(data)
  107. def update_into(self, data: bytes, buf) -> int:
  108. if self._ctx is None:
  109. raise AlreadyFinalized("Context was already finalized.")
  110. return self._ctx.update_into(data, buf)
  111. def finalize(self) -> bytes:
  112. if self._ctx is None:
  113. raise AlreadyFinalized("Context was already finalized.")
  114. data = self._ctx.finalize()
  115. self._ctx = None
  116. return data
  117. @utils.register_interface(AEADCipherContext)
  118. @utils.register_interface(CipherContext)
  119. @utils.register_interface(AEADDecryptionContext)
  120. class _AEADCipherContext(object):
  121. def __init__(self, ctx):
  122. self._ctx = ctx
  123. self._bytes_processed = 0
  124. self._aad_bytes_processed = 0
  125. self._tag = None
  126. self._updated = False
  127. def _check_limit(self, data_size: int) -> None:
  128. if self._ctx is None:
  129. raise AlreadyFinalized("Context was already finalized.")
  130. self._updated = True
  131. self._bytes_processed += data_size
  132. if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
  133. raise ValueError(
  134. "{} has a maximum encrypted byte limit of {}".format(
  135. self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
  136. )
  137. )
  138. def update(self, data: bytes) -> bytes:
  139. self._check_limit(len(data))
  140. return self._ctx.update(data)
  141. def update_into(self, data: bytes, buf) -> int:
  142. self._check_limit(len(data))
  143. return self._ctx.update_into(data, buf)
  144. def finalize(self) -> bytes:
  145. if self._ctx is None:
  146. raise AlreadyFinalized("Context was already finalized.")
  147. data = self._ctx.finalize()
  148. self._tag = self._ctx.tag
  149. self._ctx = None
  150. return data
  151. def finalize_with_tag(self, tag: bytes) -> bytes:
  152. if self._ctx is None:
  153. raise AlreadyFinalized("Context was already finalized.")
  154. data = self._ctx.finalize_with_tag(tag)
  155. self._tag = self._ctx.tag
  156. self._ctx = None
  157. return data
  158. def authenticate_additional_data(self, data: bytes) -> None:
  159. if self._ctx is None:
  160. raise AlreadyFinalized("Context was already finalized.")
  161. if self._updated:
  162. raise AlreadyUpdated("Update has been called on this context.")
  163. self._aad_bytes_processed += len(data)
  164. if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
  165. raise ValueError(
  166. "{} has a maximum AAD byte limit of {}".format(
  167. self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
  168. )
  169. )
  170. self._ctx.authenticate_additional_data(data)
  171. @utils.register_interface(AEADEncryptionContext)
  172. class _AEADEncryptionContext(_AEADCipherContext):
  173. @property
  174. def tag(self) -> bytes:
  175. if self._ctx is not None:
  176. raise NotYetFinalized(
  177. "You must finalize encryption before " "getting the tag."
  178. )
  179. assert self._tag is not None
  180. return self._tag