Source code for azure.keyvault.custom.http_message_security

#---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#---------------------------------------------------------------------------------------------

import json
import time
import os
from .internal import _a128cbc_hs256_encrypt, _a128cbc_hs256_decrypt, _JwsHeader, _JwsObject, \
    _JweHeader, _JweObject, _str_to_b64url, _bstr_to_b64url, _b64_to_bstr, _RsaKey


[docs]def generate_pop_key(): """ Generates a key which can be used for Proof Of Possession token authentication. :return: """ return _RsaKey.generate()
class HttpMessageSecurity(object): """ Used for message authorization, encryption and decrtyption. This class is intended for internal use only. Details are subject to non-compatible changes, consumers of the azure-keyvault module should not take dependencies on this class or its current implementation. """ def __init__(self, client_security_token=None, client_signature_key=None, client_encryption_key=None, server_signature_key=None, server_encryption_key=None): self.client_security_token = client_security_token self.client_signature_key = client_signature_key self.client_encryption_key = client_encryption_key self.server_signature_key = server_signature_key self.server_encryption_key = server_encryption_key def protect_request(self, request): """ Adds authorization header, and encrypts and signs the request if supported on the specific request. :param request: unprotected request to apply security protocol :return: protected request with appropriate security protocal applied """ # Setup the auth header on the request # Due to limitations in the service we hard code the auth scheme to 'Bearer' as the service will fail with any # other scheme or a different casing such as 'bearer', once this is fixed the following line should be replaced: # request.headers['Authorization'] = '{} {}'.format(auth[0], auth[1]) request.headers['Authorization'] = '{} {}'.format('Bearer', self.client_security_token) # if the current message security doesn't support message protection, or the body is empty # skip protection and return the original request if not self.supports_protection() or len(request.body) == 0: return request plain_text = request.body # if the client encryption key is specified add it to the body of the request if self.client_encryption_key: # note that this assumes that the body is already json and not simple string content # this is true for all requests which currently support message encryption, but might # need to be revisited when the types of body_dict = json.loads(plain_text) body_dict['rek'] = {'jwk': self.client_encryption_key.to_jwk().serialize()} plain_text = json.dumps(body_dict).encode(encoding='utf8') # build the header for the jws body jws_header = _JwsHeader() jws_header.alg = 'RS256' jws_header.kid = self.client_signature_key.kid jws_header.at = self.client_security_token jws_header.ts = int(time.time()) jws_header.typ = 'PoP' jws = _JwsObject() jws.protected = jws_header.to_compact_header() jws.payload = self._protect_payload(plain_text) data = (jws.protected + '.' + jws.payload).encode('ascii') jws.signature = _bstr_to_b64url(self.client_signature_key.sign(data)) request.headers['Content-Type'] = 'application/jose+json' request.prepare_body(data=jws.to_flattened_jws(), files=None) return request def unprotect_response(self, response, **kwargs): """ Removes protection from the specified response :param request: response from the key vault service :return: unprotected response with any security protocal encryption removed """ body = response.content # if the current message security doesn't support message protection, the body is empty, or the request failed # skip protection and return the original response if not self.supports_protection() or len(response.content) == 0 or response.status_code != 200: return response # ensure the content-type is application/jose+json if 'application/jose+json' not in response.headers.get('content-type', '').lower(): raise ValueError('Invalid protected response') # deserialize the response into a JwsObject, using response.text so requests handles the encoding jws = _JwsObject().deserialize(body) # deserialize the protected header jws_header = _JwsHeader.from_compact_header(jws.protected) # ensure the jws signature kid matches the key from original challenge # and the alg matches expected signature alg if jws_header.kid != self.server_signature_key.kid \ or jws_header.alg != 'RS256': raise ValueError('Invalid protected response') # validate the signature of the jws data = (jws.protected + '.' + jws.payload).encode('ascii') # verify will raise an InvalidSignature exception if the signature doesn't match self.server_signature_key.verify(signature=_b64_to_bstr(jws.signature), data=data) # get the unprotected response body decrypted = self._unprotect_payload(jws.payload) response._content = decrypted response.headers['Content-Type'] = 'application/json' return response def supports_protection(self): """ Determines if the the current HttpMessageSecurity object supports the message protection protocol. :return: True if the current object supports protection, otherwise False """ return self.client_signature_key \ and self.client_encryption_key \ and self.server_signature_key \ and self.server_encryption_key def _protect_payload(self, plaintext): # create the jwe header for the payload kek = self.server_encryption_key jwe_header = _JweHeader() jwe_header.alg = 'RSA-OAEP' jwe_header.kid = kek.kid jwe_header.enc = 'A128CBC-HS256' # create the jwe object jwe = _JweObject() jwe.protected = jwe_header.to_compact_header() # generate the content encryption key and iv cek = os.urandom(32) iv = os.urandom(16) jwe.iv = _bstr_to_b64url(iv) # wrap the cek using the server encryption key wrapped = _bstr_to_b64url(kek.encrypt(cek)) jwe.encrypted_key = wrapped # encrypt the plaintext body with the cek using the protected header # as the authdata to get the ciphertext and the authtag ciphertext, tag = _a128cbc_hs256_encrypt(cek, iv, plaintext, jwe.protected.encode('ascii')) jwe.ciphertext = _bstr_to_b64url(ciphertext) jwe.tag = _bstr_to_b64url(tag) # flatten and encode the jwe for the final jws payload content flat = jwe.to_flattened_jwe() return _str_to_b64url(flat) def _unprotect_payload(self, payload): # deserialize the payload jwe = _JweObject().deserialize_b64(payload) # deserialize the payload header jwe_header = _JweHeader.from_compact_header(jwe.protected) # ensure the kid matches the specified client encryption key # and the key wrap alg and the data encryption enc match the expected if self.client_encryption_key.kid != jwe_header.kid \ or jwe_header.alg != 'RSA-OAEP' \ or jwe_header.enc != 'A128CBC-HS256': raise ValueError('Invalid protected response') # unwrap the cek using the client encryption key cek = self.client_encryption_key.decrypt(_b64_to_bstr(jwe.encrypted_key)) # decrypt the cipher text to get the unprotected body content return _a128cbc_hs256_decrypt(cek, _b64_to_bstr(jwe.iv), _b64_to_bstr(jwe.ciphertext), jwe.protected.encode('ascii'), _b64_to_bstr(jwe.tag))