base.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import abc
  5. import datetime
  6. import os
  7. import typing
  8. from cryptography import utils
  9. from cryptography.hazmat.backends import _get_backend
  10. from cryptography.hazmat.backends.interfaces import Backend
  11. from cryptography.hazmat.bindings._rust import x509 as rust_x509
  12. from cryptography.hazmat.primitives import hashes, serialization
  13. from cryptography.hazmat.primitives.asymmetric import (
  14. dsa,
  15. ec,
  16. ed25519,
  17. ed448,
  18. rsa,
  19. )
  20. from cryptography.hazmat.primitives.asymmetric.types import (
  21. PRIVATE_KEY_TYPES as PRIVATE_KEY_TYPES,
  22. PUBLIC_KEY_TYPES as PUBLIC_KEY_TYPES,
  23. )
  24. from cryptography.x509.extensions import Extension, ExtensionType, Extensions
  25. from cryptography.x509.name import Name
  26. from cryptography.x509.oid import ObjectIdentifier
  27. _EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
  28. class AttributeNotFound(Exception):
  29. def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
  30. super(AttributeNotFound, self).__init__(msg)
  31. self.oid = oid
  32. def _reject_duplicate_extension(
  33. extension: Extension[ExtensionType],
  34. extensions: typing.List[Extension[ExtensionType]],
  35. ) -> None:
  36. # This is quadratic in the number of extensions
  37. for e in extensions:
  38. if e.oid == extension.oid:
  39. raise ValueError("This extension has already been set.")
  40. def _reject_duplicate_attribute(
  41. oid: ObjectIdentifier,
  42. attributes: typing.List[typing.Tuple[ObjectIdentifier, bytes]],
  43. ) -> None:
  44. # This is quadratic in the number of attributes
  45. for attr_oid, _ in attributes:
  46. if attr_oid == oid:
  47. raise ValueError("This attribute has already been set.")
  48. def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
  49. """Normalizes a datetime to a naive datetime in UTC.
  50. time -- datetime to normalize. Assumed to be in UTC if not timezone
  51. aware.
  52. """
  53. if time.tzinfo is not None:
  54. offset = time.utcoffset()
  55. offset = offset if offset else datetime.timedelta()
  56. return time.replace(tzinfo=None) - offset
  57. else:
  58. return time
  59. class Version(utils.Enum):
  60. v1 = 0
  61. v3 = 2
  62. class InvalidVersion(Exception):
  63. def __init__(self, msg: str, parsed_version: int) -> None:
  64. super(InvalidVersion, self).__init__(msg)
  65. self.parsed_version = parsed_version
  66. class Certificate(metaclass=abc.ABCMeta):
  67. @abc.abstractmethod
  68. def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
  69. """
  70. Returns bytes using digest passed.
  71. """
  72. @abc.abstractproperty
  73. def serial_number(self) -> int:
  74. """
  75. Returns certificate serial number
  76. """
  77. @abc.abstractproperty
  78. def version(self) -> Version:
  79. """
  80. Returns the certificate version
  81. """
  82. @abc.abstractmethod
  83. def public_key(self) -> PUBLIC_KEY_TYPES:
  84. """
  85. Returns the public key
  86. """
  87. @abc.abstractproperty
  88. def not_valid_before(self) -> datetime.datetime:
  89. """
  90. Not before time (represented as UTC datetime)
  91. """
  92. @abc.abstractproperty
  93. def not_valid_after(self) -> datetime.datetime:
  94. """
  95. Not after time (represented as UTC datetime)
  96. """
  97. @abc.abstractproperty
  98. def issuer(self) -> Name:
  99. """
  100. Returns the issuer name object.
  101. """
  102. @abc.abstractproperty
  103. def subject(self) -> Name:
  104. """
  105. Returns the subject name object.
  106. """
  107. @abc.abstractproperty
  108. def signature_hash_algorithm(
  109. self,
  110. ) -> typing.Optional[hashes.HashAlgorithm]:
  111. """
  112. Returns a HashAlgorithm corresponding to the type of the digest signed
  113. in the certificate.
  114. """
  115. @abc.abstractproperty
  116. def signature_algorithm_oid(self) -> ObjectIdentifier:
  117. """
  118. Returns the ObjectIdentifier of the signature algorithm.
  119. """
  120. @abc.abstractproperty
  121. def extensions(self) -> Extensions:
  122. """
  123. Returns an Extensions object.
  124. """
  125. @abc.abstractproperty
  126. def signature(self) -> bytes:
  127. """
  128. Returns the signature bytes.
  129. """
  130. @abc.abstractproperty
  131. def tbs_certificate_bytes(self) -> bytes:
  132. """
  133. Returns the tbsCertificate payload bytes as defined in RFC 5280.
  134. """
  135. @abc.abstractmethod
  136. def __eq__(self, other: object) -> bool:
  137. """
  138. Checks equality.
  139. """
  140. @abc.abstractmethod
  141. def __ne__(self, other: object) -> bool:
  142. """
  143. Checks not equal.
  144. """
  145. @abc.abstractmethod
  146. def __hash__(self) -> int:
  147. """
  148. Computes a hash.
  149. """
  150. @abc.abstractmethod
  151. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  152. """
  153. Serializes the certificate to PEM or DER format.
  154. """
  155. # Runtime isinstance checks need this since the rust class is not a subclass.
  156. Certificate.register(rust_x509.Certificate)
  157. class RevokedCertificate(metaclass=abc.ABCMeta):
  158. @abc.abstractproperty
  159. def serial_number(self) -> int:
  160. """
  161. Returns the serial number of the revoked certificate.
  162. """
  163. @abc.abstractproperty
  164. def revocation_date(self) -> datetime.datetime:
  165. """
  166. Returns the date of when this certificate was revoked.
  167. """
  168. @abc.abstractproperty
  169. def extensions(self) -> Extensions:
  170. """
  171. Returns an Extensions object containing a list of Revoked extensions.
  172. """
  173. # Runtime isinstance checks need this since the rust class is not a subclass.
  174. RevokedCertificate.register(rust_x509.RevokedCertificate)
  175. class CertificateRevocationList(metaclass=abc.ABCMeta):
  176. @abc.abstractmethod
  177. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  178. """
  179. Serializes the CRL to PEM or DER format.
  180. """
  181. @abc.abstractmethod
  182. def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
  183. """
  184. Returns bytes using digest passed.
  185. """
  186. @abc.abstractmethod
  187. def get_revoked_certificate_by_serial_number(
  188. self, serial_number: int
  189. ) -> typing.Optional[RevokedCertificate]:
  190. """
  191. Returns an instance of RevokedCertificate or None if the serial_number
  192. is not in the CRL.
  193. """
  194. @abc.abstractproperty
  195. def signature_hash_algorithm(
  196. self,
  197. ) -> typing.Optional[hashes.HashAlgorithm]:
  198. """
  199. Returns a HashAlgorithm corresponding to the type of the digest signed
  200. in the certificate.
  201. """
  202. @abc.abstractproperty
  203. def signature_algorithm_oid(self) -> ObjectIdentifier:
  204. """
  205. Returns the ObjectIdentifier of the signature algorithm.
  206. """
  207. @abc.abstractproperty
  208. def issuer(self) -> Name:
  209. """
  210. Returns the X509Name with the issuer of this CRL.
  211. """
  212. @abc.abstractproperty
  213. def next_update(self) -> typing.Optional[datetime.datetime]:
  214. """
  215. Returns the date of next update for this CRL.
  216. """
  217. @abc.abstractproperty
  218. def last_update(self) -> datetime.datetime:
  219. """
  220. Returns the date of last update for this CRL.
  221. """
  222. @abc.abstractproperty
  223. def extensions(self) -> Extensions:
  224. """
  225. Returns an Extensions object containing a list of CRL extensions.
  226. """
  227. @abc.abstractproperty
  228. def signature(self) -> bytes:
  229. """
  230. Returns the signature bytes.
  231. """
  232. @abc.abstractproperty
  233. def tbs_certlist_bytes(self) -> bytes:
  234. """
  235. Returns the tbsCertList payload bytes as defined in RFC 5280.
  236. """
  237. @abc.abstractmethod
  238. def __eq__(self, other: object) -> bool:
  239. """
  240. Checks equality.
  241. """
  242. @abc.abstractmethod
  243. def __ne__(self, other: object) -> bool:
  244. """
  245. Checks not equal.
  246. """
  247. @abc.abstractmethod
  248. def __len__(self) -> int:
  249. """
  250. Number of revoked certificates in the CRL.
  251. """
  252. @typing.overload
  253. def __getitem__(self, idx: int) -> RevokedCertificate:
  254. ...
  255. @typing.overload
  256. def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]:
  257. ...
  258. @abc.abstractmethod
  259. def __getitem__(
  260. self, idx: typing.Union[int, slice]
  261. ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]:
  262. """
  263. Returns a revoked certificate (or slice of revoked certificates).
  264. """
  265. @abc.abstractmethod
  266. def __iter__(self) -> typing.Iterator[RevokedCertificate]:
  267. """
  268. Iterator over the revoked certificates
  269. """
  270. @abc.abstractmethod
  271. def is_signature_valid(self, public_key: PUBLIC_KEY_TYPES) -> bool:
  272. """
  273. Verifies signature of revocation list against given public key.
  274. """
  275. CertificateRevocationList.register(rust_x509.CertificateRevocationList)
  276. class CertificateSigningRequest(metaclass=abc.ABCMeta):
  277. @abc.abstractmethod
  278. def __eq__(self, other: object) -> bool:
  279. """
  280. Checks equality.
  281. """
  282. @abc.abstractmethod
  283. def __ne__(self, other: object) -> bool:
  284. """
  285. Checks not equal.
  286. """
  287. @abc.abstractmethod
  288. def __hash__(self) -> int:
  289. """
  290. Computes a hash.
  291. """
  292. @abc.abstractmethod
  293. def public_key(self) -> PUBLIC_KEY_TYPES:
  294. """
  295. Returns the public key
  296. """
  297. @abc.abstractproperty
  298. def subject(self) -> Name:
  299. """
  300. Returns the subject name object.
  301. """
  302. @abc.abstractproperty
  303. def signature_hash_algorithm(
  304. self,
  305. ) -> typing.Optional[hashes.HashAlgorithm]:
  306. """
  307. Returns a HashAlgorithm corresponding to the type of the digest signed
  308. in the certificate.
  309. """
  310. @abc.abstractproperty
  311. def signature_algorithm_oid(self) -> ObjectIdentifier:
  312. """
  313. Returns the ObjectIdentifier of the signature algorithm.
  314. """
  315. @abc.abstractproperty
  316. def extensions(self) -> Extensions:
  317. """
  318. Returns the extensions in the signing request.
  319. """
  320. @abc.abstractmethod
  321. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  322. """
  323. Encodes the request to PEM or DER format.
  324. """
  325. @abc.abstractproperty
  326. def signature(self) -> bytes:
  327. """
  328. Returns the signature bytes.
  329. """
  330. @abc.abstractproperty
  331. def tbs_certrequest_bytes(self) -> bytes:
  332. """
  333. Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC
  334. 2986.
  335. """
  336. @abc.abstractproperty
  337. def is_signature_valid(self) -> bool:
  338. """
  339. Verifies signature of signing request.
  340. """
  341. @abc.abstractmethod
  342. def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes:
  343. """
  344. Get the attribute value for a given OID.
  345. """
  346. # Runtime isinstance checks need this since the rust class is not a subclass.
  347. CertificateSigningRequest.register(rust_x509.CertificateSigningRequest)
  348. # Backend argument preserved for API compatibility, but ignored.
  349. def load_pem_x509_certificate(
  350. data: bytes, backend: typing.Any = None
  351. ) -> Certificate:
  352. return rust_x509.load_pem_x509_certificate(data)
  353. # Backend argument preserved for API compatibility, but ignored.
  354. def load_der_x509_certificate(
  355. data: bytes, backend: typing.Any = None
  356. ) -> Certificate:
  357. return rust_x509.load_der_x509_certificate(data)
  358. # Backend argument preserved for API compatibility, but ignored.
  359. def load_pem_x509_csr(
  360. data: bytes, backend: typing.Optional[Backend] = None
  361. ) -> CertificateSigningRequest:
  362. return rust_x509.load_pem_x509_csr(data)
  363. # Backend argument preserved for API compatibility, but ignored.
  364. def load_der_x509_csr(
  365. data: bytes, backend: typing.Optional[Backend] = None
  366. ) -> CertificateSigningRequest:
  367. return rust_x509.load_der_x509_csr(data)
  368. # Backend argument preserved for API compatibility, but ignored.
  369. def load_pem_x509_crl(
  370. data: bytes, backend: typing.Optional[Backend] = None
  371. ) -> CertificateRevocationList:
  372. return rust_x509.load_pem_x509_crl(data)
  373. # Backend argument preserved for API compatibility, but ignored.
  374. def load_der_x509_crl(
  375. data: bytes, backend: typing.Optional[Backend] = None
  376. ) -> CertificateRevocationList:
  377. return rust_x509.load_der_x509_crl(data)
  378. class CertificateSigningRequestBuilder(object):
  379. def __init__(
  380. self,
  381. subject_name: typing.Optional[Name] = None,
  382. extensions: typing.List[Extension[ExtensionType]] = [],
  383. attributes: typing.List[typing.Tuple[ObjectIdentifier, bytes]] = [],
  384. ):
  385. """
  386. Creates an empty X.509 certificate request (v1).
  387. """
  388. self._subject_name = subject_name
  389. self._extensions = extensions
  390. self._attributes = attributes
  391. def subject_name(self, name: Name) -> "CertificateSigningRequestBuilder":
  392. """
  393. Sets the certificate requestor's distinguished name.
  394. """
  395. if not isinstance(name, Name):
  396. raise TypeError("Expecting x509.Name object.")
  397. if self._subject_name is not None:
  398. raise ValueError("The subject name may only be set once.")
  399. return CertificateSigningRequestBuilder(
  400. name, self._extensions, self._attributes
  401. )
  402. def add_extension(
  403. self, extval: ExtensionType, critical: bool
  404. ) -> "CertificateSigningRequestBuilder":
  405. """
  406. Adds an X.509 extension to the certificate request.
  407. """
  408. if not isinstance(extval, ExtensionType):
  409. raise TypeError("extension must be an ExtensionType")
  410. extension = Extension(extval.oid, critical, extval)
  411. _reject_duplicate_extension(extension, self._extensions)
  412. return CertificateSigningRequestBuilder(
  413. self._subject_name,
  414. self._extensions + [extension],
  415. self._attributes,
  416. )
  417. def add_attribute(
  418. self, oid: ObjectIdentifier, value: bytes
  419. ) -> "CertificateSigningRequestBuilder":
  420. """
  421. Adds an X.509 attribute with an OID and associated value.
  422. """
  423. if not isinstance(oid, ObjectIdentifier):
  424. raise TypeError("oid must be an ObjectIdentifier")
  425. if not isinstance(value, bytes):
  426. raise TypeError("value must be bytes")
  427. _reject_duplicate_attribute(oid, self._attributes)
  428. return CertificateSigningRequestBuilder(
  429. self._subject_name,
  430. self._extensions,
  431. self._attributes + [(oid, value)],
  432. )
  433. def sign(
  434. self,
  435. private_key: PRIVATE_KEY_TYPES,
  436. algorithm: typing.Optional[hashes.HashAlgorithm],
  437. backend: typing.Optional[Backend] = None,
  438. ) -> CertificateSigningRequest:
  439. """
  440. Signs the request using the requestor's private key.
  441. """
  442. backend = _get_backend(backend)
  443. if self._subject_name is None:
  444. raise ValueError("A CertificateSigningRequest must have a subject")
  445. return backend.create_x509_csr(self, private_key, algorithm)
  446. class CertificateBuilder(object):
  447. _extensions: typing.List[Extension[ExtensionType]]
  448. def __init__(
  449. self,
  450. issuer_name: typing.Optional[Name] = None,
  451. subject_name: typing.Optional[Name] = None,
  452. public_key: typing.Optional[PUBLIC_KEY_TYPES] = None,
  453. serial_number: typing.Optional[int] = None,
  454. not_valid_before: typing.Optional[datetime.datetime] = None,
  455. not_valid_after: typing.Optional[datetime.datetime] = None,
  456. extensions: typing.List[Extension[ExtensionType]] = [],
  457. ) -> None:
  458. self._version = Version.v3
  459. self._issuer_name = issuer_name
  460. self._subject_name = subject_name
  461. self._public_key = public_key
  462. self._serial_number = serial_number
  463. self._not_valid_before = not_valid_before
  464. self._not_valid_after = not_valid_after
  465. self._extensions = extensions
  466. def issuer_name(self, name: Name) -> "CertificateBuilder":
  467. """
  468. Sets the CA's distinguished name.
  469. """
  470. if not isinstance(name, Name):
  471. raise TypeError("Expecting x509.Name object.")
  472. if self._issuer_name is not None:
  473. raise ValueError("The issuer name may only be set once.")
  474. return CertificateBuilder(
  475. name,
  476. self._subject_name,
  477. self._public_key,
  478. self._serial_number,
  479. self._not_valid_before,
  480. self._not_valid_after,
  481. self._extensions,
  482. )
  483. def subject_name(self, name: Name) -> "CertificateBuilder":
  484. """
  485. Sets the requestor's distinguished name.
  486. """
  487. if not isinstance(name, Name):
  488. raise TypeError("Expecting x509.Name object.")
  489. if self._subject_name is not None:
  490. raise ValueError("The subject name may only be set once.")
  491. return CertificateBuilder(
  492. self._issuer_name,
  493. name,
  494. self._public_key,
  495. self._serial_number,
  496. self._not_valid_before,
  497. self._not_valid_after,
  498. self._extensions,
  499. )
  500. def public_key(
  501. self,
  502. key: PUBLIC_KEY_TYPES,
  503. ) -> "CertificateBuilder":
  504. """
  505. Sets the requestor's public key (as found in the signing request).
  506. """
  507. if not isinstance(
  508. key,
  509. (
  510. dsa.DSAPublicKey,
  511. rsa.RSAPublicKey,
  512. ec.EllipticCurvePublicKey,
  513. ed25519.Ed25519PublicKey,
  514. ed448.Ed448PublicKey,
  515. ),
  516. ):
  517. raise TypeError(
  518. "Expecting one of DSAPublicKey, RSAPublicKey,"
  519. " EllipticCurvePublicKey, Ed25519PublicKey or"
  520. " Ed448PublicKey."
  521. )
  522. if self._public_key is not None:
  523. raise ValueError("The public key may only be set once.")
  524. return CertificateBuilder(
  525. self._issuer_name,
  526. self._subject_name,
  527. key,
  528. self._serial_number,
  529. self._not_valid_before,
  530. self._not_valid_after,
  531. self._extensions,
  532. )
  533. def serial_number(self, number: int) -> "CertificateBuilder":
  534. """
  535. Sets the certificate serial number.
  536. """
  537. if not isinstance(number, int):
  538. raise TypeError("Serial number must be of integral type.")
  539. if self._serial_number is not None:
  540. raise ValueError("The serial number may only be set once.")
  541. if number <= 0:
  542. raise ValueError("The serial number should be positive.")
  543. # ASN.1 integers are always signed, so most significant bit must be
  544. # zero.
  545. if number.bit_length() >= 160: # As defined in RFC 5280
  546. raise ValueError(
  547. "The serial number should not be more than 159 " "bits."
  548. )
  549. return CertificateBuilder(
  550. self._issuer_name,
  551. self._subject_name,
  552. self._public_key,
  553. number,
  554. self._not_valid_before,
  555. self._not_valid_after,
  556. self._extensions,
  557. )
  558. def not_valid_before(
  559. self, time: datetime.datetime
  560. ) -> "CertificateBuilder":
  561. """
  562. Sets the certificate activation time.
  563. """
  564. if not isinstance(time, datetime.datetime):
  565. raise TypeError("Expecting datetime object.")
  566. if self._not_valid_before is not None:
  567. raise ValueError("The not valid before may only be set once.")
  568. time = _convert_to_naive_utc_time(time)
  569. if time < _EARLIEST_UTC_TIME:
  570. raise ValueError(
  571. "The not valid before date must be on or after"
  572. " 1950 January 1)."
  573. )
  574. if self._not_valid_after is not None and time > self._not_valid_after:
  575. raise ValueError(
  576. "The not valid before date must be before the not valid after "
  577. "date."
  578. )
  579. return CertificateBuilder(
  580. self._issuer_name,
  581. self._subject_name,
  582. self._public_key,
  583. self._serial_number,
  584. time,
  585. self._not_valid_after,
  586. self._extensions,
  587. )
  588. def not_valid_after(self, time: datetime.datetime) -> "CertificateBuilder":
  589. """
  590. Sets the certificate expiration time.
  591. """
  592. if not isinstance(time, datetime.datetime):
  593. raise TypeError("Expecting datetime object.")
  594. if self._not_valid_after is not None:
  595. raise ValueError("The not valid after may only be set once.")
  596. time = _convert_to_naive_utc_time(time)
  597. if time < _EARLIEST_UTC_TIME:
  598. raise ValueError(
  599. "The not valid after date must be on or after"
  600. " 1950 January 1."
  601. )
  602. if (
  603. self._not_valid_before is not None
  604. and time < self._not_valid_before
  605. ):
  606. raise ValueError(
  607. "The not valid after date must be after the not valid before "
  608. "date."
  609. )
  610. return CertificateBuilder(
  611. self._issuer_name,
  612. self._subject_name,
  613. self._public_key,
  614. self._serial_number,
  615. self._not_valid_before,
  616. time,
  617. self._extensions,
  618. )
  619. def add_extension(
  620. self, extval: ExtensionType, critical: bool
  621. ) -> "CertificateBuilder":
  622. """
  623. Adds an X.509 extension to the certificate.
  624. """
  625. if not isinstance(extval, ExtensionType):
  626. raise TypeError("extension must be an ExtensionType")
  627. extension = Extension(extval.oid, critical, extval)
  628. _reject_duplicate_extension(extension, self._extensions)
  629. return CertificateBuilder(
  630. self._issuer_name,
  631. self._subject_name,
  632. self._public_key,
  633. self._serial_number,
  634. self._not_valid_before,
  635. self._not_valid_after,
  636. self._extensions + [extension],
  637. )
  638. def sign(
  639. self,
  640. private_key: PRIVATE_KEY_TYPES,
  641. algorithm: typing.Optional[hashes.HashAlgorithm],
  642. backend: typing.Optional[Backend] = None,
  643. ) -> Certificate:
  644. """
  645. Signs the certificate using the CA's private key.
  646. """
  647. backend = _get_backend(backend)
  648. if self._subject_name is None:
  649. raise ValueError("A certificate must have a subject name")
  650. if self._issuer_name is None:
  651. raise ValueError("A certificate must have an issuer name")
  652. if self._serial_number is None:
  653. raise ValueError("A certificate must have a serial number")
  654. if self._not_valid_before is None:
  655. raise ValueError("A certificate must have a not valid before time")
  656. if self._not_valid_after is None:
  657. raise ValueError("A certificate must have a not valid after time")
  658. if self._public_key is None:
  659. raise ValueError("A certificate must have a public key")
  660. return backend.create_x509_certificate(self, private_key, algorithm)
  661. class CertificateRevocationListBuilder(object):
  662. _extensions: typing.List[Extension[ExtensionType]]
  663. _revoked_certificates: typing.List[RevokedCertificate]
  664. def __init__(
  665. self,
  666. issuer_name: typing.Optional[Name] = None,
  667. last_update: typing.Optional[datetime.datetime] = None,
  668. next_update: typing.Optional[datetime.datetime] = None,
  669. extensions: typing.List[Extension[ExtensionType]] = [],
  670. revoked_certificates: typing.List[RevokedCertificate] = [],
  671. ):
  672. self._issuer_name = issuer_name
  673. self._last_update = last_update
  674. self._next_update = next_update
  675. self._extensions = extensions
  676. self._revoked_certificates = revoked_certificates
  677. def issuer_name(
  678. self, issuer_name: Name
  679. ) -> "CertificateRevocationListBuilder":
  680. if not isinstance(issuer_name, Name):
  681. raise TypeError("Expecting x509.Name object.")
  682. if self._issuer_name is not None:
  683. raise ValueError("The issuer name may only be set once.")
  684. return CertificateRevocationListBuilder(
  685. issuer_name,
  686. self._last_update,
  687. self._next_update,
  688. self._extensions,
  689. self._revoked_certificates,
  690. )
  691. def last_update(
  692. self, last_update: datetime.datetime
  693. ) -> "CertificateRevocationListBuilder":
  694. if not isinstance(last_update, datetime.datetime):
  695. raise TypeError("Expecting datetime object.")
  696. if self._last_update is not None:
  697. raise ValueError("Last update may only be set once.")
  698. last_update = _convert_to_naive_utc_time(last_update)
  699. if last_update < _EARLIEST_UTC_TIME:
  700. raise ValueError(
  701. "The last update date must be on or after" " 1950 January 1."
  702. )
  703. if self._next_update is not None and last_update > self._next_update:
  704. raise ValueError(
  705. "The last update date must be before the next update date."
  706. )
  707. return CertificateRevocationListBuilder(
  708. self._issuer_name,
  709. last_update,
  710. self._next_update,
  711. self._extensions,
  712. self._revoked_certificates,
  713. )
  714. def next_update(
  715. self, next_update: datetime.datetime
  716. ) -> "CertificateRevocationListBuilder":
  717. if not isinstance(next_update, datetime.datetime):
  718. raise TypeError("Expecting datetime object.")
  719. if self._next_update is not None:
  720. raise ValueError("Last update may only be set once.")
  721. next_update = _convert_to_naive_utc_time(next_update)
  722. if next_update < _EARLIEST_UTC_TIME:
  723. raise ValueError(
  724. "The last update date must be on or after" " 1950 January 1."
  725. )
  726. if self._last_update is not None and next_update < self._last_update:
  727. raise ValueError(
  728. "The next update date must be after the last update date."
  729. )
  730. return CertificateRevocationListBuilder(
  731. self._issuer_name,
  732. self._last_update,
  733. next_update,
  734. self._extensions,
  735. self._revoked_certificates,
  736. )
  737. def add_extension(
  738. self, extval: ExtensionType, critical: bool
  739. ) -> "CertificateRevocationListBuilder":
  740. """
  741. Adds an X.509 extension to the certificate revocation list.
  742. """
  743. if not isinstance(extval, ExtensionType):
  744. raise TypeError("extension must be an ExtensionType")
  745. extension = Extension(extval.oid, critical, extval)
  746. _reject_duplicate_extension(extension, self._extensions)
  747. return CertificateRevocationListBuilder(
  748. self._issuer_name,
  749. self._last_update,
  750. self._next_update,
  751. self._extensions + [extension],
  752. self._revoked_certificates,
  753. )
  754. def add_revoked_certificate(
  755. self, revoked_certificate: RevokedCertificate
  756. ) -> "CertificateRevocationListBuilder":
  757. """
  758. Adds a revoked certificate to the CRL.
  759. """
  760. if not isinstance(revoked_certificate, RevokedCertificate):
  761. raise TypeError("Must be an instance of RevokedCertificate")
  762. return CertificateRevocationListBuilder(
  763. self._issuer_name,
  764. self._last_update,
  765. self._next_update,
  766. self._extensions,
  767. self._revoked_certificates + [revoked_certificate],
  768. )
  769. def sign(
  770. self,
  771. private_key: PRIVATE_KEY_TYPES,
  772. algorithm: typing.Optional[hashes.HashAlgorithm],
  773. backend: typing.Optional[Backend] = None,
  774. ) -> CertificateRevocationList:
  775. backend = _get_backend(backend)
  776. if self._issuer_name is None:
  777. raise ValueError("A CRL must have an issuer name")
  778. if self._last_update is None:
  779. raise ValueError("A CRL must have a last update time")
  780. if self._next_update is None:
  781. raise ValueError("A CRL must have a next update time")
  782. return backend.create_x509_crl(self, private_key, algorithm)
  783. class RevokedCertificateBuilder(object):
  784. def __init__(
  785. self,
  786. serial_number: typing.Optional[int] = None,
  787. revocation_date: typing.Optional[datetime.datetime] = None,
  788. extensions: typing.List[Extension[ExtensionType]] = [],
  789. ):
  790. self._serial_number = serial_number
  791. self._revocation_date = revocation_date
  792. self._extensions = extensions
  793. def serial_number(self, number: int) -> "RevokedCertificateBuilder":
  794. if not isinstance(number, int):
  795. raise TypeError("Serial number must be of integral type.")
  796. if self._serial_number is not None:
  797. raise ValueError("The serial number may only be set once.")
  798. if number <= 0:
  799. raise ValueError("The serial number should be positive")
  800. # ASN.1 integers are always signed, so most significant bit must be
  801. # zero.
  802. if number.bit_length() >= 160: # As defined in RFC 5280
  803. raise ValueError(
  804. "The serial number should not be more than 159 " "bits."
  805. )
  806. return RevokedCertificateBuilder(
  807. number, self._revocation_date, self._extensions
  808. )
  809. def revocation_date(
  810. self, time: datetime.datetime
  811. ) -> "RevokedCertificateBuilder":
  812. if not isinstance(time, datetime.datetime):
  813. raise TypeError("Expecting datetime object.")
  814. if self._revocation_date is not None:
  815. raise ValueError("The revocation date may only be set once.")
  816. time = _convert_to_naive_utc_time(time)
  817. if time < _EARLIEST_UTC_TIME:
  818. raise ValueError(
  819. "The revocation date must be on or after" " 1950 January 1."
  820. )
  821. return RevokedCertificateBuilder(
  822. self._serial_number, time, self._extensions
  823. )
  824. def add_extension(
  825. self, extval: ExtensionType, critical: bool
  826. ) -> "RevokedCertificateBuilder":
  827. if not isinstance(extval, ExtensionType):
  828. raise TypeError("extension must be an ExtensionType")
  829. extension = Extension(extval.oid, critical, extval)
  830. _reject_duplicate_extension(extension, self._extensions)
  831. return RevokedCertificateBuilder(
  832. self._serial_number,
  833. self._revocation_date,
  834. self._extensions + [extension],
  835. )
  836. def build(
  837. self, backend: typing.Optional[Backend] = None
  838. ) -> RevokedCertificate:
  839. backend = _get_backend(backend)
  840. if self._serial_number is None:
  841. raise ValueError("A revoked certificate must have a serial number")
  842. if self._revocation_date is None:
  843. raise ValueError(
  844. "A revoked certificate must have a revocation date"
  845. )
  846. return backend.create_x509_revoked_certificate(self)
  847. def random_serial_number() -> int:
  848. return int.from_bytes(os.urandom(20), "big") >> 1