sigstore._utils
Shared utilities.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Shared utilities. 17""" 18 19from __future__ import annotations 20 21import base64 22import hashlib 23import sys 24from typing import IO, NewType, Type, Union 25 26from cryptography.hazmat.primitives import serialization 27from cryptography.hazmat.primitives.asymmetric import ec, rsa 28from cryptography.x509 import ( 29 Certificate, 30 ExtensionNotFound, 31 Version, 32 load_der_x509_certificate, 33) 34from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID 35from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm 36 37from sigstore import hashes as sigstore_hashes 38from sigstore.errors import VerificationError 39 40if sys.version_info < (3, 11): 41 import importlib_resources as resources 42else: 43 from importlib import resources 44 45 46PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey] 47 48PublicKeyTypes = Union[Type[rsa.RSAPublicKey], Type[ec.EllipticCurvePublicKey]] 49 50HexStr = NewType("HexStr", str) 51""" 52A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`). 53""" 54B64Str = NewType("B64Str", str) 55""" 56A newtype for `str` objects that contain base64 encoded strings. 57""" 58KeyID = NewType("KeyID", bytes) 59""" 60A newtype for `bytes` objects that contain a key id. 61""" 62 63 64def load_pem_public_key( 65 key_pem: bytes, 66 *, 67 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 68) -> PublicKey: 69 """ 70 A specialization of `cryptography`'s `serialization.load_pem_public_key` 71 with a uniform exception type (`VerificationError`) and filtering on valid key types 72 for Sigstore purposes. 73 """ 74 75 try: 76 key = serialization.load_pem_public_key(key_pem) 77 except Exception as exc: 78 raise VerificationError("could not load PEM-formatted public key") from exc 79 80 if not isinstance(key, types): 81 raise VerificationError(f"invalid key format: not one of {types}") 82 83 return key # type: ignore[return-value] 84 85 86def load_der_public_key( 87 key_der: bytes, 88 *, 89 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 90) -> PublicKey: 91 """ 92 The `load_pem_public_key` specialization, but DER. 93 """ 94 95 try: 96 key = serialization.load_der_public_key(key_der) 97 except Exception as exc: 98 raise VerificationError("could not load DER-formatted public key") from exc 99 100 if not isinstance(key, types): 101 raise VerificationError(f"invalid key format: not one of {types}") 102 103 return key # type: ignore[return-value] 104 105 106def base64_encode_pem_cert(cert: Certificate) -> B64Str: 107 """ 108 Returns a string containing a base64-encoded PEM-encoded X.509 certificate. 109 """ 110 111 return B64Str( 112 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() 113 ) 114 115 116def cert_der_to_pem(der: bytes) -> str: 117 """ 118 Converts a DER-encoded X.509 certificate into its PEM encoding. 119 120 Returns a string containing a PEM-encoded X.509 certificate. 121 """ 122 123 # NOTE: Technically we don't have to round-trip like this, since 124 # the DER-to-PEM transformation is entirely mechanical. 125 cert = load_der_x509_certificate(der) 126 return cert.public_bytes(serialization.Encoding.PEM).decode() 127 128 129def key_id(key: PublicKey) -> KeyID: 130 """ 131 Returns an RFC 6962-style "key ID" for the given public key. 132 133 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2> 134 """ 135 public_bytes = key.public_bytes( 136 encoding=serialization.Encoding.DER, 137 format=serialization.PublicFormat.SubjectPublicKeyInfo, 138 ) 139 140 return KeyID(hashlib.sha256(public_bytes).digest()) 141 142 143def sha256_digest( 144 input_: bytes | IO[bytes] | sigstore_hashes.Hashed, 145) -> sigstore_hashes.Hashed: 146 """ 147 Compute the SHA256 digest of an input stream or buffer or, 148 if given a `Hashed`, return it directly. 149 """ 150 if isinstance(input_, sigstore_hashes.Hashed): 151 return input_ 152 153 # If the input is already buffered into memory, there's no point in 154 # going back through an I/O abstraction. 155 if isinstance(input_, bytes): 156 return sigstore_hashes.Hashed( 157 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256 158 ) 159 160 return sigstore_hashes.Hashed( 161 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256 162 ) 163 164 165def _sha256_streaming(io: IO[bytes]) -> bytes: 166 """ 167 Compute the SHA256 of a stream. 168 169 This function does its own internal buffering, so an unbuffered stream 170 should be supplied for optimal performance. 171 """ 172 173 # NOTE: This function performs a SHA256 digest over a stream. 174 # The stream's size is not checked, meaning that the stream's source 175 # is implicitly trusted: if an attacker is able to truncate the stream's 176 # source prematurely, then they could conceivably produce a digest 177 # for a partial stream. This in turn could conceivably result 178 # in a valid signature for an unintended (truncated) input. 179 # 180 # This is currently outside of sigstore-python's threat model: we 181 # assume that the stream is trusted. 182 # 183 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972 184 185 sha256 = hashlib.sha256() 186 # Per coreutils' ioblksize.h: 128KB performs optimally across a range 187 # of systems in terms of minimizing syscall overhead. 188 view = memoryview(bytearray(128 * 1024)) 189 190 nbytes = io.readinto(view) # type: ignore 191 while nbytes: 192 sha256.update(view[:nbytes]) 193 nbytes = io.readinto(view) # type: ignore 194 195 return sha256.digest() 196 197 198def read_embedded(name: str, prefix: str) -> bytes: 199 """ 200 Read a resource embedded in this distribution of sigstore-python, 201 returning its contents as bytes. 202 """ 203 b: bytes = resources.files("sigstore._store").joinpath(prefix, name).read_bytes() 204 return b 205 206 207def cert_is_ca(cert: Certificate) -> bool: 208 """ 209 Returns `True` if and only if the given `Certificate` 210 is a CA certificate. 211 212 This function doesn't indicate the trustworthiness of the given 213 `Certificate`, only whether it has the appropriate interior state. 214 215 This function is **not** naively invertible: users **must** use the 216 dedicated `cert_is_leaf` utility function to determine whether a particular 217 leaf upholds Sigstore's invariants. 218 """ 219 220 # Only v3 certificates should appear in the context of Sigstore; 221 # earlier versions of X.509 lack extensions and have ambiguous CA 222 # behavior. 223 if cert.version != Version.v3: 224 raise VerificationError(f"invalid X.509 version: {cert.version}") 225 226 # Valid CA certificates must have the following set: 227 # 228 # * `BasicKeyUsage.keyCertSign` 229 # * `BasicConstraints.ca` 230 # 231 # Any other combination of states is inconsistent and invalid, meaning 232 # that we won't consider the certificate a valid non-CA leaf. 233 234 try: 235 basic_constraints = cert.extensions.get_extension_for_oid( 236 ExtensionOID.BASIC_CONSTRAINTS 237 ) 238 239 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9. 240 if not basic_constraints.critical: 241 raise VerificationError( 242 "invalid X.509 certificate: non-critical BasicConstraints in CA" 243 ) 244 245 ca = basic_constraints.value.ca # type: ignore 246 except ExtensionNotFound: 247 # No BasicConstrains means that this can't possibly be a CA. 248 return False 249 250 key_cert_sign = False 251 try: 252 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 253 key_cert_sign = key_usage.value.key_cert_sign # type: ignore 254 except ExtensionNotFound: 255 raise VerificationError("invalid X.509 certificate: missing KeyUsage") 256 257 # If both states are set, this is a CA. 258 if ca and key_cert_sign: 259 return True 260 261 if not (ca or key_cert_sign): 262 return False 263 264 # Anything else is an invalid state that should never occur. 265 raise VerificationError( 266 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}" 267 f", BasicConstraints.ca={ca}" 268 ) 269 270 271def cert_is_root_ca(cert: Certificate) -> bool: 272 """ 273 Returns `True` if and only if the given `Certificate` indicates 274 that it's a root CA. 275 276 This is **not** a verification function, and it does not establish 277 the trustworthiness of the given certificate. 278 """ 279 280 # NOTE(ww): This function is obnoxiously long to make the different 281 # states explicit. 282 283 # Only v3 certificates should appear in the context of Sigstore; 284 # earlier versions of X.509 lack extensions and have ambiguous CA 285 # behavior. 286 if cert.version != Version.v3: 287 raise VerificationError(f"invalid X.509 version: {cert.version}") 288 289 # Non-CAs can't possibly be root CAs. 290 if not cert_is_ca(cert): 291 return False 292 293 # A certificate that is its own issuer and signer is considered a root CA. 294 try: 295 cert.verify_directly_issued_by(cert) 296 return True 297 except Exception: 298 return False 299 300 301def cert_is_leaf(cert: Certificate) -> bool: 302 """ 303 Returns `True` if and only if the given `Certificate` is a valid 304 leaf certificate for Sigstore purposes. This means that: 305 306 * It is not a root or intermediate CA; 307 * It has `KeyUsage.digitalSignature`; 308 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`. 309 310 This is **not** a verification function, and it does not establish 311 the trustworthiness of the given certificate. 312 """ 313 314 # Only v3 certificates should appear in the context of Sigstore; 315 # earlier versions of X.509 lack extensions and have ambiguous CA 316 # behavior. 317 if cert.version != Version.v3: 318 raise VerificationError(f"invalid X.509 version: {cert.version}") 319 320 # CAs are not leaves. 321 if cert_is_ca(cert): 322 return False 323 324 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 325 digital_signature = key_usage.value.digital_signature # type: ignore 326 327 if not digital_signature: 328 raise VerificationError( 329 "invalid certificate for Sigstore purposes: missing digital signature usage" 330 ) 331 332 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages` 333 # extension that includes a codesigning entitlement. Sigstore should 334 # never issue a leaf that doesn't have this extended usage. 335 try: 336 extended_key_usage = cert.extensions.get_extension_for_oid( 337 ExtensionOID.EXTENDED_KEY_USAGE 338 ) 339 340 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore 341 except ExtensionNotFound: 342 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")
A newtype for str
objects that contain hexadecimal strings (e.g. ffabcd00ff
).
A newtype for str
objects that contain base64 encoded strings.
A newtype for bytes
objects that contain a key id.
65def load_pem_public_key( 66 key_pem: bytes, 67 *, 68 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 69) -> PublicKey: 70 """ 71 A specialization of `cryptography`'s `serialization.load_pem_public_key` 72 with a uniform exception type (`VerificationError`) and filtering on valid key types 73 for Sigstore purposes. 74 """ 75 76 try: 77 key = serialization.load_pem_public_key(key_pem) 78 except Exception as exc: 79 raise VerificationError("could not load PEM-formatted public key") from exc 80 81 if not isinstance(key, types): 82 raise VerificationError(f"invalid key format: not one of {types}") 83 84 return key # type: ignore[return-value]
A specialization of cryptography
's serialization.load_pem_public_key
with a uniform exception type (VerificationError
) and filtering on valid key types
for Sigstore purposes.
87def load_der_public_key( 88 key_der: bytes, 89 *, 90 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 91) -> PublicKey: 92 """ 93 The `load_pem_public_key` specialization, but DER. 94 """ 95 96 try: 97 key = serialization.load_der_public_key(key_der) 98 except Exception as exc: 99 raise VerificationError("could not load DER-formatted public key") from exc 100 101 if not isinstance(key, types): 102 raise VerificationError(f"invalid key format: not one of {types}") 103 104 return key # type: ignore[return-value]
The load_pem_public_key
specialization, but DER.
107def base64_encode_pem_cert(cert: Certificate) -> B64Str: 108 """ 109 Returns a string containing a base64-encoded PEM-encoded X.509 certificate. 110 """ 111 112 return B64Str( 113 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() 114 )
Returns a string containing a base64-encoded PEM-encoded X.509 certificate.
117def cert_der_to_pem(der: bytes) -> str: 118 """ 119 Converts a DER-encoded X.509 certificate into its PEM encoding. 120 121 Returns a string containing a PEM-encoded X.509 certificate. 122 """ 123 124 # NOTE: Technically we don't have to round-trip like this, since 125 # the DER-to-PEM transformation is entirely mechanical. 126 cert = load_der_x509_certificate(der) 127 return cert.public_bytes(serialization.Encoding.PEM).decode()
Converts a DER-encoded X.509 certificate into its PEM encoding.
Returns a string containing a PEM-encoded X.509 certificate.
130def key_id(key: PublicKey) -> KeyID: 131 """ 132 Returns an RFC 6962-style "key ID" for the given public key. 133 134 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2> 135 """ 136 public_bytes = key.public_bytes( 137 encoding=serialization.Encoding.DER, 138 format=serialization.PublicFormat.SubjectPublicKeyInfo, 139 ) 140 141 return KeyID(hashlib.sha256(public_bytes).digest())
Returns an RFC 6962-style "key ID" for the given public key.
144def sha256_digest( 145 input_: bytes | IO[bytes] | sigstore_hashes.Hashed, 146) -> sigstore_hashes.Hashed: 147 """ 148 Compute the SHA256 digest of an input stream or buffer or, 149 if given a `Hashed`, return it directly. 150 """ 151 if isinstance(input_, sigstore_hashes.Hashed): 152 return input_ 153 154 # If the input is already buffered into memory, there's no point in 155 # going back through an I/O abstraction. 156 if isinstance(input_, bytes): 157 return sigstore_hashes.Hashed( 158 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256 159 ) 160 161 return sigstore_hashes.Hashed( 162 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256 163 )
Compute the SHA256 digest of an input stream or buffer or,
if given a Hashed
, return it directly.
199def read_embedded(name: str, prefix: str) -> bytes: 200 """ 201 Read a resource embedded in this distribution of sigstore-python, 202 returning its contents as bytes. 203 """ 204 b: bytes = resources.files("sigstore._store").joinpath(prefix, name).read_bytes() 205 return b
Read a resource embedded in this distribution of sigstore-python, returning its contents as bytes.
208def cert_is_ca(cert: Certificate) -> bool: 209 """ 210 Returns `True` if and only if the given `Certificate` 211 is a CA certificate. 212 213 This function doesn't indicate the trustworthiness of the given 214 `Certificate`, only whether it has the appropriate interior state. 215 216 This function is **not** naively invertible: users **must** use the 217 dedicated `cert_is_leaf` utility function to determine whether a particular 218 leaf upholds Sigstore's invariants. 219 """ 220 221 # Only v3 certificates should appear in the context of Sigstore; 222 # earlier versions of X.509 lack extensions and have ambiguous CA 223 # behavior. 224 if cert.version != Version.v3: 225 raise VerificationError(f"invalid X.509 version: {cert.version}") 226 227 # Valid CA certificates must have the following set: 228 # 229 # * `BasicKeyUsage.keyCertSign` 230 # * `BasicConstraints.ca` 231 # 232 # Any other combination of states is inconsistent and invalid, meaning 233 # that we won't consider the certificate a valid non-CA leaf. 234 235 try: 236 basic_constraints = cert.extensions.get_extension_for_oid( 237 ExtensionOID.BASIC_CONSTRAINTS 238 ) 239 240 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9. 241 if not basic_constraints.critical: 242 raise VerificationError( 243 "invalid X.509 certificate: non-critical BasicConstraints in CA" 244 ) 245 246 ca = basic_constraints.value.ca # type: ignore 247 except ExtensionNotFound: 248 # No BasicConstrains means that this can't possibly be a CA. 249 return False 250 251 key_cert_sign = False 252 try: 253 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 254 key_cert_sign = key_usage.value.key_cert_sign # type: ignore 255 except ExtensionNotFound: 256 raise VerificationError("invalid X.509 certificate: missing KeyUsage") 257 258 # If both states are set, this is a CA. 259 if ca and key_cert_sign: 260 return True 261 262 if not (ca or key_cert_sign): 263 return False 264 265 # Anything else is an invalid state that should never occur. 266 raise VerificationError( 267 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}" 268 f", BasicConstraints.ca={ca}" 269 )
Returns True
if and only if the given Certificate
is a CA certificate.
This function doesn't indicate the trustworthiness of the given
Certificate
, only whether it has the appropriate interior state.
This function is not naively invertible: users must use the
dedicated cert_is_leaf
utility function to determine whether a particular
leaf upholds Sigstore's invariants.
272def cert_is_root_ca(cert: Certificate) -> bool: 273 """ 274 Returns `True` if and only if the given `Certificate` indicates 275 that it's a root CA. 276 277 This is **not** a verification function, and it does not establish 278 the trustworthiness of the given certificate. 279 """ 280 281 # NOTE(ww): This function is obnoxiously long to make the different 282 # states explicit. 283 284 # Only v3 certificates should appear in the context of Sigstore; 285 # earlier versions of X.509 lack extensions and have ambiguous CA 286 # behavior. 287 if cert.version != Version.v3: 288 raise VerificationError(f"invalid X.509 version: {cert.version}") 289 290 # Non-CAs can't possibly be root CAs. 291 if not cert_is_ca(cert): 292 return False 293 294 # A certificate that is its own issuer and signer is considered a root CA. 295 try: 296 cert.verify_directly_issued_by(cert) 297 return True 298 except Exception: 299 return False
Returns True
if and only if the given Certificate
indicates
that it's a root CA.
This is not a verification function, and it does not establish the trustworthiness of the given certificate.
302def cert_is_leaf(cert: Certificate) -> bool: 303 """ 304 Returns `True` if and only if the given `Certificate` is a valid 305 leaf certificate for Sigstore purposes. This means that: 306 307 * It is not a root or intermediate CA; 308 * It has `KeyUsage.digitalSignature`; 309 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`. 310 311 This is **not** a verification function, and it does not establish 312 the trustworthiness of the given certificate. 313 """ 314 315 # Only v3 certificates should appear in the context of Sigstore; 316 # earlier versions of X.509 lack extensions and have ambiguous CA 317 # behavior. 318 if cert.version != Version.v3: 319 raise VerificationError(f"invalid X.509 version: {cert.version}") 320 321 # CAs are not leaves. 322 if cert_is_ca(cert): 323 return False 324 325 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 326 digital_signature = key_usage.value.digital_signature # type: ignore 327 328 if not digital_signature: 329 raise VerificationError( 330 "invalid certificate for Sigstore purposes: missing digital signature usage" 331 ) 332 333 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages` 334 # extension that includes a codesigning entitlement. Sigstore should 335 # never issue a leaf that doesn't have this extended usage. 336 try: 337 extended_key_usage = cert.extensions.get_extension_for_oid( 338 ExtensionOID.EXTENDED_KEY_USAGE 339 ) 340 341 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore 342 except ExtensionNotFound: 343 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")
Returns True
if and only if the given Certificate
is a valid
leaf certificate for Sigstore purposes. This means that:
- It is not a root or intermediate CA;
- It has
KeyUsage.digitalSignature
; - It has
CODE_SIGNING
as anExtendedKeyUsage
.
This is not a verification function, and it does not establish the trustworthiness of the given certificate.