sigstore._internal.sct
Utilities for verifying signed certificate timestamps.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Utilities for verifying signed certificate timestamps. 17""" 18 19import logging 20import struct 21from datetime import timezone 22from typing import List, Optional 23 24from cryptography.hazmat.primitives import hashes, serialization 25from cryptography.hazmat.primitives.asymmetric import ec, rsa 26from cryptography.x509 import ( 27 Certificate, 28 ExtendedKeyUsage, 29 ExtensionNotFound, 30 PrecertificateSignedCertificateTimestamps, 31) 32from cryptography.x509.certificate_transparency import ( 33 LogEntryType, 34 SignedCertificateTimestamp, 35) 36from cryptography.x509.oid import ExtendedKeyUsageOID 37 38from sigstore._internal.trust import CTKeyring 39from sigstore._utils import ( 40 KeyID, 41 cert_is_ca, 42 key_id, 43) 44from sigstore.errors import VerificationError 45 46_logger = logging.getLogger(__name__) 47 48 49def _pack_signed_entry( 50 sct: SignedCertificateTimestamp, cert: Certificate, issuer_key_id: Optional[bytes] 51) -> bytes: 52 fields = [] 53 if sct.entry_type == LogEntryType.X509_CERTIFICATE: 54 # When dealing with a "normal" certificate, our signed entry looks like this: 55 # 56 # [0]: opaque ASN.1Cert<1..2^24-1> 57 pack_format = "!BBB{cert_der_len}s" 58 cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) 59 elif sct.entry_type == LogEntryType.PRE_CERTIFICATE: 60 if not issuer_key_id or len(issuer_key_id) != 32: 61 raise VerificationError("API misuse: issuer key ID missing") 62 63 # When dealing with a precertificate, our signed entry looks like this: 64 # 65 # [0]: issuer_key_id[32] 66 # [1]: opaque TBSCertificate<1..2^24-1> 67 pack_format = "!32sBBB{cert_der_len}s" 68 69 # Precertificates must have their SCT list extension filtered out. 70 cert_der = cert.tbs_precertificate_bytes 71 fields.append(issuer_key_id) 72 else: 73 raise VerificationError(f"unknown SCT log entry type: {sct.entry_type!r}") 74 75 # The `opaque` length is a u24, which isn't directly supported by `struct`. 76 # So we have to decompose it into 3 bytes. 77 unused, len1, len2, len3 = struct.unpack( 78 "!4B", 79 struct.pack("!I", len(cert_der)), 80 ) 81 if unused: 82 raise VerificationError( 83 f"Unexpectedly large certificate length: {len(cert_der)}" 84 ) 85 86 pack_format = pack_format.format(cert_der_len=len(cert_der)) 87 fields.extend((len1, len2, len3, cert_der)) 88 89 return struct.pack(pack_format, *fields) 90 91 92def _pack_digitally_signed( 93 sct: SignedCertificateTimestamp, 94 cert: Certificate, 95 issuer_key_id: Optional[KeyID], 96) -> bytes: 97 """ 98 Packs the contents of `cert` (and some pieces of `sct`) into a structured 99 blob, one that forms the signature body of the "digitally-signed" struct 100 for an SCT. 101 102 The format of the digitaly signed data is described in IETF's RFC 6962. 103 """ 104 105 # No extensions are currently specified, so we treat the presence 106 # of any extension bytes as suspicious. 107 if len(sct.extension_bytes) != 0: 108 raise VerificationError("Unexpected trailing extension bytes") 109 110 # This constructs the "core" `signed_entry` field, which is either 111 # the public bytes of the cert *or* the TBSPrecertificate (with some 112 # filtering), depending on whether our SCT is for a precertificate. 113 signed_entry = _pack_signed_entry(sct, cert, issuer_key_id) 114 115 # Assemble a format string with the certificate length baked in and then pack the digitally 116 # signed data 117 # fmt: off 118 pattern = "!BBQH%dsH" % len(signed_entry) 119 timestamp = sct.timestamp.replace(tzinfo=timezone.utc) 120 data = struct.pack( 121 pattern, 122 sct.version.value, # sct_version 123 0, # signature_type (certificate_timestamp(0)) 124 int(timestamp.timestamp() * 1000), # timestamp (milliseconds) 125 sct.entry_type.value, # entry_type (x509_entry(0) | precert_entry(1)) 126 signed_entry, # select(entry_type) -> signed_entry (see above) 127 len(sct.extension_bytes), # extensions (opaque CtExtensions<0..2^16-1>) 128 ) 129 # fmt: on 130 131 return data 132 133 134def _is_preissuer(issuer: Certificate) -> bool: 135 try: 136 ext_key_usage = issuer.extensions.get_extension_for_class(ExtendedKeyUsage) 137 # If we do not have any EKU, we certainly do not have CT Ext 138 except ExtensionNotFound: 139 return False 140 141 return ExtendedKeyUsageOID.CERTIFICATE_TRANSPARENCY in ext_key_usage.value 142 143 144def _get_issuer_cert(chain: List[Certificate]) -> Certificate: 145 issuer = chain[0] 146 if _is_preissuer(issuer): 147 issuer = chain[1] 148 return issuer 149 150 151class UnexpectedSctCountException(Exception): 152 """ 153 Number of percerts scts is wrong 154 """ 155 156 pass 157 158 159def _get_precertificate_signed_certificate_timestamps( 160 certificate: Certificate, 161) -> PrecertificateSignedCertificateTimestamps: 162 # Try to retrieve the embedded SCTs within the cert. 163 try: 164 precert_scts_extension = certificate.extensions.get_extension_for_class( 165 PrecertificateSignedCertificateTimestamps 166 ).value 167 except ExtensionNotFound: 168 raise ValueError( 169 "No PrecertificateSignedCertificateTimestamps found for the certificate" 170 ) 171 172 if len(precert_scts_extension) != 1: 173 raise UnexpectedSctCountException( 174 f"Unexpected embedded SCT count in response: {len(precert_scts_extension)} != 1" 175 ) 176 return precert_scts_extension 177 178 179def _cert_is_ca(cert: Certificate) -> bool: 180 _logger.debug(f"Found {cert.subject} as issuer, verifying if it is a ca") 181 try: 182 cert_is_ca(cert) 183 except VerificationError as e: 184 _logger.debug(f"Invalid {cert.subject}: failed to validate as a CA: {e}") 185 return False 186 return True 187 188 189def verify_sct( 190 sct: SignedCertificateTimestamp, 191 cert: Certificate, 192 chain: List[Certificate], 193 ct_keyring: CTKeyring, 194) -> None: 195 """ 196 Verify a signed certificate timestamp. 197 198 An SCT is verified by reconstructing its "digitally-signed" payload 199 and verifying that the signature provided in the SCT is valid against 200 one of the keys present in the CT keyring (i.e., the keys used by the CT 201 log to sign SCTs). 202 """ 203 204 issuer_key_id = None 205 if sct.entry_type == LogEntryType.PRE_CERTIFICATE: 206 # If we're verifying an SCT for a precertificate, we need to 207 # find its issuer in the chain and calculate a hash over 208 # its public key information, as part of the "binding" proof 209 # that ties the issuer to the final certificate. 210 issuer_cert = _get_issuer_cert(chain) 211 issuer_pubkey = issuer_cert.public_key() 212 213 if not _cert_is_ca(issuer_cert): 214 raise VerificationError( 215 f"SCT verify: Invalid issuer pubkey basicConstraint (not a CA): {issuer_pubkey}" 216 ) 217 218 if not isinstance(issuer_pubkey, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)): 219 raise VerificationError( 220 f"SCT verify: invalid issuer pubkey format (not ECDSA or RSA): {issuer_pubkey}" 221 ) 222 223 issuer_key_id = key_id(issuer_pubkey) 224 225 digitally_signed = _pack_digitally_signed(sct, cert, issuer_key_id) 226 227 if not isinstance(sct.signature_hash_algorithm, hashes.SHA256): 228 raise VerificationError( 229 "Found unexpected hash algorithm in SCT: only SHA256 is supported " 230 f"(expected {hashes.SHA256}, got {sct.signature_hash_algorithm})" 231 ) 232 233 try: 234 _logger.debug(f"attempting to verify SCT with key ID {sct.log_id.hex()}") 235 # NOTE(ww): In terms of the DER structure, the SCT's `LogID` contains a 236 # singular `opaque key_id[32]`. Cryptography's APIs don't bother 237 # to expose this trivial single member, so we use the `log_id` 238 # attribute directly. 239 ct_keyring.verify( 240 key_id=KeyID(sct.log_id), signature=sct.signature, data=digitally_signed 241 ) 242 except VerificationError as exc: 243 raise VerificationError(f"SCT verify failed: {exc}")
class
UnexpectedSctCountException(builtins.Exception):
152class UnexpectedSctCountException(Exception): 153 """ 154 Number of percerts scts is wrong 155 """ 156 157 pass
Number of percerts scts is wrong
def
verify_sct( sct: cryptography.x509.certificate_transparency.SignedCertificateTimestamp, cert: cryptography.x509.base.Certificate, chain: List[cryptography.x509.base.Certificate], ct_keyring: sigstore._internal.trust.CTKeyring) -> None:
190def verify_sct( 191 sct: SignedCertificateTimestamp, 192 cert: Certificate, 193 chain: List[Certificate], 194 ct_keyring: CTKeyring, 195) -> None: 196 """ 197 Verify a signed certificate timestamp. 198 199 An SCT is verified by reconstructing its "digitally-signed" payload 200 and verifying that the signature provided in the SCT is valid against 201 one of the keys present in the CT keyring (i.e., the keys used by the CT 202 log to sign SCTs). 203 """ 204 205 issuer_key_id = None 206 if sct.entry_type == LogEntryType.PRE_CERTIFICATE: 207 # If we're verifying an SCT for a precertificate, we need to 208 # find its issuer in the chain and calculate a hash over 209 # its public key information, as part of the "binding" proof 210 # that ties the issuer to the final certificate. 211 issuer_cert = _get_issuer_cert(chain) 212 issuer_pubkey = issuer_cert.public_key() 213 214 if not _cert_is_ca(issuer_cert): 215 raise VerificationError( 216 f"SCT verify: Invalid issuer pubkey basicConstraint (not a CA): {issuer_pubkey}" 217 ) 218 219 if not isinstance(issuer_pubkey, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)): 220 raise VerificationError( 221 f"SCT verify: invalid issuer pubkey format (not ECDSA or RSA): {issuer_pubkey}" 222 ) 223 224 issuer_key_id = key_id(issuer_pubkey) 225 226 digitally_signed = _pack_digitally_signed(sct, cert, issuer_key_id) 227 228 if not isinstance(sct.signature_hash_algorithm, hashes.SHA256): 229 raise VerificationError( 230 "Found unexpected hash algorithm in SCT: only SHA256 is supported " 231 f"(expected {hashes.SHA256}, got {sct.signature_hash_algorithm})" 232 ) 233 234 try: 235 _logger.debug(f"attempting to verify SCT with key ID {sct.log_id.hex()}") 236 # NOTE(ww): In terms of the DER structure, the SCT's `LogID` contains a 237 # singular `opaque key_id[32]`. Cryptography's APIs don't bother 238 # to expose this trivial single member, so we use the `log_id` 239 # attribute directly. 240 ct_keyring.verify( 241 key_id=KeyID(sct.log_id), signature=sct.signature, data=digitally_signed 242 ) 243 except VerificationError as exc: 244 raise VerificationError(f"SCT verify failed: {exc}")
Verify a signed certificate timestamp.
An SCT is verified by reconstructing its "digitally-signed" payload and verifying that the signature provided in the SCT is valid against one of the keys present in the CT keyring (i.e., the keys used by the CT log to sign SCTs).