sigstore.verify.verifier

Verification API machinery.

  1# Copyright 2022 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16Verification API machinery.
 17"""
 18
 19from __future__ import annotations
 20
 21import base64
 22import logging
 23from datetime import datetime, timezone
 24from typing import List, cast
 25
 26import rekor_types
 27from cryptography.exceptions import InvalidSignature
 28from cryptography.hazmat.primitives.asymmetric import ec
 29from cryptography.x509 import ExtendedKeyUsage, KeyUsage
 30from cryptography.x509.oid import ExtendedKeyUsageOID
 31from OpenSSL.crypto import (
 32    X509,
 33    X509Store,
 34    X509StoreContext,
 35    X509StoreContextError,
 36    X509StoreFlags,
 37)
 38from pydantic import ValidationError
 39
 40from sigstore import dsse
 41from sigstore._internal.rekor import _hashedrekord_from_parts
 42from sigstore._internal.rekor.client import RekorClient
 43from sigstore._internal.sct import (
 44    _get_precertificate_signed_certificate_timestamps,
 45    verify_sct,
 46)
 47from sigstore._internal.trust import ClientTrustConfig, KeyringPurpose, TrustedRoot
 48from sigstore._utils import base64_encode_pem_cert, sha256_digest
 49from sigstore.errors import VerificationError
 50from sigstore.hashes import Hashed
 51from sigstore.models import Bundle
 52from sigstore.verify.policy import VerificationPolicy
 53
 54_logger = logging.getLogger(__name__)
 55
 56
 57class Verifier:
 58    """
 59    The primary API for verification operations.
 60    """
 61
 62    def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
 63        """
 64        Create a new `Verifier`.
 65
 66        `rekor` is a `RekorClient` capable of connecting to a Rekor instance
 67        containing logs for the file(s) being verified.
 68
 69        `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates,
 70        establishing the trust chain for the signing certificate and signature.
 71        """
 72        self._rekor = rekor
 73        self._fulcio_certificate_chain: List[X509] = [
 74            X509.from_cryptography(parent_cert)
 75            for parent_cert in trusted_root.get_fulcio_certs()
 76        ]
 77        self._trusted_root = trusted_root
 78
 79    @classmethod
 80    def production(cls) -> Verifier:
 81        """
 82        Return a `Verifier` instance configured against Sigstore's production-level services.
 83        """
 84        return cls(
 85            rekor=RekorClient.production(),
 86            trusted_root=TrustedRoot.production(),
 87        )
 88
 89    @classmethod
 90    def staging(cls) -> Verifier:
 91        """
 92        Return a `Verifier` instance configured against Sigstore's staging-level services.
 93        """
 94        return cls(
 95            rekor=RekorClient.staging(),
 96            trusted_root=TrustedRoot.staging(),
 97        )
 98
 99    @classmethod
100    def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier:
101        """
102        Create a `Verifier` from the given `ClientTrustConfig`.
103
104        @api private
105        """
106        return cls(
107            rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]),
108            trusted_root=trust_config.trusted_root,
109        )
110
111    def _verify_common_signing_cert(
112        self, bundle: Bundle, policy: VerificationPolicy
113    ) -> None:
114        """
115        Performs the signing certificate verification steps that are shared between
116        `verify_dsse` and `verify_artifact`.
117
118        Raises `VerificationError` on all failures.
119        """
120
121        # In order to verify an artifact, we need to achieve the following:
122        #
123        # 1. Verify that the signing certificate chains to the root of trust
124        #    and is valid at the time of signing.
125        # 2. Verify the signing certificate's SCT.
126        # 3. Verify that the signing certificate conforms to the Sigstore
127        #    X.509 profile as well as the passed-in `VerificationPolicy`.
128        # 4. Verify the inclusion proof and signed checkpoint for the log
129        #    entry.
130        # 5. Verify the inclusion promise for the log entry, if present.
131        # 6. Verify the timely insertion of the log entry against the validity
132        #    period for the signing certificate.
133        # 7. Verify the signature and input against the signing certificate's
134        #    public key.
135        # 8. Verify the transparency log entry's consistency against the other
136        #    materials, to prevent variants of CVE-2022-36056.
137        #
138        # This method performs steps (1) through (6) above. Its caller
139        # MUST perform steps (7) and (8) separately, since they vary based on
140        # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
141
142        cert = bundle.signing_certificate
143
144        # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
145        # method been called on it. To get around this, we construct a new one for every `verify`
146        # call.
147        store = X509Store()
148        # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
149        # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
150        # would be strictly more conformant of OpenSSL, but we currently
151        # *want* the "long" chain behavior of performing path validation
152        # down to a self-signed root.
153        store.set_flags(X509StoreFlags.X509_STRICT)
154        for parent_cert_ossl in self._fulcio_certificate_chain:
155            store.add_cert(parent_cert_ossl)
156
157        # (1): verify that the signing certificate is signed by the root
158        #      certificate and that the signing certificate was valid at the
159        #      time of signing.
160        sign_date = cert.not_valid_before_utc
161        cert_ossl = X509.from_cryptography(cert)
162
163        store.set_time(sign_date)
164        store_ctx = X509StoreContext(store, cert_ossl)
165        try:
166            # get_verified_chain returns the full chain including the end-entity certificate
167            # and chain should contain only CA certificates
168            chain = store_ctx.get_verified_chain()[1:]
169        except X509StoreContextError as e:
170            raise VerificationError(f"failed to build chain: {e}")
171
172        # (2): verify the signing certificate's SCT.
173        sct = _get_precertificate_signed_certificate_timestamps(cert)[0]
174        try:
175            verify_sct(
176                sct,
177                cert,
178                [parent_cert.to_cryptography() for parent_cert in chain],
179                self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
180            )
181        except VerificationError as e:
182            raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
183
184        # (3): verify the signing certificate against the Sigstore
185        #      X.509 profile and verify against the given `VerificationPolicy`.
186        usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
187        if not usage_ext.value.digital_signature:
188            raise VerificationError("Key usage is not of type `digital signature`")
189
190        extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
191        if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
192            raise VerificationError("Extended usage does not contain `code signing`")
193
194        policy.verify(cert)
195
196        _logger.debug("Successfully verified signing certificate validity...")
197
198        # (4): verify the inclusion proof and signed checkpoint for the
199        #      log entry.
200        # (5): verify the inclusion promise for the log entry, if present.
201        entry = bundle.log_entry
202        try:
203            entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
204        except VerificationError as exc:
205            raise VerificationError(f"invalid log entry: {exc}")
206
207        # (6): verify that log entry was integrated circa the signing certificate's
208        #      validity period.
209        integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc)
210        if not (
211            bundle.signing_certificate.not_valid_before_utc
212            <= integrated_time
213            <= bundle.signing_certificate.not_valid_after_utc
214        ):
215            raise VerificationError(
216                "invalid signing cert: expired at time of Rekor entry"
217            )
218
219    def verify_dsse(
220        self, bundle: Bundle, policy: VerificationPolicy
221    ) -> tuple[str, bytes]:
222        """
223        Verifies an bundle's DSSE envelope, returning the encapsulated payload
224        and its content type.
225
226        This method is only for DSSE-enveloped payloads. To verify
227        an arbitrary input against a bundle, use the `verify_artifact`
228        method.
229
230        `bundle` is the Sigstore `Bundle` to both verify and verify against.
231
232        `policy` is the `VerificationPolicy` to verify against.
233
234        Returns a tuple of `(type, payload)`, where `type` is the payload's
235        type as encoded in the DSSE envelope and `payload` is the raw `bytes`
236        of the payload. No validation of either `type` or `payload` is
237        performed; users of this API **must** assert that `type` is known
238        to them before proceeding to handle `payload` in an application-dependent
239        manner.
240        """
241
242        # (1) through (6) are performed by `_verify_common_signing_cert`.
243        self._verify_common_signing_cert(bundle, policy)
244
245        # (7): verify the bundle's signature and DSSE envelope against the
246        #      signing certificate's public key.
247        envelope = bundle._dsse_envelope
248        if envelope is None:
249            raise VerificationError(
250                "cannot perform DSSE verification on a bundle without a DSSE envelope"
251            )
252
253        signing_key = bundle.signing_certificate.public_key()
254        signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
255        dsse._verify(signing_key, envelope)
256
257        # (8): verify the consistency of the log entry's body against
258        #      the other bundle materials.
259        # NOTE: This is very slightly weaker than the consistency check
260        # for hashedrekord entries, due to how inclusion is recorded for DSSE:
261        # the included entry for DSSE includes an envelope hash that we
262        # *cannot* verify, since the envelope is uncanonicalized JSON.
263        # Instead, we manually pick apart the entry body below and verify
264        # the parts we can (namely the payload hash and signature list).
265        entry = bundle.log_entry
266        try:
267            entry_body = rekor_types.Dsse.model_validate_json(
268                base64.b64decode(entry.body)
269            )
270        except ValidationError as exc:
271            raise VerificationError(f"invalid DSSE log entry: {exc}")
272
273        payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
274        if (
275            entry_body.spec.root.payload_hash.algorithm  # type: ignore[union-attr]
276            != rekor_types.dsse.Algorithm.SHA256
277        ):
278            raise VerificationError("expected SHA256 payload hash in DSSE log entry")
279        if payload_hash != entry_body.spec.root.payload_hash.value:  # type: ignore[union-attr]
280            raise VerificationError("log entry payload hash does not match bundle")
281
282        # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
283        # but we handle them just in case the signer has somehow produced multiple
284        # signatures for their envelope with the same signing key.
285        signatures = [
286            rekor_types.dsse.Signature(
287                signature=base64.b64encode(signature.sig).decode(),
288                verifier=base64_encode_pem_cert(bundle.signing_certificate),
289            )
290            for signature in envelope._inner.signatures
291        ]
292        if signatures != entry_body.spec.root.signatures:
293            raise VerificationError("log entry signatures do not match bundle")
294
295        return (envelope._inner.payload_type, envelope._inner.payload)
296
297    def verify_artifact(
298        self,
299        input_: bytes | Hashed,
300        bundle: Bundle,
301        policy: VerificationPolicy,
302    ) -> None:
303        """
304        Public API for verifying.
305
306        `input_` is the input to verify, either as a buffer of contents or as
307        a prehashed `Hashed` object.
308
309        `bundle` is the Sigstore `Bundle` to verify against.
310
311        `policy` is the `VerificationPolicy` to verify against.
312
313        On failure, this method raises `VerificationError`.
314        """
315
316        # (1) through (6) are performed by `_verify_common_signing_cert`.
317        self._verify_common_signing_cert(bundle, policy)
318
319        hashed_input = sha256_digest(input_)
320
321        # (7): verify that the signature was signed by the public key in the signing certificate.
322        try:
323            signing_key = bundle.signing_certificate.public_key()
324            signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
325            signing_key.verify(
326                bundle._inner.message_signature.signature,
327                hashed_input.digest,
328                ec.ECDSA(hashed_input._as_prehashed()),
329            )
330        except InvalidSignature:
331            raise VerificationError("Signature is invalid for input")
332
333        _logger.debug("Successfully verified signature...")
334
335        # (8): verify the consistency of the log entry's body against
336        #      the other bundle materials (and input being verified).
337        entry = bundle.log_entry
338
339        expected_body = _hashedrekord_from_parts(
340            bundle.signing_certificate,
341            bundle._inner.message_signature.signature,
342            hashed_input,
343        )
344        actual_body = rekor_types.Hashedrekord.model_validate_json(
345            base64.b64decode(entry.body)
346        )
347        if expected_body != actual_body:
348            raise VerificationError(
349                "transparency log entry is inconsistent with other materials"
350            )
class Verifier:
 58class Verifier:
 59    """
 60    The primary API for verification operations.
 61    """
 62
 63    def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
 64        """
 65        Create a new `Verifier`.
 66
 67        `rekor` is a `RekorClient` capable of connecting to a Rekor instance
 68        containing logs for the file(s) being verified.
 69
 70        `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates,
 71        establishing the trust chain for the signing certificate and signature.
 72        """
 73        self._rekor = rekor
 74        self._fulcio_certificate_chain: List[X509] = [
 75            X509.from_cryptography(parent_cert)
 76            for parent_cert in trusted_root.get_fulcio_certs()
 77        ]
 78        self._trusted_root = trusted_root
 79
 80    @classmethod
 81    def production(cls) -> Verifier:
 82        """
 83        Return a `Verifier` instance configured against Sigstore's production-level services.
 84        """
 85        return cls(
 86            rekor=RekorClient.production(),
 87            trusted_root=TrustedRoot.production(),
 88        )
 89
 90    @classmethod
 91    def staging(cls) -> Verifier:
 92        """
 93        Return a `Verifier` instance configured against Sigstore's staging-level services.
 94        """
 95        return cls(
 96            rekor=RekorClient.staging(),
 97            trusted_root=TrustedRoot.staging(),
 98        )
 99
100    @classmethod
101    def _from_trust_config(cls, trust_config: ClientTrustConfig) -> Verifier:
102        """
103        Create a `Verifier` from the given `ClientTrustConfig`.
104
105        @api private
106        """
107        return cls(
108            rekor=RekorClient(trust_config._inner.signing_config.tlog_urls[0]),
109            trusted_root=trust_config.trusted_root,
110        )
111
112    def _verify_common_signing_cert(
113        self, bundle: Bundle, policy: VerificationPolicy
114    ) -> None:
115        """
116        Performs the signing certificate verification steps that are shared between
117        `verify_dsse` and `verify_artifact`.
118
119        Raises `VerificationError` on all failures.
120        """
121
122        # In order to verify an artifact, we need to achieve the following:
123        #
124        # 1. Verify that the signing certificate chains to the root of trust
125        #    and is valid at the time of signing.
126        # 2. Verify the signing certificate's SCT.
127        # 3. Verify that the signing certificate conforms to the Sigstore
128        #    X.509 profile as well as the passed-in `VerificationPolicy`.
129        # 4. Verify the inclusion proof and signed checkpoint for the log
130        #    entry.
131        # 5. Verify the inclusion promise for the log entry, if present.
132        # 6. Verify the timely insertion of the log entry against the validity
133        #    period for the signing certificate.
134        # 7. Verify the signature and input against the signing certificate's
135        #    public key.
136        # 8. Verify the transparency log entry's consistency against the other
137        #    materials, to prevent variants of CVE-2022-36056.
138        #
139        # This method performs steps (1) through (6) above. Its caller
140        # MUST perform steps (7) and (8) separately, since they vary based on
141        # the kind of verification being performed (i.e. hashedrekord, DSSE, etc.)
142
143        cert = bundle.signing_certificate
144
145        # NOTE: The `X509Store` object currently cannot have its time reset once the `set_time`
146        # method been called on it. To get around this, we construct a new one for every `verify`
147        # call.
148        store = X509Store()
149        # NOTE: By explicitly setting the flags here, we ensure that OpenSSL's
150        # PARTIAL_CHAIN default does not change on us. Enabling PARTIAL_CHAIN
151        # would be strictly more conformant of OpenSSL, but we currently
152        # *want* the "long" chain behavior of performing path validation
153        # down to a self-signed root.
154        store.set_flags(X509StoreFlags.X509_STRICT)
155        for parent_cert_ossl in self._fulcio_certificate_chain:
156            store.add_cert(parent_cert_ossl)
157
158        # (1): verify that the signing certificate is signed by the root
159        #      certificate and that the signing certificate was valid at the
160        #      time of signing.
161        sign_date = cert.not_valid_before_utc
162        cert_ossl = X509.from_cryptography(cert)
163
164        store.set_time(sign_date)
165        store_ctx = X509StoreContext(store, cert_ossl)
166        try:
167            # get_verified_chain returns the full chain including the end-entity certificate
168            # and chain should contain only CA certificates
169            chain = store_ctx.get_verified_chain()[1:]
170        except X509StoreContextError as e:
171            raise VerificationError(f"failed to build chain: {e}")
172
173        # (2): verify the signing certificate's SCT.
174        sct = _get_precertificate_signed_certificate_timestamps(cert)[0]
175        try:
176            verify_sct(
177                sct,
178                cert,
179                [parent_cert.to_cryptography() for parent_cert in chain],
180                self._trusted_root.ct_keyring(KeyringPurpose.VERIFY),
181            )
182        except VerificationError as e:
183            raise VerificationError(f"failed to verify SCT on signing certificate: {e}")
184
185        # (3): verify the signing certificate against the Sigstore
186        #      X.509 profile and verify against the given `VerificationPolicy`.
187        usage_ext = cert.extensions.get_extension_for_class(KeyUsage)
188        if not usage_ext.value.digital_signature:
189            raise VerificationError("Key usage is not of type `digital signature`")
190
191        extended_usage_ext = cert.extensions.get_extension_for_class(ExtendedKeyUsage)
192        if ExtendedKeyUsageOID.CODE_SIGNING not in extended_usage_ext.value:
193            raise VerificationError("Extended usage does not contain `code signing`")
194
195        policy.verify(cert)
196
197        _logger.debug("Successfully verified signing certificate validity...")
198
199        # (4): verify the inclusion proof and signed checkpoint for the
200        #      log entry.
201        # (5): verify the inclusion promise for the log entry, if present.
202        entry = bundle.log_entry
203        try:
204            entry._verify(self._trusted_root.rekor_keyring(KeyringPurpose.VERIFY))
205        except VerificationError as exc:
206            raise VerificationError(f"invalid log entry: {exc}")
207
208        # (6): verify that log entry was integrated circa the signing certificate's
209        #      validity period.
210        integrated_time = datetime.fromtimestamp(entry.integrated_time, tz=timezone.utc)
211        if not (
212            bundle.signing_certificate.not_valid_before_utc
213            <= integrated_time
214            <= bundle.signing_certificate.not_valid_after_utc
215        ):
216            raise VerificationError(
217                "invalid signing cert: expired at time of Rekor entry"
218            )
219
220    def verify_dsse(
221        self, bundle: Bundle, policy: VerificationPolicy
222    ) -> tuple[str, bytes]:
223        """
224        Verifies an bundle's DSSE envelope, returning the encapsulated payload
225        and its content type.
226
227        This method is only for DSSE-enveloped payloads. To verify
228        an arbitrary input against a bundle, use the `verify_artifact`
229        method.
230
231        `bundle` is the Sigstore `Bundle` to both verify and verify against.
232
233        `policy` is the `VerificationPolicy` to verify against.
234
235        Returns a tuple of `(type, payload)`, where `type` is the payload's
236        type as encoded in the DSSE envelope and `payload` is the raw `bytes`
237        of the payload. No validation of either `type` or `payload` is
238        performed; users of this API **must** assert that `type` is known
239        to them before proceeding to handle `payload` in an application-dependent
240        manner.
241        """
242
243        # (1) through (6) are performed by `_verify_common_signing_cert`.
244        self._verify_common_signing_cert(bundle, policy)
245
246        # (7): verify the bundle's signature and DSSE envelope against the
247        #      signing certificate's public key.
248        envelope = bundle._dsse_envelope
249        if envelope is None:
250            raise VerificationError(
251                "cannot perform DSSE verification on a bundle without a DSSE envelope"
252            )
253
254        signing_key = bundle.signing_certificate.public_key()
255        signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
256        dsse._verify(signing_key, envelope)
257
258        # (8): verify the consistency of the log entry's body against
259        #      the other bundle materials.
260        # NOTE: This is very slightly weaker than the consistency check
261        # for hashedrekord entries, due to how inclusion is recorded for DSSE:
262        # the included entry for DSSE includes an envelope hash that we
263        # *cannot* verify, since the envelope is uncanonicalized JSON.
264        # Instead, we manually pick apart the entry body below and verify
265        # the parts we can (namely the payload hash and signature list).
266        entry = bundle.log_entry
267        try:
268            entry_body = rekor_types.Dsse.model_validate_json(
269                base64.b64decode(entry.body)
270            )
271        except ValidationError as exc:
272            raise VerificationError(f"invalid DSSE log entry: {exc}")
273
274        payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
275        if (
276            entry_body.spec.root.payload_hash.algorithm  # type: ignore[union-attr]
277            != rekor_types.dsse.Algorithm.SHA256
278        ):
279            raise VerificationError("expected SHA256 payload hash in DSSE log entry")
280        if payload_hash != entry_body.spec.root.payload_hash.value:  # type: ignore[union-attr]
281            raise VerificationError("log entry payload hash does not match bundle")
282
283        # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
284        # but we handle them just in case the signer has somehow produced multiple
285        # signatures for their envelope with the same signing key.
286        signatures = [
287            rekor_types.dsse.Signature(
288                signature=base64.b64encode(signature.sig).decode(),
289                verifier=base64_encode_pem_cert(bundle.signing_certificate),
290            )
291            for signature in envelope._inner.signatures
292        ]
293        if signatures != entry_body.spec.root.signatures:
294            raise VerificationError("log entry signatures do not match bundle")
295
296        return (envelope._inner.payload_type, envelope._inner.payload)
297
298    def verify_artifact(
299        self,
300        input_: bytes | Hashed,
301        bundle: Bundle,
302        policy: VerificationPolicy,
303    ) -> None:
304        """
305        Public API for verifying.
306
307        `input_` is the input to verify, either as a buffer of contents or as
308        a prehashed `Hashed` object.
309
310        `bundle` is the Sigstore `Bundle` to verify against.
311
312        `policy` is the `VerificationPolicy` to verify against.
313
314        On failure, this method raises `VerificationError`.
315        """
316
317        # (1) through (6) are performed by `_verify_common_signing_cert`.
318        self._verify_common_signing_cert(bundle, policy)
319
320        hashed_input = sha256_digest(input_)
321
322        # (7): verify that the signature was signed by the public key in the signing certificate.
323        try:
324            signing_key = bundle.signing_certificate.public_key()
325            signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
326            signing_key.verify(
327                bundle._inner.message_signature.signature,
328                hashed_input.digest,
329                ec.ECDSA(hashed_input._as_prehashed()),
330            )
331        except InvalidSignature:
332            raise VerificationError("Signature is invalid for input")
333
334        _logger.debug("Successfully verified signature...")
335
336        # (8): verify the consistency of the log entry's body against
337        #      the other bundle materials (and input being verified).
338        entry = bundle.log_entry
339
340        expected_body = _hashedrekord_from_parts(
341            bundle.signing_certificate,
342            bundle._inner.message_signature.signature,
343            hashed_input,
344        )
345        actual_body = rekor_types.Hashedrekord.model_validate_json(
346            base64.b64decode(entry.body)
347        )
348        if expected_body != actual_body:
349            raise VerificationError(
350                "transparency log entry is inconsistent with other materials"
351            )

The primary API for verification operations.

Verifier( *, rekor: sigstore._internal.rekor.client.RekorClient, trusted_root: sigstore._internal.trust.TrustedRoot)
63    def __init__(self, *, rekor: RekorClient, trusted_root: TrustedRoot):
64        """
65        Create a new `Verifier`.
66
67        `rekor` is a `RekorClient` capable of connecting to a Rekor instance
68        containing logs for the file(s) being verified.
69
70        `fulcio_certificate_chain` is a list of PEM-encoded X.509 certificates,
71        establishing the trust chain for the signing certificate and signature.
72        """
73        self._rekor = rekor
74        self._fulcio_certificate_chain: List[X509] = [
75            X509.from_cryptography(parent_cert)
76            for parent_cert in trusted_root.get_fulcio_certs()
77        ]
78        self._trusted_root = trusted_root

Create a new Verifier.

rekor is a RekorClient capable of connecting to a Rekor instance containing logs for the file(s) being verified.

fulcio_certificate_chain is a list of PEM-encoded X.509 certificates, establishing the trust chain for the signing certificate and signature.

@classmethod
def production(cls) -> Verifier:
80    @classmethod
81    def production(cls) -> Verifier:
82        """
83        Return a `Verifier` instance configured against Sigstore's production-level services.
84        """
85        return cls(
86            rekor=RekorClient.production(),
87            trusted_root=TrustedRoot.production(),
88        )

Return a Verifier instance configured against Sigstore's production-level services.

@classmethod
def staging(cls) -> Verifier:
90    @classmethod
91    def staging(cls) -> Verifier:
92        """
93        Return a `Verifier` instance configured against Sigstore's staging-level services.
94        """
95        return cls(
96            rekor=RekorClient.staging(),
97            trusted_root=TrustedRoot.staging(),
98        )

Return a Verifier instance configured against Sigstore's staging-level services.

def verify_dsse( self, bundle: sigstore.models.Bundle, policy: sigstore.verify.policy.VerificationPolicy) -> tuple[str, bytes]:
220    def verify_dsse(
221        self, bundle: Bundle, policy: VerificationPolicy
222    ) -> tuple[str, bytes]:
223        """
224        Verifies an bundle's DSSE envelope, returning the encapsulated payload
225        and its content type.
226
227        This method is only for DSSE-enveloped payloads. To verify
228        an arbitrary input against a bundle, use the `verify_artifact`
229        method.
230
231        `bundle` is the Sigstore `Bundle` to both verify and verify against.
232
233        `policy` is the `VerificationPolicy` to verify against.
234
235        Returns a tuple of `(type, payload)`, where `type` is the payload's
236        type as encoded in the DSSE envelope and `payload` is the raw `bytes`
237        of the payload. No validation of either `type` or `payload` is
238        performed; users of this API **must** assert that `type` is known
239        to them before proceeding to handle `payload` in an application-dependent
240        manner.
241        """
242
243        # (1) through (6) are performed by `_verify_common_signing_cert`.
244        self._verify_common_signing_cert(bundle, policy)
245
246        # (7): verify the bundle's signature and DSSE envelope against the
247        #      signing certificate's public key.
248        envelope = bundle._dsse_envelope
249        if envelope is None:
250            raise VerificationError(
251                "cannot perform DSSE verification on a bundle without a DSSE envelope"
252            )
253
254        signing_key = bundle.signing_certificate.public_key()
255        signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
256        dsse._verify(signing_key, envelope)
257
258        # (8): verify the consistency of the log entry's body against
259        #      the other bundle materials.
260        # NOTE: This is very slightly weaker than the consistency check
261        # for hashedrekord entries, due to how inclusion is recorded for DSSE:
262        # the included entry for DSSE includes an envelope hash that we
263        # *cannot* verify, since the envelope is uncanonicalized JSON.
264        # Instead, we manually pick apart the entry body below and verify
265        # the parts we can (namely the payload hash and signature list).
266        entry = bundle.log_entry
267        try:
268            entry_body = rekor_types.Dsse.model_validate_json(
269                base64.b64decode(entry.body)
270            )
271        except ValidationError as exc:
272            raise VerificationError(f"invalid DSSE log entry: {exc}")
273
274        payload_hash = sha256_digest(envelope._inner.payload).digest.hex()
275        if (
276            entry_body.spec.root.payload_hash.algorithm  # type: ignore[union-attr]
277            != rekor_types.dsse.Algorithm.SHA256
278        ):
279            raise VerificationError("expected SHA256 payload hash in DSSE log entry")
280        if payload_hash != entry_body.spec.root.payload_hash.value:  # type: ignore[union-attr]
281            raise VerificationError("log entry payload hash does not match bundle")
282
283        # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here,
284        # but we handle them just in case the signer has somehow produced multiple
285        # signatures for their envelope with the same signing key.
286        signatures = [
287            rekor_types.dsse.Signature(
288                signature=base64.b64encode(signature.sig).decode(),
289                verifier=base64_encode_pem_cert(bundle.signing_certificate),
290            )
291            for signature in envelope._inner.signatures
292        ]
293        if signatures != entry_body.spec.root.signatures:
294            raise VerificationError("log entry signatures do not match bundle")
295
296        return (envelope._inner.payload_type, envelope._inner.payload)

Verifies an bundle's DSSE envelope, returning the encapsulated payload and its content type.

This method is only for DSSE-enveloped payloads. To verify an arbitrary input against a bundle, use the verify_artifact method.

bundle is the Sigstore Bundle to both verify and verify against.

policy is the VerificationPolicy to verify against.

Returns a tuple of (type, payload), where type is the payload's type as encoded in the DSSE envelope and payload is the raw bytes of the payload. No validation of either type or payload is performed; users of this API must assert that type is known to them before proceeding to handle payload in an application-dependent manner.

def verify_artifact( self, input_: bytes | sigstore.hashes.Hashed, bundle: sigstore.models.Bundle, policy: sigstore.verify.policy.VerificationPolicy) -> None:
298    def verify_artifact(
299        self,
300        input_: bytes | Hashed,
301        bundle: Bundle,
302        policy: VerificationPolicy,
303    ) -> None:
304        """
305        Public API for verifying.
306
307        `input_` is the input to verify, either as a buffer of contents or as
308        a prehashed `Hashed` object.
309
310        `bundle` is the Sigstore `Bundle` to verify against.
311
312        `policy` is the `VerificationPolicy` to verify against.
313
314        On failure, this method raises `VerificationError`.
315        """
316
317        # (1) through (6) are performed by `_verify_common_signing_cert`.
318        self._verify_common_signing_cert(bundle, policy)
319
320        hashed_input = sha256_digest(input_)
321
322        # (7): verify that the signature was signed by the public key in the signing certificate.
323        try:
324            signing_key = bundle.signing_certificate.public_key()
325            signing_key = cast(ec.EllipticCurvePublicKey, signing_key)
326            signing_key.verify(
327                bundle._inner.message_signature.signature,
328                hashed_input.digest,
329                ec.ECDSA(hashed_input._as_prehashed()),
330            )
331        except InvalidSignature:
332            raise VerificationError("Signature is invalid for input")
333
334        _logger.debug("Successfully verified signature...")
335
336        # (8): verify the consistency of the log entry's body against
337        #      the other bundle materials (and input being verified).
338        entry = bundle.log_entry
339
340        expected_body = _hashedrekord_from_parts(
341            bundle.signing_certificate,
342            bundle._inner.message_signature.signature,
343            hashed_input,
344        )
345        actual_body = rekor_types.Hashedrekord.model_validate_json(
346            base64.b64decode(entry.body)
347        )
348        if expected_body != actual_body:
349            raise VerificationError(
350                "transparency log entry is inconsistent with other materials"
351            )

Public API for verifying.

input_ is the input to verify, either as a buffer of contents or as a prehashed Hashed object.

bundle is the Sigstore Bundle to verify against.

policy is the VerificationPolicy to verify against.

On failure, this method raises VerificationError.