123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # This file is dual licensed under the terms of the Apache License, Version
- # 2.0, and the BSD License. See the LICENSE file in the root of this repository
- # for complete details.
- from cryptography import utils
- from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
- from cryptography.hazmat.primitives import ciphers
- from cryptography.hazmat.primitives.ciphers import modes
- @utils.register_interface(ciphers.CipherContext)
- @utils.register_interface(ciphers.AEADCipherContext)
- @utils.register_interface(ciphers.AEADEncryptionContext)
- @utils.register_interface(ciphers.AEADDecryptionContext)
- class _CipherContext(object):
- _ENCRYPT = 1
- _DECRYPT = 0
- _MAX_CHUNK_SIZE = 2 ** 30 - 1
- def __init__(self, backend, cipher, mode, operation):
- self._backend = backend
- self._cipher = cipher
- self._mode = mode
- self._operation = operation
- self._tag = None
- if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
- self._block_size_bytes = self._cipher.block_size // 8
- else:
- self._block_size_bytes = 1
- ctx = self._backend._lib.EVP_CIPHER_CTX_new()
- ctx = self._backend._ffi.gc(
- ctx, self._backend._lib.EVP_CIPHER_CTX_free
- )
- registry = self._backend._cipher_registry
- try:
- adapter = registry[type(cipher), type(mode)]
- except KeyError:
- raise UnsupportedAlgorithm(
- "cipher {} in {} mode is not supported "
- "by this backend.".format(
- cipher.name, mode.name if mode else mode
- ),
- _Reasons.UNSUPPORTED_CIPHER,
- )
- evp_cipher = adapter(self._backend, cipher, mode)
- if evp_cipher == self._backend._ffi.NULL:
- msg = "cipher {0.name} ".format(cipher)
- if mode is not None:
- msg += "in {0.name} mode ".format(mode)
- msg += (
- "is not supported by this backend (Your version of OpenSSL "
- "may be too old. Current version: {}.)"
- ).format(self._backend.openssl_version_text())
- raise UnsupportedAlgorithm(msg, _Reasons.UNSUPPORTED_CIPHER)
- if isinstance(mode, modes.ModeWithInitializationVector):
- iv_nonce = self._backend._ffi.from_buffer(
- mode.initialization_vector
- )
- elif isinstance(mode, modes.ModeWithTweak):
- iv_nonce = self._backend._ffi.from_buffer(mode.tweak)
- elif isinstance(mode, modes.ModeWithNonce):
- iv_nonce = self._backend._ffi.from_buffer(mode.nonce)
- elif isinstance(cipher, modes.ModeWithNonce):
- iv_nonce = self._backend._ffi.from_buffer(cipher.nonce)
- else:
- iv_nonce = self._backend._ffi.NULL
- # begin init with cipher and operation type
- res = self._backend._lib.EVP_CipherInit_ex(
- ctx,
- evp_cipher,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- operation,
- )
- self._backend.openssl_assert(res != 0)
- # set the key length to handle variable key ciphers
- res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
- ctx, len(cipher.key)
- )
- self._backend.openssl_assert(res != 0)
- if isinstance(mode, modes.GCM):
- res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- ctx,
- self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
- len(iv_nonce),
- self._backend._ffi.NULL,
- )
- self._backend.openssl_assert(res != 0)
- if mode.tag is not None:
- res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- ctx,
- self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
- len(mode.tag),
- mode.tag,
- )
- self._backend.openssl_assert(res != 0)
- self._tag = mode.tag
- # pass key/iv
- res = self._backend._lib.EVP_CipherInit_ex(
- ctx,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.from_buffer(cipher.key),
- iv_nonce,
- operation,
- )
- # Check for XTS mode duplicate keys error
- errors = self._backend._consume_errors()
- lib = self._backend._lib
- if res == 0 and (
- (
- lib.CRYPTOGRAPHY_OPENSSL_111D_OR_GREATER
- and errors[0]._lib_reason_match(
- lib.ERR_LIB_EVP, lib.EVP_R_XTS_DUPLICATED_KEYS
- )
- )
- or (
- lib.Cryptography_HAS_PROVIDERS
- and errors[0]._lib_reason_match(
- lib.ERR_LIB_PROV, lib.PROV_R_XTS_DUPLICATED_KEYS
- )
- )
- ):
- raise ValueError("In XTS mode duplicated keys are not allowed")
- self._backend.openssl_assert(res != 0, errors=errors)
- # We purposely disable padding here as it's handled higher up in the
- # API.
- self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
- self._ctx = ctx
- def update(self, data: bytes) -> bytes:
- buf = bytearray(len(data) + self._block_size_bytes - 1)
- n = self.update_into(data, buf)
- return bytes(buf[:n])
- def update_into(self, data: bytes, buf) -> int:
- total_data_len = len(data)
- if len(buf) < (total_data_len + self._block_size_bytes - 1):
- raise ValueError(
- "buffer must be at least {} bytes for this "
- "payload".format(len(data) + self._block_size_bytes - 1)
- )
- data_processed = 0
- total_out = 0
- outlen = self._backend._ffi.new("int *")
- baseoutbuf = self._backend._ffi.from_buffer(buf)
- baseinbuf = self._backend._ffi.from_buffer(data)
- while data_processed != total_data_len:
- outbuf = baseoutbuf + total_out
- inbuf = baseinbuf + data_processed
- inlen = min(self._MAX_CHUNK_SIZE, total_data_len - data_processed)
- res = self._backend._lib.EVP_CipherUpdate(
- self._ctx, outbuf, outlen, inbuf, inlen
- )
- if res == 0 and isinstance(self._mode, modes.XTS):
- self._backend._consume_errors()
- raise ValueError(
- "In XTS mode you must supply at least a full block in the "
- "first update call. For AES this is 16 bytes."
- )
- else:
- self._backend.openssl_assert(res != 0)
- data_processed += inlen
- total_out += outlen[0]
- return total_out
- def finalize(self) -> bytes:
- if (
- self._operation == self._DECRYPT
- and isinstance(self._mode, modes.ModeWithAuthenticationTag)
- and self.tag is None
- ):
- raise ValueError(
- "Authentication tag must be provided when decrypting."
- )
- buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes)
- outlen = self._backend._ffi.new("int *")
- res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
- if res == 0:
- errors = self._backend._consume_errors()
- if not errors and isinstance(self._mode, modes.GCM):
- raise InvalidTag
- self._backend.openssl_assert(
- errors[0]._lib_reason_match(
- self._backend._lib.ERR_LIB_EVP,
- self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH,
- )
- or (
- self._backend._lib.Cryptography_HAS_PROVIDERS
- and errors[0]._lib_reason_match(
- self._backend._lib.ERR_LIB_PROV,
- self._backend._lib.PROV_R_WRONG_FINAL_BLOCK_LENGTH,
- )
- ),
- errors=errors,
- )
- raise ValueError(
- "The length of the provided data is not a multiple of "
- "the block length."
- )
- if (
- isinstance(self._mode, modes.GCM)
- and self._operation == self._ENCRYPT
- ):
- tag_buf = self._backend._ffi.new(
- "unsigned char[]", self._block_size_bytes
- )
- res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- self._ctx,
- self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
- self._block_size_bytes,
- tag_buf,
- )
- self._backend.openssl_assert(res != 0)
- self._tag = self._backend._ffi.buffer(tag_buf)[:]
- res = self._backend._lib.EVP_CIPHER_CTX_reset(self._ctx)
- self._backend.openssl_assert(res == 1)
- return self._backend._ffi.buffer(buf)[: outlen[0]]
- def finalize_with_tag(self, tag: bytes) -> bytes:
- tag_len = len(tag)
- if tag_len < self._mode._min_tag_length:
- raise ValueError(
- "Authentication tag must be {} bytes or longer.".format(
- self._mode._min_tag_length
- )
- )
- elif tag_len > self._block_size_bytes:
- raise ValueError(
- "Authentication tag cannot be more than {} bytes.".format(
- self._block_size_bytes
- )
- )
- res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
- self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
- )
- self._backend.openssl_assert(res != 0)
- self._tag = tag
- return self.finalize()
- def authenticate_additional_data(self, data: bytes) -> None:
- outlen = self._backend._ffi.new("int *")
- res = self._backend._lib.EVP_CipherUpdate(
- self._ctx,
- self._backend._ffi.NULL,
- outlen,
- self._backend._ffi.from_buffer(data),
- len(data),
- )
- self._backend.openssl_assert(res != 0)
- tag = utils.read_only_property("_tag")
|