sigstore.verify.verifier

Verification API machinery.

  1# Copyright 2022 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16Verification API machinery.
 17"""
 18
 19from __future__ import annotations
 20
 21import base64
 22import logging
 23from datetime import datetime, timezone
 24from typing import List, cast
 25
 26import rekor_types
 27from cryptography.exceptions import InvalidSignature
 28from cryptography.hazmat.primitives.asymmetric import ec
 29from cryptography.x509 import ExtendedKeyUsage, KeyUsage
 30from cryptography.x509.oid import ExtendedKeyUsageOID
 31from OpenSSL.crypto import (
 32    X509,
 33    X509Store,
 34    X509StoreContext,
 35    X509StoreContextError,
 36    X509StoreFlags,
 37)
 38from pydantic import ValidationError
 39from rfc3161_client import TimeStampResponse, VerifierBuilder
 40from rfc3161_client import VerificationError as Rfc3161VerificationError
 41
 42from sigstore import dsse
 43from sigstore._internal.rekor import _hashedrekord_from_parts
 44from sigstore._internal.rekor.client import RekorClient
 45from sigstore._internal.sct import (
 46    _get_precertificate_signed_certificate_timestamps,
 47    verify_sct,
 48)
 49from sigstore._internal.timestamp import TimestampSource, TimestampVerificationResult
 50from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot
 51from sigstore._utils import base64_encode_pem_cert, sha256_digest
 52from sigstore.errors import VerificationError
 53from sigstore.hashes import Hashed
 54from sigstore.models import Bundle
 55from sigstore.verify.policy import VerificationPolicy
 56
 57_logger = logging.getLogger(__name__)
 58
 59# Limit the number of timestamps to prevent DoS
 60# From https://github.com/sigstore/sigstore-go/blob/e92142f0734064ebf6001f188b7330a1212245fe/pkg/verify/tsa.go#L29
 61MAX_ALLOWED_TIMESTAMP: int = 32
 62
 63# When verifying a timestamp, this threshold represents the minimum number of required
 64# timestamps to consider a signature valid.
 65VERIFY_TIMESTAMP_THRESHOLD: int = 1
 66
 67
 68class Verifier:
 69    """
 70    The primary API for verification operations.
 71    """
 72
 73    def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
 74        """
 75        Create a new `Verifier`.
 76
 77        `rekor` is a `RekorClient` capable of connecting to a Rekor instance
 78        containing logs for the file(s) being verified.
 79
 80        `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates,
 81        establishing the trust chain for the signing certificate and signature.
 82        """
 83        self._rekor = rekor
 84        self._fulcio_certificate_chain: List[X509] = [
 85            X509.from_cryptography(parent_cert)
 86            for parent_cert in trusted_root.get_fulcio_certs()
 87        ]
 88        self._trusted_root = trusted_root
 89
 90    @classmethod
 91    def production(cls, *, offline: bool = False) -> Verifier:
 92        """
 93        Return a `Verifier` instance configured against Sigstore's production-level services.
 94        """
 95        return cls(
 96            rekor=RekorClient.production(),
 97            trusted_root=TrustedRoot.production(offline=offline),
 98        )
 99
100    @classmethod
101    def staging(cls, *, offline: bool = False) -> Verifier:
102        """
103        Return a `Verifier` instance configured against Sigstore's staging-level services.
104        """
105        return cls(
106            rekor=RekorClient.staging(),
107            trusted_root=TrustedRoot.staging(offline=offline),
108        )
109
110    @classmethod
111    def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier:
112        """
113        Create a `Verifier` from the given `ClientTrustConfig`.
114
115        @api private
116        """
117        return cls(
118            rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]),
119            trusted_root=trust_config.trusted_root,
120        )
121
122    def _verify_signed_timestamp(
123        self, timestamp_response: TimeStampResponse, signature: bytes
124    ) -> TimestampVerificationResult | None:
125        """
126        Verify a Signed Timestamp using the TSA provided by the Trusted Root.
127        """
128        cert_authorities = self._trusted_root.get_timestamp_authorities()
129        for certificate_authority in cert_authorities:
130            certificates = certificate_authority.certificates(allow_expired=True)
131
132            builder = VerifierBuilder()
133            for certificate in certificates:
134                builder.add_root_certificate(certificate)
135
136            verifier = builder.build()
137            try:
138                verifier.verify(timestamp_response, signature)
139            except Rfc3161VerificationError as e:
140                _logger.debug("Unable to verify Timestamp with CA.")
141                _logger.exception(e)
142                continue
143
144            if (
145                certificate_authority.validity_period_start
146                and certificate_authority.validity_period_end
147            ):
148                if (
149                    certificate_authority.validity_period_start
150                    <= timestamp_response.tst_info.gen_time
151                    < certificate_authority.validity_period_end
152                ):
153                    return TimestampVerificationResult(
154                        source=TimestampSource.TIMESTAMP_AUTHORITY,
155                        time=timestamp_response.tst_info.gen_time,
156                    )
157
158                _logger.debug(
159                    "Unable to verify Timestamp because not in CA time range."
160                )
161            else:
162                _logger.debug(
163                    "Unable to verify Timestamp because no validity provided."
164                )
165
166        return None
167
168    def _verify_timestamp_authority(
169        self, bundle: Bundle
170    ) -> List[TimestampVerificationResult]:
171        """
172        Verify that the given bundle has been timestamped by a trusted timestamp authority
173        and that the timestamp is valid.
174
175        Returns the number of valid signed timestamp in the bundle.
176        """
177        timestamp_responses = (
178            bundle.verification_material.timestamp_verification_data.rfc3161_timestamps
179        )
180        if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP:
181            msg = f"Too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}"
182            raise VerificationError(msg)
183
184        if len(set(timestamp_responses)) != len(timestamp_responses):
185            msg = "Duplicate timestamp found"
186            raise VerificationError(msg)
187
188        # The Signer sends a hash of the signature as the messageImprint in a TimeStampReq
189        # to the Timestamping Service
190        signature_hash = sha256_digest(bundle.signature).digest
191        verified_timestamps = []
192        for tsr in timestamp_responses:
193            if verified_timestamp := self._verify_signed_timestamp(tsr, signature_hash):
194                verified_timestamps.append(verified_timestamp)
195
196        return verified_timestamps
197
198    def _establish_time(self, bundle: Bundle) -> List[TimestampVerificationResult]:
199        """
200        Establish the time for bundle verification.
201
202        This method uses timestamps from two possible sources:
203        1. RFC3161 signed timestamps from a Timestamping Authority (TSA)
204        2. Transparency Log timestamps
205        """
206        verified_timestamps = []
207
208        # If a timestamp from the timestamping service is available, the Verifier MUST
209        # perform path validation using the timestamp from the Timestamping Service.
210        if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps:
211            if not self._trusted_root.get_timestamp_authorities():
212                msg = (
213                    "no Timestamp Authorities have been provided to validate this "
214                    "bundle but it contains a signed timestamp"
215                )
216                raise VerificationError(msg)
217
218            timestamp_from_tsa = self._verify_timestamp_authority(bundle)
219            if len(timestamp_from_tsa) < VERIFY_TIMESTAMP_THRESHOLD:
220                msg = (
221                    f"not enough timestamps validated to meet the validation "
222                    f"threshold ({len(timestamp_from_tsa)}/{VERIFY_TIMESTAMP_THRESHOLD})"
223                )
224                raise VerificationError(msg)
225
226            verified_timestamps.extend(timestamp_from_tsa)
227
228        # If a timestamp from the Transparency Service is available, the Verifier MUST
229        # perform path validation using the timestamp from the Transparency Service.
230        if timestamp := bundle.log_entry.integrated_time:
231            verified_timestamps.append(
232                TimestampVerificationResult(
233                    source=TimestampSource.TRANSPARENCY_SERVICE,
234                    time=datetime.fromtimestamp(timestamp, tz=timezone.utc),
235                )
236            )
237        return verified_timestamps
238
239    def _verify_chain_at_time(
240        self, certificate: X509, timestamp_result: TimestampVerificationResult
241    ) -> List[X509]:
242        """
243        Verify the validity of the certificate chain at the given time.
244
245        Raises a VerificationError if the chain can't be built or be verified.
246        """
247        # NOTE: The `X509Store` object cannot have its time reset once the `set_time`
248        # method been called on it. To get around this, we construct a new one in each
249        # call.
250        store = X509Store()
251        # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
252        # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
253        # would be strictly more conformant of OpenSSL, but we currently
254        # *want* the "long" chain behavior of performing path validation
255        # down to a self-signed root.
256        store.set_flags(X509StoreFlags.X509_STRICT)
257        for parent_cert_ossl in self._fulcio_certificate_chain:
258            store.add_cert(parent_cert_ossl)
259
260        store.set_time(timestamp_result.time)
261
262        store_ctx = X509StoreContext(store, certificate)
263
264        try:
265            # get_verified_chain returns the full chain including the end-entity certificate
266            # and chain should contain only CA certificates
267            return store_ctx.get_verified_chain()[1:]
268        except X509StoreContextError as e:
269            raise VerificationError(f"failed to build chain: {e}")
270
271    def _verify_common_signing_cert(
272        self, bundle: Bundle, policy: VerificationPolicy
273    ) -> None:
274        """
275        Performs the signing certificate verification steps that are shared between
276        `verify_dsse` and `verify_artifact`.
277
278        Raises `VerificationError` on all failures.
279        """
280
281        # In order to verify an artifact, we need to achieve the following:
282        #
283        # 0. Establish a time for the signature.
284        # 1. Verify that the signing certificate chains to the root of trust
285        #    and is valid at the time of signing.
286        # 2. Verify the signing certificate's SCT.
287        # 3. Verify that the signing certificate conforms to the Sigstore
288        #    X.509 profile as well as the passed-in `VerificationPolicy`.
289        # 4. Verify the inclusion proof and signed checkpoint for the log
290        #    entry.
291        # 5. Verify the inclusion promise for the log entry, if present.
292        # 6. Verify the timely insertion of the log entry against the validity
293        #    period for the signing certificate.
294        # 7. Verify the signature and input against the signing certificate's
295        #    public key.
296        # 8. Verify the transparency log entry's consistency against the other
297        #    materials, to prevent variants of CVE-2022-36056.
298        #
299        # This method performs steps (0) through (6) above. Its caller
300        # MUST perform steps (7) and (8) separately, since they vary based on
301        # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
302
303        cert = bundle.signing_certificate
304
305        # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
306        # method been called on it. To get around this, we construct a new one for every `verify`
307        # call.
308        store = X509Store()
309        # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
310        # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
311        # would be strictly more conformant of OpenSSL, but we currently
312        # *want* the "long" chain behavior of performing path validation
313        # down to a self-signed root.
314        store.set_flags(X509StoreFlags.X509_STRICT)
315        for parent_cert_ossl in self._fulcio_certificate_chain:
316            store.add_cert(parent_cert_ossl)
317
318        # (0): Establishing a Time for the Signature
319        # First, establish a time for the signature. This timestamp is required to
320        # validate the certificate chain, so this step comes first.
321        # While this step is optional and only performed if timestamp data has been
322        # provided within the bundle, providing a signed timestamp without a TSA to
323        # verify it result in a VerificationError.
324        verified_timestamps = self._establish_time(bundle)
325        if not verified_timestamps:
326            raise VerificationError("not enough sources of verified time")
327
328        # (1): verify that the signing certificate is signed by the root
329        #      certificate and that the signing certificate was valid at the
330        #      time of signing.
331        cert_ossl = X509.from_cryptography(cert)
332        chain: list[X509] = []
333        for vts in verified_timestamps:
334            chain = self._verify_chain_at_time(cert_ossl, vts)
335
336        # (2): verify the signing certificate's SCT.
337        sct = _get_precertificate_signed_certificate_timestamps(cert)[0]
338        try:
339            verify_sct(
340                sct,
341                cert,
342                [parent_cert.to_cryptography() for parent_cert in chain],
343                self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
344            )
345        except VerificationError as e:
346            raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
347
348        # (3): verify the signing certificate against the Sigstore
349        #      X.509 profile and verify against the given `VerificationPolicy`.
350        usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
351        if not usage_ext.value.digital_signature:
352            raise VerificationError("Key usage is not of type `digital signature`")
353
354        extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
355        if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
356            raise VerificationError("Extended usage does not contain `code signing`")
357
358        policy.verify(cert)
359
360        _logger.debug("Successfully verified signing certificate validity...")
361
362        # (4): verify the inclusion proof and signed checkpoint for the
363        #      log entry.
364        # (5): verify the inclusion promise for the log entry, if present.
365        entry = bundle.log_entry
366        try:
367            entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
368        except VerificationError as exc:
369            raise VerificationError(f"invalid log entry: {exc}")
370
371        # (6): verify that log entry was integrated circa the signing certificate's
372        #      validity period.
373        integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc)
374        if not (
375            bundle.signing_certificate.not_valid_before_utc
376            <= integrated_time
377            <= bundle.signing_certificate.not_valid_after_utc
378        ):
379            raise VerificationError(
380                "invalid signing cert: expired at time of Rekor entry"
381            )
382
383    def verify_dsse(
384        self, bundle: Bundle, policy: VerificationPolicy
385    ) -> tuple[str, bytes]:
386        """
387        Verifies an bundle's DSSE envelope, returning the encapsulated payload
388        and its content type.
389
390        This method is only for DSSE-enveloped payloads. To verify
391        an arbitrary input against a bundle, use the `verify_artifact`
392        method.
393
394        `bundle` is the Sigstore `Bundle` to both verify and verify against.
395
396        `policy` is the `VerificationPolicy` to verify against.
397
398        Returns a tuple of `(type, payload)`, where `type` is the payload's
399        type as encoded in the DSSE envelope and `payload` is the raw `bytes`
400        of the payload. No validation of either `type` or `payload` is
401        performed; users of this API **must** assert that `type` is known
402        to them before proceeding to handle `payload` in an application-dependent
403        manner.
404        """
405
406        # (1) through (6) are performed by `_verify_common_signing_cert`.
407        self._verify_common_signing_cert(bundle, policy)
408
409        # (7): verify the bundle's signature and DSSE envelope against the
410        #      signing certificate's public key.
411        envelope = bundle._dsse_envelope
412        if envelope is None:
413            raise VerificationError(
414                "cannot perform DSSE verification on a bundle without a DSSE envelope"
415            )
416
417        signing_key = bundle.signing_certificate.public_key()
418        signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
419        dsse._verify(signing_key, envelope)
420
421        # (8): verify the consistency of the log entry's body against
422        #      the other bundle materials.
423        # NOTE: This is very slightly weaker than the consistency check
424        # for hashedrekord entries, due to how inclusion is recorded for DSSE:
425        # the included entry for DSSE includes an envelope hash that we
426        # *cannot* verify, since the envelope is uncanonicalized JSON.
427        # Instead, we manually pick apart the entry body below and verify
428        # the parts we can (namely the payload hash and signature list).
429        entry = bundle.log_entry
430        try:
431            entry_body = rekor_types.Dsse.model_validate_json(
432                base64.b64decode(entry.body)
433            )
434        except ValidationError as exc:
435            raise VerificationError(f"invalid DSSE log entry: {exc}")
436
437        payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
438        if (
439            entry_body.spec.root.payload_hash.algorithm  # type: ignore[union-attr]
440            != rekor_types.dsse.Algorithm.SHA256
441        ):
442            raise VerificationError("expected SHA256 payload hash in DSSE log entry")
443        if payload_hash != entry_body.spec.root.payload_hash.value:  # type: ignore[union-attr]
444            raise VerificationError("log entry payload hash does not match bundle")
445
446        # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
447        # but we handle them just in case the signer has somehow produced multiple
448        # signatures for their envelope with the same signing key.
449        signatures = [
450            rekor_types.dsse.Signature(
451                signature=base64.b64encode(signature.sig).decode(),
452                verifier=base64_encode_pem_cert(bundle.signing_certificate),
453            )
454            for signature in envelope._inner.signatures
455        ]
456        if signatures != entry_body.spec.root.signatures:
457            raise VerificationError("log entry signatures do not match bundle")
458
459        return (envelope._inner.payload_type, envelope._inner.payload)
460
461    def verify_artifact(
462        self,
463        input_: bytes | Hashed,
464        bundle: Bundle,
465        policy: VerificationPolicy,
466    ) -> None:
467        """
468        Public API for verifying.
469
470        `input_` is the input to verify, either as a buffer of contents or as
471        a prehashed `Hashed` object.
472
473        `bundle` is the Sigstore `Bundle` to verify against.
474
475        `policy` is the `VerificationPolicy` to verify against.
476
477        On failure, this method raises `VerificationError`.
478        """
479
480        # (1) through (6) are performed by `_verify_common_signing_cert`.
481        self._verify_common_signing_cert(bundle, policy)
482
483        hashed_input = sha256_digest(input_)
484
485        # (7): verify that the signature was signed by the public key in the signing certificate.
486        try:
487            signing_key = bundle.signing_certificate.public_key()
488            signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
489            signing_key.verify(
490                bundle._inner.message_signature.signature,
491                hashed_input.digest,
492                ec.ECDSA(hashed_input._as_prehashed()),
493            )
494        except InvalidSignature:
495            raise VerificationError("Signature is invalid for input")
496
497        _logger.debug("Successfully verified signature...")
498
499        # (8): verify the consistency of the log entry's body against
500        #      the other bundle materials (and input being verified).
501        entry = bundle.log_entry
502
503        expected_body = _hashedrekord_from_parts(
504            bundle.signing_certificate,
505            bundle._inner.message_signature.signature,
506            hashed_input,
507        )
508        actual_body = rekor_types.Hashedrekord.model_validate_json(
509            base64.b64decode(entry.body)
510        )
511        if expected_body != actual_body:
512            raise VerificationError(
513                "transparency log entry is inconsistent with other materials"
514            )
MAX_ALLOWED_TIMESTAMP: int = 32
VERIFY_TIMESTAMP_THRESHOLD: int = 1
class Verifier:
 69class Verifier:
 70    """
 71    The primary API for verification operations.
 72    """
 73
 74    def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
 75        """
 76        Create a new `Verifier`.
 77
 78        `rekor` is a `RekorClient` capable of connecting to a Rekor instance
 79        containing logs for the file(s) being verified.
 80
 81        `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates,
 82        establishing the trust chain for the signing certificate and signature.
 83        """
 84        self._rekor = rekor
 85        self._fulcio_certificate_chain: List[X509] = [
 86            X509.from_cryptography(parent_cert)
 87            for parent_cert in trusted_root.get_fulcio_certs()
 88        ]
 89        self._trusted_root = trusted_root
 90
 91    @classmethod
 92    def production(cls, *, offline: bool = False) -> Verifier:
 93        """
 94        Return a `Verifier` instance configured against Sigstore's production-level services.
 95        """
 96        return cls(
 97            rekor=RekorClient.production(),
 98            trusted_root=TrustedRoot.production(offline=offline),
 99        )
100
101    @classmethod
102    def staging(cls, *, offline: bool = False) -> Verifier:
103        """
104        Return a `Verifier` instance configured against Sigstore's staging-level services.
105        """
106        return cls(
107            rekor=RekorClient.staging(),
108            trusted_root=TrustedRoot.staging(offline=offline),
109        )
110
111    @classmethod
112    def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier:
113        """
114        Create a `Verifier` from the given `ClientTrustConfig`.
115
116        @api private
117        """
118        return cls(
119            rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]),
120            trusted_root=trust_config.trusted_root,
121        )
122
123    def _verify_signed_timestamp(
124        self, timestamp_response: TimeStampResponse, signature: bytes
125    ) -> TimestampVerificationResult | None:
126        """
127        Verify a Signed Timestamp using the TSA provided by the Trusted Root.
128        """
129        cert_authorities = self._trusted_root.get_timestamp_authorities()
130        for certificate_authority in cert_authorities:
131            certificates = certificate_authority.certificates(allow_expired=True)
132
133            builder = VerifierBuilder()
134            for certificate in certificates:
135                builder.add_root_certificate(certificate)
136
137            verifier = builder.build()
138            try:
139                verifier.verify(timestamp_response, signature)
140            except Rfc3161VerificationError as e:
141                _logger.debug("Unable to verify Timestamp with CA.")
142                _logger.exception(e)
143                continue
144
145            if (
146                certificate_authority.validity_period_start
147                and certificate_authority.validity_period_end
148            ):
149                if (
150                    certificate_authority.validity_period_start
151                    <= timestamp_response.tst_info.gen_time
152                    < certificate_authority.validity_period_end
153                ):
154                    return TimestampVerificationResult(
155                        source=TimestampSource.TIMESTAMP_AUTHORITY,
156                        time=timestamp_response.tst_info.gen_time,
157                    )
158
159                _logger.debug(
160                    "Unable to verify Timestamp because not in CA time range."
161                )
162            else:
163                _logger.debug(
164                    "Unable to verify Timestamp because no validity provided."
165                )
166
167        return None
168
169    def _verify_timestamp_authority(
170        self, bundle: Bundle
171    ) -> List[TimestampVerificationResult]:
172        """
173        Verify that the given bundle has been timestamped by a trusted timestamp authority
174        and that the timestamp is valid.
175
176        Returns the number of valid signed timestamp in the bundle.
177        """
178        timestamp_responses = (
179            bundle.verification_material.timestamp_verification_data.rfc3161_timestamps
180        )
181        if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP:
182            msg = f"Too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}"
183            raise VerificationError(msg)
184
185        if len(set(timestamp_responses)) != len(timestamp_responses):
186            msg = "Duplicate timestamp found"
187            raise VerificationError(msg)
188
189        # The Signer sends a hash of the signature as the messageImprint in a TimeStampReq
190        # to the Timestamping Service
191        signature_hash = sha256_digest(bundle.signature).digest
192        verified_timestamps = []
193        for tsr in timestamp_responses:
194            if verified_timestamp := self._verify_signed_timestamp(tsr, signature_hash):
195                verified_timestamps.append(verified_timestamp)
196
197        return verified_timestamps
198
199    def _establish_time(self, bundle: Bundle) -> List[TimestampVerificationResult]:
200        """
201        Establish the time for bundle verification.
202
203        This method uses timestamps from two possible sources:
204        1. RFC3161 signed timestamps from a Timestamping Authority (TSA)
205        2. Transparency Log timestamps
206        """
207        verified_timestamps = []
208
209        # If a timestamp from the timestamping service is available, the Verifier MUST
210        # perform path validation using the timestamp from the Timestamping Service.
211        if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps:
212            if not self._trusted_root.get_timestamp_authorities():
213                msg = (
214                    "no Timestamp Authorities have been provided to validate this "
215                    "bundle but it contains a signed timestamp"
216                )
217                raise VerificationError(msg)
218
219            timestamp_from_tsa = self._verify_timestamp_authority(bundle)
220            if len(timestamp_from_tsa) < VERIFY_TIMESTAMP_THRESHOLD:
221                msg = (
222                    f"not enough timestamps validated to meet the validation "
223                    f"threshold ({len(timestamp_from_tsa)}/{VERIFY_TIMESTAMP_THRESHOLD})"
224                )
225                raise VerificationError(msg)
226
227            verified_timestamps.extend(timestamp_from_tsa)
228
229        # If a timestamp from the Transparency Service is available, the Verifier MUST
230        # perform path validation using the timestamp from the Transparency Service.
231        if timestamp := bundle.log_entry.integrated_time:
232            verified_timestamps.append(
233                TimestampVerificationResult(
234                    source=TimestampSource.TRANSPARENCY_SERVICE,
235                    time=datetime.fromtimestamp(timestamp, tz=timezone.utc),
236                )
237            )
238        return verified_timestamps
239
240    def _verify_chain_at_time(
241        self, certificate: X509, timestamp_result: TimestampVerificationResult
242    ) -> List[X509]:
243        """
244        Verify the validity of the certificate chain at the given time.
245
246        Raises a VerificationError if the chain can't be built or be verified.
247        """
248        # NOTE: The `X509Store` object cannot have its time reset once the `set_time`
249        # method been called on it. To get around this, we construct a new one in each
250        # call.
251        store = X509Store()
252        # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
253        # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
254        # would be strictly more conformant of OpenSSL, but we currently
255        # *want* the "long" chain behavior of performing path validation
256        # down to a self-signed root.
257        store.set_flags(X509StoreFlags.X509_STRICT)
258        for parent_cert_ossl in self._fulcio_certificate_chain:
259            store.add_cert(parent_cert_ossl)
260
261        store.set_time(timestamp_result.time)
262
263        store_ctx = X509StoreContext(store, certificate)
264
265        try:
266            # get_verified_chain returns the full chain including the end-entity certificate
267            # and chain should contain only CA certificates
268            return store_ctx.get_verified_chain()[1:]
269        except X509StoreContextError as e:
270            raise VerificationError(f"failed to build chain: {e}")
271
272    def _verify_common_signing_cert(
273        self, bundle: Bundle, policy: VerificationPolicy
274    ) -> None:
275        """
276        Performs the signing certificate verification steps that are shared between
277        `verify_dsse` and `verify_artifact`.
278
279        Raises `VerificationError` on all failures.
280        """
281
282        # In order to verify an artifact, we need to achieve the following:
283        #
284        # 0. Establish a time for the signature.
285        # 1. Verify that the signing certificate chains to the root of trust
286        #    and is valid at the time of signing.
287        # 2. Verify the signing certificate's SCT.
288        # 3. Verify that the signing certificate conforms to the Sigstore
289        #    X.509 profile as well as the passed-in `VerificationPolicy`.
290        # 4. Verify the inclusion proof and signed checkpoint for the log
291        #    entry.
292        # 5. Verify the inclusion promise for the log entry, if present.
293        # 6. Verify the timely insertion of the log entry against the validity
294        #    period for the signing certificate.
295        # 7. Verify the signature and input against the signing certificate's
296        #    public key.
297        # 8. Verify the transparency log entry's consistency against the other
298        #    materials, to prevent variants of CVE-2022-36056.
299        #
300        # This method performs steps (0) through (6) above. Its caller
301        # MUST perform steps (7) and (8) separately, since they vary based on
302        # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
303
304        cert = bundle.signing_certificate
305
306        # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
307        # method been called on it. To get around this, we construct a new one for every `verify`
308        # call.
309        store = X509Store()
310        # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
311        # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
312        # would be strictly more conformant of OpenSSL, but we currently
313        # *want* the "long" chain behavior of performing path validation
314        # down to a self-signed root.
315        store.set_flags(X509StoreFlags.X509_STRICT)
316        for parent_cert_ossl in self._fulcio_certificate_chain:
317            store.add_cert(parent_cert_ossl)
318
319        # (0): Establishing a Time for the Signature
320        # First, establish a time for the signature. This timestamp is required to
321        # validate the certificate chain, so this step comes first.
322        # While this step is optional and only performed if timestamp data has been
323        # provided within the bundle, providing a signed timestamp without a TSA to
324        # verify it result in a VerificationError.
325        verified_timestamps = self._establish_time(bundle)
326        if not verified_timestamps:
327            raise VerificationError("not enough sources of verified time")
328
329        # (1): verify that the signing certificate is signed by the root
330        #      certificate and that the signing certificate was valid at the
331        #      time of signing.
332        cert_ossl = X509.from_cryptography(cert)
333        chain: list[X509] = []
334        for vts in verified_timestamps:
335            chain = self._verify_chain_at_time(cert_ossl, vts)
336
337        # (2): verify the signing certificate's SCT.
338        sct = _get_precertificate_signed_certificate_timestamps(cert)[0]
339        try:
340            verify_sct(
341                sct,
342                cert,
343                [parent_cert.to_cryptography() for parent_cert in chain],
344                self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
345            )
346        except VerificationError as e:
347            raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
348
349        # (3): verify the signing certificate against the Sigstore
350        #      X.509 profile and verify against the given `VerificationPolicy`.
351        usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
352        if not usage_ext.value.digital_signature:
353            raise VerificationError("Key usage is not of type `digital signature`")
354
355        extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
356        if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
357            raise VerificationError("Extended usage does not contain `code signing`")
358
359        policy.verify(cert)
360
361        _logger.debug("Successfully verified signing certificate validity...")
362
363        # (4): verify the inclusion proof and signed checkpoint for the
364        #      log entry.
365        # (5): verify the inclusion promise for the log entry, if present.
366        entry = bundle.log_entry
367        try:
368            entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
369        except VerificationError as exc:
370            raise VerificationError(f"invalid log entry: {exc}")
371
372        # (6): verify that log entry was integrated circa the signing certificate's
373        #      validity period.
374        integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc)
375        if not (
376            bundle.signing_certificate.not_valid_before_utc
377            <= integrated_time
378            <= bundle.signing_certificate.not_valid_after_utc
379        ):
380            raise VerificationError(
381                "invalid signing cert: expired at time of Rekor entry"
382            )
383
384    def verify_dsse(
385        self, bundle: Bundle, policy: VerificationPolicy
386    ) -> tuple[str, bytes]:
387        """
388        Verifies an bundle's DSSE envelope, returning the encapsulated payload
389        and its content type.
390
391        This method is only for DSSE-enveloped payloads. To verify
392        an arbitrary input against a bundle, use the `verify_artifact`
393        method.
394
395        `bundle` is the Sigstore `Bundle` to both verify and verify against.
396
397        `policy` is the `VerificationPolicy` to verify against.
398
399        Returns a tuple of `(type, payload)`, where `type` is the payload's
400        type as encoded in the DSSE envelope and `payload` is the raw `bytes`
401        of the payload. No validation of either `type` or `payload` is
402        performed; users of this API **must** assert that `type` is known
403        to them before proceeding to handle `payload` in an application-dependent
404        manner.
405        """
406
407        # (1) through (6) are performed by `_verify_common_signing_cert`.
408        self._verify_common_signing_cert(bundle, policy)
409
410        # (7): verify the bundle's signature and DSSE envelope against the
411        #      signing certificate's public key.
412        envelope = bundle._dsse_envelope
413        if envelope is None:
414            raise VerificationError(
415                "cannot perform DSSE verification on a bundle without a DSSE envelope"
416            )
417
418        signing_key = bundle.signing_certificate.public_key()
419        signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
420        dsse._verify(signing_key, envelope)
421
422        # (8): verify the consistency of the log entry's body against
423        #      the other bundle materials.
424        # NOTE: This is very slightly weaker than the consistency check
425        # for hashedrekord entries, due to how inclusion is recorded for DSSE:
426        # the included entry for DSSE includes an envelope hash that we
427        # *cannot* verify, since the envelope is uncanonicalized JSON.
428        # Instead, we manually pick apart the entry body below and verify
429        # the parts we can (namely the payload hash and signature list).
430        entry = bundle.log_entry
431        try:
432            entry_body = rekor_types.Dsse.model_validate_json(
433                base64.b64decode(entry.body)
434            )
435        except ValidationError as exc:
436            raise VerificationError(f"invalid DSSE log entry: {exc}")
437
438        payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
439        if (
440            entry_body.spec.root.payload_hash.algorithm  # type: ignore[union-attr]
441            != rekor_types.dsse.Algorithm.SHA256
442        ):
443            raise VerificationError("expected SHA256 payload hash in DSSE log entry")
444        if payload_hash != entry_body.spec.root.payload_hash.value:  # type: ignore[union-attr]
445            raise VerificationError("log entry payload hash does not match bundle")
446
447        # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
448        # but we handle them just in case the signer has somehow produced multiple
449        # signatures for their envelope with the same signing key.
450        signatures = [
451            rekor_types.dsse.Signature(
452                signature=base64.b64encode(signature.sig).decode(),
453                verifier=base64_encode_pem_cert(bundle.signing_certificate),
454            )
455            for signature in envelope._inner.signatures
456        ]
457        if signatures != entry_body.spec.root.signatures:
458            raise VerificationError("log entry signatures do not match bundle")
459
460        return (envelope._inner.payload_type, envelope._inner.payload)
461
462    def verify_artifact(
463        self,
464        input_: bytes | Hashed,
465        bundle: Bundle,
466        policy: VerificationPolicy,
467    ) -> None:
468        """
469        Public API for verifying.
470
471        `input_` is the input to verify, either as a buffer of contents or as
472        a prehashed `Hashed` object.
473
474        `bundle` is the Sigstore `Bundle` to verify against.
475
476        `policy` is the `VerificationPolicy` to verify against.
477
478        On failure, this method raises `VerificationError`.
479        """
480
481        # (1) through (6) are performed by `_verify_common_signing_cert`.
482        self._verify_common_signing_cert(bundle, policy)
483
484        hashed_input = sha256_digest(input_)
485
486        # (7): verify that the signature was signed by the public key in the signing certificate.
487        try:
488            signing_key = bundle.signing_certificate.public_key()
489            signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
490            signing_key.verify(
491                bundle._inner.message_signature.signature,
492                hashed_input.digest,
493                ec.ECDSA(hashed_input._as_prehashed()),
494            )
495        except InvalidSignature:
496            raise VerificationError("Signature is invalid for input")
497
498        _logger.debug("Successfully verified signature...")
499
500        # (8): verify the consistency of the log entry's body against
501        #      the other bundle materials (and input being verified).
502        entry = bundle.log_entry
503
504        expected_body = _hashedrekord_from_parts(
505            bundle.signing_certificate,
506            bundle._inner.message_signature.signature,
507            hashed_input,
508        )
509        actual_body = rekor_types.Hashedrekord.model_validate_json(
510            base64.b64decode(entry.body)
511        )
512        if expected_body != actual_body:
513            raise VerificationError(
514                "transparency log entry is inconsistent with other materials"
515            )

The primary API for verification operations.

Verifier( *, rekor: sigstore._internal.rekor.client.RekorClient, trusted_root: sigstore._internal.trust.TrustedRoot)
74    def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
75        """
76        Create a new `Verifier`.
77
78        `rekor` is a `RekorClient` capable of connecting to a Rekor instance
79        containing logs for the file(s) being verified.
80
81        `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates,
82        establishing the trust chain for the signing certificate and signature.
83        """
84        self._rekor = rekor
85        self._fulcio_certificate_chain: List[X509] = [
86            X509.from_cryptography(parent_cert)
87            for parent_cert in trusted_root.get_fulcio_certs()
88        ]
89        self._trusted_root = trusted_root

Create a new Verifier.

rekor is a RekorClient capable of connecting to a Rekor instance containing logs for the file(s) being verified.

fulcio_certificate_chain is a list of PEM-encoded X.509 certificates, establishing the trust chain for the signing certificate and signature.

@classmethod
def production(cls, *, offline: bool = False) -> Verifier:
91    @classmethod
92    def production(cls, *, offline: bool = False) -> Verifier:
93        """
94        Return a `Verifier` instance configured against Sigstore's production-level services.
95        """
96        return cls(
97            rekor=RekorClient.production(),
98            trusted_root=TrustedRoot.production(offline=offline),
99        )

Return a Verifier instance configured against Sigstore's production-level services.

@classmethod
def staging(cls, *, offline: bool = False) -> Verifier:
101    @classmethod
102    def staging(cls, *, offline: bool = False) -> Verifier:
103        """
104        Return a `Verifier` instance configured against Sigstore's staging-level services.
105        """
106        return cls(
107            rekor=RekorClient.staging(),
108            trusted_root=TrustedRoot.staging(offline=offline),
109        )

Return a Verifier instance configured against Sigstore's staging-level services.

def verify_dsse( self, bundle: sigstore.models.Bundle, policy: sigstore.verify.policy.VerificationPolicy) -> tuple[str, bytes]:
384    def verify_dsse(
385        self, bundle: Bundle, policy: VerificationPolicy
386    ) -> tuple[str, bytes]:
387        """
388        Verifies an bundle's DSSE envelope, returning the encapsulated payload
389        and its content type.
390
391        This method is only for DSSE-enveloped payloads. To verify
392        an arbitrary input against a bundle, use the `verify_artifact`
393        method.
394
395        `bundle` is the Sigstore `Bundle` to both verify and verify against.
396
397        `policy` is the `VerificationPolicy` to verify against.
398
399        Returns a tuple of `(type, payload)`, where `type` is the payload's
400        type as encoded in the DSSE envelope and `payload` is the raw `bytes`
401        of the payload. No validation of either `type` or `payload` is
402        performed; users of this API **must** assert that `type` is known
403        to them before proceeding to handle `payload` in an application-dependent
404        manner.
405        """
406
407        # (1) through (6) are performed by `_verify_common_signing_cert`.
408        self._verify_common_signing_cert(bundle, policy)
409
410        # (7): verify the bundle's signature and DSSE envelope against the
411        #      signing certificate's public key.
412        envelope = bundle._dsse_envelope
413        if envelope is None:
414            raise VerificationError(
415                "cannot perform DSSE verification on a bundle without a DSSE envelope"
416            )
417
418        signing_key = bundle.signing_certificate.public_key()
419        signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
420        dsse._verify(signing_key, envelope)
421
422        # (8): verify the consistency of the log entry's body against
423        #      the other bundle materials.
424        # NOTE: This is very slightly weaker than the consistency check
425        # for hashedrekord entries, due to how inclusion is recorded for DSSE:
426        # the included entry for DSSE includes an envelope hash that we
427        # *cannot* verify, since the envelope is uncanonicalized JSON.
428        # Instead, we manually pick apart the entry body below and verify
429        # the parts we can (namely the payload hash and signature list).
430        entry = bundle.log_entry
431        try:
432            entry_body = rekor_types.Dsse.model_validate_json(
433                base64.b64decode(entry.body)
434            )
435        except ValidationError as exc:
436            raise VerificationError(f"invalid DSSE log entry: {exc}")
437
438        payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
439        if (
440            entry_body.spec.root.payload_hash.algorithm  # type: ignore[union-attr]
441            != rekor_types.dsse.Algorithm.SHA256
442        ):
443            raise VerificationError("expected SHA256 payload hash in DSSE log entry")
444        if payload_hash != entry_body.spec.root.payload_hash.value:  # type: ignore[union-attr]
445            raise VerificationError("log entry payload hash does not match bundle")
446
447        # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
448        # but we handle them just in case the signer has somehow produced multiple
449        # signatures for their envelope with the same signing key.
450        signatures = [
451            rekor_types.dsse.Signature(
452                signature=base64.b64encode(signature.sig).decode(),
453                verifier=base64_encode_pem_cert(bundle.signing_certificate),
454            )
455            for signature in envelope._inner.signatures
456        ]
457        if signatures != entry_body.spec.root.signatures:
458            raise VerificationError("log entry signatures do not match bundle")
459
460        return (envelope._inner.payload_type, envelope._inner.payload)

Verifies an bundle's DSSE envelope, returning the encapsulated payload and its content type.

This method is only for DSSE-enveloped payloads. To verify an arbitrary input against a bundle, use the verify_artifact method.

bundle is the Sigstore Bundle to both verify and verify against.

policy is the VerificationPolicy to verify against.

Returns a tuple of (type, payload), where type is the payload's type as encoded in the DSSE envelope and payload is the raw bytes of the payload. No validation of either type or payload is performed; users of this API must assert that type is known to them before proceeding to handle payload in an application-dependent manner.

def verify_artifact( self, input_: bytes | sigstore.hashes.Hashed, bundle: sigstore.models.Bundle, policy: sigstore.verify.policy.VerificationPolicy) -> None:
462    def verify_artifact(
463        self,
464        input_: bytes | Hashed,
465        bundle: Bundle,
466        policy: VerificationPolicy,
467    ) -> None:
468        """
469        Public API for verifying.
470
471        `input_` is the input to verify, either as a buffer of contents or as
472        a prehashed `Hashed` object.
473
474        `bundle` is the Sigstore `Bundle` to verify against.
475
476        `policy` is the `VerificationPolicy` to verify against.
477
478        On failure, this method raises `VerificationError`.
479        """
480
481        # (1) through (6) are performed by `_verify_common_signing_cert`.
482        self._verify_common_signing_cert(bundle, policy)
483
484        hashed_input = sha256_digest(input_)
485
486        # (7): verify that the signature was signed by the public key in the signing certificate.
487        try:
488            signing_key = bundle.signing_certificate.public_key()
489            signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
490            signing_key.verify(
491                bundle._inner.message_signature.signature,
492                hashed_input.digest,
493                ec.ECDSA(hashed_input._as_prehashed()),
494            )
495        except InvalidSignature:
496            raise VerificationError("Signature is invalid for input")
497
498        _logger.debug("Successfully verified signature...")
499
500        # (8): verify the consistency of the log entry's body against
501        #      the other bundle materials (and input being verified).
502        entry = bundle.log_entry
503
504        expected_body = _hashedrekord_from_parts(
505            bundle.signing_certificate,
506            bundle._inner.message_signature.signature,
507            hashed_input,
508        )
509        actual_body = rekor_types.Hashedrekord.model_validate_json(
510            base64.b64decode(entry.body)
511        )
512        if expected_body != actual_body:
513            raise VerificationError(
514                "transparency log entry is inconsistent with other materials"
515            )

Public API for verifying.

input_ is the input to verify, either as a buffer of contents or as a prehashed Hashed object.

bundle is the Sigstore Bundle to verify against.

policy is the VerificationPolicy to verify against.

On failure, this method raises VerificationError.