sigstore.models
Common models shared between signing and verification.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Common models shared between signing and verification. 17""" 18 19from __future__ import annotations 20 21import base64 22import logging 23import typing 24from enum import Enum 25from textwrap import dedent 26from typing import Any, List, Optional 27 28import rfc8785 29from cryptography.hazmat.primitives.serialization import Encoding 30from cryptography.x509 import ( 31 Certificate, 32 load_der_x509_certificate, 33) 34from pydantic import ( 35 BaseModel, 36 ConfigDict, 37 Field, 38 StrictInt, 39 StrictStr, 40 TypeAdapter, 41 ValidationInfo, 42 field_validator, 43) 44from pydantic.dataclasses import dataclass 45from rekor_types import Dsse, Hashedrekord, ProposedEntry 46from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1 47from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 48 Bundle as _Bundle, 49) 50from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 51from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 52from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import ( 53 InclusionProof, 54) 55 56from sigstore import dsse 57from sigstore._internal.merkle import verify_merkle_inclusion 58from sigstore._internal.rekor.checkpoint import verify_checkpoint 59from sigstore._utils import ( 60 B64Str, 61 KeyID, 62 cert_is_leaf, 63 cert_is_root_ca, 64) 65from sigstore.errors import Error, VerificationError 66 67if typing.TYPE_CHECKING: 68 from sigstore._internal.trust import RekorKeyring 69 70 71_logger = logging.getLogger(__name__) 72 73 74class LogInclusionProof(BaseModel): 75 """ 76 Represents an inclusion proof for a transparency log entry. 77 """ 78 79 model_config = ConfigDict(populate_by_name=True) 80 81 checkpoint: StrictStr = Field(..., alias="checkpoint") 82 hashes: List[StrictStr] = Field(..., alias="hashes") 83 log_index: StrictInt = Field(..., alias="logIndex") 84 root_hash: StrictStr = Field(..., alias="rootHash") 85 tree_size: StrictInt = Field(..., alias="treeSize") 86 87 @field_validator("log_index") 88 def _log_index_positive(cls, v: int) -> int: 89 if v < 0: 90 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") 91 return v 92 93 @field_validator("tree_size") 94 def _tree_size_positive(cls, v: int) -> int: 95 if v < 0: 96 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") 97 return v 98 99 @field_validator("tree_size") 100 def _log_index_within_tree_size( 101 cls, v: int, info: ValidationInfo, **kwargs: Any 102 ) -> int: 103 if "log_index" in info.data and v <= info.data["log_index"]: 104 raise ValueError( 105 "Inclusion proof has log index greater than or equal to tree size: " 106 f"{v} <= {info.data['log_index']}" 107 ) 108 return v 109 110 111@dataclass(frozen=True) 112class LogEntry: 113 """ 114 Represents a transparency log entry. 115 116 Log entries are retrieved from the transparency log after signing or verification events, 117 or loaded from "Sigstore" bundles provided by the user. 118 119 This representation allows for either a missing inclusion promise or a missing 120 inclusion proof, but not both: attempting to construct a `LogEntry` without 121 at least one will fail. 122 """ 123 124 uuid: Optional[str] 125 """ 126 This entry's unique ID in the log instance it was retrieved from. 127 128 For sharded log deployments, IDs are unique per-shard. 129 130 Not present for `LogEntry` instances loaded from Sigstore bundles. 131 """ 132 133 body: B64Str 134 """ 135 The base64-encoded body of the transparency log entry. 136 """ 137 138 integrated_time: int 139 """ 140 The UNIX time at which this entry was integrated into the transparency log. 141 """ 142 143 log_id: str 144 """ 145 The log's ID (as the SHA256 hash of the DER-encoded public key for the log 146 at the time of entry inclusion). 147 """ 148 149 log_index: int 150 """ 151 The index of this entry within the log. 152 """ 153 154 inclusion_proof: LogInclusionProof 155 """ 156 An inclusion proof for this log entry. 157 """ 158 159 inclusion_promise: Optional[B64Str] 160 """ 161 An inclusion promise for this log entry, if present. 162 163 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this 164 log entry. 165 """ 166 167 @classmethod 168 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: 169 """ 170 Create a new `LogEntry` from the given API response. 171 """ 172 173 # Assumes we only get one entry back 174 entries = list(dict_.items()) 175 if len(entries) != 1: 176 raise ValueError("Received multiple entries in response") 177 178 uuid, entry = entries[0] 179 return LogEntry( 180 uuid=uuid, 181 body=entry["body"], 182 integrated_time=entry["integratedTime"], 183 log_id=entry["logID"], 184 log_index=entry["logIndex"], 185 inclusion_proof=LogInclusionProof.model_validate( 186 entry["verification"]["inclusionProof"] 187 ), 188 inclusion_promise=entry["verification"]["signedEntryTimestamp"], 189 ) 190 191 @classmethod 192 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: 193 """ 194 Create a new `LogEntry` from the given Rekor TransparencyLogEntry. 195 """ 196 tlog_entry = rekor_v1.TransparencyLogEntry() 197 tlog_entry.from_dict(dict_) 198 199 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof 200 # This check is required by us as the client, not the 201 # protobuf-specs themselves. 202 if not inclusion_proof or not inclusion_proof.checkpoint.envelope: 203 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 204 205 parsed_inclusion_proof = LogInclusionProof( 206 checkpoint=inclusion_proof.checkpoint.envelope, 207 hashes=[h.hex() for h in inclusion_proof.hashes], 208 log_index=inclusion_proof.log_index, 209 root_hash=inclusion_proof.root_hash.hex(), 210 tree_size=inclusion_proof.tree_size, 211 ) 212 213 return LogEntry( 214 uuid=None, 215 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), 216 integrated_time=tlog_entry.integrated_time, 217 log_id=tlog_entry.log_id.key_id.hex(), 218 log_index=tlog_entry.log_index, 219 inclusion_proof=parsed_inclusion_proof, 220 inclusion_promise=B64Str( 221 base64.b64encode( 222 tlog_entry.inclusion_promise.signed_entry_timestamp 223 ).decode() 224 ), 225 ) 226 227 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: 228 """ 229 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`. 230 231 @private 232 """ 233 inclusion_promise: rekor_v1.InclusionPromise | None = None 234 if self.inclusion_promise: 235 inclusion_promise = rekor_v1.InclusionPromise( 236 signed_entry_timestamp=base64.b64decode(self.inclusion_promise) 237 ) 238 239 inclusion_proof = rekor_v1.InclusionProof( 240 log_index=self.inclusion_proof.log_index, 241 root_hash=bytes.fromhex(self.inclusion_proof.root_hash), 242 tree_size=self.inclusion_proof.tree_size, 243 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], 244 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), 245 ) 246 247 tlog_entry = rekor_v1.TransparencyLogEntry( 248 log_index=self.log_index, 249 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), 250 integrated_time=self.integrated_time, 251 inclusion_promise=inclusion_promise, # type: ignore[arg-type] 252 inclusion_proof=inclusion_proof, 253 canonicalized_body=base64.b64decode(self.body), 254 ) 255 256 # Fill in the appropriate kind 257 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 258 tlog_entry.canonicalized_body 259 ) 260 if not isinstance(body_entry, (Hashedrekord, Dsse)): 261 raise InvalidBundle("log entry is not of expected type") 262 263 tlog_entry.kind_version = rekor_v1.KindVersion( 264 kind=body_entry.kind, version=body_entry.api_version 265 ) 266 267 return tlog_entry 268 269 def encode_canonical(self) -> bytes: 270 """ 271 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 272 273 This encoded representation is suitable for verification against 274 the Signed Entry Timestamp. 275 """ 276 payload: dict[str, int | str] = { 277 "body": self.body, 278 "integratedTime": self.integrated_time, 279 "logID": self.log_id, 280 "logIndex": self.log_index, 281 } 282 283 return rfc8785.dumps(payload) 284 285 def _verify_set(self, keyring: RekorKeyring) -> None: 286 """ 287 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 288 `entry` using the given `keyring`. 289 290 Fails if the given log entry does not contain an inclusion promise. 291 """ 292 293 if self.inclusion_promise is None: 294 raise VerificationError("SET: invalid inclusion promise: missing") 295 296 signed_entry_ts = base64.b64decode(self.inclusion_promise) 297 298 try: 299 keyring.verify( 300 key_id=KeyID(bytes.fromhex(self.log_id)), 301 signature=signed_entry_ts, 302 data=self.encode_canonical(), 303 ) 304 except VerificationError as exc: 305 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 306 307 def _verify(self, keyring: RekorKeyring) -> None: 308 """ 309 Verifies this log entry. 310 311 This method performs steps (5), (6), and optionally (7) in 312 the top-level verify API: 313 314 * Verifies the consistency of the entry with the given bundle; 315 * Verifies the Merkle inclusion proof and its signed checkpoint; 316 * Verifies the inclusion promise, if present. 317 """ 318 319 verify_merkle_inclusion(self) 320 verify_checkpoint(keyring, self) 321 322 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") 323 324 if self.inclusion_promise: 325 self._verify_set(keyring) 326 _logger.debug( 327 f"successfully verified inclusion promise: index={self.log_index}" 328 ) 329 330 331class InvalidBundle(Error): 332 """ 333 Raised when the associated `Bundle` is invalid in some way. 334 """ 335 336 def diagnostics(self) -> str: 337 """Returns diagnostics for the error.""" 338 339 return dedent( 340 f"""\ 341 An issue occurred while parsing the Sigstore bundle. 342 343 The provided bundle is malformed and may have been modified maliciously. 344 345 Additional context: 346 347 {self} 348 """ 349 ) 350 351 352class Bundle: 353 """ 354 Represents a Sigstore bundle. 355 """ 356 357 class BundleType(str, Enum): 358 """ 359 Known Sigstore bundle media types. 360 """ 361 362 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 363 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 364 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 365 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 366 367 def __str__(self) -> str: 368 """Returns the variant's string value.""" 369 return self.value 370 371 def __init__(self, inner: _Bundle) -> None: 372 """ 373 Creates a new bundle. This is not a public API; use 374 `from_json` instead. 375 376 @private 377 """ 378 self._inner = inner 379 self._verify() 380 381 def _verify(self) -> None: 382 """ 383 Performs various feats of heroism to ensure the bundle is well-formed 384 and upholds invariants, including: 385 386 * The "leaf" (signing) certificate is present; 387 * There is a inclusion proof present, even if the Bundle's version 388 predates a mandatory inclusion proof. 389 """ 390 391 # The bundle must have a recognized media type. 392 try: 393 media_type = Bundle.BundleType(self._inner.media_type) 394 except ValueError: 395 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 396 397 # Extract the signing certificate. 398 if media_type in ( 399 Bundle.BundleType.BUNDLE_0_3, 400 Bundle.BundleType.BUNDLE_0_3_ALT, 401 ): 402 # For "v3" bundles, the signing certificate is the only one present. 403 leaf_cert = load_der_x509_certificate( 404 self._inner.verification_material.certificate.raw_bytes 405 ) 406 else: 407 # In older bundles, there is an entire pool (misleadingly called 408 # a chain) of certificates, the first of which is the signing 409 # certificate. 410 certs = ( 411 self._inner.verification_material.x509_certificate_chain.certificates 412 ) 413 414 if len(certs) == 0: 415 raise InvalidBundle("expected non-empty certificate chain in bundle") 416 417 # Per client policy in protobuf-specs: the first entry in the chain 418 # MUST be a leaf certificate, and the rest of the chain MUST NOT 419 # include a root CA or any intermediate CAs that appear in an 420 # independent root of trust. 421 # 422 # We expect some old bundles to violate the rules around root 423 # and intermediate CAs, so we issue warnings and not hard errors 424 # in those cases. 425 leaf_cert, *chain_certs = [ 426 load_der_x509_certificate(cert.raw_bytes) for cert in certs 427 ] 428 if not cert_is_leaf(leaf_cert): 429 raise InvalidBundle( 430 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 431 ) 432 433 for chain_cert in chain_certs: 434 # TODO: We should also retrieve the root of trust here and 435 # cross-check against it. 436 if cert_is_root_ca(chain_cert): 437 _logger.warning( 438 "this bundle contains a root CA, making it subject to misuse" 439 ) 440 441 self._signing_certificate = leaf_cert 442 443 # Extract the log entry. For the time being, we expect 444 # bundles to only contain a single log entry. 445 tlog_entries = self._inner.verification_material.tlog_entries 446 if len(tlog_entries) != 1: 447 raise InvalidBundle("expected exactly one log entry in bundle") 448 tlog_entry = tlog_entries[0] 449 450 # Handling of inclusion promises and proofs varies between bundle 451 # format versions: 452 # 453 # * For 0.1, an inclusion promise is required; the client 454 # MUST verify the inclusion promise. 455 # The inclusion proof is NOT required. If provided, it might NOT 456 # contain a checkpoint; in this case, we ignore it (since it's 457 # useless without one). 458 # 459 # * For 0.2+, an inclusion proof is required; the client MUST 460 # verify the inclusion proof. The inclusion prof MUST contain 461 # a checkpoint. 462 # The inclusion promise is NOT required; if present, the client 463 # SHOULD verify it. 464 # 465 # Before all of this, we require that the inclusion proof be present 466 # (when constructing the LogEntry). 467 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) 468 469 if media_type == Bundle.BundleType.BUNDLE_0_1: 470 if not log_entry.inclusion_promise: 471 raise InvalidBundle("bundle must contain an inclusion promise") 472 if not log_entry.inclusion_proof.checkpoint: 473 _logger.debug( 474 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 475 ) 476 else: 477 if not log_entry.inclusion_proof.checkpoint: 478 raise InvalidBundle("expected checkpoint in inclusion proof") 479 480 self._log_entry = log_entry 481 482 @property 483 def signing_certificate(self) -> Certificate: 484 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 485 return self._signing_certificate 486 487 @property 488 def log_entry(self) -> LogEntry: 489 """ 490 Returns the bundle's log entry, containing an inclusion proof 491 (with checkpoint) and an inclusion promise (if the latter is present). 492 """ 493 return self._log_entry 494 495 @property 496 def _dsse_envelope(self) -> dsse.Envelope | None: 497 """ 498 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 499 500 @private 501 """ 502 if self._inner.dsse_envelope: 503 return dsse.Envelope(self._inner.dsse_envelope) 504 return None 505 506 @classmethod 507 def from_json(cls, raw: bytes | str) -> Bundle: 508 """ 509 Deserialize the given Sigstore bundle. 510 """ 511 inner = _Bundle().from_json(raw) 512 return cls(inner) 513 514 def to_json(self) -> str: 515 """ 516 Return a JSON encoding of this bundle. 517 """ 518 return self._inner.to_json() 519 520 def _to_parts( 521 self, 522 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]: 523 """ 524 Decompose the `Bundle` into its core constituent parts. 525 526 @private 527 """ 528 529 content: common_v1.MessageSignature | dsse.Envelope 530 if self._dsse_envelope: 531 content = self._dsse_envelope 532 else: 533 content = self._inner.message_signature 534 535 return (self.signing_certificate, content, self.log_entry) 536 537 @classmethod 538 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 539 """ 540 Construct a Sigstore bundle (of `hashedrekord` type) from its 541 constituent parts. 542 """ 543 544 return cls._from_parts( 545 cert, common_v1.MessageSignature(signature=sig), log_entry 546 ) 547 548 @classmethod 549 def _from_parts( 550 cls, 551 cert: Certificate, 552 content: common_v1.MessageSignature | dsse.Envelope, 553 log_entry: LogEntry, 554 ) -> Bundle: 555 """ 556 @private 557 """ 558 559 inner = _Bundle( 560 media_type=Bundle.BundleType.BUNDLE_0_3.value, 561 verification_material=bundle_v1.VerificationMaterial( 562 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)), 563 ), 564 ) 565 566 # Fill in the appropriate variants. 567 if isinstance(content, common_v1.MessageSignature): 568 inner.message_signature = content 569 else: 570 inner.dsse_envelope = content._inner 571 572 tlog_entry = log_entry._to_rekor() 573 inner.verification_material.tlog_entries = [tlog_entry] 574 575 return cls(inner)
75class LogInclusionProof(BaseModel): 76 """ 77 Represents an inclusion proof for a transparency log entry. 78 """ 79 80 model_config = ConfigDict(populate_by_name=True) 81 82 checkpoint: StrictStr = Field(..., alias="checkpoint") 83 hashes: List[StrictStr] = Field(..., alias="hashes") 84 log_index: StrictInt = Field(..., alias="logIndex") 85 root_hash: StrictStr = Field(..., alias="rootHash") 86 tree_size: StrictInt = Field(..., alias="treeSize") 87 88 @field_validator("log_index") 89 def _log_index_positive(cls, v: int) -> int: 90 if v < 0: 91 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") 92 return v 93 94 @field_validator("tree_size") 95 def _tree_size_positive(cls, v: int) -> int: 96 if v < 0: 97 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") 98 return v 99 100 @field_validator("tree_size") 101 def _log_index_within_tree_size( 102 cls, v: int, info: ValidationInfo, **kwargs: Any 103 ) -> int: 104 if "log_index" in info.data and v <= info.data["log_index"]: 105 raise ValueError( 106 "Inclusion proof has log index greater than or equal to tree size: " 107 f"{v} <= {info.data['log_index']}" 108 ) 109 return v
Represents an inclusion proof for a transparency log entry.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
112@dataclass(frozen=True) 113class LogEntry: 114 """ 115 Represents a transparency log entry. 116 117 Log entries are retrieved from the transparency log after signing or verification events, 118 or loaded from "Sigstore" bundles provided by the user. 119 120 This representation allows for either a missing inclusion promise or a missing 121 inclusion proof, but not both: attempting to construct a `LogEntry` without 122 at least one will fail. 123 """ 124 125 uuid: Optional[str] 126 """ 127 This entry's unique ID in the log instance it was retrieved from. 128 129 For sharded log deployments, IDs are unique per-shard. 130 131 Not present for `LogEntry` instances loaded from Sigstore bundles. 132 """ 133 134 body: B64Str 135 """ 136 The base64-encoded body of the transparency log entry. 137 """ 138 139 integrated_time: int 140 """ 141 The UNIX time at which this entry was integrated into the transparency log. 142 """ 143 144 log_id: str 145 """ 146 The log's ID (as the SHA256 hash of the DER-encoded public key for the log 147 at the time of entry inclusion). 148 """ 149 150 log_index: int 151 """ 152 The index of this entry within the log. 153 """ 154 155 inclusion_proof: LogInclusionProof 156 """ 157 An inclusion proof for this log entry. 158 """ 159 160 inclusion_promise: Optional[B64Str] 161 """ 162 An inclusion promise for this log entry, if present. 163 164 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this 165 log entry. 166 """ 167 168 @classmethod 169 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: 170 """ 171 Create a new `LogEntry` from the given API response. 172 """ 173 174 # Assumes we only get one entry back 175 entries = list(dict_.items()) 176 if len(entries) != 1: 177 raise ValueError("Received multiple entries in response") 178 179 uuid, entry = entries[0] 180 return LogEntry( 181 uuid=uuid, 182 body=entry["body"], 183 integrated_time=entry["integratedTime"], 184 log_id=entry["logID"], 185 log_index=entry["logIndex"], 186 inclusion_proof=LogInclusionProof.model_validate( 187 entry["verification"]["inclusionProof"] 188 ), 189 inclusion_promise=entry["verification"]["signedEntryTimestamp"], 190 ) 191 192 @classmethod 193 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: 194 """ 195 Create a new `LogEntry` from the given Rekor TransparencyLogEntry. 196 """ 197 tlog_entry = rekor_v1.TransparencyLogEntry() 198 tlog_entry.from_dict(dict_) 199 200 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof 201 # This check is required by us as the client, not the 202 # protobuf-specs themselves. 203 if not inclusion_proof or not inclusion_proof.checkpoint.envelope: 204 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 205 206 parsed_inclusion_proof = LogInclusionProof( 207 checkpoint=inclusion_proof.checkpoint.envelope, 208 hashes=[h.hex() for h in inclusion_proof.hashes], 209 log_index=inclusion_proof.log_index, 210 root_hash=inclusion_proof.root_hash.hex(), 211 tree_size=inclusion_proof.tree_size, 212 ) 213 214 return LogEntry( 215 uuid=None, 216 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), 217 integrated_time=tlog_entry.integrated_time, 218 log_id=tlog_entry.log_id.key_id.hex(), 219 log_index=tlog_entry.log_index, 220 inclusion_proof=parsed_inclusion_proof, 221 inclusion_promise=B64Str( 222 base64.b64encode( 223 tlog_entry.inclusion_promise.signed_entry_timestamp 224 ).decode() 225 ), 226 ) 227 228 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: 229 """ 230 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`. 231 232 @private 233 """ 234 inclusion_promise: rekor_v1.InclusionPromise | None = None 235 if self.inclusion_promise: 236 inclusion_promise = rekor_v1.InclusionPromise( 237 signed_entry_timestamp=base64.b64decode(self.inclusion_promise) 238 ) 239 240 inclusion_proof = rekor_v1.InclusionProof( 241 log_index=self.inclusion_proof.log_index, 242 root_hash=bytes.fromhex(self.inclusion_proof.root_hash), 243 tree_size=self.inclusion_proof.tree_size, 244 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], 245 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), 246 ) 247 248 tlog_entry = rekor_v1.TransparencyLogEntry( 249 log_index=self.log_index, 250 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), 251 integrated_time=self.integrated_time, 252 inclusion_promise=inclusion_promise, # type: ignore[arg-type] 253 inclusion_proof=inclusion_proof, 254 canonicalized_body=base64.b64decode(self.body), 255 ) 256 257 # Fill in the appropriate kind 258 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 259 tlog_entry.canonicalized_body 260 ) 261 if not isinstance(body_entry, (Hashedrekord, Dsse)): 262 raise InvalidBundle("log entry is not of expected type") 263 264 tlog_entry.kind_version = rekor_v1.KindVersion( 265 kind=body_entry.kind, version=body_entry.api_version 266 ) 267 268 return tlog_entry 269 270 def encode_canonical(self) -> bytes: 271 """ 272 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 273 274 This encoded representation is suitable for verification against 275 the Signed Entry Timestamp. 276 """ 277 payload: dict[str, int | str] = { 278 "body": self.body, 279 "integratedTime": self.integrated_time, 280 "logID": self.log_id, 281 "logIndex": self.log_index, 282 } 283 284 return rfc8785.dumps(payload) 285 286 def _verify_set(self, keyring: RekorKeyring) -> None: 287 """ 288 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 289 `entry` using the given `keyring`. 290 291 Fails if the given log entry does not contain an inclusion promise. 292 """ 293 294 if self.inclusion_promise is None: 295 raise VerificationError("SET: invalid inclusion promise: missing") 296 297 signed_entry_ts = base64.b64decode(self.inclusion_promise) 298 299 try: 300 keyring.verify( 301 key_id=KeyID(bytes.fromhex(self.log_id)), 302 signature=signed_entry_ts, 303 data=self.encode_canonical(), 304 ) 305 except VerificationError as exc: 306 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 307 308 def _verify(self, keyring: RekorKeyring) -> None: 309 """ 310 Verifies this log entry. 311 312 This method performs steps (5), (6), and optionally (7) in 313 the top-level verify API: 314 315 * Verifies the consistency of the entry with the given bundle; 316 * Verifies the Merkle inclusion proof and its signed checkpoint; 317 * Verifies the inclusion promise, if present. 318 """ 319 320 verify_merkle_inclusion(self) 321 verify_checkpoint(keyring, self) 322 323 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") 324 325 if self.inclusion_promise: 326 self._verify_set(keyring) 327 _logger.debug( 328 f"successfully verified inclusion promise: index={self.log_index}" 329 )
Represents a transparency log entry.
Log entries are retrieved from the transparency log after signing or verification events, or loaded from "Sigstore" bundles provided by the user.
This representation allows for either a missing inclusion promise or a missing
inclusion proof, but not both: attempting to construct a LogEntry
without
at least one will fail.
This entry's unique ID in the log instance it was retrieved from.
For sharded log deployments, IDs are unique per-shard.
Not present for LogEntry
instances loaded from Sigstore bundles.
The log's ID (as the SHA256 hash of the DER-encoded public key for the log at the time of entry inclusion).
An inclusion promise for this log entry, if present.
Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this log entry.
270 def encode_canonical(self) -> bytes: 271 """ 272 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 273 274 This encoded representation is suitable for verification against 275 the Signed Entry Timestamp. 276 """ 277 payload: dict[str, int | str] = { 278 "body": self.body, 279 "integratedTime": self.integrated_time, 280 "logID": self.log_id, 281 "logIndex": self.log_index, 282 } 283 284 return rfc8785.dumps(payload)
Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
This encoded representation is suitable for verification against the Signed Entry Timestamp.
332class InvalidBundle(Error): 333 """ 334 Raised when the associated `Bundle` is invalid in some way. 335 """ 336 337 def diagnostics(self) -> str: 338 """Returns diagnostics for the error.""" 339 340 return dedent( 341 f"""\ 342 An issue occurred while parsing the Sigstore bundle. 343 344 The provided bundle is malformed and may have been modified maliciously. 345 346 Additional context: 347 348 {self} 349 """ 350 )
Raised when the associated Bundle
is invalid in some way.
337 def diagnostics(self) -> str: 338 """Returns diagnostics for the error.""" 339 340 return dedent( 341 f"""\ 342 An issue occurred while parsing the Sigstore bundle. 343 344 The provided bundle is malformed and may have been modified maliciously. 345 346 Additional context: 347 348 {self} 349 """ 350 )
Returns diagnostics for the error.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
353class Bundle: 354 """ 355 Represents a Sigstore bundle. 356 """ 357 358 class BundleType(str, Enum): 359 """ 360 Known Sigstore bundle media types. 361 """ 362 363 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 364 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 365 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 366 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 367 368 def __str__(self) -> str: 369 """Returns the variant's string value.""" 370 return self.value 371 372 def __init__(self, inner: _Bundle) -> None: 373 """ 374 Creates a new bundle. This is not a public API; use 375 `from_json` instead. 376 377 @private 378 """ 379 self._inner = inner 380 self._verify() 381 382 def _verify(self) -> None: 383 """ 384 Performs various feats of heroism to ensure the bundle is well-formed 385 and upholds invariants, including: 386 387 * The "leaf" (signing) certificate is present; 388 * There is a inclusion proof present, even if the Bundle's version 389 predates a mandatory inclusion proof. 390 """ 391 392 # The bundle must have a recognized media type. 393 try: 394 media_type = Bundle.BundleType(self._inner.media_type) 395 except ValueError: 396 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 397 398 # Extract the signing certificate. 399 if media_type in ( 400 Bundle.BundleType.BUNDLE_0_3, 401 Bundle.BundleType.BUNDLE_0_3_ALT, 402 ): 403 # For "v3" bundles, the signing certificate is the only one present. 404 leaf_cert = load_der_x509_certificate( 405 self._inner.verification_material.certificate.raw_bytes 406 ) 407 else: 408 # In older bundles, there is an entire pool (misleadingly called 409 # a chain) of certificates, the first of which is the signing 410 # certificate. 411 certs = ( 412 self._inner.verification_material.x509_certificate_chain.certificates 413 ) 414 415 if len(certs) == 0: 416 raise InvalidBundle("expected non-empty certificate chain in bundle") 417 418 # Per client policy in protobuf-specs: the first entry in the chain 419 # MUST be a leaf certificate, and the rest of the chain MUST NOT 420 # include a root CA or any intermediate CAs that appear in an 421 # independent root of trust. 422 # 423 # We expect some old bundles to violate the rules around root 424 # and intermediate CAs, so we issue warnings and not hard errors 425 # in those cases. 426 leaf_cert, *chain_certs = [ 427 load_der_x509_certificate(cert.raw_bytes) for cert in certs 428 ] 429 if not cert_is_leaf(leaf_cert): 430 raise InvalidBundle( 431 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 432 ) 433 434 for chain_cert in chain_certs: 435 # TODO: We should also retrieve the root of trust here and 436 # cross-check against it. 437 if cert_is_root_ca(chain_cert): 438 _logger.warning( 439 "this bundle contains a root CA, making it subject to misuse" 440 ) 441 442 self._signing_certificate = leaf_cert 443 444 # Extract the log entry. For the time being, we expect 445 # bundles to only contain a single log entry. 446 tlog_entries = self._inner.verification_material.tlog_entries 447 if len(tlog_entries) != 1: 448 raise InvalidBundle("expected exactly one log entry in bundle") 449 tlog_entry = tlog_entries[0] 450 451 # Handling of inclusion promises and proofs varies between bundle 452 # format versions: 453 # 454 # * For 0.1, an inclusion promise is required; the client 455 # MUST verify the inclusion promise. 456 # The inclusion proof is NOT required. If provided, it might NOT 457 # contain a checkpoint; in this case, we ignore it (since it's 458 # useless without one). 459 # 460 # * For 0.2+, an inclusion proof is required; the client MUST 461 # verify the inclusion proof. The inclusion prof MUST contain 462 # a checkpoint. 463 # The inclusion promise is NOT required; if present, the client 464 # SHOULD verify it. 465 # 466 # Before all of this, we require that the inclusion proof be present 467 # (when constructing the LogEntry). 468 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) 469 470 if media_type == Bundle.BundleType.BUNDLE_0_1: 471 if not log_entry.inclusion_promise: 472 raise InvalidBundle("bundle must contain an inclusion promise") 473 if not log_entry.inclusion_proof.checkpoint: 474 _logger.debug( 475 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 476 ) 477 else: 478 if not log_entry.inclusion_proof.checkpoint: 479 raise InvalidBundle("expected checkpoint in inclusion proof") 480 481 self._log_entry = log_entry 482 483 @property 484 def signing_certificate(self) -> Certificate: 485 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 486 return self._signing_certificate 487 488 @property 489 def log_entry(self) -> LogEntry: 490 """ 491 Returns the bundle's log entry, containing an inclusion proof 492 (with checkpoint) and an inclusion promise (if the latter is present). 493 """ 494 return self._log_entry 495 496 @property 497 def _dsse_envelope(self) -> dsse.Envelope | None: 498 """ 499 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 500 501 @private 502 """ 503 if self._inner.dsse_envelope: 504 return dsse.Envelope(self._inner.dsse_envelope) 505 return None 506 507 @classmethod 508 def from_json(cls, raw: bytes | str) -> Bundle: 509 """ 510 Deserialize the given Sigstore bundle. 511 """ 512 inner = _Bundle().from_json(raw) 513 return cls(inner) 514 515 def to_json(self) -> str: 516 """ 517 Return a JSON encoding of this bundle. 518 """ 519 return self._inner.to_json() 520 521 def _to_parts( 522 self, 523 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]: 524 """ 525 Decompose the `Bundle` into its core constituent parts. 526 527 @private 528 """ 529 530 content: common_v1.MessageSignature | dsse.Envelope 531 if self._dsse_envelope: 532 content = self._dsse_envelope 533 else: 534 content = self._inner.message_signature 535 536 return (self.signing_certificate, content, self.log_entry) 537 538 @classmethod 539 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 540 """ 541 Construct a Sigstore bundle (of `hashedrekord` type) from its 542 constituent parts. 543 """ 544 545 return cls._from_parts( 546 cert, common_v1.MessageSignature(signature=sig), log_entry 547 ) 548 549 @classmethod 550 def _from_parts( 551 cls, 552 cert: Certificate, 553 content: common_v1.MessageSignature | dsse.Envelope, 554 log_entry: LogEntry, 555 ) -> Bundle: 556 """ 557 @private 558 """ 559 560 inner = _Bundle( 561 media_type=Bundle.BundleType.BUNDLE_0_3.value, 562 verification_material=bundle_v1.VerificationMaterial( 563 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)), 564 ), 565 ) 566 567 # Fill in the appropriate variants. 568 if isinstance(content, common_v1.MessageSignature): 569 inner.message_signature = content 570 else: 571 inner.dsse_envelope = content._inner 572 573 tlog_entry = log_entry._to_rekor() 574 inner.verification_material.tlog_entries = [tlog_entry] 575 576 return cls(inner)
Represents a Sigstore bundle.
483 @property 484 def signing_certificate(self) -> Certificate: 485 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 486 return self._signing_certificate
Returns the bundle's contained signing (i.e. leaf) certificate.
488 @property 489 def log_entry(self) -> LogEntry: 490 """ 491 Returns the bundle's log entry, containing an inclusion proof 492 (with checkpoint) and an inclusion promise (if the latter is present). 493 """ 494 return self._log_entry
Returns the bundle's log entry, containing an inclusion proof (with checkpoint) and an inclusion promise (if the latter is present).
507 @classmethod 508 def from_json(cls, raw: bytes | str) -> Bundle: 509 """ 510 Deserialize the given Sigstore bundle. 511 """ 512 inner = _Bundle().from_json(raw) 513 return cls(inner)
Deserialize the given Sigstore bundle.
515 def to_json(self) -> str: 516 """ 517 Return a JSON encoding of this bundle. 518 """ 519 return self._inner.to_json()
Return a JSON encoding of this bundle.
538 @classmethod 539 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 540 """ 541 Construct a Sigstore bundle (of `hashedrekord` type) from its 542 constituent parts. 543 """ 544 545 return cls._from_parts( 546 cert, common_v1.MessageSignature(signature=sig), log_entry 547 )
Construct a Sigstore bundle (of hashedrekord
type) from its
constituent parts.
358 class BundleType(str, Enum): 359 """ 360 Known Sigstore bundle media types. 361 """ 362 363 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 364 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 365 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 366 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 367 368 def __str__(self) -> str: 369 """Returns the variant's string value.""" 370 return self.value
Known Sigstore bundle media types.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans