sigstore.verify.verifier
Verification API machinery.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Verification API machinery. 17""" 18 19from __future__ import annotations 20 21import base64 22import logging 23from datetime import datetime, timezone 24from typing import List, cast 25 26import rekor_types 27from cryptography.exceptions import InvalidSignature 28from cryptography.hazmat.primitives.asymmetric import ec 29from cryptography.x509 import ExtendedKeyUsage, KeyUsage 30from cryptography.x509.oid import ExtendedKeyUsageOID 31from OpenSSL.crypto import ( 32 X509, 33 X509Store, 34 X509StoreContext, 35 X509StoreContextError, 36 X509StoreFlags, 37) 38from pydantic import ValidationError 39 40from sigstore import dsse 41from sigstore._internal.rekor import _hashedrekord_from_parts 42from sigstore._internal.rekor.client import RekorClient 43from sigstore._internal.sct import ( 44 _get_precertificate_signed_certificate_timestamps, 45 verify_sct, 46) 47from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot 48from sigstore._utils import base64_encode_pem_cert, sha256_digest 49from sigstore.errors import VerificationError 50from sigstore.hashes import Hashed 51from sigstore.models import Bundle 52from sigstore.verify.policy import VerificationPolicy 53 54_logger = logging.getLogger(__name__) 55 56 57class Verifier: 58 """ 59 The primary API for verification operations. 60 """ 61 62 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 63 """ 64 Create a new `Verifier`. 65 66 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 67 containing logs for the file(s) being verified. 68 69 `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates, 70 establishing the trust chain for the signing certificate and signature. 71 """ 72 self._rekor = rekor 73 self._fulcio_certificate_chain: List[X509] = [ 74 X509.from_cryptography(parent_cert) 75 for parent_cert in trusted_root.get_fulcio_certs() 76 ] 77 self._trusted_root = trusted_root 78 79 @classmethod 80 def production(cls) -> Verifier: 81 """ 82 Return a `Verifier` instance configured against Sigstore's production-level services. 83 """ 84 return cls( 85 rekor=RekorClient.production(), 86 trusted_root=TrustedRoot.production(), 87 ) 88 89 @classmethod 90 def staging(cls) -> Verifier: 91 """ 92 Return a `Verifier` instance configured against Sigstore's staging-level services. 93 """ 94 return cls( 95 rekor=RekorClient.staging(), 96 trusted_root=TrustedRoot.staging(), 97 ) 98 99 @classmethod 100 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier: 101 """ 102 Create a `Verifier` from the given `ClientTrustConfig`. 103 104 @api private 105 """ 106 return cls( 107 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]), 108 trusted_root=trust_config.trusted_root, 109 ) 110 111 def _verify_common_signing_cert( 112 self, bundle: Bundle, policy: VerificationPolicy 113 ) -> None: 114 """ 115 Performs the signing certificate verification steps that are shared between 116 `verify_dsse` and `verify_artifact`. 117 118 Raises `VerificationError` on all failures. 119 """ 120 121 # In order to verify an artifact, we need to achieve the following: 122 # 123 # 1. Verify that the signing certificate chains to the root of trust 124 # and is valid at the time of signing. 125 # 2. Verify the signing certificate's SCT. 126 # 3. Verify that the signing certificate conforms to the Sigstore 127 # X.509 profile as well as the passed-in `VerificationPolicy`. 128 # 4. Verify the inclusion proof and signed checkpoint for the log 129 # entry. 130 # 5. Verify the inclusion promise for the log entry, if present. 131 # 6. Verify the timely insertion of the log entry against the validity 132 # period for the signing certificate. 133 # 7. Verify the signature and input against the signing certificate's 134 # public key. 135 # 8. Verify the transparency log entry's consistency against the other 136 # materials, to prevent variants of CVE-2022-36056. 137 # 138 # This method performs steps (1) through (6) above. Its caller 139 # MUST perform steps (7) and (8) separately, since they vary based on 140 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 141 142 cert = bundle.signing_certificate 143 144 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 145 # method been called on it. To get around this, we construct a new one for every `verify` 146 # call. 147 store = X509Store() 148 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 149 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 150 # would be strictly more conformant of OpenSSL, but we currently 151 # *want* the "long" chain behavior of performing path validation 152 # down to a self-signed root. 153 store.set_flags(X509StoreFlags.X509_STRICT) 154 for parent_cert_ossl in self._fulcio_certificate_chain: 155 store.add_cert(parent_cert_ossl) 156 157 # (1): verify that the signing certificate is signed by the root 158 # certificate and that the signing certificate was valid at the 159 # time of signing. 160 sign_date = cert.not_valid_before_utc 161 cert_ossl = X509.from_cryptography(cert) 162 163 store.set_time(sign_date) 164 store_ctx = X509StoreContext(store, cert_ossl) 165 try: 166 # get_verified_chain returns the full chain including the end-entity certificate 167 # and chain should contain only CA certificates 168 chain = store_ctx.get_verified_chain()[1:] 169 except X509StoreContextError as e: 170 raise VerificationError(f"failed to build chain: {e}") 171 172 # (2): verify the signing certificate's SCT. 173 sct = _get_precertificate_signed_certificate_timestamps(cert)[0] 174 try: 175 verify_sct( 176 sct, 177 cert, 178 [parent_cert.to_cryptography() for parent_cert in chain], 179 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 180 ) 181 except VerificationError as e: 182 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 183 184 # (3): verify the signing certificate against the Sigstore 185 # X.509 profile and verify against the given `VerificationPolicy`. 186 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 187 if not usage_ext.value.digital_signature: 188 raise VerificationError("Key usage is not of type `digital signature`") 189 190 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 191 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 192 raise VerificationError("Extended usage does not contain `code signing`") 193 194 policy.verify(cert) 195 196 _logger.debug("Successfully verified signing certificate validity...") 197 198 # (4): verify the inclusion proof and signed checkpoint for the 199 # log entry. 200 # (5): verify the inclusion promise for the log entry, if present. 201 entry = bundle.log_entry 202 try: 203 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 204 except VerificationError as exc: 205 raise VerificationError(f"invalid log entry: {exc}") 206 207 # (6): verify that log entry was integrated circa the signing certificate's 208 # validity period. 209 integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc) 210 if not ( 211 bundle.signing_certificate.not_valid_before_utc 212 <= integrated_time 213 <= bundle.signing_certificate.not_valid_after_utc 214 ): 215 raise VerificationError( 216 "invalid signing cert: expired at time of Rekor entry" 217 ) 218 219 def verify_dsse( 220 self, bundle: Bundle, policy: VerificationPolicy 221 ) -> tuple[str, bytes]: 222 """ 223 Verifies an bundle's DSSE envelope, returning the encapsulated payload 224 and its content type. 225 226 This method is only for DSSE-enveloped payloads. To verify 227 an arbitrary input against a bundle, use the `verify_artifact` 228 method. 229 230 `bundle` is the Sigstore `Bundle` to both verify and verify against. 231 232 `policy` is the `VerificationPolicy` to verify against. 233 234 Returns a tuple of `(type, payload)`, where `type` is the payload's 235 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 236 of the payload. No validation of either `type` or `payload` is 237 performed; users of this API **must** assert that `type` is known 238 to them before proceeding to handle `payload` in an application-dependent 239 manner. 240 """ 241 242 # (1) through (6) are performed by `_verify_common_signing_cert`. 243 self._verify_common_signing_cert(bundle, policy) 244 245 # (7): verify the bundle's signature and DSSE envelope against the 246 # signing certificate's public key. 247 envelope = bundle._dsse_envelope 248 if envelope is None: 249 raise VerificationError( 250 "cannot perform DSSE verification on a bundle without a DSSE envelope" 251 ) 252 253 signing_key = bundle.signing_certificate.public_key() 254 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 255 dsse._verify(signing_key, envelope) 256 257 # (8): verify the consistency of the log entry's body against 258 # the other bundle materials. 259 # NOTE: This is very slightly weaker than the consistency check 260 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 261 # the included entry for DSSE includes an envelope hash that we 262 # *cannot* verify, since the envelope is uncanonicalized JSON. 263 # Instead, we manually pick apart the entry body below and verify 264 # the parts we can (namely the payload hash and signature list). 265 entry = bundle.log_entry 266 try: 267 entry_body = rekor_types.Dsse.model_validate_json( 268 base64.b64decode(entry.body) 269 ) 270 except ValidationError as exc: 271 raise VerificationError(f"invalid DSSE log entry: {exc}") 272 273 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 274 if ( 275 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 276 != rekor_types.dsse.Algorithm.SHA256 277 ): 278 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 279 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 280 raise VerificationError("log entry payload hash does not match bundle") 281 282 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 283 # but we handle them just in case the signer has somehow produced multiple 284 # signatures for their envelope with the same signing key. 285 signatures = [ 286 rekor_types.dsse.Signature( 287 signature=base64.b64encode(signature.sig).decode(), 288 verifier=base64_encode_pem_cert(bundle.signing_certificate), 289 ) 290 for signature in envelope._inner.signatures 291 ] 292 if signatures != entry_body.spec.root.signatures: 293 raise VerificationError("log entry signatures do not match bundle") 294 295 return (envelope._inner.payload_type, envelope._inner.payload) 296 297 def verify_artifact( 298 self, 299 input_: bytes | Hashed, 300 bundle: Bundle, 301 policy: VerificationPolicy, 302 ) -> None: 303 """ 304 Public API for verifying. 305 306 `input_` is the input to verify, either as a buffer of contents or as 307 a prehashed `Hashed` object. 308 309 `bundle` is the Sigstore `Bundle` to verify against. 310 311 `policy` is the `VerificationPolicy` to verify against. 312 313 On failure, this method raises `VerificationError`. 314 """ 315 316 # (1) through (6) are performed by `_verify_common_signing_cert`. 317 self._verify_common_signing_cert(bundle, policy) 318 319 hashed_input = sha256_digest(input_) 320 321 # (7): verify that the signature was signed by the public key in the signing certificate. 322 try: 323 signing_key = bundle.signing_certificate.public_key() 324 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 325 signing_key.verify( 326 bundle._inner.message_signature.signature, 327 hashed_input.digest, 328 ec.ECDSA(hashed_input._as_prehashed()), 329 ) 330 except InvalidSignature: 331 raise VerificationError("Signature is invalid for input") 332 333 _logger.debug("Successfully verified signature...") 334 335 # (8): verify the consistency of the log entry's body against 336 # the other bundle materials (and input being verified). 337 entry = bundle.log_entry 338 339 expected_body = _hashedrekord_from_parts( 340 bundle.signing_certificate, 341 bundle._inner.message_signature.signature, 342 hashed_input, 343 ) 344 actual_body = rekor_types.Hashedrekord.model_validate_json( 345 base64.b64decode(entry.body) 346 ) 347 if expected_body != actual_body: 348 raise VerificationError( 349 "transparency log entry is inconsistent with other materials" 350 )
58class Verifier: 59 """ 60 The primary API for verification operations. 61 """ 62 63 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 64 """ 65 Create a new `Verifier`. 66 67 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 68 containing logs for the file(s) being verified. 69 70 `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates, 71 establishing the trust chain for the signing certificate and signature. 72 """ 73 self._rekor = rekor 74 self._fulcio_certificate_chain: List[X509] = [ 75 X509.from_cryptography(parent_cert) 76 for parent_cert in trusted_root.get_fulcio_certs() 77 ] 78 self._trusted_root = trusted_root 79 80 @classmethod 81 def production(cls) -> Verifier: 82 """ 83 Return a `Verifier` instance configured against Sigstore's production-level services. 84 """ 85 return cls( 86 rekor=RekorClient.production(), 87 trusted_root=TrustedRoot.production(), 88 ) 89 90 @classmethod 91 def staging(cls) -> Verifier: 92 """ 93 Return a `Verifier` instance configured against Sigstore's staging-level services. 94 """ 95 return cls( 96 rekor=RekorClient.staging(), 97 trusted_root=TrustedRoot.staging(), 98 ) 99 100 @classmethod 101 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier: 102 """ 103 Create a `Verifier` from the given `ClientTrustConfig`. 104 105 @api private 106 """ 107 return cls( 108 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]), 109 trusted_root=trust_config.trusted_root, 110 ) 111 112 def _verify_common_signing_cert( 113 self, bundle: Bundle, policy: VerificationPolicy 114 ) -> None: 115 """ 116 Performs the signing certificate verification steps that are shared between 117 `verify_dsse` and `verify_artifact`. 118 119 Raises `VerificationError` on all failures. 120 """ 121 122 # In order to verify an artifact, we need to achieve the following: 123 # 124 # 1. Verify that the signing certificate chains to the root of trust 125 # and is valid at the time of signing. 126 # 2. Verify the signing certificate's SCT. 127 # 3. Verify that the signing certificate conforms to the Sigstore 128 # X.509 profile as well as the passed-in `VerificationPolicy`. 129 # 4. Verify the inclusion proof and signed checkpoint for the log 130 # entry. 131 # 5. Verify the inclusion promise for the log entry, if present. 132 # 6. Verify the timely insertion of the log entry against the validity 133 # period for the signing certificate. 134 # 7. Verify the signature and input against the signing certificate's 135 # public key. 136 # 8. Verify the transparency log entry's consistency against the other 137 # materials, to prevent variants of CVE-2022-36056. 138 # 139 # This method performs steps (1) through (6) above. Its caller 140 # MUST perform steps (7) and (8) separately, since they vary based on 141 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 142 143 cert = bundle.signing_certificate 144 145 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 146 # method been called on it. To get around this, we construct a new one for every `verify` 147 # call. 148 store = X509Store() 149 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 150 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 151 # would be strictly more conformant of OpenSSL, but we currently 152 # *want* the "long" chain behavior of performing path validation 153 # down to a self-signed root. 154 store.set_flags(X509StoreFlags.X509_STRICT) 155 for parent_cert_ossl in self._fulcio_certificate_chain: 156 store.add_cert(parent_cert_ossl) 157 158 # (1): verify that the signing certificate is signed by the root 159 # certificate and that the signing certificate was valid at the 160 # time of signing. 161 sign_date = cert.not_valid_before_utc 162 cert_ossl = X509.from_cryptography(cert) 163 164 store.set_time(sign_date) 165 store_ctx = X509StoreContext(store, cert_ossl) 166 try: 167 # get_verified_chain returns the full chain including the end-entity certificate 168 # and chain should contain only CA certificates 169 chain = store_ctx.get_verified_chain()[1:] 170 except X509StoreContextError as e: 171 raise VerificationError(f"failed to build chain: {e}") 172 173 # (2): verify the signing certificate's SCT. 174 sct = _get_precertificate_signed_certificate_timestamps(cert)[0] 175 try: 176 verify_sct( 177 sct, 178 cert, 179 [parent_cert.to_cryptography() for parent_cert in chain], 180 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 181 ) 182 except VerificationError as e: 183 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 184 185 # (3): verify the signing certificate against the Sigstore 186 # X.509 profile and verify against the given `VerificationPolicy`. 187 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 188 if not usage_ext.value.digital_signature: 189 raise VerificationError("Key usage is not of type `digital signature`") 190 191 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 192 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 193 raise VerificationError("Extended usage does not contain `code signing`") 194 195 policy.verify(cert) 196 197 _logger.debug("Successfully verified signing certificate validity...") 198 199 # (4): verify the inclusion proof and signed checkpoint for the 200 # log entry. 201 # (5): verify the inclusion promise for the log entry, if present. 202 entry = bundle.log_entry 203 try: 204 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 205 except VerificationError as exc: 206 raise VerificationError(f"invalid log entry: {exc}") 207 208 # (6): verify that log entry was integrated circa the signing certificate's 209 # validity period. 210 integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc) 211 if not ( 212 bundle.signing_certificate.not_valid_before_utc 213 <= integrated_time 214 <= bundle.signing_certificate.not_valid_after_utc 215 ): 216 raise VerificationError( 217 "invalid signing cert: expired at time of Rekor entry" 218 ) 219 220 def verify_dsse( 221 self, bundle: Bundle, policy: VerificationPolicy 222 ) -> tuple[str, bytes]: 223 """ 224 Verifies an bundle's DSSE envelope, returning the encapsulated payload 225 and its content type. 226 227 This method is only for DSSE-enveloped payloads. To verify 228 an arbitrary input against a bundle, use the `verify_artifact` 229 method. 230 231 `bundle` is the Sigstore `Bundle` to both verify and verify against. 232 233 `policy` is the `VerificationPolicy` to verify against. 234 235 Returns a tuple of `(type, payload)`, where `type` is the payload's 236 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 237 of the payload. No validation of either `type` or `payload` is 238 performed; users of this API **must** assert that `type` is known 239 to them before proceeding to handle `payload` in an application-dependent 240 manner. 241 """ 242 243 # (1) through (6) are performed by `_verify_common_signing_cert`. 244 self._verify_common_signing_cert(bundle, policy) 245 246 # (7): verify the bundle's signature and DSSE envelope against the 247 # signing certificate's public key. 248 envelope = bundle._dsse_envelope 249 if envelope is None: 250 raise VerificationError( 251 "cannot perform DSSE verification on a bundle without a DSSE envelope" 252 ) 253 254 signing_key = bundle.signing_certificate.public_key() 255 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 256 dsse._verify(signing_key, envelope) 257 258 # (8): verify the consistency of the log entry's body against 259 # the other bundle materials. 260 # NOTE: This is very slightly weaker than the consistency check 261 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 262 # the included entry for DSSE includes an envelope hash that we 263 # *cannot* verify, since the envelope is uncanonicalized JSON. 264 # Instead, we manually pick apart the entry body below and verify 265 # the parts we can (namely the payload hash and signature list). 266 entry = bundle.log_entry 267 try: 268 entry_body = rekor_types.Dsse.model_validate_json( 269 base64.b64decode(entry.body) 270 ) 271 except ValidationError as exc: 272 raise VerificationError(f"invalid DSSE log entry: {exc}") 273 274 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 275 if ( 276 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 277 != rekor_types.dsse.Algorithm.SHA256 278 ): 279 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 280 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 281 raise VerificationError("log entry payload hash does not match bundle") 282 283 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 284 # but we handle them just in case the signer has somehow produced multiple 285 # signatures for their envelope with the same signing key. 286 signatures = [ 287 rekor_types.dsse.Signature( 288 signature=base64.b64encode(signature.sig).decode(), 289 verifier=base64_encode_pem_cert(bundle.signing_certificate), 290 ) 291 for signature in envelope._inner.signatures 292 ] 293 if signatures != entry_body.spec.root.signatures: 294 raise VerificationError("log entry signatures do not match bundle") 295 296 return (envelope._inner.payload_type, envelope._inner.payload) 297 298 def verify_artifact( 299 self, 300 input_: bytes | Hashed, 301 bundle: Bundle, 302 policy: VerificationPolicy, 303 ) -> None: 304 """ 305 Public API for verifying. 306 307 `input_` is the input to verify, either as a buffer of contents or as 308 a prehashed `Hashed` object. 309 310 `bundle` is the Sigstore `Bundle` to verify against. 311 312 `policy` is the `VerificationPolicy` to verify against. 313 314 On failure, this method raises `VerificationError`. 315 """ 316 317 # (1) through (6) are performed by `_verify_common_signing_cert`. 318 self._verify_common_signing_cert(bundle, policy) 319 320 hashed_input = sha256_digest(input_) 321 322 # (7): verify that the signature was signed by the public key in the signing certificate. 323 try: 324 signing_key = bundle.signing_certificate.public_key() 325 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 326 signing_key.verify( 327 bundle._inner.message_signature.signature, 328 hashed_input.digest, 329 ec.ECDSA(hashed_input._as_prehashed()), 330 ) 331 except InvalidSignature: 332 raise VerificationError("Signature is invalid for input") 333 334 _logger.debug("Successfully verified signature...") 335 336 # (8): verify the consistency of the log entry's body against 337 # the other bundle materials (and input being verified). 338 entry = bundle.log_entry 339 340 expected_body = _hashedrekord_from_parts( 341 bundle.signing_certificate, 342 bundle._inner.message_signature.signature, 343 hashed_input, 344 ) 345 actual_body = rekor_types.Hashedrekord.model_validate_json( 346 base64.b64decode(entry.body) 347 ) 348 if expected_body != actual_body: 349 raise VerificationError( 350 "transparency log entry is inconsistent with other materials" 351 )
The primary API for verification operations.
63 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 64 """ 65 Create a new `Verifier`. 66 67 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 68 containing logs for the file(s) being verified. 69 70 `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates, 71 establishing the trust chain for the signing certificate and signature. 72 """ 73 self._rekor = rekor 74 self._fulcio_certificate_chain: List[X509] = [ 75 X509.from_cryptography(parent_cert) 76 for parent_cert in trusted_root.get_fulcio_certs() 77 ] 78 self._trusted_root = trusted_root
Create a new Verifier
.
rekor
is a RekorClient
capable of connecting to a Rekor instance
containing logs for the file(s) being verified.
fulcio_certificate_chain
is a list of PEM-encoded X.509 certificates,
establishing the trust chain for the signing certificate and signature.
80 @classmethod 81 def production(cls) -> Verifier: 82 """ 83 Return a `Verifier` instance configured against Sigstore's production-level services. 84 """ 85 return cls( 86 rekor=RekorClient.production(), 87 trusted_root=TrustedRoot.production(), 88 )
Return a Verifier
instance configured against Sigstore's production-level services.
90 @classmethod 91 def staging(cls) -> Verifier: 92 """ 93 Return a `Verifier` instance configured against Sigstore's staging-level services. 94 """ 95 return cls( 96 rekor=RekorClient.staging(), 97 trusted_root=TrustedRoot.staging(), 98 )
Return a Verifier
instance configured against Sigstore's staging-level services.
220 def verify_dsse( 221 self, bundle: Bundle, policy: VerificationPolicy 222 ) -> tuple[str, bytes]: 223 """ 224 Verifies an bundle's DSSE envelope, returning the encapsulated payload 225 and its content type. 226 227 This method is only for DSSE-enveloped payloads. To verify 228 an arbitrary input against a bundle, use the `verify_artifact` 229 method. 230 231 `bundle` is the Sigstore `Bundle` to both verify and verify against. 232 233 `policy` is the `VerificationPolicy` to verify against. 234 235 Returns a tuple of `(type, payload)`, where `type` is the payload's 236 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 237 of the payload. No validation of either `type` or `payload` is 238 performed; users of this API **must** assert that `type` is known 239 to them before proceeding to handle `payload` in an application-dependent 240 manner. 241 """ 242 243 # (1) through (6) are performed by `_verify_common_signing_cert`. 244 self._verify_common_signing_cert(bundle, policy) 245 246 # (7): verify the bundle's signature and DSSE envelope against the 247 # signing certificate's public key. 248 envelope = bundle._dsse_envelope 249 if envelope is None: 250 raise VerificationError( 251 "cannot perform DSSE verification on a bundle without a DSSE envelope" 252 ) 253 254 signing_key = bundle.signing_certificate.public_key() 255 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 256 dsse._verify(signing_key, envelope) 257 258 # (8): verify the consistency of the log entry's body against 259 # the other bundle materials. 260 # NOTE: This is very slightly weaker than the consistency check 261 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 262 # the included entry for DSSE includes an envelope hash that we 263 # *cannot* verify, since the envelope is uncanonicalized JSON. 264 # Instead, we manually pick apart the entry body below and verify 265 # the parts we can (namely the payload hash and signature list). 266 entry = bundle.log_entry 267 try: 268 entry_body = rekor_types.Dsse.model_validate_json( 269 base64.b64decode(entry.body) 270 ) 271 except ValidationError as exc: 272 raise VerificationError(f"invalid DSSE log entry: {exc}") 273 274 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 275 if ( 276 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 277 != rekor_types.dsse.Algorithm.SHA256 278 ): 279 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 280 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 281 raise VerificationError("log entry payload hash does not match bundle") 282 283 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 284 # but we handle them just in case the signer has somehow produced multiple 285 # signatures for their envelope with the same signing key. 286 signatures = [ 287 rekor_types.dsse.Signature( 288 signature=base64.b64encode(signature.sig).decode(), 289 verifier=base64_encode_pem_cert(bundle.signing_certificate), 290 ) 291 for signature in envelope._inner.signatures 292 ] 293 if signatures != entry_body.spec.root.signatures: 294 raise VerificationError("log entry signatures do not match bundle") 295 296 return (envelope._inner.payload_type, envelope._inner.payload)
Verifies an bundle's DSSE envelope, returning the encapsulated payload and its content type.
This method is only for DSSE-enveloped payloads. To verify
an arbitrary input against a bundle, use the verify_artifact
method.
bundle
is the Sigstore Bundle
to both verify and verify against.
policy
is the VerificationPolicy
to verify against.
Returns a tuple of (type, payload)
, where type
is the payload's
type as encoded in the DSSE envelope and payload
is the raw bytes
of the payload. No validation of either type
or payload
is
performed; users of this API must assert that type
is known
to them before proceeding to handle payload
in an application-dependent
manner.
298 def verify_artifact( 299 self, 300 input_: bytes | Hashed, 301 bundle: Bundle, 302 policy: VerificationPolicy, 303 ) -> None: 304 """ 305 Public API for verifying. 306 307 `input_` is the input to verify, either as a buffer of contents or as 308 a prehashed `Hashed` object. 309 310 `bundle` is the Sigstore `Bundle` to verify against. 311 312 `policy` is the `VerificationPolicy` to verify against. 313 314 On failure, this method raises `VerificationError`. 315 """ 316 317 # (1) through (6) are performed by `_verify_common_signing_cert`. 318 self._verify_common_signing_cert(bundle, policy) 319 320 hashed_input = sha256_digest(input_) 321 322 # (7): verify that the signature was signed by the public key in the signing certificate. 323 try: 324 signing_key = bundle.signing_certificate.public_key() 325 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 326 signing_key.verify( 327 bundle._inner.message_signature.signature, 328 hashed_input.digest, 329 ec.ECDSA(hashed_input._as_prehashed()), 330 ) 331 except InvalidSignature: 332 raise VerificationError("Signature is invalid for input") 333 334 _logger.debug("Successfully verified signature...") 335 336 # (8): verify the consistency of the log entry's body against 337 # the other bundle materials (and input being verified). 338 entry = bundle.log_entry 339 340 expected_body = _hashedrekord_from_parts( 341 bundle.signing_certificate, 342 bundle._inner.message_signature.signature, 343 hashed_input, 344 ) 345 actual_body = rekor_types.Hashedrekord.model_validate_json( 346 base64.b64decode(entry.body) 347 ) 348 if expected_body != actual_body: 349 raise VerificationError( 350 "transparency log entry is inconsistent with other materials" 351 )
Public API for verifying.
input_
is the input to verify, either as a buffer of contents or as
a prehashed Hashed
object.
bundle
is the Sigstore Bundle
to verify against.
policy
is the VerificationPolicy
to verify against.
On failure, this method raises VerificationError
.