x963kdf.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import struct
  5. import typing
  6. from cryptography import utils
  7. from cryptography.exceptions import (
  8. AlreadyFinalized,
  9. InvalidKey,
  10. UnsupportedAlgorithm,
  11. _Reasons,
  12. )
  13. from cryptography.hazmat.backends import _get_backend
  14. from cryptography.hazmat.backends.interfaces import Backend, HashBackend
  15. from cryptography.hazmat.primitives import constant_time, hashes
  16. from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
  17. def _int_to_u32be(n):
  18. return struct.pack(">I", n)
  19. class X963KDF(KeyDerivationFunction):
  20. def __init__(
  21. self,
  22. algorithm: hashes.HashAlgorithm,
  23. length: int,
  24. sharedinfo: typing.Optional[bytes],
  25. backend: typing.Optional[Backend] = None,
  26. ):
  27. backend = _get_backend(backend)
  28. max_len = algorithm.digest_size * (2 ** 32 - 1)
  29. if length > max_len:
  30. raise ValueError(
  31. "Cannot derive keys larger than {} bits.".format(max_len)
  32. )
  33. if sharedinfo is not None:
  34. utils._check_bytes("sharedinfo", sharedinfo)
  35. self._algorithm = algorithm
  36. self._length = length
  37. self._sharedinfo = sharedinfo
  38. if not isinstance(backend, HashBackend):
  39. raise UnsupportedAlgorithm(
  40. "Backend object does not implement HashBackend.",
  41. _Reasons.BACKEND_MISSING_INTERFACE,
  42. )
  43. self._backend = backend
  44. self._used = False
  45. def derive(self, key_material: bytes) -> bytes:
  46. if self._used:
  47. raise AlreadyFinalized
  48. self._used = True
  49. utils._check_byteslike("key_material", key_material)
  50. output = [b""]
  51. outlen = 0
  52. counter = 1
  53. while self._length > outlen:
  54. h = hashes.Hash(self._algorithm, self._backend)
  55. h.update(key_material)
  56. h.update(_int_to_u32be(counter))
  57. if self._sharedinfo is not None:
  58. h.update(self._sharedinfo)
  59. output.append(h.finalize())
  60. outlen += len(output[-1])
  61. counter += 1
  62. return b"".join(output)[: self._length]
  63. def verify(self, key_material: bytes, expected_key: bytes) -> None:
  64. if not constant_time.bytes_eq(self.derive(key_material), expected_key):
  65. raise InvalidKey