ocsp.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import abc
  5. import datetime
  6. import typing
  7. from cryptography import utils
  8. from cryptography import x509
  9. from cryptography.hazmat.bindings._rust import ocsp
  10. from cryptography.hazmat.primitives import hashes, serialization
  11. from cryptography.x509.base import (
  12. PRIVATE_KEY_TYPES,
  13. _EARLIEST_UTC_TIME,
  14. _convert_to_naive_utc_time,
  15. _reject_duplicate_extension,
  16. )
  17. class OCSPResponderEncoding(utils.Enum):
  18. HASH = "By Hash"
  19. NAME = "By Name"
  20. class OCSPResponseStatus(utils.Enum):
  21. SUCCESSFUL = 0
  22. MALFORMED_REQUEST = 1
  23. INTERNAL_ERROR = 2
  24. TRY_LATER = 3
  25. SIG_REQUIRED = 5
  26. UNAUTHORIZED = 6
  27. _ALLOWED_HASHES = (
  28. hashes.SHA1,
  29. hashes.SHA224,
  30. hashes.SHA256,
  31. hashes.SHA384,
  32. hashes.SHA512,
  33. )
  34. def _verify_algorithm(algorithm):
  35. if not isinstance(algorithm, _ALLOWED_HASHES):
  36. raise ValueError(
  37. "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
  38. )
  39. class OCSPCertStatus(utils.Enum):
  40. GOOD = 0
  41. REVOKED = 1
  42. UNKNOWN = 2
  43. class _SingleResponse(object):
  44. def __init__(
  45. self,
  46. cert,
  47. issuer,
  48. algorithm,
  49. cert_status,
  50. this_update,
  51. next_update,
  52. revocation_time,
  53. revocation_reason,
  54. ):
  55. if not isinstance(cert, x509.Certificate) or not isinstance(
  56. issuer, x509.Certificate
  57. ):
  58. raise TypeError("cert and issuer must be a Certificate")
  59. _verify_algorithm(algorithm)
  60. if not isinstance(this_update, datetime.datetime):
  61. raise TypeError("this_update must be a datetime object")
  62. if next_update is not None and not isinstance(
  63. next_update, datetime.datetime
  64. ):
  65. raise TypeError("next_update must be a datetime object or None")
  66. self._cert = cert
  67. self._issuer = issuer
  68. self._algorithm = algorithm
  69. self._this_update = this_update
  70. self._next_update = next_update
  71. if not isinstance(cert_status, OCSPCertStatus):
  72. raise TypeError(
  73. "cert_status must be an item from the OCSPCertStatus enum"
  74. )
  75. if cert_status is not OCSPCertStatus.REVOKED:
  76. if revocation_time is not None:
  77. raise ValueError(
  78. "revocation_time can only be provided if the certificate "
  79. "is revoked"
  80. )
  81. if revocation_reason is not None:
  82. raise ValueError(
  83. "revocation_reason can only be provided if the certificate"
  84. " is revoked"
  85. )
  86. else:
  87. if not isinstance(revocation_time, datetime.datetime):
  88. raise TypeError("revocation_time must be a datetime object")
  89. revocation_time = _convert_to_naive_utc_time(revocation_time)
  90. if revocation_time < _EARLIEST_UTC_TIME:
  91. raise ValueError(
  92. "The revocation_time must be on or after"
  93. " 1950 January 1."
  94. )
  95. if revocation_reason is not None and not isinstance(
  96. revocation_reason, x509.ReasonFlags
  97. ):
  98. raise TypeError(
  99. "revocation_reason must be an item from the ReasonFlags "
  100. "enum or None"
  101. )
  102. self._cert_status = cert_status
  103. self._revocation_time = revocation_time
  104. self._revocation_reason = revocation_reason
  105. class OCSPRequest(metaclass=abc.ABCMeta):
  106. @abc.abstractproperty
  107. def issuer_key_hash(self) -> bytes:
  108. """
  109. The hash of the issuer public key
  110. """
  111. @abc.abstractproperty
  112. def issuer_name_hash(self) -> bytes:
  113. """
  114. The hash of the issuer name
  115. """
  116. @abc.abstractproperty
  117. def hash_algorithm(self) -> hashes.HashAlgorithm:
  118. """
  119. The hash algorithm used in the issuer name and key hashes
  120. """
  121. @abc.abstractproperty
  122. def serial_number(self) -> int:
  123. """
  124. The serial number of the cert whose status is being checked
  125. """
  126. @abc.abstractmethod
  127. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  128. """
  129. Serializes the request to DER
  130. """
  131. @abc.abstractproperty
  132. def extensions(self) -> x509.Extensions:
  133. """
  134. The list of request extensions. Not single request extensions.
  135. """
  136. class OCSPResponse(metaclass=abc.ABCMeta):
  137. @abc.abstractproperty
  138. def response_status(self) -> OCSPResponseStatus:
  139. """
  140. The status of the response. This is a value from the OCSPResponseStatus
  141. enumeration
  142. """
  143. @abc.abstractproperty
  144. def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
  145. """
  146. The ObjectIdentifier of the signature algorithm
  147. """
  148. @abc.abstractproperty
  149. def signature_hash_algorithm(
  150. self,
  151. ) -> typing.Optional[hashes.HashAlgorithm]:
  152. """
  153. Returns a HashAlgorithm corresponding to the type of the digest signed
  154. """
  155. @abc.abstractproperty
  156. def signature(self) -> bytes:
  157. """
  158. The signature bytes
  159. """
  160. @abc.abstractproperty
  161. def tbs_response_bytes(self) -> bytes:
  162. """
  163. The tbsResponseData bytes
  164. """
  165. @abc.abstractproperty
  166. def certificates(self) -> typing.List[x509.Certificate]:
  167. """
  168. A list of certificates used to help build a chain to verify the OCSP
  169. response. This situation occurs when the OCSP responder uses a delegate
  170. certificate.
  171. """
  172. @abc.abstractproperty
  173. def responder_key_hash(self) -> typing.Optional[bytes]:
  174. """
  175. The responder's key hash or None
  176. """
  177. @abc.abstractproperty
  178. def responder_name(self) -> typing.Optional[x509.Name]:
  179. """
  180. The responder's Name or None
  181. """
  182. @abc.abstractproperty
  183. def produced_at(self) -> datetime.datetime:
  184. """
  185. The time the response was produced
  186. """
  187. @abc.abstractproperty
  188. def certificate_status(self) -> OCSPCertStatus:
  189. """
  190. The status of the certificate (an element from the OCSPCertStatus enum)
  191. """
  192. @abc.abstractproperty
  193. def revocation_time(self) -> typing.Optional[datetime.datetime]:
  194. """
  195. The date of when the certificate was revoked or None if not
  196. revoked.
  197. """
  198. @abc.abstractproperty
  199. def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
  200. """
  201. The reason the certificate was revoked or None if not specified or
  202. not revoked.
  203. """
  204. @abc.abstractproperty
  205. def this_update(self) -> datetime.datetime:
  206. """
  207. The most recent time at which the status being indicated is known by
  208. the responder to have been correct
  209. """
  210. @abc.abstractproperty
  211. def next_update(self) -> typing.Optional[datetime.datetime]:
  212. """
  213. The time when newer information will be available
  214. """
  215. @abc.abstractproperty
  216. def issuer_key_hash(self) -> bytes:
  217. """
  218. The hash of the issuer public key
  219. """
  220. @abc.abstractproperty
  221. def issuer_name_hash(self) -> bytes:
  222. """
  223. The hash of the issuer name
  224. """
  225. @abc.abstractproperty
  226. def hash_algorithm(self) -> hashes.HashAlgorithm:
  227. """
  228. The hash algorithm used in the issuer name and key hashes
  229. """
  230. @abc.abstractproperty
  231. def serial_number(self) -> int:
  232. """
  233. The serial number of the cert whose status is being checked
  234. """
  235. @abc.abstractproperty
  236. def extensions(self) -> x509.Extensions:
  237. """
  238. The list of response extensions. Not single response extensions.
  239. """
  240. @abc.abstractproperty
  241. def single_extensions(self) -> x509.Extensions:
  242. """
  243. The list of single response extensions. Not response extensions.
  244. """
  245. @abc.abstractmethod
  246. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  247. """
  248. Serializes the response to DER
  249. """
  250. class OCSPRequestBuilder(object):
  251. def __init__(
  252. self,
  253. request: typing.Optional[
  254. typing.Tuple[
  255. x509.Certificate, x509.Certificate, hashes.HashAlgorithm
  256. ]
  257. ] = None,
  258. extensions: typing.List[x509.Extension[x509.ExtensionType]] = [],
  259. ) -> None:
  260. self._request = request
  261. self._extensions = extensions
  262. def add_certificate(
  263. self,
  264. cert: x509.Certificate,
  265. issuer: x509.Certificate,
  266. algorithm: hashes.HashAlgorithm,
  267. ) -> "OCSPRequestBuilder":
  268. if self._request is not None:
  269. raise ValueError("Only one certificate can be added to a request")
  270. _verify_algorithm(algorithm)
  271. if not isinstance(cert, x509.Certificate) or not isinstance(
  272. issuer, x509.Certificate
  273. ):
  274. raise TypeError("cert and issuer must be a Certificate")
  275. return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
  276. def add_extension(
  277. self, extval: x509.ExtensionType, critical: bool
  278. ) -> "OCSPRequestBuilder":
  279. if not isinstance(extval, x509.ExtensionType):
  280. raise TypeError("extension must be an ExtensionType")
  281. extension = x509.Extension(extval.oid, critical, extval)
  282. _reject_duplicate_extension(extension, self._extensions)
  283. return OCSPRequestBuilder(
  284. self._request, self._extensions + [extension]
  285. )
  286. def build(self) -> OCSPRequest:
  287. from cryptography.hazmat.backends.openssl.backend import backend
  288. if self._request is None:
  289. raise ValueError("You must add a certificate before building")
  290. return backend.create_ocsp_request(self)
  291. class OCSPResponseBuilder(object):
  292. def __init__(
  293. self,
  294. response: typing.Optional[_SingleResponse] = None,
  295. responder_id: typing.Optional[
  296. typing.Tuple[x509.Certificate, OCSPResponderEncoding]
  297. ] = None,
  298. certs: typing.Optional[typing.List[x509.Certificate]] = None,
  299. extensions: typing.List[x509.Extension[x509.ExtensionType]] = [],
  300. ):
  301. self._response = response
  302. self._responder_id = responder_id
  303. self._certs = certs
  304. self._extensions = extensions
  305. def add_response(
  306. self,
  307. cert: x509.Certificate,
  308. issuer: x509.Certificate,
  309. algorithm: hashes.HashAlgorithm,
  310. cert_status: OCSPCertStatus,
  311. this_update: datetime.datetime,
  312. next_update: typing.Optional[datetime.datetime],
  313. revocation_time: typing.Optional[datetime.datetime],
  314. revocation_reason: typing.Optional[x509.ReasonFlags],
  315. ) -> "OCSPResponseBuilder":
  316. if self._response is not None:
  317. raise ValueError("Only one response per OCSPResponse.")
  318. singleresp = _SingleResponse(
  319. cert,
  320. issuer,
  321. algorithm,
  322. cert_status,
  323. this_update,
  324. next_update,
  325. revocation_time,
  326. revocation_reason,
  327. )
  328. return OCSPResponseBuilder(
  329. singleresp,
  330. self._responder_id,
  331. self._certs,
  332. self._extensions,
  333. )
  334. def responder_id(
  335. self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate
  336. ) -> "OCSPResponseBuilder":
  337. if self._responder_id is not None:
  338. raise ValueError("responder_id can only be set once")
  339. if not isinstance(responder_cert, x509.Certificate):
  340. raise TypeError("responder_cert must be a Certificate")
  341. if not isinstance(encoding, OCSPResponderEncoding):
  342. raise TypeError(
  343. "encoding must be an element from OCSPResponderEncoding"
  344. )
  345. return OCSPResponseBuilder(
  346. self._response,
  347. (responder_cert, encoding),
  348. self._certs,
  349. self._extensions,
  350. )
  351. def certificates(
  352. self, certs: typing.Iterable[x509.Certificate]
  353. ) -> "OCSPResponseBuilder":
  354. if self._certs is not None:
  355. raise ValueError("certificates may only be set once")
  356. certs = list(certs)
  357. if len(certs) == 0:
  358. raise ValueError("certs must not be an empty list")
  359. if not all(isinstance(x, x509.Certificate) for x in certs):
  360. raise TypeError("certs must be a list of Certificates")
  361. return OCSPResponseBuilder(
  362. self._response,
  363. self._responder_id,
  364. certs,
  365. self._extensions,
  366. )
  367. def add_extension(
  368. self, extval: x509.ExtensionType, critical: bool
  369. ) -> "OCSPResponseBuilder":
  370. if not isinstance(extval, x509.ExtensionType):
  371. raise TypeError("extension must be an ExtensionType")
  372. extension = x509.Extension(extval.oid, critical, extval)
  373. _reject_duplicate_extension(extension, self._extensions)
  374. return OCSPResponseBuilder(
  375. self._response,
  376. self._responder_id,
  377. self._certs,
  378. self._extensions + [extension],
  379. )
  380. def sign(
  381. self,
  382. private_key: PRIVATE_KEY_TYPES,
  383. algorithm: typing.Optional[hashes.HashAlgorithm],
  384. ) -> OCSPResponse:
  385. from cryptography.hazmat.backends.openssl.backend import backend
  386. if self._response is None:
  387. raise ValueError("You must add a response before signing")
  388. if self._responder_id is None:
  389. raise ValueError("You must add a responder_id before signing")
  390. return backend.create_ocsp_response(
  391. OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
  392. )
  393. @classmethod
  394. def build_unsuccessful(
  395. cls, response_status: OCSPResponseStatus
  396. ) -> OCSPResponse:
  397. from cryptography.hazmat.backends.openssl.backend import backend
  398. if not isinstance(response_status, OCSPResponseStatus):
  399. raise TypeError(
  400. "response_status must be an item from OCSPResponseStatus"
  401. )
  402. if response_status is OCSPResponseStatus.SUCCESSFUL:
  403. raise ValueError("response_status cannot be SUCCESSFUL")
  404. return backend.create_ocsp_response(response_status, None, None, None)
  405. def load_der_ocsp_request(data: bytes) -> OCSPRequest:
  406. return ocsp.load_der_ocsp_request(data)
  407. def load_der_ocsp_response(data: bytes) -> OCSPResponse:
  408. return ocsp.load_der_ocsp_response(data)