dh.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  5. from cryptography.hazmat.primitives import serialization
  6. from cryptography.hazmat.primitives.asymmetric import dh
  7. def _dh_params_dup(dh_cdata, backend):
  8. lib = backend._lib
  9. ffi = backend._ffi
  10. param_cdata = lib.DHparams_dup(dh_cdata)
  11. backend.openssl_assert(param_cdata != ffi.NULL)
  12. param_cdata = ffi.gc(param_cdata, lib.DH_free)
  13. if lib.CRYPTOGRAPHY_IS_LIBRESSL:
  14. # In libressl DHparams_dup don't copy q
  15. q = ffi.new("BIGNUM **")
  16. lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
  17. q_dup = lib.BN_dup(q[0])
  18. res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
  19. backend.openssl_assert(res == 1)
  20. return param_cdata
  21. def _dh_cdata_to_parameters(dh_cdata, backend):
  22. param_cdata = _dh_params_dup(dh_cdata, backend)
  23. return _DHParameters(backend, param_cdata)
  24. class _DHParameters(dh.DHParameters):
  25. def __init__(self, backend, dh_cdata):
  26. self._backend = backend
  27. self._dh_cdata = dh_cdata
  28. def parameter_numbers(self) -> dh.DHParameterNumbers:
  29. p = self._backend._ffi.new("BIGNUM **")
  30. g = self._backend._ffi.new("BIGNUM **")
  31. q = self._backend._ffi.new("BIGNUM **")
  32. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  33. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  34. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  35. if q[0] == self._backend._ffi.NULL:
  36. q_val = None
  37. else:
  38. q_val = self._backend._bn_to_int(q[0])
  39. return dh.DHParameterNumbers(
  40. p=self._backend._bn_to_int(p[0]),
  41. g=self._backend._bn_to_int(g[0]),
  42. q=q_val,
  43. )
  44. def generate_private_key(self) -> dh.DHPrivateKey:
  45. return self._backend.generate_dh_private_key(self)
  46. def parameter_bytes(
  47. self,
  48. encoding: serialization.Encoding,
  49. format: serialization.ParameterFormat,
  50. ) -> bytes:
  51. if format is not serialization.ParameterFormat.PKCS3:
  52. raise ValueError("Only PKCS3 serialization is supported")
  53. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  54. q = self._backend._ffi.new("BIGNUM **")
  55. self._backend._lib.DH_get0_pqg(
  56. self._dh_cdata,
  57. self._backend._ffi.NULL,
  58. q,
  59. self._backend._ffi.NULL,
  60. )
  61. if q[0] != self._backend._ffi.NULL:
  62. raise UnsupportedAlgorithm(
  63. "DH X9.42 serialization is not supported",
  64. _Reasons.UNSUPPORTED_SERIALIZATION,
  65. )
  66. return self._backend._parameter_bytes(encoding, format, self._dh_cdata)
  67. def _get_dh_num_bits(backend, dh_cdata) -> int:
  68. p = backend._ffi.new("BIGNUM **")
  69. backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
  70. backend.openssl_assert(p[0] != backend._ffi.NULL)
  71. return backend._lib.BN_num_bits(p[0])
  72. class _DHPrivateKey(dh.DHPrivateKey):
  73. def __init__(self, backend, dh_cdata, evp_pkey):
  74. self._backend = backend
  75. self._dh_cdata = dh_cdata
  76. self._evp_pkey = evp_pkey
  77. self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
  78. @property
  79. def key_size(self) -> int:
  80. return _get_dh_num_bits(self._backend, self._dh_cdata)
  81. def private_numbers(self) -> dh.DHPrivateNumbers:
  82. p = self._backend._ffi.new("BIGNUM **")
  83. g = self._backend._ffi.new("BIGNUM **")
  84. q = self._backend._ffi.new("BIGNUM **")
  85. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  86. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  87. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  88. if q[0] == self._backend._ffi.NULL:
  89. q_val = None
  90. else:
  91. q_val = self._backend._bn_to_int(q[0])
  92. pub_key = self._backend._ffi.new("BIGNUM **")
  93. priv_key = self._backend._ffi.new("BIGNUM **")
  94. self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
  95. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  96. self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
  97. return dh.DHPrivateNumbers(
  98. public_numbers=dh.DHPublicNumbers(
  99. parameter_numbers=dh.DHParameterNumbers(
  100. p=self._backend._bn_to_int(p[0]),
  101. g=self._backend._bn_to_int(g[0]),
  102. q=q_val,
  103. ),
  104. y=self._backend._bn_to_int(pub_key[0]),
  105. ),
  106. x=self._backend._bn_to_int(priv_key[0]),
  107. )
  108. def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes:
  109. if not isinstance(peer_public_key, _DHPublicKey):
  110. raise TypeError("peer_public_key must be a DHPublicKey")
  111. ctx = self._backend._lib.EVP_PKEY_CTX_new(
  112. self._evp_pkey, self._backend._ffi.NULL
  113. )
  114. self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
  115. ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free)
  116. res = self._backend._lib.EVP_PKEY_derive_init(ctx)
  117. self._backend.openssl_assert(res == 1)
  118. res = self._backend._lib.EVP_PKEY_derive_set_peer(
  119. ctx, peer_public_key._evp_pkey
  120. )
  121. # Invalid kex errors here in OpenSSL 3.0 because checks were moved
  122. # to EVP_PKEY_derive_set_peer
  123. self._exchange_assert(res == 1)
  124. keylen = self._backend._ffi.new("size_t *")
  125. res = self._backend._lib.EVP_PKEY_derive(
  126. ctx, self._backend._ffi.NULL, keylen
  127. )
  128. # Invalid kex errors here in OpenSSL < 3
  129. self._exchange_assert(res == 1)
  130. self._backend.openssl_assert(keylen[0] > 0)
  131. buf = self._backend._ffi.new("unsigned char[]", keylen[0])
  132. res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
  133. self._backend.openssl_assert(res == 1)
  134. key = self._backend._ffi.buffer(buf, keylen[0])[:]
  135. pad = self._key_size_bytes - len(key)
  136. if pad > 0:
  137. key = (b"\x00" * pad) + key
  138. return key
  139. def _exchange_assert(self, ok):
  140. if not ok:
  141. errors_with_text = self._backend._consume_errors_with_text()
  142. raise ValueError(
  143. "Error computing shared key.",
  144. errors_with_text,
  145. )
  146. def public_key(self) -> dh.DHPublicKey:
  147. dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
  148. pub_key = self._backend._ffi.new("BIGNUM **")
  149. self._backend._lib.DH_get0_key(
  150. self._dh_cdata, pub_key, self._backend._ffi.NULL
  151. )
  152. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  153. pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
  154. self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
  155. res = self._backend._lib.DH_set0_key(
  156. dh_cdata, pub_key_dup, self._backend._ffi.NULL
  157. )
  158. self._backend.openssl_assert(res == 1)
  159. evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
  160. return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
  161. def parameters(self) -> dh.DHParameters:
  162. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  163. def private_bytes(
  164. self,
  165. encoding: serialization.Encoding,
  166. format: serialization.PrivateFormat,
  167. encryption_algorithm: serialization.KeySerializationEncryption,
  168. ) -> bytes:
  169. if format is not serialization.PrivateFormat.PKCS8:
  170. raise ValueError(
  171. "DH private keys support only PKCS8 serialization"
  172. )
  173. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  174. q = self._backend._ffi.new("BIGNUM **")
  175. self._backend._lib.DH_get0_pqg(
  176. self._dh_cdata,
  177. self._backend._ffi.NULL,
  178. q,
  179. self._backend._ffi.NULL,
  180. )
  181. if q[0] != self._backend._ffi.NULL:
  182. raise UnsupportedAlgorithm(
  183. "DH X9.42 serialization is not supported",
  184. _Reasons.UNSUPPORTED_SERIALIZATION,
  185. )
  186. return self._backend._private_key_bytes(
  187. encoding,
  188. format,
  189. encryption_algorithm,
  190. self,
  191. self._evp_pkey,
  192. self._dh_cdata,
  193. )
  194. class _DHPublicKey(dh.DHPublicKey):
  195. def __init__(self, backend, dh_cdata, evp_pkey):
  196. self._backend = backend
  197. self._dh_cdata = dh_cdata
  198. self._evp_pkey = evp_pkey
  199. self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
  200. @property
  201. def key_size(self) -> int:
  202. return self._key_size_bits
  203. def public_numbers(self) -> dh.DHPublicNumbers:
  204. p = self._backend._ffi.new("BIGNUM **")
  205. g = self._backend._ffi.new("BIGNUM **")
  206. q = self._backend._ffi.new("BIGNUM **")
  207. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  208. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  209. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  210. if q[0] == self._backend._ffi.NULL:
  211. q_val = None
  212. else:
  213. q_val = self._backend._bn_to_int(q[0])
  214. pub_key = self._backend._ffi.new("BIGNUM **")
  215. self._backend._lib.DH_get0_key(
  216. self._dh_cdata, pub_key, self._backend._ffi.NULL
  217. )
  218. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  219. return dh.DHPublicNumbers(
  220. parameter_numbers=dh.DHParameterNumbers(
  221. p=self._backend._bn_to_int(p[0]),
  222. g=self._backend._bn_to_int(g[0]),
  223. q=q_val,
  224. ),
  225. y=self._backend._bn_to_int(pub_key[0]),
  226. )
  227. def parameters(self) -> dh.DHParameters:
  228. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  229. def public_bytes(
  230. self,
  231. encoding: serialization.Encoding,
  232. format: serialization.PublicFormat,
  233. ) -> bytes:
  234. if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
  235. raise ValueError(
  236. "DH public keys support only "
  237. "SubjectPublicKeyInfo serialization"
  238. )
  239. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  240. q = self._backend._ffi.new("BIGNUM **")
  241. self._backend._lib.DH_get0_pqg(
  242. self._dh_cdata,
  243. self._backend._ffi.NULL,
  244. q,
  245. self._backend._ffi.NULL,
  246. )
  247. if q[0] != self._backend._ffi.NULL:
  248. raise UnsupportedAlgorithm(
  249. "DH X9.42 serialization is not supported",
  250. _Reasons.UNSUPPORTED_SERIALIZATION,
  251. )
  252. return self._backend._public_key_bytes(
  253. encoding, format, self, self._evp_pkey, None
  254. )