sigstore._internal.trust

Client trust configuration and trust root management for sigstore-python.

  1# Copyright 2023 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16Client trust configuration and trust root management for sigstore-python.
 17"""
 18
 19from __future__ import annotations
 20
 21from dataclasses import dataclass
 22from datetime import datetime, timezone
 23from enum import Enum
 24from pathlib import Path
 25from typing import ClassVar, Iterable, List, NewType
 26
 27import cryptography.hazmat.primitives.asymmetric.padding as padding
 28from cryptography.exceptions import InvalidSignature
 29from cryptography.hazmat.primitives import hashes
 30from cryptography.hazmat.primitives.asymmetric import ec, rsa
 31from cryptography.x509 import (
 32    Certificate,
 33    load_der_x509_certificate,
 34)
 35from sigstore_protobuf_specs.dev.sigstore.common.v1 import PublicKey as _PublicKey
 36from sigstore_protobuf_specs.dev.sigstore.common.v1 import (
 37    PublicKeyDetails as _PublicKeyDetails,
 38)
 39from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
 40from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
 41    CertificateAuthority as _CertificateAuthority,
 42)
 43from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
 44    ClientTrustConfig as _ClientTrustConfig,
 45)
 46from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
 47    TransparencyLogInstance,
 48)
 49from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
 50    TrustedRoot as _TrustedRoot,
 51)
 52
 53from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
 54from sigstore._utils import (
 55    KeyID,
 56    PublicKey,
 57    key_id,
 58    load_der_public_key,
 59)
 60from sigstore.errors import Error, MetadataError, VerificationError
 61
 62
 63def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
 64    """
 65    Given a `period`, checks that the the current time is not before `start`. If
 66    `allow_expired` is `False`, also checks that the current time is not after
 67    `end`.
 68    """
 69    now = datetime.now(timezone.utc)
 70
 71    # If there was no validity period specified, the key is always valid.
 72    if not period:
 73        return True
 74
 75    # Active: if the current time is before the starting period, we are not yet
 76    # valid.
 77    if now < period.start:
 78        return False
 79
 80    # If we want Expired keys, the key is valid at this point. Otherwise, check
 81    # that we are within range.
 82    return allow_expired or (period.end is None or now <= period.end)
 83
 84
 85@dataclass(init=False)
 86class Key:
 87    """
 88    Represents a key in a `Keyring`.
 89    """
 90
 91    hash_algorithm: hashes.HashAlgorithm
 92    key: PublicKey
 93    key_id: KeyID
 94
 95    _RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = {
 96        _PublicKeyDetails.PKCS1_RSA_PKCS1V5,
 97        _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256,
 98        _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256,
 99        _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256,
100    }
101
102    _EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = {
103        _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(),
104        _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(),
105        _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(),
106    }
107
108    def __init__(self, public_key: _PublicKey) -> None:
109        """
110        Construct a key from the given Sigstore PublicKey message.
111        """
112
113        # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message,
114        # for unclear reasons.
115        if not public_key.raw_bytes:
116            raise VerificationError("public key is empty")
117
118        hash_algorithm: hashes.HashAlgorithm
119        if public_key.key_details in self._RSA_SHA_256_DETAILS:
120            hash_algorithm = hashes.SHA256()
121            key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,))
122        elif public_key.key_details in self._EC_DETAILS_TO_HASH:
123            hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details]
124            key = load_der_public_key(
125                public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,)
126            )
127        else:
128            raise VerificationError(f"unsupported key type: {public_key.key_details}")
129
130        self.hash_algorithm = hash_algorithm
131        self.key = key
132        self.key_id = key_id(key)
133
134    def verify(self, signature: bytes, data: bytes) -> None:
135        """
136        Verifies the given `data` against `signature` using the current key.
137        """
138        if isinstance(self.key, rsa.RSAPublicKey):
139            self.key.verify(
140                signature=signature,
141                data=data,
142                # TODO: Parametrize this as well, for PSS.
143                padding=padding.PKCS1v15(),
144                algorithm=self.hash_algorithm,
145            )
146        elif isinstance(self.key, ec.EllipticCurvePublicKey):
147            self.key.verify(
148                signature=signature,
149                data=data,
150                signature_algorithm=ec.ECDSA(self.hash_algorithm),
151            )
152        else:
153            # Unreachable without API misuse.
154            raise VerificationError(f"keyring: unsupported key: {self.key}")
155
156
157class Keyring:
158    """
159    Represents a set of keys, each of which is a potentially valid verifier.
160    """
161
162    def __init__(self, public_keys: List[_PublicKey] = []):
163        """
164        Create a new `Keyring`, with `keys` as the initial set of verifying keys.
165        """
166        self._keyring: dict[KeyID, Key] = {}
167
168        for public_key in public_keys:
169            key = Key(public_key)
170            self._keyring[key.key_id] = key
171
172    def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None:
173        """
174        Verify that `signature` is a valid signature for `data`, using the
175        key identified by `key_id`.
176
177        `key_id` is an unauthenticated hint; if no key matches the given key ID,
178        all keys in the keyring are tried.
179
180        Raises if the signature is invalid, i.e. is not valid for any of the
181        keys in the keyring.
182        """
183
184        key = self._keyring.get(key_id)
185        if key is not None:
186            candidates = [key]
187        else:
188            candidates = list(self._keyring.values())
189
190        # Try to verify each candidate key. In the happy case, this will
191        # be exactly one candidate.
192        valid = False
193        for candidate in candidates:
194            try:
195                candidate.verify(signature, data)
196                valid = True
197                break
198            except InvalidSignature:
199                pass
200
201        if not valid:
202            raise VerificationError("keyring: invalid signature")
203
204
205RekorKeyring = NewType("RekorKeyring", Keyring)
206CTKeyring = NewType("CTKeyring", Keyring)
207
208
209class KeyringPurpose(str, Enum):
210    """
211    Keyring purpose typing
212    """
213
214    SIGN = "sign"
215    VERIFY = "verify"
216
217    def __str__(self) -> str:
218        """Returns the purpose string value."""
219        return self.value
220
221
222class CertificateAuthority:
223    """
224    Certificate Authority used in a Trusted Root configuration.
225    """
226
227    def __init__(self, inner: _CertificateAuthority):
228        """
229        Construct a new `CertificateAuthority`.
230
231        @api private
232        """
233        self._inner = inner
234        self._certificates: list[Certificate] = []
235        self._verify()
236
237    @classmethod
238    def from_json(cls, path: str) -> CertificateAuthority:
239        """
240        Create a CertificateAuthority directly from JSON.
241        """
242        inner = _CertificateAuthority().from_json(Path(path).read_bytes())
243        return cls(inner)
244
245    def _verify(self) -> None:
246        """
247        Verify and load the certificate authority.
248        """
249        self._certificates = [
250            load_der_x509_certificate(cert.raw_bytes)
251            for cert in self._inner.cert_chain.certificates
252        ]
253
254        if not self._certificates:
255            raise Error("missing a certificate in Certificate Authority")
256
257    @property
258    def validity_period_start(self) -> datetime | None:
259        """
260        Validity period start.
261        """
262        return self._inner.valid_for.start
263
264    @property
265    def validity_period_end(self) -> datetime | None:
266        """
267        Validity period end.
268        """
269        return self._inner.valid_for.end
270
271    def certificates(self, *, allow_expired: bool) -> list[Certificate]:
272        """
273        Return a list of certificates in the authority chain.
274
275        The certificates are returned in order from leaf to root, with any
276        intermediate certificates in between.
277        """
278        if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired):
279            return []
280        return self._certificates
281
282
283class TrustedRoot:
284    """
285    The cryptographic root(s) of trust for a Sigstore instance.
286    """
287
288    class TrustedRootType(str, Enum):
289        """
290        Known Sigstore trusted root media types.
291        """
292
293        TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1"
294
295        def __str__(self) -> str:
296            """Returns the variant's string value."""
297            return self.value
298
299    def __init__(self, inner: _TrustedRoot):
300        """
301        Construct a new `TrustedRoot`.
302
303        @api private
304        """
305        self._inner = inner
306        self._verify()
307
308    def _verify(self) -> None:
309        """
310        Performs various feats of heroism to ensure that the trusted root
311        is well-formed.
312        """
313
314        # The trusted root must have a recognized media type.
315        try:
316            TrustedRoot.TrustedRootType(self._inner.media_type)
317        except ValueError:
318            raise Error(f"unsupported trusted root format: {self._inner.media_type}")
319
320    @classmethod
321    def from_file(
322        cls,
323        path: str,
324    ) -> TrustedRoot:
325        """Create a new trust root from file"""
326        inner = _TrustedRoot().from_json(Path(path).read_bytes())
327        return cls(inner)
328
329    @classmethod
330    def from_tuf(
331        cls,
332        url: str,
333        offline: bool = False,
334    ) -> TrustedRoot:
335        """Create a new trust root from a TUF repository.
336
337        If `offline`, will use trust root in local TUF cache. Otherwise will
338        update the trust root from remote TUF repository.
339        """
340        path = TrustUpdater(url, offline).get_trusted_root_path()
341        return cls.from_file(path)
342
343    @classmethod
344    def production(
345        cls,
346        offline: bool = False,
347    ) -> TrustedRoot:
348        """Create new trust root from Sigstore production TUF repository.
349
350        If `offline`, will use trust root in local TUF cache. Otherwise will
351        update the trust root from remote TUF repository.
352        """
353        return cls.from_tuf(DEFAULT_TUF_URL, offline)
354
355    @classmethod
356    def staging(
357        cls,
358        offline: bool = False,
359    ) -> TrustedRoot:
360        """Create new trust root from Sigstore staging TUF repository.
361
362        If `offline`, will use trust root in local TUF cache. Otherwise will
363        update the trust root from remote TUF repository.
364        """
365        return cls.from_tuf(STAGING_TUF_URL, offline)
366
367    def _get_tlog_keys(
368        self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose
369    ) -> Iterable[_PublicKey]:
370        """
371        Yields an iterator of public keys for transparency log instances that
372        are suitable for `purpose`.
373        """
374        allow_expired = purpose is KeyringPurpose.VERIFY
375        for tlog in tlogs:
376            if not _is_timerange_valid(
377                tlog.public_key.valid_for, allow_expired=allow_expired
378            ):
379                continue
380
381            yield tlog.public_key
382
383    def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
384        """Return keyring with keys for Rekor."""
385
386        keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose))
387        if len(keys) != 1:
388            raise MetadataError("Did not find one Rekor key in trusted root")
389        return RekorKeyring(Keyring(keys))
390
391    def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring:
392        """Return keyring with key for CTFE."""
393        ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose))
394        if not ctfes:
395            raise MetadataError("CTFE keys not found in trusted root")
396        return CTKeyring(Keyring(ctfes))
397
398    def get_fulcio_certs(self) -> list[Certificate]:
399        """Return the Fulcio certificates."""
400
401        certs: list[Certificate] = []
402
403        # Return expired certificates too: they are expired now but may have
404        # been active when the certificate was used to sign.
405        for authority in self._inner.certificate_authorities:
406            certificate_authority = CertificateAuthority(authority)
407            certs.extend(certificate_authority.certificates(allow_expired=True))
408
409        if not certs:
410            raise MetadataError("Fulcio certificates not found in trusted root")
411        return certs
412
413    def get_timestamp_authorities(self) -> list[CertificateAuthority]:
414        """
415        Return the TSA present in the trusted root.
416
417        This list may be empty and in this case, no timestamp verification can be
418        performed.
419        """
420        certificate_authorities: list[CertificateAuthority] = [
421            CertificateAuthority(cert_chain)
422            for cert_chain in self._inner.timestamp_authorities
423        ]
424        return certificate_authorities
425
426
427class ClientTrustConfig:
428    """
429    Represents a Sigstore client's trust configuration, including a root of trust.
430    """
431
432    class ClientTrustConfigType(str, Enum):
433        """
434        Known Sigstore client trust config media types.
435        """
436
437        CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json"
438
439        def __str__(self) -> str:
440            """Returns the variant's string value."""
441            return self.value
442
443    @classmethod
444    def from_json(cls, raw: str) -> ClientTrustConfig:
445        """
446        Deserialize the given client trust config.
447        """
448        inner = _ClientTrustConfig().from_json(raw)
449        return cls(inner)
450
451    def __init__(self, inner: _ClientTrustConfig) -> None:
452        """
453        @api private
454        """
455        self._inner = inner
456        self._verify()
457
458    def _verify(self) -> None:
459        """
460        Performs various feats of heroism to ensure that the client trust config
461        is well-formed.
462        """
463
464        # The client trust config must have a recognized media type.
465        try:
466            ClientTrustConfig.ClientTrustConfigType(self._inner.media_type)
467        except ValueError:
468            raise Error(
469                f"unsupported client trust config format: {self._inner.media_type}"
470            )
471
472    @property
473    def trusted_root(self) -> TrustedRoot:
474        """
475        Return the interior root of trust, as a `TrustedRoot`.
476        """
477        return TrustedRoot(self._inner.trusted_root)
@dataclass(init=False)
class Key:
 86@dataclass(init=False)
 87class Key:
 88    """
 89    Represents a key in a `Keyring`.
 90    """
 91
 92    hash_algorithm: hashes.HashAlgorithm
 93    key: PublicKey
 94    key_id: KeyID
 95
 96    _RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = {
 97        _PublicKeyDetails.PKCS1_RSA_PKCS1V5,
 98        _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256,
 99        _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256,
100        _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256,
101    }
102
103    _EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = {
104        _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(),
105        _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(),
106        _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(),
107    }
108
109    def __init__(self, public_key: _PublicKey) -> None:
110        """
111        Construct a key from the given Sigstore PublicKey message.
112        """
113
114        # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message,
115        # for unclear reasons.
116        if not public_key.raw_bytes:
117            raise VerificationError("public key is empty")
118
119        hash_algorithm: hashes.HashAlgorithm
120        if public_key.key_details in self._RSA_SHA_256_DETAILS:
121            hash_algorithm = hashes.SHA256()
122            key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,))
123        elif public_key.key_details in self._EC_DETAILS_TO_HASH:
124            hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details]
125            key = load_der_public_key(
126                public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,)
127            )
128        else:
129            raise VerificationError(f"unsupported key type: {public_key.key_details}")
130
131        self.hash_algorithm = hash_algorithm
132        self.key = key
133        self.key_id = key_id(key)
134
135    def verify(self, signature: bytes, data: bytes) -> None:
136        """
137        Verifies the given `data` against `signature` using the current key.
138        """
139        if isinstance(self.key, rsa.RSAPublicKey):
140            self.key.verify(
141                signature=signature,
142                data=data,
143                # TODO: Parametrize this as well, for PSS.
144                padding=padding.PKCS1v15(),
145                algorithm=self.hash_algorithm,
146            )
147        elif isinstance(self.key, ec.EllipticCurvePublicKey):
148            self.key.verify(
149                signature=signature,
150                data=data,
151                signature_algorithm=ec.ECDSA(self.hash_algorithm),
152            )
153        else:
154            # Unreachable without API misuse.
155            raise VerificationError(f"keyring: unsupported key: {self.key}")

Represents a key in a Keyring.

Key(public_key: sigstore_protobuf_specs.dev.sigstore.common.v1.PublicKey)
109    def __init__(self, public_key: _PublicKey) -> None:
110        """
111        Construct a key from the given Sigstore PublicKey message.
112        """
113
114        # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message,
115        # for unclear reasons.
116        if not public_key.raw_bytes:
117            raise VerificationError("public key is empty")
118
119        hash_algorithm: hashes.HashAlgorithm
120        if public_key.key_details in self._RSA_SHA_256_DETAILS:
121            hash_algorithm = hashes.SHA256()
122            key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,))
123        elif public_key.key_details in self._EC_DETAILS_TO_HASH:
124            hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details]
125            key = load_der_public_key(
126                public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,)
127            )
128        else:
129            raise VerificationError(f"unsupported key type: {public_key.key_details}")
130
131        self.hash_algorithm = hash_algorithm
132        self.key = key
133        self.key_id = key_id(key)

Construct a key from the given Sigstore PublicKey message.

hash_algorithm: cryptography.hazmat.primitives.hashes.HashAlgorithm
key: Union[cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey]
def verify(self, signature: bytes, data: bytes) -> None:
135    def verify(self, signature: bytes, data: bytes) -> None:
136        """
137        Verifies the given `data` against `signature` using the current key.
138        """
139        if isinstance(self.key, rsa.RSAPublicKey):
140            self.key.verify(
141                signature=signature,
142                data=data,
143                # TODO: Parametrize this as well, for PSS.
144                padding=padding.PKCS1v15(),
145                algorithm=self.hash_algorithm,
146            )
147        elif isinstance(self.key, ec.EllipticCurvePublicKey):
148            self.key.verify(
149                signature=signature,
150                data=data,
151                signature_algorithm=ec.ECDSA(self.hash_algorithm),
152            )
153        else:
154            # Unreachable without API misuse.
155            raise VerificationError(f"keyring: unsupported key: {self.key}")

Verifies the given data against signature using the current key.

class Keyring:
158class Keyring:
159    """
160    Represents a set of keys, each of which is a potentially valid verifier.
161    """
162
163    def __init__(self, public_keys: List[_PublicKey] = []):
164        """
165        Create a new `Keyring`, with `keys` as the initial set of verifying keys.
166        """
167        self._keyring: dict[KeyID, Key] = {}
168
169        for public_key in public_keys:
170            key = Key(public_key)
171            self._keyring[key.key_id] = key
172
173    def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None:
174        """
175        Verify that `signature` is a valid signature for `data`, using the
176        key identified by `key_id`.
177
178        `key_id` is an unauthenticated hint; if no key matches the given key ID,
179        all keys in the keyring are tried.
180
181        Raises if the signature is invalid, i.e. is not valid for any of the
182        keys in the keyring.
183        """
184
185        key = self._keyring.get(key_id)
186        if key is not None:
187            candidates = [key]
188        else:
189            candidates = list(self._keyring.values())
190
191        # Try to verify each candidate key. In the happy case, this will
192        # be exactly one candidate.
193        valid = False
194        for candidate in candidates:
195            try:
196                candidate.verify(signature, data)
197                valid = True
198                break
199            except InvalidSignature:
200                pass
201
202        if not valid:
203            raise VerificationError("keyring: invalid signature")

Represents a set of keys, each of which is a potentially valid verifier.

Keyring( public_keys: List[sigstore_protobuf_specs.dev.sigstore.common.v1.PublicKey] = [])
163    def __init__(self, public_keys: List[_PublicKey] = []):
164        """
165        Create a new `Keyring`, with `keys` as the initial set of verifying keys.
166        """
167        self._keyring: dict[KeyID, Key] = {}
168
169        for public_key in public_keys:
170            key = Key(public_key)
171            self._keyring[key.key_id] = key

Create a new Keyring, with keys as the initial set of verifying keys.

def verify( self, *, key_id: sigstore._utils.KeyID, signature: bytes, data: bytes) -> None:
173    def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None:
174        """
175        Verify that `signature` is a valid signature for `data`, using the
176        key identified by `key_id`.
177
178        `key_id` is an unauthenticated hint; if no key matches the given key ID,
179        all keys in the keyring are tried.
180
181        Raises if the signature is invalid, i.e. is not valid for any of the
182        keys in the keyring.
183        """
184
185        key = self._keyring.get(key_id)
186        if key is not None:
187            candidates = [key]
188        else:
189            candidates = list(self._keyring.values())
190
191        # Try to verify each candidate key. In the happy case, this will
192        # be exactly one candidate.
193        valid = False
194        for candidate in candidates:
195            try:
196                candidate.verify(signature, data)
197                valid = True
198                break
199            except InvalidSignature:
200                pass
201
202        if not valid:
203            raise VerificationError("keyring: invalid signature")

Verify that signature is a valid signature for data, using the key identified by key_id.

key_id is an unauthenticated hint; if no key matches the given key ID, all keys in the keyring are tried.

Raises if the signature is invalid, i.e. is not valid for any of the keys in the keyring.

RekorKeyring = RekorKeyring
CTKeyring = CTKeyring
class KeyringPurpose(builtins.str, enum.Enum):
210class KeyringPurpose(str, Enum):
211    """
212    Keyring purpose typing
213    """
214
215    SIGN = "sign"
216    VERIFY = "verify"
217
218    def __str__(self) -> str:
219        """Returns the purpose string value."""
220        return self.value

Keyring purpose typing

SIGN = <KeyringPurpose.SIGN: 'sign'>
VERIFY = <KeyringPurpose.VERIFY: 'verify'>
class CertificateAuthority:
223class CertificateAuthority:
224    """
225    Certificate Authority used in a Trusted Root configuration.
226    """
227
228    def __init__(self, inner: _CertificateAuthority):
229        """
230        Construct a new `CertificateAuthority`.
231
232        @api private
233        """
234        self._inner = inner
235        self._certificates: list[Certificate] = []
236        self._verify()
237
238    @classmethod
239    def from_json(cls, path: str) -> CertificateAuthority:
240        """
241        Create a CertificateAuthority directly from JSON.
242        """
243        inner = _CertificateAuthority().from_json(Path(path).read_bytes())
244        return cls(inner)
245
246    def _verify(self) -> None:
247        """
248        Verify and load the certificate authority.
249        """
250        self._certificates = [
251            load_der_x509_certificate(cert.raw_bytes)
252            for cert in self._inner.cert_chain.certificates
253        ]
254
255        if not self._certificates:
256            raise Error("missing a certificate in Certificate Authority")
257
258    @property
259    def validity_period_start(self) -> datetime | None:
260        """
261        Validity period start.
262        """
263        return self._inner.valid_for.start
264
265    @property
266    def validity_period_end(self) -> datetime | None:
267        """
268        Validity period end.
269        """
270        return self._inner.valid_for.end
271
272    def certificates(self, *, allow_expired: bool) -> list[Certificate]:
273        """
274        Return a list of certificates in the authority chain.
275
276        The certificates are returned in order from leaf to root, with any
277        intermediate certificates in between.
278        """
279        if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired):
280            return []
281        return self._certificates

Certificate Authority used in a Trusted Root configuration.

CertificateAuthority( inner: sigstore_protobuf_specs.dev.sigstore.trustroot.v1.CertificateAuthority)
228    def __init__(self, inner: _CertificateAuthority):
229        """
230        Construct a new `CertificateAuthority`.
231
232        @api private
233        """
234        self._inner = inner
235        self._certificates: list[Certificate] = []
236        self._verify()

Construct a new CertificateAuthority.

@api private

@classmethod
def from_json(cls, path: str) -> CertificateAuthority:
238    @classmethod
239    def from_json(cls, path: str) -> CertificateAuthority:
240        """
241        Create a CertificateAuthority directly from JSON.
242        """
243        inner = _CertificateAuthority().from_json(Path(path).read_bytes())
244        return cls(inner)

Create a CertificateAuthority directly from JSON.

validity_period_start: datetime.datetime | None
258    @property
259    def validity_period_start(self) -> datetime | None:
260        """
261        Validity period start.
262        """
263        return self._inner.valid_for.start

Validity period start.

validity_period_end: datetime.datetime | None
265    @property
266    def validity_period_end(self) -> datetime | None:
267        """
268        Validity period end.
269        """
270        return self._inner.valid_for.end

Validity period end.

def certificates(self, *, allow_expired: bool) -> list[cryptography.x509.base.Certificate]:
272    def certificates(self, *, allow_expired: bool) -> list[Certificate]:
273        """
274        Return a list of certificates in the authority chain.
275
276        The certificates are returned in order from leaf to root, with any
277        intermediate certificates in between.
278        """
279        if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired):
280            return []
281        return self._certificates

Return a list of certificates in the authority chain.

The certificates are returned in order from leaf to root, with any intermediate certificates in between.

class TrustedRoot:
284class TrustedRoot:
285    """
286    The cryptographic root(s) of trust for a Sigstore instance.
287    """
288
289    class TrustedRootType(str, Enum):
290        """
291        Known Sigstore trusted root media types.
292        """
293
294        TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1"
295
296        def __str__(self) -> str:
297            """Returns the variant's string value."""
298            return self.value
299
300    def __init__(self, inner: _TrustedRoot):
301        """
302        Construct a new `TrustedRoot`.
303
304        @api private
305        """
306        self._inner = inner
307        self._verify()
308
309    def _verify(self) -> None:
310        """
311        Performs various feats of heroism to ensure that the trusted root
312        is well-formed.
313        """
314
315        # The trusted root must have a recognized media type.
316        try:
317            TrustedRoot.TrustedRootType(self._inner.media_type)
318        except ValueError:
319            raise Error(f"unsupported trusted root format: {self._inner.media_type}")
320
321    @classmethod
322    def from_file(
323        cls,
324        path: str,
325    ) -> TrustedRoot:
326        """Create a new trust root from file"""
327        inner = _TrustedRoot().from_json(Path(path).read_bytes())
328        return cls(inner)
329
330    @classmethod
331    def from_tuf(
332        cls,
333        url: str,
334        offline: bool = False,
335    ) -> TrustedRoot:
336        """Create a new trust root from a TUF repository.
337
338        If `offline`, will use trust root in local TUF cache. Otherwise will
339        update the trust root from remote TUF repository.
340        """
341        path = TrustUpdater(url, offline).get_trusted_root_path()
342        return cls.from_file(path)
343
344    @classmethod
345    def production(
346        cls,
347        offline: bool = False,
348    ) -> TrustedRoot:
349        """Create new trust root from Sigstore production TUF repository.
350
351        If `offline`, will use trust root in local TUF cache. Otherwise will
352        update the trust root from remote TUF repository.
353        """
354        return cls.from_tuf(DEFAULT_TUF_URL, offline)
355
356    @classmethod
357    def staging(
358        cls,
359        offline: bool = False,
360    ) -> TrustedRoot:
361        """Create new trust root from Sigstore staging TUF repository.
362
363        If `offline`, will use trust root in local TUF cache. Otherwise will
364        update the trust root from remote TUF repository.
365        """
366        return cls.from_tuf(STAGING_TUF_URL, offline)
367
368    def _get_tlog_keys(
369        self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose
370    ) -> Iterable[_PublicKey]:
371        """
372        Yields an iterator of public keys for transparency log instances that
373        are suitable for `purpose`.
374        """
375        allow_expired = purpose is KeyringPurpose.VERIFY
376        for tlog in tlogs:
377            if not _is_timerange_valid(
378                tlog.public_key.valid_for, allow_expired=allow_expired
379            ):
380                continue
381
382            yield tlog.public_key
383
384    def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
385        """Return keyring with keys for Rekor."""
386
387        keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose))
388        if len(keys) != 1:
389            raise MetadataError("Did not find one Rekor key in trusted root")
390        return RekorKeyring(Keyring(keys))
391
392    def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring:
393        """Return keyring with key for CTFE."""
394        ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose))
395        if not ctfes:
396            raise MetadataError("CTFE keys not found in trusted root")
397        return CTKeyring(Keyring(ctfes))
398
399    def get_fulcio_certs(self) -> list[Certificate]:
400        """Return the Fulcio certificates."""
401
402        certs: list[Certificate] = []
403
404        # Return expired certificates too: they are expired now but may have
405        # been active when the certificate was used to sign.
406        for authority in self._inner.certificate_authorities:
407            certificate_authority = CertificateAuthority(authority)
408            certs.extend(certificate_authority.certificates(allow_expired=True))
409
410        if not certs:
411            raise MetadataError("Fulcio certificates not found in trusted root")
412        return certs
413
414    def get_timestamp_authorities(self) -> list[CertificateAuthority]:
415        """
416        Return the TSA present in the trusted root.
417
418        This list may be empty and in this case, no timestamp verification can be
419        performed.
420        """
421        certificate_authorities: list[CertificateAuthority] = [
422            CertificateAuthority(cert_chain)
423            for cert_chain in self._inner.timestamp_authorities
424        ]
425        return certificate_authorities

The cryptographic root(s) of trust for a Sigstore instance.

TrustedRoot(inner: sigstore_protobuf_specs.dev.sigstore.trustroot.v1.TrustedRoot)
300    def __init__(self, inner: _TrustedRoot):
301        """
302        Construct a new `TrustedRoot`.
303
304        @api private
305        """
306        self._inner = inner
307        self._verify()

Construct a new TrustedRoot.

@api private

@classmethod
def from_file(cls, path: str) -> TrustedRoot:
321    @classmethod
322    def from_file(
323        cls,
324        path: str,
325    ) -> TrustedRoot:
326        """Create a new trust root from file"""
327        inner = _TrustedRoot().from_json(Path(path).read_bytes())
328        return cls(inner)

Create a new trust root from file

@classmethod
def from_tuf( cls, url: str, offline: bool = False) -> TrustedRoot:
330    @classmethod
331    def from_tuf(
332        cls,
333        url: str,
334        offline: bool = False,
335    ) -> TrustedRoot:
336        """Create a new trust root from a TUF repository.
337
338        If `offline`, will use trust root in local TUF cache. Otherwise will
339        update the trust root from remote TUF repository.
340        """
341        path = TrustUpdater(url, offline).get_trusted_root_path()
342        return cls.from_file(path)

Create a new trust root from a TUF repository.

If offline, will use trust root in local TUF cache. Otherwise will update the trust root from remote TUF repository.

@classmethod
def production(cls, offline: bool = False) -> TrustedRoot:
344    @classmethod
345    def production(
346        cls,
347        offline: bool = False,
348    ) -> TrustedRoot:
349        """Create new trust root from Sigstore production TUF repository.
350
351        If `offline`, will use trust root in local TUF cache. Otherwise will
352        update the trust root from remote TUF repository.
353        """
354        return cls.from_tuf(DEFAULT_TUF_URL, offline)

Create new trust root from Sigstore production TUF repository.

If offline, will use trust root in local TUF cache. Otherwise will update the trust root from remote TUF repository.

@classmethod
def staging(cls, offline: bool = False) -> TrustedRoot:
356    @classmethod
357    def staging(
358        cls,
359        offline: bool = False,
360    ) -> TrustedRoot:
361        """Create new trust root from Sigstore staging TUF repository.
362
363        If `offline`, will use trust root in local TUF cache. Otherwise will
364        update the trust root from remote TUF repository.
365        """
366        return cls.from_tuf(STAGING_TUF_URL, offline)

Create new trust root from Sigstore staging TUF repository.

If offline, will use trust root in local TUF cache. Otherwise will update the trust root from remote TUF repository.

def rekor_keyring( self, purpose: KeyringPurpose) -> RekorKeyring:
384    def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
385        """Return keyring with keys for Rekor."""
386
387        keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose))
388        if len(keys) != 1:
389            raise MetadataError("Did not find one Rekor key in trusted root")
390        return RekorKeyring(Keyring(keys))

Return keyring with keys for Rekor.

def ct_keyring( self, purpose: KeyringPurpose) -> CTKeyring:
392    def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring:
393        """Return keyring with key for CTFE."""
394        ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose))
395        if not ctfes:
396            raise MetadataError("CTFE keys not found in trusted root")
397        return CTKeyring(Keyring(ctfes))

Return keyring with key for CTFE.

def get_fulcio_certs(self) -> list[cryptography.x509.base.Certificate]:
399    def get_fulcio_certs(self) -> list[Certificate]:
400        """Return the Fulcio certificates."""
401
402        certs: list[Certificate] = []
403
404        # Return expired certificates too: they are expired now but may have
405        # been active when the certificate was used to sign.
406        for authority in self._inner.certificate_authorities:
407            certificate_authority = CertificateAuthority(authority)
408            certs.extend(certificate_authority.certificates(allow_expired=True))
409
410        if not certs:
411            raise MetadataError("Fulcio certificates not found in trusted root")
412        return certs

Return the Fulcio certificates.

def get_timestamp_authorities(self) -> list[CertificateAuthority]:
414    def get_timestamp_authorities(self) -> list[CertificateAuthority]:
415        """
416        Return the TSA present in the trusted root.
417
418        This list may be empty and in this case, no timestamp verification can be
419        performed.
420        """
421        certificate_authorities: list[CertificateAuthority] = [
422            CertificateAuthority(cert_chain)
423            for cert_chain in self._inner.timestamp_authorities
424        ]
425        return certificate_authorities

Return the TSA present in the trusted root.

This list may be empty and in this case, no timestamp verification can be performed.

class TrustedRoot.TrustedRootType(builtins.str, enum.Enum):
289    class TrustedRootType(str, Enum):
290        """
291        Known Sigstore trusted root media types.
292        """
293
294        TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1"
295
296        def __str__(self) -> str:
297            """Returns the variant's string value."""
298            return self.value

Known Sigstore trusted root media types.

TRUSTED_ROOT_0_1 = <TrustedRootType.TRUSTED_ROOT_0_1: 'application/vnd.dev.sigstore.trustedroot+json;version=0.1'>
class ClientTrustConfig:
428class ClientTrustConfig:
429    """
430    Represents a Sigstore client's trust configuration, including a root of trust.
431    """
432
433    class ClientTrustConfigType(str, Enum):
434        """
435        Known Sigstore client trust config media types.
436        """
437
438        CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json"
439
440        def __str__(self) -> str:
441            """Returns the variant's string value."""
442            return self.value
443
444    @classmethod
445    def from_json(cls, raw: str) -> ClientTrustConfig:
446        """
447        Deserialize the given client trust config.
448        """
449        inner = _ClientTrustConfig().from_json(raw)
450        return cls(inner)
451
452    def __init__(self, inner: _ClientTrustConfig) -> None:
453        """
454        @api private
455        """
456        self._inner = inner
457        self._verify()
458
459    def _verify(self) -> None:
460        """
461        Performs various feats of heroism to ensure that the client trust config
462        is well-formed.
463        """
464
465        # The client trust config must have a recognized media type.
466        try:
467            ClientTrustConfig.ClientTrustConfigType(self._inner.media_type)
468        except ValueError:
469            raise Error(
470                f"unsupported client trust config format: {self._inner.media_type}"
471            )
472
473    @property
474    def trusted_root(self) -> TrustedRoot:
475        """
476        Return the interior root of trust, as a `TrustedRoot`.
477        """
478        return TrustedRoot(self._inner.trusted_root)

Represents a Sigstore client's trust configuration, including a root of trust.

ClientTrustConfig( inner: sigstore_protobuf_specs.dev.sigstore.trustroot.v1.ClientTrustConfig)
452    def __init__(self, inner: _ClientTrustConfig) -> None:
453        """
454        @api private
455        """
456        self._inner = inner
457        self._verify()

@api private

@classmethod
def from_json(cls, raw: str) -> ClientTrustConfig:
444    @classmethod
445    def from_json(cls, raw: str) -> ClientTrustConfig:
446        """
447        Deserialize the given client trust config.
448        """
449        inner = _ClientTrustConfig().from_json(raw)
450        return cls(inner)

Deserialize the given client trust config.

trusted_root: TrustedRoot
473    @property
474    def trusted_root(self) -> TrustedRoot:
475        """
476        Return the interior root of trust, as a `TrustedRoot`.
477        """
478        return TrustedRoot(self._inner.trusted_root)

Return the interior root of trust, as a TrustedRoot.

class ClientTrustConfig.ClientTrustConfigType(builtins.str, enum.Enum):
433    class ClientTrustConfigType(str, Enum):
434        """
435        Known Sigstore client trust config media types.
436        """
437
438        CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json"
439
440        def __str__(self) -> str:
441            """Returns the variant's string value."""
442            return self.value

Known Sigstore client trust config media types.

CONFIG_0_1 = <ClientTrustConfigType.CONFIG_0_1: 'application/vnd.dev.sigstore.clienttrustconfig.v0.1+json'>