sigstore.verify.verifier
Verification API machinery.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Verification API machinery. 17""" 18 19from __future__ import annotations 20 21import base64 22import logging 23from datetime import datetime, timezone 24from typing import List, cast 25 26import rekor_types 27from cryptography.exceptions import InvalidSignature 28from cryptography.hazmat.primitives.asymmetric import ec 29from cryptography.x509 import ExtendedKeyUsage, KeyUsage 30from cryptography.x509.oid import ExtendedKeyUsageOID 31from OpenSSL.crypto import ( 32 X509, 33 X509Store, 34 X509StoreContext, 35 X509StoreContextError, 36 X509StoreFlags, 37) 38from pydantic import ValidationError 39from rfc3161_client import TimeStampResponse, VerifierBuilder 40from rfc3161_client import VerificationError as Rfc3161VerificationError 41 42from sigstore import dsse 43from sigstore._internal.rekor import _hashedrekord_from_parts 44from sigstore._internal.rekor.client import RekorClient 45from sigstore._internal.sct import ( 46 _get_precertificate_signed_certificate_timestamps, 47 verify_sct, 48) 49from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult 50from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot 51from sigstore._utils import base64_encode_pem_cert, sha256_digest 52from sigstore.errors import VerificationError 53from sigstore.hashes import Hashed 54from sigstore.models import Bundle 55from sigstore.verify.policy import VerificationPolicy 56 57_logger = logging.getLogger(__name__) 58 59# Limit the number of timestamps to prevent DoS 60# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29 61MAX_ALLOWED_TIMESTAMP: int = 32 62 63# When verifying a timestamp, this threshold represents the minimum number of required 64# timestamps to consider a signature valid. 65VERIFY_TIMESTAMP_THRESHOLD: int = 1 66 67 68class Verifier: 69 """ 70 The primary API for verification operations. 71 """ 72 73 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 74 """ 75 Create a new `Verifier`. 76 77 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 78 containing logs for the file(s) being verified. 79 80 `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates, 81 establishing the trust chain for the signing certificate and signature. 82 """ 83 self._rekor = rekor 84 self._fulcio_certificate_chain: List[X509] = [ 85 X509.from_cryptography(parent_cert) 86 for parent_cert in trusted_root.get_fulcio_certs() 87 ] 88 self._trusted_root = trusted_root 89 90 @classmethod 91 def production(cls, *, offline: bool = False) -> Verifier: 92 """ 93 Return a `Verifier` instance configured against Sigstore's production-level services. 94 """ 95 return cls( 96 rekor=RekorClient.production(), 97 trusted_root=TrustedRoot.production(offline=offline), 98 ) 99 100 @classmethod 101 def staging(cls, *, offline: bool = False) -> Verifier: 102 """ 103 Return a `Verifier` instance configured against Sigstore's staging-level services. 104 """ 105 return cls( 106 rekor=RekorClient.staging(), 107 trusted_root=TrustedRoot.staging(offline=offline), 108 ) 109 110 @classmethod 111 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier: 112 """ 113 Create a `Verifier` from the given `ClientTrustConfig`. 114 115 @api private 116 """ 117 return cls( 118 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]), 119 trusted_root=trust_config.trusted_root, 120 ) 121 122 def _verify_signed_timestamp( 123 self, timestamp_response: TimeStampResponse, signature: bytes 124 ) -> TimestampVerificationResult | None: 125 """ 126 Verify a Signed Timestamp using the TSA provided by the Trusted Root. 127 """ 128 cert_authorities = self._trusted_root.get_timestamp_authorities() 129 for certificate_authority in cert_authorities: 130 certificates = certificate_authority.certificates(allow_expired=True) 131 132 builder = VerifierBuilder() 133 for certificate in certificates: 134 builder.add_root_certificate(certificate) 135 136 verifier = builder.build() 137 try: 138 verifier.verify(timestamp_response, signature) 139 except Rfc3161VerificationError as e: 140 _logger.debug("Unable to verify Timestamp with CA.") 141 _logger.exception(e) 142 continue 143 144 if ( 145 certificate_authority.validity_period_start 146 and certificate_authority.validity_period_end 147 ): 148 if ( 149 certificate_authority.validity_period_start 150 <= timestamp_response.tst_info.gen_time 151 < certificate_authority.validity_period_end 152 ): 153 return TimestampVerificationResult( 154 source=TimestampSource.TIMESTAMP_AUTHORITY, 155 time=timestamp_response.tst_info.gen_time, 156 ) 157 158 _logger.debug( 159 "Unable to verify Timestamp because not in CA time range." 160 ) 161 else: 162 _logger.debug( 163 "Unable to verify Timestamp because no validity provided." 164 ) 165 166 return None 167 168 def _verify_timestamp_authority( 169 self, bundle: Bundle 170 ) -> List[TimestampVerificationResult]: 171 """ 172 Verify that the given bundle has been timestamped by a trusted timestamp authority 173 and that the timestamp is valid. 174 175 Returns the number of valid signed timestamp in the bundle. 176 """ 177 timestamp_responses = ( 178 bundle.verification_material.timestamp_verification_data.rfc3161_timestamps 179 ) 180 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: 181 msg = f"Too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" 182 raise VerificationError(msg) 183 184 if len(set(timestamp_responses)) != len(timestamp_responses): 185 msg = "Duplicate timestamp found" 186 raise VerificationError(msg) 187 188 # The Signer sends a hash of the signature as the messageImprint in a TimeStampReq 189 # to the Timestamping Service 190 signature_hash = sha256_digest(bundle.signature).digest 191 verified_timestamps = [] 192 for tsr in timestamp_responses: 193 if verified_timestamp := self._verify_signed_timestamp(tsr, signature_hash): 194 verified_timestamps.append(verified_timestamp) 195 196 return verified_timestamps 197 198 def _establish_time(self, bundle: Bundle) -> List[TimestampVerificationResult]: 199 """ 200 Establish the time for bundle verification. 201 202 This method uses timestamps from two possible sources: 203 1. RFC3161 signed timestamps from a Timestamping Authority (TSA) 204 2. Transparency Log timestamps 205 """ 206 verified_timestamps = [] 207 208 # If a timestamp from the timestamping service is available, the Verifier MUST 209 # perform path validation using the timestamp from the Timestamping Service. 210 if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps: 211 if not self._trusted_root.get_timestamp_authorities(): 212 msg = ( 213 "no Timestamp Authorities have been provided to validate this " 214 "bundle but it contains a signed timestamp" 215 ) 216 raise VerificationError(msg) 217 218 timestamp_from_tsa = self._verify_timestamp_authority(bundle) 219 if len(timestamp_from_tsa) < VERIFY_TIMESTAMP_THRESHOLD: 220 msg = ( 221 f"not enough timestamps validated to meet the validation " 222 f"threshold ({len(timestamp_from_tsa)}/{VERIFY_TIMESTAMP_THRESHOLD})" 223 ) 224 raise VerificationError(msg) 225 226 verified_timestamps.extend(timestamp_from_tsa) 227 228 # If a timestamp from the Transparency Service is available, the Verifier MUST 229 # perform path validation using the timestamp from the Transparency Service. 230 if timestamp := bundle.log_entry.integrated_time: 231 verified_timestamps.append( 232 TimestampVerificationResult( 233 source=TimestampSource.TRANSPARENCY_SERVICE, 234 time=datetime.fromtimestamp(timestamp, tz=timezone.utc), 235 ) 236 ) 237 return verified_timestamps 238 239 def _verify_chain_at_time( 240 self, certificate: X509, timestamp_result: TimestampVerificationResult 241 ) -> List[X509]: 242 """ 243 Verify the validity of the certificate chain at the given time. 244 245 Raises a VerificationError if the chain can't be built or be verified. 246 """ 247 # NOTE: The `X509Store` object cannot have its time reset once the `set_time` 248 # method been called on it. To get around this, we construct a new one in each 249 # call. 250 store = X509Store() 251 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 252 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 253 # would be strictly more conformant of OpenSSL, but we currently 254 # *want* the "long" chain behavior of performing path validation 255 # down to a self-signed root. 256 store.set_flags(X509StoreFlags.X509_STRICT) 257 for parent_cert_ossl in self._fulcio_certificate_chain: 258 store.add_cert(parent_cert_ossl) 259 260 store.set_time(timestamp_result.time) 261 262 store_ctx = X509StoreContext(store, certificate) 263 264 try: 265 # get_verified_chain returns the full chain including the end-entity certificate 266 # and chain should contain only CA certificates 267 return store_ctx.get_verified_chain()[1:] 268 except X509StoreContextError as e: 269 raise VerificationError(f"failed to build chain: {e}") 270 271 def _verify_common_signing_cert( 272 self, bundle: Bundle, policy: VerificationPolicy 273 ) -> None: 274 """ 275 Performs the signing certificate verification steps that are shared between 276 `verify_dsse` and `verify_artifact`. 277 278 Raises `VerificationError` on all failures. 279 """ 280 281 # In order to verify an artifact, we need to achieve the following: 282 # 283 # 0. Establish a time for the signature. 284 # 1. Verify that the signing certificate chains to the root of trust 285 # and is valid at the time of signing. 286 # 2. Verify the signing certificate's SCT. 287 # 3. Verify that the signing certificate conforms to the Sigstore 288 # X.509 profile as well as the passed-in `VerificationPolicy`. 289 # 4. Verify the inclusion proof and signed checkpoint for the log 290 # entry. 291 # 5. Verify the inclusion promise for the log entry, if present. 292 # 6. Verify the timely insertion of the log entry against the validity 293 # period for the signing certificate. 294 # 7. Verify the signature and input against the signing certificate's 295 # public key. 296 # 8. Verify the transparency log entry's consistency against the other 297 # materials, to prevent variants of CVE-2022-36056. 298 # 299 # This method performs steps (0) through (6) above. Its caller 300 # MUST perform steps (7) and (8) separately, since they vary based on 301 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 302 303 cert = bundle.signing_certificate 304 305 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 306 # method been called on it. To get around this, we construct a new one for every `verify` 307 # call. 308 store = X509Store() 309 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 310 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 311 # would be strictly more conformant of OpenSSL, but we currently 312 # *want* the "long" chain behavior of performing path validation 313 # down to a self-signed root. 314 store.set_flags(X509StoreFlags.X509_STRICT) 315 for parent_cert_ossl in self._fulcio_certificate_chain: 316 store.add_cert(parent_cert_ossl) 317 318 # (0): Establishing a Time for the Signature 319 # First, establish a time for the signature. This timestamp is required to 320 # validate the certificate chain, so this step comes first. 321 # While this step is optional and only performed if timestamp data has been 322 # provided within the bundle, providing a signed timestamp without a TSA to 323 # verify it result in a VerificationError. 324 verified_timestamps = self._establish_time(bundle) 325 if not verified_timestamps: 326 raise VerificationError("not enough sources of verified time") 327 328 # (1): verify that the signing certificate is signed by the root 329 # certificate and that the signing certificate was valid at the 330 # time of signing. 331 cert_ossl = X509.from_cryptography(cert) 332 chain: list[X509] = [] 333 for vts in verified_timestamps: 334 chain = self._verify_chain_at_time(cert_ossl, vts) 335 336 # (2): verify the signing certificate's SCT. 337 sct = _get_precertificate_signed_certificate_timestamps(cert)[0] 338 try: 339 verify_sct( 340 sct, 341 cert, 342 [parent_cert.to_cryptography() for parent_cert in chain], 343 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 344 ) 345 except VerificationError as e: 346 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 347 348 # (3): verify the signing certificate against the Sigstore 349 # X.509 profile and verify against the given `VerificationPolicy`. 350 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 351 if not usage_ext.value.digital_signature: 352 raise VerificationError("Key usage is not of type `digital signature`") 353 354 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 355 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 356 raise VerificationError("Extended usage does not contain `code signing`") 357 358 policy.verify(cert) 359 360 _logger.debug("Successfully verified signing certificate validity...") 361 362 # (4): verify the inclusion proof and signed checkpoint for the 363 # log entry. 364 # (5): verify the inclusion promise for the log entry, if present. 365 entry = bundle.log_entry 366 try: 367 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 368 except VerificationError as exc: 369 raise VerificationError(f"invalid log entry: {exc}") 370 371 # (6): verify that log entry was integrated circa the signing certificate's 372 # validity period. 373 integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc) 374 if not ( 375 bundle.signing_certificate.not_valid_before_utc 376 <= integrated_time 377 <= bundle.signing_certificate.not_valid_after_utc 378 ): 379 raise VerificationError( 380 "invalid signing cert: expired at time of Rekor entry" 381 ) 382 383 def verify_dsse( 384 self, bundle: Bundle, policy: VerificationPolicy 385 ) -> tuple[str, bytes]: 386 """ 387 Verifies an bundle's DSSE envelope, returning the encapsulated payload 388 and its content type. 389 390 This method is only for DSSE-enveloped payloads. To verify 391 an arbitrary input against a bundle, use the `verify_artifact` 392 method. 393 394 `bundle` is the Sigstore `Bundle` to both verify and verify against. 395 396 `policy` is the `VerificationPolicy` to verify against. 397 398 Returns a tuple of `(type, payload)`, where `type` is the payload's 399 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 400 of the payload. No validation of either `type` or `payload` is 401 performed; users of this API **must** assert that `type` is known 402 to them before proceeding to handle `payload` in an application-dependent 403 manner. 404 """ 405 406 # (1) through (6) are performed by `_verify_common_signing_cert`. 407 self._verify_common_signing_cert(bundle, policy) 408 409 # (7): verify the bundle's signature and DSSE envelope against the 410 # signing certificate's public key. 411 envelope = bundle._dsse_envelope 412 if envelope is None: 413 raise VerificationError( 414 "cannot perform DSSE verification on a bundle without a DSSE envelope" 415 ) 416 417 signing_key = bundle.signing_certificate.public_key() 418 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 419 dsse._verify(signing_key, envelope) 420 421 # (8): verify the consistency of the log entry's body against 422 # the other bundle materials. 423 # NOTE: This is very slightly weaker than the consistency check 424 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 425 # the included entry for DSSE includes an envelope hash that we 426 # *cannot* verify, since the envelope is uncanonicalized JSON. 427 # Instead, we manually pick apart the entry body below and verify 428 # the parts we can (namely the payload hash and signature list). 429 entry = bundle.log_entry 430 try: 431 entry_body = rekor_types.Dsse.model_validate_json( 432 base64.b64decode(entry.body) 433 ) 434 except ValidationError as exc: 435 raise VerificationError(f"invalid DSSE log entry: {exc}") 436 437 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 438 if ( 439 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 440 != rekor_types.dsse.Algorithm.SHA256 441 ): 442 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 443 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 444 raise VerificationError("log entry payload hash does not match bundle") 445 446 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 447 # but we handle them just in case the signer has somehow produced multiple 448 # signatures for their envelope with the same signing key. 449 signatures = [ 450 rekor_types.dsse.Signature( 451 signature=base64.b64encode(signature.sig).decode(), 452 verifier=base64_encode_pem_cert(bundle.signing_certificate), 453 ) 454 for signature in envelope._inner.signatures 455 ] 456 if signatures != entry_body.spec.root.signatures: 457 raise VerificationError("log entry signatures do not match bundle") 458 459 return (envelope._inner.payload_type, envelope._inner.payload) 460 461 def verify_artifact( 462 self, 463 input_: bytes | Hashed, 464 bundle: Bundle, 465 policy: VerificationPolicy, 466 ) -> None: 467 """ 468 Public API for verifying. 469 470 `input_` is the input to verify, either as a buffer of contents or as 471 a prehashed `Hashed` object. 472 473 `bundle` is the Sigstore `Bundle` to verify against. 474 475 `policy` is the `VerificationPolicy` to verify against. 476 477 On failure, this method raises `VerificationError`. 478 """ 479 480 # (1) through (6) are performed by `_verify_common_signing_cert`. 481 self._verify_common_signing_cert(bundle, policy) 482 483 hashed_input = sha256_digest(input_) 484 485 # (7): verify that the signature was signed by the public key in the signing certificate. 486 try: 487 signing_key = bundle.signing_certificate.public_key() 488 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 489 signing_key.verify( 490 bundle._inner.message_signature.signature, 491 hashed_input.digest, 492 ec.ECDSA(hashed_input._as_prehashed()), 493 ) 494 except InvalidSignature: 495 raise VerificationError("Signature is invalid for input") 496 497 _logger.debug("Successfully verified signature...") 498 499 # (8): verify the consistency of the log entry's body against 500 # the other bundle materials (and input being verified). 501 entry = bundle.log_entry 502 503 expected_body = _hashedrekord_from_parts( 504 bundle.signing_certificate, 505 bundle._inner.message_signature.signature, 506 hashed_input, 507 ) 508 actual_body = rekor_types.Hashedrekord.model_validate_json( 509 base64.b64decode(entry.body) 510 ) 511 if expected_body != actual_body: 512 raise VerificationError( 513 "transparency log entry is inconsistent with other materials" 514 )
69class Verifier: 70 """ 71 The primary API for verification operations. 72 """ 73 74 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 75 """ 76 Create a new `Verifier`. 77 78 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 79 containing logs for the file(s) being verified. 80 81 `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates, 82 establishing the trust chain for the signing certificate and signature. 83 """ 84 self._rekor = rekor 85 self._fulcio_certificate_chain: List[X509] = [ 86 X509.from_cryptography(parent_cert) 87 for parent_cert in trusted_root.get_fulcio_certs() 88 ] 89 self._trusted_root = trusted_root 90 91 @classmethod 92 def production(cls, *, offline: bool = False) -> Verifier: 93 """ 94 Return a `Verifier` instance configured against Sigstore's production-level services. 95 """ 96 return cls( 97 rekor=RekorClient.production(), 98 trusted_root=TrustedRoot.production(offline=offline), 99 ) 100 101 @classmethod 102 def staging(cls, *, offline: bool = False) -> Verifier: 103 """ 104 Return a `Verifier` instance configured against Sigstore's staging-level services. 105 """ 106 return cls( 107 rekor=RekorClient.staging(), 108 trusted_root=TrustedRoot.staging(offline=offline), 109 ) 110 111 @classmethod 112 def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier: 113 """ 114 Create a `Verifier` from the given `ClientTrustConfig`. 115 116 @api private 117 """ 118 return cls( 119 rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]), 120 trusted_root=trust_config.trusted_root, 121 ) 122 123 def _verify_signed_timestamp( 124 self, timestamp_response: TimeStampResponse, signature: bytes 125 ) -> TimestampVerificationResult | None: 126 """ 127 Verify a Signed Timestamp using the TSA provided by the Trusted Root. 128 """ 129 cert_authorities = self._trusted_root.get_timestamp_authorities() 130 for certificate_authority in cert_authorities: 131 certificates = certificate_authority.certificates(allow_expired=True) 132 133 builder = VerifierBuilder() 134 for certificate in certificates: 135 builder.add_root_certificate(certificate) 136 137 verifier = builder.build() 138 try: 139 verifier.verify(timestamp_response, signature) 140 except Rfc3161VerificationError as e: 141 _logger.debug("Unable to verify Timestamp with CA.") 142 _logger.exception(e) 143 continue 144 145 if ( 146 certificate_authority.validity_period_start 147 and certificate_authority.validity_period_end 148 ): 149 if ( 150 certificate_authority.validity_period_start 151 <= timestamp_response.tst_info.gen_time 152 < certificate_authority.validity_period_end 153 ): 154 return TimestampVerificationResult( 155 source=TimestampSource.TIMESTAMP_AUTHORITY, 156 time=timestamp_response.tst_info.gen_time, 157 ) 158 159 _logger.debug( 160 "Unable to verify Timestamp because not in CA time range." 161 ) 162 else: 163 _logger.debug( 164 "Unable to verify Timestamp because no validity provided." 165 ) 166 167 return None 168 169 def _verify_timestamp_authority( 170 self, bundle: Bundle 171 ) -> List[TimestampVerificationResult]: 172 """ 173 Verify that the given bundle has been timestamped by a trusted timestamp authority 174 and that the timestamp is valid. 175 176 Returns the number of valid signed timestamp in the bundle. 177 """ 178 timestamp_responses = ( 179 bundle.verification_material.timestamp_verification_data.rfc3161_timestamps 180 ) 181 if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: 182 msg = f"Too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" 183 raise VerificationError(msg) 184 185 if len(set(timestamp_responses)) != len(timestamp_responses): 186 msg = "Duplicate timestamp found" 187 raise VerificationError(msg) 188 189 # The Signer sends a hash of the signature as the messageImprint in a TimeStampReq 190 # to the Timestamping Service 191 signature_hash = sha256_digest(bundle.signature).digest 192 verified_timestamps = [] 193 for tsr in timestamp_responses: 194 if verified_timestamp := self._verify_signed_timestamp(tsr, signature_hash): 195 verified_timestamps.append(verified_timestamp) 196 197 return verified_timestamps 198 199 def _establish_time(self, bundle: Bundle) -> List[TimestampVerificationResult]: 200 """ 201 Establish the time for bundle verification. 202 203 This method uses timestamps from two possible sources: 204 1. RFC3161 signed timestamps from a Timestamping Authority (TSA) 205 2. Transparency Log timestamps 206 """ 207 verified_timestamps = [] 208 209 # If a timestamp from the timestamping service is available, the Verifier MUST 210 # perform path validation using the timestamp from the Timestamping Service. 211 if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps: 212 if not self._trusted_root.get_timestamp_authorities(): 213 msg = ( 214 "no Timestamp Authorities have been provided to validate this " 215 "bundle but it contains a signed timestamp" 216 ) 217 raise VerificationError(msg) 218 219 timestamp_from_tsa = self._verify_timestamp_authority(bundle) 220 if len(timestamp_from_tsa) < VERIFY_TIMESTAMP_THRESHOLD: 221 msg = ( 222 f"not enough timestamps validated to meet the validation " 223 f"threshold ({len(timestamp_from_tsa)}/{VERIFY_TIMESTAMP_THRESHOLD})" 224 ) 225 raise VerificationError(msg) 226 227 verified_timestamps.extend(timestamp_from_tsa) 228 229 # If a timestamp from the Transparency Service is available, the Verifier MUST 230 # perform path validation using the timestamp from the Transparency Service. 231 if timestamp := bundle.log_entry.integrated_time: 232 verified_timestamps.append( 233 TimestampVerificationResult( 234 source=TimestampSource.TRANSPARENCY_SERVICE, 235 time=datetime.fromtimestamp(timestamp, tz=timezone.utc), 236 ) 237 ) 238 return verified_timestamps 239 240 def _verify_chain_at_time( 241 self, certificate: X509, timestamp_result: TimestampVerificationResult 242 ) -> List[X509]: 243 """ 244 Verify the validity of the certificate chain at the given time. 245 246 Raises a VerificationError if the chain can't be built or be verified. 247 """ 248 # NOTE: The `X509Store` object cannot have its time reset once the `set_time` 249 # method been called on it. To get around this, we construct a new one in each 250 # call. 251 store = X509Store() 252 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 253 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 254 # would be strictly more conformant of OpenSSL, but we currently 255 # *want* the "long" chain behavior of performing path validation 256 # down to a self-signed root. 257 store.set_flags(X509StoreFlags.X509_STRICT) 258 for parent_cert_ossl in self._fulcio_certificate_chain: 259 store.add_cert(parent_cert_ossl) 260 261 store.set_time(timestamp_result.time) 262 263 store_ctx = X509StoreContext(store, certificate) 264 265 try: 266 # get_verified_chain returns the full chain including the end-entity certificate 267 # and chain should contain only CA certificates 268 return store_ctx.get_verified_chain()[1:] 269 except X509StoreContextError as e: 270 raise VerificationError(f"failed to build chain: {e}") 271 272 def _verify_common_signing_cert( 273 self, bundle: Bundle, policy: VerificationPolicy 274 ) -> None: 275 """ 276 Performs the signing certificate verification steps that are shared between 277 `verify_dsse` and `verify_artifact`. 278 279 Raises `VerificationError` on all failures. 280 """ 281 282 # In order to verify an artifact, we need to achieve the following: 283 # 284 # 0. Establish a time for the signature. 285 # 1. Verify that the signing certificate chains to the root of trust 286 # and is valid at the time of signing. 287 # 2. Verify the signing certificate's SCT. 288 # 3. Verify that the signing certificate conforms to the Sigstore 289 # X.509 profile as well as the passed-in `VerificationPolicy`. 290 # 4. Verify the inclusion proof and signed checkpoint for the log 291 # entry. 292 # 5. Verify the inclusion promise for the log entry, if present. 293 # 6. Verify the timely insertion of the log entry against the validity 294 # period for the signing certificate. 295 # 7. Verify the signature and input against the signing certificate's 296 # public key. 297 # 8. Verify the transparency log entry's consistency against the other 298 # materials, to prevent variants of CVE-2022-36056. 299 # 300 # This method performs steps (0) through (6) above. Its caller 301 # MUST perform steps (7) and (8) separately, since they vary based on 302 # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.) 303 304 cert = bundle.signing_certificate 305 306 # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time` 307 # method been called on it. To get around this, we construct a new one for every `verify` 308 # call. 309 store = X509Store() 310 # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's 311 # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN 312 # would be strictly more conformant of OpenSSL, but we currently 313 # *want* the "long" chain behavior of performing path validation 314 # down to a self-signed root. 315 store.set_flags(X509StoreFlags.X509_STRICT) 316 for parent_cert_ossl in self._fulcio_certificate_chain: 317 store.add_cert(parent_cert_ossl) 318 319 # (0): Establishing a Time for the Signature 320 # First, establish a time for the signature. This timestamp is required to 321 # validate the certificate chain, so this step comes first. 322 # While this step is optional and only performed if timestamp data has been 323 # provided within the bundle, providing a signed timestamp without a TSA to 324 # verify it result in a VerificationError. 325 verified_timestamps = self._establish_time(bundle) 326 if not verified_timestamps: 327 raise VerificationError("not enough sources of verified time") 328 329 # (1): verify that the signing certificate is signed by the root 330 # certificate and that the signing certificate was valid at the 331 # time of signing. 332 cert_ossl = X509.from_cryptography(cert) 333 chain: list[X509] = [] 334 for vts in verified_timestamps: 335 chain = self._verify_chain_at_time(cert_ossl, vts) 336 337 # (2): verify the signing certificate's SCT. 338 sct = _get_precertificate_signed_certificate_timestamps(cert)[0] 339 try: 340 verify_sct( 341 sct, 342 cert, 343 [parent_cert.to_cryptography() for parent_cert in chain], 344 self._trusted_root.ct_keyring(KeyringPurpose.VERIFY), 345 ) 346 except VerificationError as e: 347 raise VerificationError(f"failed to verify SCT on signing certificate: {e}") 348 349 # (3): verify the signing certificate against the Sigstore 350 # X.509 profile and verify against the given `VerificationPolicy`. 351 usage_ext = cert.extensions.get_extension_for_class(KeyUsage) 352 if not usage_ext.value.digital_signature: 353 raise VerificationError("Key usage is not of type `digital signature`") 354 355 extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage) 356 if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value: 357 raise VerificationError("Extended usage does not contain `code signing`") 358 359 policy.verify(cert) 360 361 _logger.debug("Successfully verified signing certificate validity...") 362 363 # (4): verify the inclusion proof and signed checkpoint for the 364 # log entry. 365 # (5): verify the inclusion promise for the log entry, if present. 366 entry = bundle.log_entry 367 try: 368 entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY)) 369 except VerificationError as exc: 370 raise VerificationError(f"invalid log entry: {exc}") 371 372 # (6): verify that log entry was integrated circa the signing certificate's 373 # validity period. 374 integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc) 375 if not ( 376 bundle.signing_certificate.not_valid_before_utc 377 <= integrated_time 378 <= bundle.signing_certificate.not_valid_after_utc 379 ): 380 raise VerificationError( 381 "invalid signing cert: expired at time of Rekor entry" 382 ) 383 384 def verify_dsse( 385 self, bundle: Bundle, policy: VerificationPolicy 386 ) -> tuple[str, bytes]: 387 """ 388 Verifies an bundle's DSSE envelope, returning the encapsulated payload 389 and its content type. 390 391 This method is only for DSSE-enveloped payloads. To verify 392 an arbitrary input against a bundle, use the `verify_artifact` 393 method. 394 395 `bundle` is the Sigstore `Bundle` to both verify and verify against. 396 397 `policy` is the `VerificationPolicy` to verify against. 398 399 Returns a tuple of `(type, payload)`, where `type` is the payload's 400 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 401 of the payload. No validation of either `type` or `payload` is 402 performed; users of this API **must** assert that `type` is known 403 to them before proceeding to handle `payload` in an application-dependent 404 manner. 405 """ 406 407 # (1) through (6) are performed by `_verify_common_signing_cert`. 408 self._verify_common_signing_cert(bundle, policy) 409 410 # (7): verify the bundle's signature and DSSE envelope against the 411 # signing certificate's public key. 412 envelope = bundle._dsse_envelope 413 if envelope is None: 414 raise VerificationError( 415 "cannot perform DSSE verification on a bundle without a DSSE envelope" 416 ) 417 418 signing_key = bundle.signing_certificate.public_key() 419 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 420 dsse._verify(signing_key, envelope) 421 422 # (8): verify the consistency of the log entry's body against 423 # the other bundle materials. 424 # NOTE: This is very slightly weaker than the consistency check 425 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 426 # the included entry for DSSE includes an envelope hash that we 427 # *cannot* verify, since the envelope is uncanonicalized JSON. 428 # Instead, we manually pick apart the entry body below and verify 429 # the parts we can (namely the payload hash and signature list). 430 entry = bundle.log_entry 431 try: 432 entry_body = rekor_types.Dsse.model_validate_json( 433 base64.b64decode(entry.body) 434 ) 435 except ValidationError as exc: 436 raise VerificationError(f"invalid DSSE log entry: {exc}") 437 438 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 439 if ( 440 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 441 != rekor_types.dsse.Algorithm.SHA256 442 ): 443 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 444 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 445 raise VerificationError("log entry payload hash does not match bundle") 446 447 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 448 # but we handle them just in case the signer has somehow produced multiple 449 # signatures for their envelope with the same signing key. 450 signatures = [ 451 rekor_types.dsse.Signature( 452 signature=base64.b64encode(signature.sig).decode(), 453 verifier=base64_encode_pem_cert(bundle.signing_certificate), 454 ) 455 for signature in envelope._inner.signatures 456 ] 457 if signatures != entry_body.spec.root.signatures: 458 raise VerificationError("log entry signatures do not match bundle") 459 460 return (envelope._inner.payload_type, envelope._inner.payload) 461 462 def verify_artifact( 463 self, 464 input_: bytes | Hashed, 465 bundle: Bundle, 466 policy: VerificationPolicy, 467 ) -> None: 468 """ 469 Public API for verifying. 470 471 `input_` is the input to verify, either as a buffer of contents or as 472 a prehashed `Hashed` object. 473 474 `bundle` is the Sigstore `Bundle` to verify against. 475 476 `policy` is the `VerificationPolicy` to verify against. 477 478 On failure, this method raises `VerificationError`. 479 """ 480 481 # (1) through (6) are performed by `_verify_common_signing_cert`. 482 self._verify_common_signing_cert(bundle, policy) 483 484 hashed_input = sha256_digest(input_) 485 486 # (7): verify that the signature was signed by the public key in the signing certificate. 487 try: 488 signing_key = bundle.signing_certificate.public_key() 489 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 490 signing_key.verify( 491 bundle._inner.message_signature.signature, 492 hashed_input.digest, 493 ec.ECDSA(hashed_input._as_prehashed()), 494 ) 495 except InvalidSignature: 496 raise VerificationError("Signature is invalid for input") 497 498 _logger.debug("Successfully verified signature...") 499 500 # (8): verify the consistency of the log entry's body against 501 # the other bundle materials (and input being verified). 502 entry = bundle.log_entry 503 504 expected_body = _hashedrekord_from_parts( 505 bundle.signing_certificate, 506 bundle._inner.message_signature.signature, 507 hashed_input, 508 ) 509 actual_body = rekor_types.Hashedrekord.model_validate_json( 510 base64.b64decode(entry.body) 511 ) 512 if expected_body != actual_body: 513 raise VerificationError( 514 "transparency log entry is inconsistent with other materials" 515 )
The primary API for verification operations.
74 def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot): 75 """ 76 Create a new `Verifier`. 77 78 `rekor` is a `RekorClient` capable of connecting to a Rekor instance 79 containing logs for the file(s) being verified. 80 81 `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates, 82 establishing the trust chain for the signing certificate and signature. 83 """ 84 self._rekor = rekor 85 self._fulcio_certificate_chain: List[X509] = [ 86 X509.from_cryptography(parent_cert) 87 for parent_cert in trusted_root.get_fulcio_certs() 88 ] 89 self._trusted_root = trusted_root
Create a new Verifier
.
rekor
is a RekorClient
capable of connecting to a Rekor instance
containing logs for the file(s) being verified.
fulcio_certificate_chain
is a list of PEM-encoded X.509 certificates,
establishing the trust chain for the signing certificate and signature.
91 @classmethod 92 def production(cls, *, offline: bool = False) -> Verifier: 93 """ 94 Return a `Verifier` instance configured against Sigstore's production-level services. 95 """ 96 return cls( 97 rekor=RekorClient.production(), 98 trusted_root=TrustedRoot.production(offline=offline), 99 )
Return a Verifier
instance configured against Sigstore's production-level services.
101 @classmethod 102 def staging(cls, *, offline: bool = False) -> Verifier: 103 """ 104 Return a `Verifier` instance configured against Sigstore's staging-level services. 105 """ 106 return cls( 107 rekor=RekorClient.staging(), 108 trusted_root=TrustedRoot.staging(offline=offline), 109 )
Return a Verifier
instance configured against Sigstore's staging-level services.
384 def verify_dsse( 385 self, bundle: Bundle, policy: VerificationPolicy 386 ) -> tuple[str, bytes]: 387 """ 388 Verifies an bundle's DSSE envelope, returning the encapsulated payload 389 and its content type. 390 391 This method is only for DSSE-enveloped payloads. To verify 392 an arbitrary input against a bundle, use the `verify_artifact` 393 method. 394 395 `bundle` is the Sigstore `Bundle` to both verify and verify against. 396 397 `policy` is the `VerificationPolicy` to verify against. 398 399 Returns a tuple of `(type, payload)`, where `type` is the payload's 400 type as encoded in the DSSE envelope and `payload` is the raw `bytes` 401 of the payload. No validation of either `type` or `payload` is 402 performed; users of this API **must** assert that `type` is known 403 to them before proceeding to handle `payload` in an application-dependent 404 manner. 405 """ 406 407 # (1) through (6) are performed by `_verify_common_signing_cert`. 408 self._verify_common_signing_cert(bundle, policy) 409 410 # (7): verify the bundle's signature and DSSE envelope against the 411 # signing certificate's public key. 412 envelope = bundle._dsse_envelope 413 if envelope is None: 414 raise VerificationError( 415 "cannot perform DSSE verification on a bundle without a DSSE envelope" 416 ) 417 418 signing_key = bundle.signing_certificate.public_key() 419 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 420 dsse._verify(signing_key, envelope) 421 422 # (8): verify the consistency of the log entry's body against 423 # the other bundle materials. 424 # NOTE: This is very slightly weaker than the consistency check 425 # for hashedrekord entries, due to how inclusion is recorded for DSSE: 426 # the included entry for DSSE includes an envelope hash that we 427 # *cannot* verify, since the envelope is uncanonicalized JSON. 428 # Instead, we manually pick apart the entry body below and verify 429 # the parts we can (namely the payload hash and signature list). 430 entry = bundle.log_entry 431 try: 432 entry_body = rekor_types.Dsse.model_validate_json( 433 base64.b64decode(entry.body) 434 ) 435 except ValidationError as exc: 436 raise VerificationError(f"invalid DSSE log entry: {exc}") 437 438 payload_hash = sha256_digest(envelope._inner.payload).digest.hex() 439 if ( 440 entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] 441 != rekor_types.dsse.Algorithm.SHA256 442 ): 443 raise VerificationError("expected SHA256 payload hash in DSSE log entry") 444 if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] 445 raise VerificationError("log entry payload hash does not match bundle") 446 447 # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, 448 # but we handle them just in case the signer has somehow produced multiple 449 # signatures for their envelope with the same signing key. 450 signatures = [ 451 rekor_types.dsse.Signature( 452 signature=base64.b64encode(signature.sig).decode(), 453 verifier=base64_encode_pem_cert(bundle.signing_certificate), 454 ) 455 for signature in envelope._inner.signatures 456 ] 457 if signatures != entry_body.spec.root.signatures: 458 raise VerificationError("log entry signatures do not match bundle") 459 460 return (envelope._inner.payload_type, envelope._inner.payload)
Verifies an bundle's DSSE envelope, returning the encapsulated payload and its content type.
This method is only for DSSE-enveloped payloads. To verify
an arbitrary input against a bundle, use the verify_artifact
method.
bundle
is the Sigstore Bundle
to both verify and verify against.
policy
is the VerificationPolicy
to verify against.
Returns a tuple of (type, payload)
, where type
is the payload's
type as encoded in the DSSE envelope and payload
is the raw bytes
of the payload. No validation of either type
or payload
is
performed; users of this API must assert that type
is known
to them before proceeding to handle payload
in an application-dependent
manner.
462 def verify_artifact( 463 self, 464 input_: bytes | Hashed, 465 bundle: Bundle, 466 policy: VerificationPolicy, 467 ) -> None: 468 """ 469 Public API for verifying. 470 471 `input_` is the input to verify, either as a buffer of contents or as 472 a prehashed `Hashed` object. 473 474 `bundle` is the Sigstore `Bundle` to verify against. 475 476 `policy` is the `VerificationPolicy` to verify against. 477 478 On failure, this method raises `VerificationError`. 479 """ 480 481 # (1) through (6) are performed by `_verify_common_signing_cert`. 482 self._verify_common_signing_cert(bundle, policy) 483 484 hashed_input = sha256_digest(input_) 485 486 # (7): verify that the signature was signed by the public key in the signing certificate. 487 try: 488 signing_key = bundle.signing_certificate.public_key() 489 signing_key = cast(ec.EllipticCurvePublicKey, signing_key) 490 signing_key.verify( 491 bundle._inner.message_signature.signature, 492 hashed_input.digest, 493 ec.ECDSA(hashed_input._as_prehashed()), 494 ) 495 except InvalidSignature: 496 raise VerificationError("Signature is invalid for input") 497 498 _logger.debug("Successfully verified signature...") 499 500 # (8): verify the consistency of the log entry's body against 501 # the other bundle materials (and input being verified). 502 entry = bundle.log_entry 503 504 expected_body = _hashedrekord_from_parts( 505 bundle.signing_certificate, 506 bundle._inner.message_signature.signature, 507 hashed_input, 508 ) 509 actual_body = rekor_types.Hashedrekord.model_validate_json( 510 base64.b64decode(entry.body) 511 ) 512 if expected_body != actual_body: 513 raise VerificationError( 514 "transparency log entry is inconsistent with other materials" 515 )
Public API for verifying.
input_
is the input to verify, either as a buffer of contents or as
a prehashed Hashed
object.
bundle
is the Sigstore Bundle
to verify against.
policy
is the VerificationPolicy
to verify against.
On failure, this method raises VerificationError
.