hashes.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  5. from cryptography.hazmat.primitives import hashes
  6. class _HashContext(hashes.HashContext):
  7. def __init__(self, backend, algorithm: hashes.HashAlgorithm, ctx=None):
  8. self._algorithm = algorithm
  9. self._backend = backend
  10. if ctx is None:
  11. ctx = self._backend._lib.EVP_MD_CTX_new()
  12. ctx = self._backend._ffi.gc(
  13. ctx, self._backend._lib.EVP_MD_CTX_free
  14. )
  15. evp_md = self._backend._evp_md_from_algorithm(algorithm)
  16. if evp_md == self._backend._ffi.NULL:
  17. raise UnsupportedAlgorithm(
  18. "{} is not a supported hash on this backend.".format(
  19. algorithm.name
  20. ),
  21. _Reasons.UNSUPPORTED_HASH,
  22. )
  23. res = self._backend._lib.EVP_DigestInit_ex(
  24. ctx, evp_md, self._backend._ffi.NULL
  25. )
  26. self._backend.openssl_assert(res != 0)
  27. self._ctx = ctx
  28. @property
  29. def algorithm(self) -> hashes.HashAlgorithm:
  30. return self._algorithm
  31. def copy(self) -> "_HashContext":
  32. copied_ctx = self._backend._lib.EVP_MD_CTX_new()
  33. copied_ctx = self._backend._ffi.gc(
  34. copied_ctx, self._backend._lib.EVP_MD_CTX_free
  35. )
  36. res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
  37. self._backend.openssl_assert(res != 0)
  38. return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
  39. def update(self, data: bytes) -> None:
  40. data_ptr = self._backend._ffi.from_buffer(data)
  41. res = self._backend._lib.EVP_DigestUpdate(
  42. self._ctx, data_ptr, len(data)
  43. )
  44. self._backend.openssl_assert(res != 0)
  45. def finalize(self) -> bytes:
  46. if isinstance(self.algorithm, hashes.ExtendableOutputFunction):
  47. # extendable output functions use a different finalize
  48. return self._finalize_xof()
  49. else:
  50. buf = self._backend._ffi.new(
  51. "unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
  52. )
  53. outlen = self._backend._ffi.new("unsigned int *")
  54. res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
  55. self._backend.openssl_assert(res != 0)
  56. self._backend.openssl_assert(
  57. outlen[0] == self.algorithm.digest_size
  58. )
  59. return self._backend._ffi.buffer(buf)[: outlen[0]]
  60. def _finalize_xof(self) -> bytes:
  61. buf = self._backend._ffi.new(
  62. "unsigned char[]", self.algorithm.digest_size
  63. )
  64. res = self._backend._lib.EVP_DigestFinalXOF(
  65. self._ctx, buf, self.algorithm.digest_size
  66. )
  67. self._backend.openssl_assert(res != 0)
  68. return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]