sigstore._internal.trust
Client trust configuration and trust root management for sigstore-python.
1# Copyright 2023 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Client trust configuration and trust root management for sigstore-python. 17""" 18 19from __future__ import annotations 20 21from dataclasses import dataclass 22from datetime import datetime, timezone 23from enum import Enum 24from pathlib import Path 25from typing import ClassVar, Iterable, List, NewType 26 27import cryptography.hazmat.primitives.asymmetric.padding as padding 28from cryptography.exceptions import InvalidSignature 29from cryptography.hazmat.primitives import hashes 30from cryptography.hazmat.primitives.asymmetric import ec, rsa 31from cryptography.x509 import ( 32 Certificate, 33 load_der_x509_certificate, 34) 35from sigstore_protobuf_specs.dev.sigstore.common.v1 import PublicKey as _PublicKey 36from sigstore_protobuf_specs.dev.sigstore.common.v1 import ( 37 PublicKeyDetails as _PublicKeyDetails, 38) 39from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange 40from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 41 CertificateAuthority as _CertificateAuthority, 42) 43from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 44 ClientTrustConfig as _ClientTrustConfig, 45) 46from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 47 TransparencyLogInstance, 48) 49from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( 50 TrustedRoot as _TrustedRoot, 51) 52 53from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater 54from sigstore._utils import ( 55 KeyID, 56 PublicKey, 57 key_id, 58 load_der_public_key, 59) 60from sigstore.errors import Error, MetadataError, VerificationError 61 62 63def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: 64 """ 65 Given a `period`, checks that the the current time is not before `start`. If 66 `allow_expired` is `False`, also checks that the current time is not after 67 `end`. 68 """ 69 now = datetime.now(timezone.utc) 70 71 # If there was no validity period specified, the key is always valid. 72 if not period: 73 return True 74 75 # Active: if the current time is before the starting period, we are not yet 76 # valid. 77 if now < period.start: 78 return False 79 80 # If we want Expired keys, the key is valid at this point. Otherwise, check 81 # that we are within range. 82 return allow_expired or (period.end is None or now <= period.end) 83 84 85@dataclass(init=False) 86class Key: 87 """ 88 Represents a key in a `Keyring`. 89 """ 90 91 hash_algorithm: hashes.HashAlgorithm 92 key: PublicKey 93 key_id: KeyID 94 95 _RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = { 96 _PublicKeyDetails.PKCS1_RSA_PKCS1V5, 97 _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256, 98 _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256, 99 _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256, 100 } 101 102 _EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = { 103 _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(), 104 _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(), 105 _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(), 106 } 107 108 def __init__(self, public_key: _PublicKey) -> None: 109 """ 110 Construct a key from the given Sigstore PublicKey message. 111 """ 112 113 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message, 114 # for unclear reasons. 115 if not public_key.raw_bytes: 116 raise VerificationError("public key is empty") 117 118 hash_algorithm: hashes.HashAlgorithm 119 if public_key.key_details in self._RSA_SHA_256_DETAILS: 120 hash_algorithm = hashes.SHA256() 121 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,)) 122 elif public_key.key_details in self._EC_DETAILS_TO_HASH: 123 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details] 124 key = load_der_public_key( 125 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,) 126 ) 127 else: 128 raise VerificationError(f"unsupported key type: {public_key.key_details}") 129 130 self.hash_algorithm = hash_algorithm 131 self.key = key 132 self.key_id = key_id(key) 133 134 def verify(self, signature: bytes, data: bytes) -> None: 135 """ 136 Verifies the given `data` against `signature` using the current key. 137 """ 138 if isinstance(self.key, rsa.RSAPublicKey): 139 self.key.verify( 140 signature=signature, 141 data=data, 142 # TODO: Parametrize this as well, for PSS. 143 padding=padding.PKCS1v15(), 144 algorithm=self.hash_algorithm, 145 ) 146 elif isinstance(self.key, ec.EllipticCurvePublicKey): 147 self.key.verify( 148 signature=signature, 149 data=data, 150 signature_algorithm=ec.ECDSA(self.hash_algorithm), 151 ) 152 else: 153 # Unreachable without API misuse. 154 raise VerificationError(f"keyring: unsupported key: {self.key}") 155 156 157class Keyring: 158 """ 159 Represents a set of keys, each of which is a potentially valid verifier. 160 """ 161 162 def __init__(self, public_keys: List[_PublicKey] = []): 163 """ 164 Create a new `Keyring`, with `keys` as the initial set of verifying keys. 165 """ 166 self._keyring: dict[KeyID, Key] = {} 167 168 for public_key in public_keys: 169 key = Key(public_key) 170 self._keyring[key.key_id] = key 171 172 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None: 173 """ 174 Verify that `signature` is a valid signature for `data`, using the 175 key identified by `key_id`. 176 177 `key_id` is an unauthenticated hint; if no key matches the given key ID, 178 all keys in the keyring are tried. 179 180 Raises if the signature is invalid, i.e. is not valid for any of the 181 keys in the keyring. 182 """ 183 184 key = self._keyring.get(key_id) 185 if key is not None: 186 candidates = [key] 187 else: 188 candidates = list(self._keyring.values()) 189 190 # Try to verify each candidate key. In the happy case, this will 191 # be exactly one candidate. 192 valid = False 193 for candidate in candidates: 194 try: 195 candidate.verify(signature, data) 196 valid = True 197 break 198 except InvalidSignature: 199 pass 200 201 if not valid: 202 raise VerificationError("keyring: invalid signature") 203 204 205RekorKeyring = NewType("RekorKeyring", Keyring) 206CTKeyring = NewType("CTKeyring", Keyring) 207 208 209class KeyringPurpose(str, Enum): 210 """ 211 Keyring purpose typing 212 """ 213 214 SIGN = "sign" 215 VERIFY = "verify" 216 217 def __str__(self) -> str: 218 """Returns the purpose string value.""" 219 return self.value 220 221 222class CertificateAuthority: 223 """ 224 Certificate Authority used in a Trusted Root configuration. 225 """ 226 227 def __init__(self, inner: _CertificateAuthority): 228 """ 229 Construct a new `CertificateAuthority`. 230 231 @api private 232 """ 233 self._inner = inner 234 self._certificates: list[Certificate] = [] 235 self._verify() 236 237 @classmethod 238 def from_json(cls, path: str) -> CertificateAuthority: 239 """ 240 Create a CertificateAuthority directly from JSON. 241 """ 242 inner = _CertificateAuthority().from_json(Path(path).read_bytes()) 243 return cls(inner) 244 245 def _verify(self) -> None: 246 """ 247 Verify and load the certificate authority. 248 """ 249 self._certificates = [ 250 load_der_x509_certificate(cert.raw_bytes) 251 for cert in self._inner.cert_chain.certificates 252 ] 253 254 if not self._certificates: 255 raise Error("missing a certificate in Certificate Authority") 256 257 @property 258 def validity_period_start(self) -> datetime | None: 259 """ 260 Validity period start. 261 """ 262 return self._inner.valid_for.start 263 264 @property 265 def validity_period_end(self) -> datetime | None: 266 """ 267 Validity period end. 268 """ 269 return self._inner.valid_for.end 270 271 def certificates(self, *, allow_expired: bool) -> list[Certificate]: 272 """ 273 Return a list of certificates in the authority chain. 274 275 The certificates are returned in order from leaf to root, with any 276 intermediate certificates in between. 277 """ 278 if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired): 279 return [] 280 return self._certificates 281 282 283class TrustedRoot: 284 """ 285 The cryptographic root(s) of trust for a Sigstore instance. 286 """ 287 288 class TrustedRootType(str, Enum): 289 """ 290 Known Sigstore trusted root media types. 291 """ 292 293 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1" 294 295 def __str__(self) -> str: 296 """Returns the variant's string value.""" 297 return self.value 298 299 def __init__(self, inner: _TrustedRoot): 300 """ 301 Construct a new `TrustedRoot`. 302 303 @api private 304 """ 305 self._inner = inner 306 self._verify() 307 308 def _verify(self) -> None: 309 """ 310 Performs various feats of heroism to ensure that the trusted root 311 is well-formed. 312 """ 313 314 # The trusted root must have a recognized media type. 315 try: 316 TrustedRoot.TrustedRootType(self._inner.media_type) 317 except ValueError: 318 raise Error(f"unsupported trusted root format: {self._inner.media_type}") 319 320 @classmethod 321 def from_file( 322 cls, 323 path: str, 324 ) -> TrustedRoot: 325 """Create a new trust root from file""" 326 inner = _TrustedRoot().from_json(Path(path).read_bytes()) 327 return cls(inner) 328 329 @classmethod 330 def from_tuf( 331 cls, 332 url: str, 333 offline: bool = False, 334 ) -> TrustedRoot: 335 """Create a new trust root from a TUF repository. 336 337 If `offline`, will use trust root in local TUF cache. Otherwise will 338 update the trust root from remote TUF repository. 339 """ 340 path = TrustUpdater(url, offline).get_trusted_root_path() 341 return cls.from_file(path) 342 343 @classmethod 344 def production( 345 cls, 346 offline: bool = False, 347 ) -> TrustedRoot: 348 """Create new trust root from Sigstore production TUF repository. 349 350 If `offline`, will use trust root in local TUF cache. Otherwise will 351 update the trust root from remote TUF repository. 352 """ 353 return cls.from_tuf(DEFAULT_TUF_URL, offline) 354 355 @classmethod 356 def staging( 357 cls, 358 offline: bool = False, 359 ) -> TrustedRoot: 360 """Create new trust root from Sigstore staging TUF repository. 361 362 If `offline`, will use trust root in local TUF cache. Otherwise will 363 update the trust root from remote TUF repository. 364 """ 365 return cls.from_tuf(STAGING_TUF_URL, offline) 366 367 def _get_tlog_keys( 368 self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose 369 ) -> Iterable[_PublicKey]: 370 """ 371 Yields an iterator of public keys for transparency log instances that 372 are suitable for `purpose`. 373 """ 374 allow_expired = purpose is KeyringPurpose.VERIFY 375 for tlog in tlogs: 376 if not _is_timerange_valid( 377 tlog.public_key.valid_for, allow_expired=allow_expired 378 ): 379 continue 380 381 yield tlog.public_key 382 383 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: 384 """Return keyring with keys for Rekor.""" 385 386 keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose)) 387 if len(keys) != 1: 388 raise MetadataError("Did not find one Rekor key in trusted root") 389 return RekorKeyring(Keyring(keys)) 390 391 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: 392 """Return keyring with key for CTFE.""" 393 ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose)) 394 if not ctfes: 395 raise MetadataError("CTFE keys not found in trusted root") 396 return CTKeyring(Keyring(ctfes)) 397 398 def get_fulcio_certs(self) -> list[Certificate]: 399 """Return the Fulcio certificates.""" 400 401 certs: list[Certificate] = [] 402 403 # Return expired certificates too: they are expired now but may have 404 # been active when the certificate was used to sign. 405 for authority in self._inner.certificate_authorities: 406 certificate_authority = CertificateAuthority(authority) 407 certs.extend(certificate_authority.certificates(allow_expired=True)) 408 409 if not certs: 410 raise MetadataError("Fulcio certificates not found in trusted root") 411 return certs 412 413 def get_timestamp_authorities(self) -> list[CertificateAuthority]: 414 """ 415 Return the TSA present in the trusted root. 416 417 This list may be empty and in this case, no timestamp verification can be 418 performed. 419 """ 420 certificate_authorities: list[CertificateAuthority] = [ 421 CertificateAuthority(cert_chain) 422 for cert_chain in self._inner.timestamp_authorities 423 ] 424 return certificate_authorities 425 426 427class ClientTrustConfig: 428 """ 429 Represents a Sigstore client's trust configuration, including a root of trust. 430 """ 431 432 class ClientTrustConfigType(str, Enum): 433 """ 434 Known Sigstore client trust config media types. 435 """ 436 437 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json" 438 439 def __str__(self) -> str: 440 """Returns the variant's string value.""" 441 return self.value 442 443 @classmethod 444 def from_json(cls, raw: str) -> ClientTrustConfig: 445 """ 446 Deserialize the given client trust config. 447 """ 448 inner = _ClientTrustConfig().from_json(raw) 449 return cls(inner) 450 451 def __init__(self, inner: _ClientTrustConfig) -> None: 452 """ 453 @api private 454 """ 455 self._inner = inner 456 self._verify() 457 458 def _verify(self) -> None: 459 """ 460 Performs various feats of heroism to ensure that the client trust config 461 is well-formed. 462 """ 463 464 # The client trust config must have a recognized media type. 465 try: 466 ClientTrustConfig.ClientTrustConfigType(self._inner.media_type) 467 except ValueError: 468 raise Error( 469 f"unsupported client trust config format: {self._inner.media_type}" 470 ) 471 472 @property 473 def trusted_root(self) -> TrustedRoot: 474 """ 475 Return the interior root of trust, as a `TrustedRoot`. 476 """ 477 return TrustedRoot(self._inner.trusted_root)
86@dataclass(init=False) 87class Key: 88 """ 89 Represents a key in a `Keyring`. 90 """ 91 92 hash_algorithm: hashes.HashAlgorithm 93 key: PublicKey 94 key_id: KeyID 95 96 _RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = { 97 _PublicKeyDetails.PKCS1_RSA_PKCS1V5, 98 _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256, 99 _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256, 100 _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256, 101 } 102 103 _EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = { 104 _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(), 105 _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(), 106 _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(), 107 } 108 109 def __init__(self, public_key: _PublicKey) -> None: 110 """ 111 Construct a key from the given Sigstore PublicKey message. 112 """ 113 114 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message, 115 # for unclear reasons. 116 if not public_key.raw_bytes: 117 raise VerificationError("public key is empty") 118 119 hash_algorithm: hashes.HashAlgorithm 120 if public_key.key_details in self._RSA_SHA_256_DETAILS: 121 hash_algorithm = hashes.SHA256() 122 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,)) 123 elif public_key.key_details in self._EC_DETAILS_TO_HASH: 124 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details] 125 key = load_der_public_key( 126 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,) 127 ) 128 else: 129 raise VerificationError(f"unsupported key type: {public_key.key_details}") 130 131 self.hash_algorithm = hash_algorithm 132 self.key = key 133 self.key_id = key_id(key) 134 135 def verify(self, signature: bytes, data: bytes) -> None: 136 """ 137 Verifies the given `data` against `signature` using the current key. 138 """ 139 if isinstance(self.key, rsa.RSAPublicKey): 140 self.key.verify( 141 signature=signature, 142 data=data, 143 # TODO: Parametrize this as well, for PSS. 144 padding=padding.PKCS1v15(), 145 algorithm=self.hash_algorithm, 146 ) 147 elif isinstance(self.key, ec.EllipticCurvePublicKey): 148 self.key.verify( 149 signature=signature, 150 data=data, 151 signature_algorithm=ec.ECDSA(self.hash_algorithm), 152 ) 153 else: 154 # Unreachable without API misuse. 155 raise VerificationError(f"keyring: unsupported key: {self.key}")
Represents a key in a Keyring
.
109 def __init__(self, public_key: _PublicKey) -> None: 110 """ 111 Construct a key from the given Sigstore PublicKey message. 112 """ 113 114 # NOTE: `raw_bytes` is marked as `optional` in the `PublicKey` message, 115 # for unclear reasons. 116 if not public_key.raw_bytes: 117 raise VerificationError("public key is empty") 118 119 hash_algorithm: hashes.HashAlgorithm 120 if public_key.key_details in self._RSA_SHA_256_DETAILS: 121 hash_algorithm = hashes.SHA256() 122 key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,)) 123 elif public_key.key_details in self._EC_DETAILS_TO_HASH: 124 hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details] 125 key = load_der_public_key( 126 public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,) 127 ) 128 else: 129 raise VerificationError(f"unsupported key type: {public_key.key_details}") 130 131 self.hash_algorithm = hash_algorithm 132 self.key = key 133 self.key_id = key_id(key)
Construct a key from the given Sigstore PublicKey message.
135 def verify(self, signature: bytes, data: bytes) -> None: 136 """ 137 Verifies the given `data` against `signature` using the current key. 138 """ 139 if isinstance(self.key, rsa.RSAPublicKey): 140 self.key.verify( 141 signature=signature, 142 data=data, 143 # TODO: Parametrize this as well, for PSS. 144 padding=padding.PKCS1v15(), 145 algorithm=self.hash_algorithm, 146 ) 147 elif isinstance(self.key, ec.EllipticCurvePublicKey): 148 self.key.verify( 149 signature=signature, 150 data=data, 151 signature_algorithm=ec.ECDSA(self.hash_algorithm), 152 ) 153 else: 154 # Unreachable without API misuse. 155 raise VerificationError(f"keyring: unsupported key: {self.key}")
Verifies the given data
against signature
using the current key.
158class Keyring: 159 """ 160 Represents a set of keys, each of which is a potentially valid verifier. 161 """ 162 163 def __init__(self, public_keys: List[_PublicKey] = []): 164 """ 165 Create a new `Keyring`, with `keys` as the initial set of verifying keys. 166 """ 167 self._keyring: dict[KeyID, Key] = {} 168 169 for public_key in public_keys: 170 key = Key(public_key) 171 self._keyring[key.key_id] = key 172 173 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None: 174 """ 175 Verify that `signature` is a valid signature for `data`, using the 176 key identified by `key_id`. 177 178 `key_id` is an unauthenticated hint; if no key matches the given key ID, 179 all keys in the keyring are tried. 180 181 Raises if the signature is invalid, i.e. is not valid for any of the 182 keys in the keyring. 183 """ 184 185 key = self._keyring.get(key_id) 186 if key is not None: 187 candidates = [key] 188 else: 189 candidates = list(self._keyring.values()) 190 191 # Try to verify each candidate key. In the happy case, this will 192 # be exactly one candidate. 193 valid = False 194 for candidate in candidates: 195 try: 196 candidate.verify(signature, data) 197 valid = True 198 break 199 except InvalidSignature: 200 pass 201 202 if not valid: 203 raise VerificationError("keyring: invalid signature")
Represents a set of keys, each of which is a potentially valid verifier.
163 def __init__(self, public_keys: List[_PublicKey] = []): 164 """ 165 Create a new `Keyring`, with `keys` as the initial set of verifying keys. 166 """ 167 self._keyring: dict[KeyID, Key] = {} 168 169 for public_key in public_keys: 170 key = Key(public_key) 171 self._keyring[key.key_id] = key
Create a new Keyring
, with keys
as the initial set of verifying keys.
173 def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None: 174 """ 175 Verify that `signature` is a valid signature for `data`, using the 176 key identified by `key_id`. 177 178 `key_id` is an unauthenticated hint; if no key matches the given key ID, 179 all keys in the keyring are tried. 180 181 Raises if the signature is invalid, i.e. is not valid for any of the 182 keys in the keyring. 183 """ 184 185 key = self._keyring.get(key_id) 186 if key is not None: 187 candidates = [key] 188 else: 189 candidates = list(self._keyring.values()) 190 191 # Try to verify each candidate key. In the happy case, this will 192 # be exactly one candidate. 193 valid = False 194 for candidate in candidates: 195 try: 196 candidate.verify(signature, data) 197 valid = True 198 break 199 except InvalidSignature: 200 pass 201 202 if not valid: 203 raise VerificationError("keyring: invalid signature")
Verify that signature
is a valid signature for data
, using the
key identified by key_id
.
key_id
is an unauthenticated hint; if no key matches the given key ID,
all keys in the keyring are tried.
Raises if the signature is invalid, i.e. is not valid for any of the keys in the keyring.
210class KeyringPurpose(str, Enum): 211 """ 212 Keyring purpose typing 213 """ 214 215 SIGN = "sign" 216 VERIFY = "verify" 217 218 def __str__(self) -> str: 219 """Returns the purpose string value.""" 220 return self.value
Keyring purpose typing
223class CertificateAuthority: 224 """ 225 Certificate Authority used in a Trusted Root configuration. 226 """ 227 228 def __init__(self, inner: _CertificateAuthority): 229 """ 230 Construct a new `CertificateAuthority`. 231 232 @api private 233 """ 234 self._inner = inner 235 self._certificates: list[Certificate] = [] 236 self._verify() 237 238 @classmethod 239 def from_json(cls, path: str) -> CertificateAuthority: 240 """ 241 Create a CertificateAuthority directly from JSON. 242 """ 243 inner = _CertificateAuthority().from_json(Path(path).read_bytes()) 244 return cls(inner) 245 246 def _verify(self) -> None: 247 """ 248 Verify and load the certificate authority. 249 """ 250 self._certificates = [ 251 load_der_x509_certificate(cert.raw_bytes) 252 for cert in self._inner.cert_chain.certificates 253 ] 254 255 if not self._certificates: 256 raise Error("missing a certificate in Certificate Authority") 257 258 @property 259 def validity_period_start(self) -> datetime | None: 260 """ 261 Validity period start. 262 """ 263 return self._inner.valid_for.start 264 265 @property 266 def validity_period_end(self) -> datetime | None: 267 """ 268 Validity period end. 269 """ 270 return self._inner.valid_for.end 271 272 def certificates(self, *, allow_expired: bool) -> list[Certificate]: 273 """ 274 Return a list of certificates in the authority chain. 275 276 The certificates are returned in order from leaf to root, with any 277 intermediate certificates in between. 278 """ 279 if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired): 280 return [] 281 return self._certificates
Certificate Authority used in a Trusted Root configuration.
228 def __init__(self, inner: _CertificateAuthority): 229 """ 230 Construct a new `CertificateAuthority`. 231 232 @api private 233 """ 234 self._inner = inner 235 self._certificates: list[Certificate] = [] 236 self._verify()
Construct a new CertificateAuthority
.
@api private
238 @classmethod 239 def from_json(cls, path: str) -> CertificateAuthority: 240 """ 241 Create a CertificateAuthority directly from JSON. 242 """ 243 inner = _CertificateAuthority().from_json(Path(path).read_bytes()) 244 return cls(inner)
Create a CertificateAuthority directly from JSON.
258 @property 259 def validity_period_start(self) -> datetime | None: 260 """ 261 Validity period start. 262 """ 263 return self._inner.valid_for.start
Validity period start.
265 @property 266 def validity_period_end(self) -> datetime | None: 267 """ 268 Validity period end. 269 """ 270 return self._inner.valid_for.end
Validity period end.
272 def certificates(self, *, allow_expired: bool) -> list[Certificate]: 273 """ 274 Return a list of certificates in the authority chain. 275 276 The certificates are returned in order from leaf to root, with any 277 intermediate certificates in between. 278 """ 279 if not _is_timerange_valid(self._inner.valid_for, allow_expired=allow_expired): 280 return [] 281 return self._certificates
Return a list of certificates in the authority chain.
The certificates are returned in order from leaf to root, with any intermediate certificates in between.
284class TrustedRoot: 285 """ 286 The cryptographic root(s) of trust for a Sigstore instance. 287 """ 288 289 class TrustedRootType(str, Enum): 290 """ 291 Known Sigstore trusted root media types. 292 """ 293 294 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1" 295 296 def __str__(self) -> str: 297 """Returns the variant's string value.""" 298 return self.value 299 300 def __init__(self, inner: _TrustedRoot): 301 """ 302 Construct a new `TrustedRoot`. 303 304 @api private 305 """ 306 self._inner = inner 307 self._verify() 308 309 def _verify(self) -> None: 310 """ 311 Performs various feats of heroism to ensure that the trusted root 312 is well-formed. 313 """ 314 315 # The trusted root must have a recognized media type. 316 try: 317 TrustedRoot.TrustedRootType(self._inner.media_type) 318 except ValueError: 319 raise Error(f"unsupported trusted root format: {self._inner.media_type}") 320 321 @classmethod 322 def from_file( 323 cls, 324 path: str, 325 ) -> TrustedRoot: 326 """Create a new trust root from file""" 327 inner = _TrustedRoot().from_json(Path(path).read_bytes()) 328 return cls(inner) 329 330 @classmethod 331 def from_tuf( 332 cls, 333 url: str, 334 offline: bool = False, 335 ) -> TrustedRoot: 336 """Create a new trust root from a TUF repository. 337 338 If `offline`, will use trust root in local TUF cache. Otherwise will 339 update the trust root from remote TUF repository. 340 """ 341 path = TrustUpdater(url, offline).get_trusted_root_path() 342 return cls.from_file(path) 343 344 @classmethod 345 def production( 346 cls, 347 offline: bool = False, 348 ) -> TrustedRoot: 349 """Create new trust root from Sigstore production TUF repository. 350 351 If `offline`, will use trust root in local TUF cache. Otherwise will 352 update the trust root from remote TUF repository. 353 """ 354 return cls.from_tuf(DEFAULT_TUF_URL, offline) 355 356 @classmethod 357 def staging( 358 cls, 359 offline: bool = False, 360 ) -> TrustedRoot: 361 """Create new trust root from Sigstore staging TUF repository. 362 363 If `offline`, will use trust root in local TUF cache. Otherwise will 364 update the trust root from remote TUF repository. 365 """ 366 return cls.from_tuf(STAGING_TUF_URL, offline) 367 368 def _get_tlog_keys( 369 self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose 370 ) -> Iterable[_PublicKey]: 371 """ 372 Yields an iterator of public keys for transparency log instances that 373 are suitable for `purpose`. 374 """ 375 allow_expired = purpose is KeyringPurpose.VERIFY 376 for tlog in tlogs: 377 if not _is_timerange_valid( 378 tlog.public_key.valid_for, allow_expired=allow_expired 379 ): 380 continue 381 382 yield tlog.public_key 383 384 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: 385 """Return keyring with keys for Rekor.""" 386 387 keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose)) 388 if len(keys) != 1: 389 raise MetadataError("Did not find one Rekor key in trusted root") 390 return RekorKeyring(Keyring(keys)) 391 392 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: 393 """Return keyring with key for CTFE.""" 394 ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose)) 395 if not ctfes: 396 raise MetadataError("CTFE keys not found in trusted root") 397 return CTKeyring(Keyring(ctfes)) 398 399 def get_fulcio_certs(self) -> list[Certificate]: 400 """Return the Fulcio certificates.""" 401 402 certs: list[Certificate] = [] 403 404 # Return expired certificates too: they are expired now but may have 405 # been active when the certificate was used to sign. 406 for authority in self._inner.certificate_authorities: 407 certificate_authority = CertificateAuthority(authority) 408 certs.extend(certificate_authority.certificates(allow_expired=True)) 409 410 if not certs: 411 raise MetadataError("Fulcio certificates not found in trusted root") 412 return certs 413 414 def get_timestamp_authorities(self) -> list[CertificateAuthority]: 415 """ 416 Return the TSA present in the trusted root. 417 418 This list may be empty and in this case, no timestamp verification can be 419 performed. 420 """ 421 certificate_authorities: list[CertificateAuthority] = [ 422 CertificateAuthority(cert_chain) 423 for cert_chain in self._inner.timestamp_authorities 424 ] 425 return certificate_authorities
The cryptographic root(s) of trust for a Sigstore instance.
300 def __init__(self, inner: _TrustedRoot): 301 """ 302 Construct a new `TrustedRoot`. 303 304 @api private 305 """ 306 self._inner = inner 307 self._verify()
Construct a new TrustedRoot
.
@api private
321 @classmethod 322 def from_file( 323 cls, 324 path: str, 325 ) -> TrustedRoot: 326 """Create a new trust root from file""" 327 inner = _TrustedRoot().from_json(Path(path).read_bytes()) 328 return cls(inner)
Create a new trust root from file
330 @classmethod 331 def from_tuf( 332 cls, 333 url: str, 334 offline: bool = False, 335 ) -> TrustedRoot: 336 """Create a new trust root from a TUF repository. 337 338 If `offline`, will use trust root in local TUF cache. Otherwise will 339 update the trust root from remote TUF repository. 340 """ 341 path = TrustUpdater(url, offline).get_trusted_root_path() 342 return cls.from_file(path)
Create a new trust root from a TUF repository.
If offline
, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
344 @classmethod 345 def production( 346 cls, 347 offline: bool = False, 348 ) -> TrustedRoot: 349 """Create new trust root from Sigstore production TUF repository. 350 351 If `offline`, will use trust root in local TUF cache. Otherwise will 352 update the trust root from remote TUF repository. 353 """ 354 return cls.from_tuf(DEFAULT_TUF_URL, offline)
Create new trust root from Sigstore production TUF repository.
If offline
, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
356 @classmethod 357 def staging( 358 cls, 359 offline: bool = False, 360 ) -> TrustedRoot: 361 """Create new trust root from Sigstore staging TUF repository. 362 363 If `offline`, will use trust root in local TUF cache. Otherwise will 364 update the trust root from remote TUF repository. 365 """ 366 return cls.from_tuf(STAGING_TUF_URL, offline)
Create new trust root from Sigstore staging TUF repository.
If offline
, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
384 def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: 385 """Return keyring with keys for Rekor.""" 386 387 keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose)) 388 if len(keys) != 1: 389 raise MetadataError("Did not find one Rekor key in trusted root") 390 return RekorKeyring(Keyring(keys))
Return keyring with keys for Rekor.
392 def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: 393 """Return keyring with key for CTFE.""" 394 ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose)) 395 if not ctfes: 396 raise MetadataError("CTFE keys not found in trusted root") 397 return CTKeyring(Keyring(ctfes))
Return keyring with key for CTFE.
399 def get_fulcio_certs(self) -> list[Certificate]: 400 """Return the Fulcio certificates.""" 401 402 certs: list[Certificate] = [] 403 404 # Return expired certificates too: they are expired now but may have 405 # been active when the certificate was used to sign. 406 for authority in self._inner.certificate_authorities: 407 certificate_authority = CertificateAuthority(authority) 408 certs.extend(certificate_authority.certificates(allow_expired=True)) 409 410 if not certs: 411 raise MetadataError("Fulcio certificates not found in trusted root") 412 return certs
Return the Fulcio certificates.
289 class TrustedRootType(str, Enum): 290 """ 291 Known Sigstore trusted root media types. 292 """ 293 294 TRUSTED_ROOT_0_1 = "application/vnd.dev.sigstore.trustedroot+json;version=0.1" 295 296 def __str__(self) -> str: 297 """Returns the variant's string value.""" 298 return self.value
Known Sigstore trusted root media types.
428class ClientTrustConfig: 429 """ 430 Represents a Sigstore client's trust configuration, including a root of trust. 431 """ 432 433 class ClientTrustConfigType(str, Enum): 434 """ 435 Known Sigstore client trust config media types. 436 """ 437 438 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json" 439 440 def __str__(self) -> str: 441 """Returns the variant's string value.""" 442 return self.value 443 444 @classmethod 445 def from_json(cls, raw: str) -> ClientTrustConfig: 446 """ 447 Deserialize the given client trust config. 448 """ 449 inner = _ClientTrustConfig().from_json(raw) 450 return cls(inner) 451 452 def __init__(self, inner: _ClientTrustConfig) -> None: 453 """ 454 @api private 455 """ 456 self._inner = inner 457 self._verify() 458 459 def _verify(self) -> None: 460 """ 461 Performs various feats of heroism to ensure that the client trust config 462 is well-formed. 463 """ 464 465 # The client trust config must have a recognized media type. 466 try: 467 ClientTrustConfig.ClientTrustConfigType(self._inner.media_type) 468 except ValueError: 469 raise Error( 470 f"unsupported client trust config format: {self._inner.media_type}" 471 ) 472 473 @property 474 def trusted_root(self) -> TrustedRoot: 475 """ 476 Return the interior root of trust, as a `TrustedRoot`. 477 """ 478 return TrustedRoot(self._inner.trusted_root)
Represents a Sigstore client's trust configuration, including a root of trust.
452 def __init__(self, inner: _ClientTrustConfig) -> None: 453 """ 454 @api private 455 """ 456 self._inner = inner 457 self._verify()
@api private
444 @classmethod 445 def from_json(cls, raw: str) -> ClientTrustConfig: 446 """ 447 Deserialize the given client trust config. 448 """ 449 inner = _ClientTrustConfig().from_json(raw) 450 return cls(inner)
Deserialize the given client trust config.
473 @property 474 def trusted_root(self) -> TrustedRoot: 475 """ 476 Return the interior root of trust, as a `TrustedRoot`. 477 """ 478 return TrustedRoot(self._inner.trusted_root)
Return the interior root of trust, as a TrustedRoot
.
433 class ClientTrustConfigType(str, Enum): 434 """ 435 Known Sigstore client trust config media types. 436 """ 437 438 CONFIG_0_1 = "application/vnd.dev.sigstore.clienttrustconfig.v0.1+json" 439 440 def __str__(self) -> str: 441 """Returns the variant's string value.""" 442 return self.value
Known Sigstore client trust config media types.