sigstore.models
Common models shared between signing and verification.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Common models shared between signing and verification. 17""" 18 19from __future__ import annotations 20 21import base64 22import logging 23import typing 24from enum import Enum 25from textwrap import dedent 26from typing import Any, List, Optional 27 28import rfc8785 29from cryptography.hazmat.primitives.serialization import Encoding 30from cryptography.x509 import ( 31 Certificate, 32 load_der_x509_certificate, 33) 34from pydantic import ( 35 BaseModel, 36 ConfigDict, 37 Field, 38 StrictInt, 39 StrictStr, 40 TypeAdapter, 41 ValidationInfo, 42 field_validator, 43) 44from pydantic.dataclasses import dataclass 45from rekor_types import Dsse, Hashedrekord, ProposedEntry 46from rfc3161_client import TimeStampResponse, decode_timestamp_response 47from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1 48from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 49 Bundle as _Bundle, 50) 51from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 52 TimestampVerificationData as _TimestampVerificationData, 53) 54from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( 55 VerificationMaterial as _VerificationMaterial, 56) 57from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 58from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 59from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import ( 60 InclusionProof, 61) 62 63from sigstore import dsse 64from sigstore._internal.merkle import verify_merkle_inclusion 65from sigstore._internal.rekor.checkpoint import verify_checkpoint 66from sigstore._utils import ( 67 B64Str, 68 KeyID, 69 cert_is_leaf, 70 cert_is_root_ca, 71) 72from sigstore.errors import Error, VerificationError 73 74if typing.TYPE_CHECKING: 75 from sigstore._internal.trust import RekorKeyring 76 77 78_logger = logging.getLogger(__name__) 79 80 81class LogInclusionProof(BaseModel): 82 """ 83 Represents an inclusion proof for a transparency log entry. 84 """ 85 86 model_config = ConfigDict(populate_by_name=True) 87 88 checkpoint: StrictStr = Field(..., alias="checkpoint") 89 hashes: List[StrictStr] = Field(..., alias="hashes") 90 log_index: StrictInt = Field(..., alias="logIndex") 91 root_hash: StrictStr = Field(..., alias="rootHash") 92 tree_size: StrictInt = Field(..., alias="treeSize") 93 94 @field_validator("log_index") 95 def _log_index_positive(cls, v: int) -> int: 96 if v < 0: 97 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") 98 return v 99 100 @field_validator("tree_size") 101 def _tree_size_positive(cls, v: int) -> int: 102 if v < 0: 103 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") 104 return v 105 106 @field_validator("tree_size") 107 def _log_index_within_tree_size( 108 cls, v: int, info: ValidationInfo, **kwargs: Any 109 ) -> int: 110 if "log_index" in info.data and v <= info.data["log_index"]: 111 raise ValueError( 112 "Inclusion proof has log index greater than or equal to tree size: " 113 f"{v} <= {info.data['log_index']}" 114 ) 115 return v 116 117 118@dataclass(frozen=True) 119class LogEntry: 120 """ 121 Represents a transparency log entry. 122 123 Log entries are retrieved from the transparency log after signing or verification events, 124 or loaded from "Sigstore" bundles provided by the user. 125 126 This representation allows for either a missing inclusion promise or a missing 127 inclusion proof, but not both: attempting to construct a `LogEntry` without 128 at least one will fail. 129 """ 130 131 uuid: Optional[str] 132 """ 133 This entry's unique ID in the log instance it was retrieved from. 134 135 For sharded log deployments, IDs are unique per-shard. 136 137 Not present for `LogEntry` instances loaded from Sigstore bundles. 138 """ 139 140 body: B64Str 141 """ 142 The base64-encoded body of the transparency log entry. 143 """ 144 145 integrated_time: int 146 """ 147 The UNIX time at which this entry was integrated into the transparency log. 148 """ 149 150 log_id: str 151 """ 152 The log's ID (as the SHA256 hash of the DER-encoded public key for the log 153 at the time of entry inclusion). 154 """ 155 156 log_index: int 157 """ 158 The index of this entry within the log. 159 """ 160 161 inclusion_proof: LogInclusionProof 162 """ 163 An inclusion proof for this log entry. 164 """ 165 166 inclusion_promise: Optional[B64Str] 167 """ 168 An inclusion promise for this log entry, if present. 169 170 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this 171 log entry. 172 """ 173 174 @classmethod 175 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: 176 """ 177 Create a new `LogEntry` from the given API response. 178 """ 179 180 # Assumes we only get one entry back 181 entries = list(dict_.items()) 182 if len(entries) != 1: 183 raise ValueError("Received multiple entries in response") 184 185 uuid, entry = entries[0] 186 return LogEntry( 187 uuid=uuid, 188 body=entry["body"], 189 integrated_time=entry["integratedTime"], 190 log_id=entry["logID"], 191 log_index=entry["logIndex"], 192 inclusion_proof=LogInclusionProof.model_validate( 193 entry["verification"]["inclusionProof"] 194 ), 195 inclusion_promise=entry["verification"]["signedEntryTimestamp"], 196 ) 197 198 @classmethod 199 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: 200 """ 201 Create a new `LogEntry` from the given Rekor TransparencyLogEntry. 202 """ 203 tlog_entry = rekor_v1.TransparencyLogEntry() 204 tlog_entry.from_dict(dict_) 205 206 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof 207 # This check is required by us as the client, not the 208 # protobuf-specs themselves. 209 if not inclusion_proof or not inclusion_proof.checkpoint.envelope: 210 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 211 212 parsed_inclusion_proof = LogInclusionProof( 213 checkpoint=inclusion_proof.checkpoint.envelope, 214 hashes=[h.hex() for h in inclusion_proof.hashes], 215 log_index=inclusion_proof.log_index, 216 root_hash=inclusion_proof.root_hash.hex(), 217 tree_size=inclusion_proof.tree_size, 218 ) 219 220 return LogEntry( 221 uuid=None, 222 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), 223 integrated_time=tlog_entry.integrated_time, 224 log_id=tlog_entry.log_id.key_id.hex(), 225 log_index=tlog_entry.log_index, 226 inclusion_proof=parsed_inclusion_proof, 227 inclusion_promise=B64Str( 228 base64.b64encode( 229 tlog_entry.inclusion_promise.signed_entry_timestamp 230 ).decode() 231 ), 232 ) 233 234 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: 235 """ 236 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`. 237 238 @private 239 """ 240 inclusion_promise: rekor_v1.InclusionPromise | None = None 241 if self.inclusion_promise: 242 inclusion_promise = rekor_v1.InclusionPromise( 243 signed_entry_timestamp=base64.b64decode(self.inclusion_promise) 244 ) 245 246 inclusion_proof = rekor_v1.InclusionProof( 247 log_index=self.inclusion_proof.log_index, 248 root_hash=bytes.fromhex(self.inclusion_proof.root_hash), 249 tree_size=self.inclusion_proof.tree_size, 250 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], 251 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), 252 ) 253 254 tlog_entry = rekor_v1.TransparencyLogEntry( 255 log_index=self.log_index, 256 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), 257 integrated_time=self.integrated_time, 258 inclusion_promise=inclusion_promise, # type: ignore[arg-type] 259 inclusion_proof=inclusion_proof, 260 canonicalized_body=base64.b64decode(self.body), 261 ) 262 263 # Fill in the appropriate kind 264 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 265 tlog_entry.canonicalized_body 266 ) 267 if not isinstance(body_entry, (Hashedrekord, Dsse)): 268 raise InvalidBundle("log entry is not of expected type") 269 270 tlog_entry.kind_version = rekor_v1.KindVersion( 271 kind=body_entry.kind, version=body_entry.api_version 272 ) 273 274 return tlog_entry 275 276 def encode_canonical(self) -> bytes: 277 """ 278 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 279 280 This encoded representation is suitable for verification against 281 the Signed Entry Timestamp. 282 """ 283 payload: dict[str, int | str] = { 284 "body": self.body, 285 "integratedTime": self.integrated_time, 286 "logID": self.log_id, 287 "logIndex": self.log_index, 288 } 289 290 return rfc8785.dumps(payload) 291 292 def _verify_set(self, keyring: RekorKeyring) -> None: 293 """ 294 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 295 `entry` using the given `keyring`. 296 297 Fails if the given log entry does not contain an inclusion promise. 298 """ 299 300 if self.inclusion_promise is None: 301 raise VerificationError("SET: invalid inclusion promise: missing") 302 303 signed_entry_ts = base64.b64decode(self.inclusion_promise) 304 305 try: 306 keyring.verify( 307 key_id=KeyID(bytes.fromhex(self.log_id)), 308 signature=signed_entry_ts, 309 data=self.encode_canonical(), 310 ) 311 except VerificationError as exc: 312 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 313 314 def _verify(self, keyring: RekorKeyring) -> None: 315 """ 316 Verifies this log entry. 317 318 This method performs steps (5), (6), and optionally (7) in 319 the top-level verify API: 320 321 * Verifies the consistency of the entry with the given bundle; 322 * Verifies the Merkle inclusion proof and its signed checkpoint; 323 * Verifies the inclusion promise, if present. 324 """ 325 326 verify_merkle_inclusion(self) 327 verify_checkpoint(keyring, self) 328 329 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") 330 331 if self.inclusion_promise: 332 self._verify_set(keyring) 333 _logger.debug( 334 f"successfully verified inclusion promise: index={self.log_index}" 335 ) 336 337 338class TimestampVerificationData: 339 """ 340 Represents a TimestampVerificationData structure. 341 342 @private 343 """ 344 345 def __init__(self, inner: _TimestampVerificationData) -> None: 346 """Init method.""" 347 self._inner = inner 348 self._verify() 349 350 def _verify(self) -> None: 351 """ 352 Verifies the TimestampVerificationData. 353 354 It verifies that TimeStamp Responses embedded in the bundle are correctly 355 formed. 356 """ 357 try: 358 self._signed_ts = [ 359 decode_timestamp_response(ts.signed_timestamp) 360 for ts in self._inner.rfc3161_timestamps 361 ] 362 except ValueError: 363 raise VerificationError("Invalid Timestamp Response") 364 365 @property 366 def rfc3161_timestamps(self) -> list[TimeStampResponse]: 367 """Returns a list of signed timestamp.""" 368 return self._signed_ts 369 370 @classmethod 371 def from_json(cls, raw: str | bytes) -> TimestampVerificationData: 372 """ 373 Deserialize the given timestamp verification data. 374 """ 375 inner = _TimestampVerificationData().from_json(raw) 376 return cls(inner) 377 378 379class VerificationMaterial: 380 """ 381 Represents a VerificationMaterial structure. 382 """ 383 384 def __init__(self, inner: _VerificationMaterial) -> None: 385 """Init method.""" 386 self._inner = inner 387 388 @property 389 def timestamp_verification_data(self) -> TimestampVerificationData: 390 """ 391 Returns the Timestamp Verification Data. 392 """ 393 return TimestampVerificationData(self._inner.timestamp_verification_data) 394 395 396class InvalidBundle(Error): 397 """ 398 Raised when the associated `Bundle` is invalid in some way. 399 """ 400 401 def diagnostics(self) -> str: 402 """Returns diagnostics for the error.""" 403 404 return dedent( 405 f"""\ 406 An issue occurred while parsing the Sigstore bundle. 407 408 The provided bundle is malformed and may have been modified maliciously. 409 410 Additional context: 411 412 {self} 413 """ 414 ) 415 416 417class Bundle: 418 """ 419 Represents a Sigstore bundle. 420 """ 421 422 class BundleType(str, Enum): 423 """ 424 Known Sigstore bundle media types. 425 """ 426 427 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 428 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 429 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 430 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 431 432 def __str__(self) -> str: 433 """Returns the variant's string value.""" 434 return self.value 435 436 def __init__(self, inner: _Bundle) -> None: 437 """ 438 Creates a new bundle. This is not a public API; use 439 `from_json` instead. 440 441 @private 442 """ 443 self._inner = inner 444 self._verify() 445 446 def _verify(self) -> None: 447 """ 448 Performs various feats of heroism to ensure the bundle is well-formed 449 and upholds invariants, including: 450 451 * The "leaf" (signing) certificate is present; 452 * There is a inclusion proof present, even if the Bundle's version 453 predates a mandatory inclusion proof. 454 """ 455 456 # The bundle must have a recognized media type. 457 try: 458 media_type = Bundle.BundleType(self._inner.media_type) 459 except ValueError: 460 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 461 462 # Extract the signing certificate. 463 if media_type in ( 464 Bundle.BundleType.BUNDLE_0_3, 465 Bundle.BundleType.BUNDLE_0_3_ALT, 466 ): 467 # For "v3" bundles, the signing certificate is the only one present. 468 leaf_cert = load_der_x509_certificate( 469 self._inner.verification_material.certificate.raw_bytes 470 ) 471 else: 472 # In older bundles, there is an entire pool (misleadingly called 473 # a chain) of certificates, the first of which is the signing 474 # certificate. 475 certs = ( 476 self._inner.verification_material.x509_certificate_chain.certificates 477 ) 478 479 if len(certs) == 0: 480 raise InvalidBundle("expected non-empty certificate chain in bundle") 481 482 # Per client policy in protobuf-specs: the first entry in the chain 483 # MUST be a leaf certificate, and the rest of the chain MUST NOT 484 # include a root CA or any intermediate CAs that appear in an 485 # independent root of trust. 486 # 487 # We expect some old bundles to violate the rules around root 488 # and intermediate CAs, so we issue warnings and not hard errors 489 # in those cases. 490 leaf_cert, *chain_certs = [ 491 load_der_x509_certificate(cert.raw_bytes) for cert in certs 492 ] 493 if not cert_is_leaf(leaf_cert): 494 raise InvalidBundle( 495 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 496 ) 497 498 for chain_cert in chain_certs: 499 # TODO: We should also retrieve the root of trust here and 500 # cross-check against it. 501 if cert_is_root_ca(chain_cert): 502 _logger.warning( 503 "this bundle contains a root CA, making it subject to misuse" 504 ) 505 506 self._signing_certificate = leaf_cert 507 508 # Extract the log entry. For the time being, we expect 509 # bundles to only contain a single log entry. 510 tlog_entries = self._inner.verification_material.tlog_entries 511 if len(tlog_entries) != 1: 512 raise InvalidBundle("expected exactly one log entry in bundle") 513 tlog_entry = tlog_entries[0] 514 515 # Handling of inclusion promises and proofs varies between bundle 516 # format versions: 517 # 518 # * For 0.1, an inclusion promise is required; the client 519 # MUST verify the inclusion promise. 520 # The inclusion proof is NOT required. If provided, it might NOT 521 # contain a checkpoint; in this case, we ignore it (since it's 522 # useless without one). 523 # 524 # * For 0.2+, an inclusion proof is required; the client MUST 525 # verify the inclusion proof. The inclusion prof MUST contain 526 # a checkpoint. 527 # The inclusion promise is NOT required; if present, the client 528 # SHOULD verify it. 529 # 530 # Before all of this, we require that the inclusion proof be present 531 # (when constructing the LogEntry). 532 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) 533 534 if media_type == Bundle.BundleType.BUNDLE_0_1: 535 if not log_entry.inclusion_promise: 536 raise InvalidBundle("bundle must contain an inclusion promise") 537 if not log_entry.inclusion_proof.checkpoint: 538 _logger.debug( 539 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 540 ) 541 else: 542 if not log_entry.inclusion_proof.checkpoint: 543 raise InvalidBundle("expected checkpoint in inclusion proof") 544 545 self._log_entry = log_entry 546 547 @property 548 def signing_certificate(self) -> Certificate: 549 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 550 return self._signing_certificate 551 552 @property 553 def log_entry(self) -> LogEntry: 554 """ 555 Returns the bundle's log entry, containing an inclusion proof 556 (with checkpoint) and an inclusion promise (if the latter is present). 557 """ 558 return self._log_entry 559 560 @property 561 def _dsse_envelope(self) -> dsse.Envelope | None: 562 """ 563 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 564 565 @private 566 """ 567 if self._inner.dsse_envelope: 568 return dsse.Envelope(self._inner.dsse_envelope) 569 return None 570 571 @property 572 def signature(self) -> bytes: 573 """ 574 Returns the signature bytes of this bundle. 575 Either from the DSSE Envelope or from the message itself. 576 """ 577 return ( 578 self._dsse_envelope.signature 579 if self._dsse_envelope 580 else self._inner.message_signature.signature 581 ) 582 583 @property 584 def verification_material(self) -> VerificationMaterial: 585 """ 586 Returns the bundle's verification material. 587 """ 588 return VerificationMaterial(self._inner.verification_material) 589 590 @classmethod 591 def from_json(cls, raw: bytes | str) -> Bundle: 592 """ 593 Deserialize the given Sigstore bundle. 594 """ 595 inner = _Bundle().from_json(raw) 596 return cls(inner) 597 598 def to_json(self) -> str: 599 """ 600 Return a JSON encoding of this bundle. 601 """ 602 return self._inner.to_json() 603 604 def _to_parts( 605 self, 606 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]: 607 """ 608 Decompose the `Bundle` into its core constituent parts. 609 610 @private 611 """ 612 613 content: common_v1.MessageSignature | dsse.Envelope 614 if self._dsse_envelope: 615 content = self._dsse_envelope 616 else: 617 content = self._inner.message_signature 618 619 return (self.signing_certificate, content, self.log_entry) 620 621 @classmethod 622 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 623 """ 624 Construct a Sigstore bundle (of `hashedrekord` type) from its 625 constituent parts. 626 """ 627 628 return cls._from_parts( 629 cert, common_v1.MessageSignature(signature=sig), log_entry 630 ) 631 632 @classmethod 633 def _from_parts( 634 cls, 635 cert: Certificate, 636 content: common_v1.MessageSignature | dsse.Envelope, 637 log_entry: LogEntry, 638 ) -> Bundle: 639 """ 640 @private 641 """ 642 643 inner = _Bundle( 644 media_type=Bundle.BundleType.BUNDLE_0_3.value, 645 verification_material=bundle_v1.VerificationMaterial( 646 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)), 647 ), 648 ) 649 650 # Fill in the appropriate variants. 651 if isinstance(content, common_v1.MessageSignature): 652 inner.message_signature = content 653 else: 654 inner.dsse_envelope = content._inner 655 656 tlog_entry = log_entry._to_rekor() 657 inner.verification_material.tlog_entries = [tlog_entry] 658 659 return cls(inner)
82class LogInclusionProof(BaseModel): 83 """ 84 Represents an inclusion proof for a transparency log entry. 85 """ 86 87 model_config = ConfigDict(populate_by_name=True) 88 89 checkpoint: StrictStr = Field(..., alias="checkpoint") 90 hashes: List[StrictStr] = Field(..., alias="hashes") 91 log_index: StrictInt = Field(..., alias="logIndex") 92 root_hash: StrictStr = Field(..., alias="rootHash") 93 tree_size: StrictInt = Field(..., alias="treeSize") 94 95 @field_validator("log_index") 96 def _log_index_positive(cls, v: int) -> int: 97 if v < 0: 98 raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") 99 return v 100 101 @field_validator("tree_size") 102 def _tree_size_positive(cls, v: int) -> int: 103 if v < 0: 104 raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") 105 return v 106 107 @field_validator("tree_size") 108 def _log_index_within_tree_size( 109 cls, v: int, info: ValidationInfo, **kwargs: Any 110 ) -> int: 111 if "log_index" in info.data and v <= info.data["log_index"]: 112 raise ValueError( 113 "Inclusion proof has log index greater than or equal to tree size: " 114 f"{v} <= {info.data['log_index']}" 115 ) 116 return v
Represents an inclusion proof for a transparency log entry.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
119@dataclass(frozen=True) 120class LogEntry: 121 """ 122 Represents a transparency log entry. 123 124 Log entries are retrieved from the transparency log after signing or verification events, 125 or loaded from "Sigstore" bundles provided by the user. 126 127 This representation allows for either a missing inclusion promise or a missing 128 inclusion proof, but not both: attempting to construct a `LogEntry` without 129 at least one will fail. 130 """ 131 132 uuid: Optional[str] 133 """ 134 This entry's unique ID in the log instance it was retrieved from. 135 136 For sharded log deployments, IDs are unique per-shard. 137 138 Not present for `LogEntry` instances loaded from Sigstore bundles. 139 """ 140 141 body: B64Str 142 """ 143 The base64-encoded body of the transparency log entry. 144 """ 145 146 integrated_time: int 147 """ 148 The UNIX time at which this entry was integrated into the transparency log. 149 """ 150 151 log_id: str 152 """ 153 The log's ID (as the SHA256 hash of the DER-encoded public key for the log 154 at the time of entry inclusion). 155 """ 156 157 log_index: int 158 """ 159 The index of this entry within the log. 160 """ 161 162 inclusion_proof: LogInclusionProof 163 """ 164 An inclusion proof for this log entry. 165 """ 166 167 inclusion_promise: Optional[B64Str] 168 """ 169 An inclusion promise for this log entry, if present. 170 171 Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this 172 log entry. 173 """ 174 175 @classmethod 176 def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: 177 """ 178 Create a new `LogEntry` from the given API response. 179 """ 180 181 # Assumes we only get one entry back 182 entries = list(dict_.items()) 183 if len(entries) != 1: 184 raise ValueError("Received multiple entries in response") 185 186 uuid, entry = entries[0] 187 return LogEntry( 188 uuid=uuid, 189 body=entry["body"], 190 integrated_time=entry["integratedTime"], 191 log_id=entry["logID"], 192 log_index=entry["logIndex"], 193 inclusion_proof=LogInclusionProof.model_validate( 194 entry["verification"]["inclusionProof"] 195 ), 196 inclusion_promise=entry["verification"]["signedEntryTimestamp"], 197 ) 198 199 @classmethod 200 def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: 201 """ 202 Create a new `LogEntry` from the given Rekor TransparencyLogEntry. 203 """ 204 tlog_entry = rekor_v1.TransparencyLogEntry() 205 tlog_entry.from_dict(dict_) 206 207 inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof 208 # This check is required by us as the client, not the 209 # protobuf-specs themselves. 210 if not inclusion_proof or not inclusion_proof.checkpoint.envelope: 211 raise InvalidBundle("entry must contain inclusion proof, with checkpoint") 212 213 parsed_inclusion_proof = LogInclusionProof( 214 checkpoint=inclusion_proof.checkpoint.envelope, 215 hashes=[h.hex() for h in inclusion_proof.hashes], 216 log_index=inclusion_proof.log_index, 217 root_hash=inclusion_proof.root_hash.hex(), 218 tree_size=inclusion_proof.tree_size, 219 ) 220 221 return LogEntry( 222 uuid=None, 223 body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), 224 integrated_time=tlog_entry.integrated_time, 225 log_id=tlog_entry.log_id.key_id.hex(), 226 log_index=tlog_entry.log_index, 227 inclusion_proof=parsed_inclusion_proof, 228 inclusion_promise=B64Str( 229 base64.b64encode( 230 tlog_entry.inclusion_promise.signed_entry_timestamp 231 ).decode() 232 ), 233 ) 234 235 def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: 236 """ 237 Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`. 238 239 @private 240 """ 241 inclusion_promise: rekor_v1.InclusionPromise | None = None 242 if self.inclusion_promise: 243 inclusion_promise = rekor_v1.InclusionPromise( 244 signed_entry_timestamp=base64.b64decode(self.inclusion_promise) 245 ) 246 247 inclusion_proof = rekor_v1.InclusionProof( 248 log_index=self.inclusion_proof.log_index, 249 root_hash=bytes.fromhex(self.inclusion_proof.root_hash), 250 tree_size=self.inclusion_proof.tree_size, 251 hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], 252 checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), 253 ) 254 255 tlog_entry = rekor_v1.TransparencyLogEntry( 256 log_index=self.log_index, 257 log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), 258 integrated_time=self.integrated_time, 259 inclusion_promise=inclusion_promise, # type: ignore[arg-type] 260 inclusion_proof=inclusion_proof, 261 canonicalized_body=base64.b64decode(self.body), 262 ) 263 264 # Fill in the appropriate kind 265 body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( 266 tlog_entry.canonicalized_body 267 ) 268 if not isinstance(body_entry, (Hashedrekord, Dsse)): 269 raise InvalidBundle("log entry is not of expected type") 270 271 tlog_entry.kind_version = rekor_v1.KindVersion( 272 kind=body_entry.kind, version=body_entry.api_version 273 ) 274 275 return tlog_entry 276 277 def encode_canonical(self) -> bytes: 278 """ 279 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 280 281 This encoded representation is suitable for verification against 282 the Signed Entry Timestamp. 283 """ 284 payload: dict[str, int | str] = { 285 "body": self.body, 286 "integratedTime": self.integrated_time, 287 "logID": self.log_id, 288 "logIndex": self.log_index, 289 } 290 291 return rfc8785.dumps(payload) 292 293 def _verify_set(self, keyring: RekorKeyring) -> None: 294 """ 295 Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log 296 `entry` using the given `keyring`. 297 298 Fails if the given log entry does not contain an inclusion promise. 299 """ 300 301 if self.inclusion_promise is None: 302 raise VerificationError("SET: invalid inclusion promise: missing") 303 304 signed_entry_ts = base64.b64decode(self.inclusion_promise) 305 306 try: 307 keyring.verify( 308 key_id=KeyID(bytes.fromhex(self.log_id)), 309 signature=signed_entry_ts, 310 data=self.encode_canonical(), 311 ) 312 except VerificationError as exc: 313 raise VerificationError(f"SET: invalid inclusion promise: {exc}") 314 315 def _verify(self, keyring: RekorKeyring) -> None: 316 """ 317 Verifies this log entry. 318 319 This method performs steps (5), (6), and optionally (7) in 320 the top-level verify API: 321 322 * Verifies the consistency of the entry with the given bundle; 323 * Verifies the Merkle inclusion proof and its signed checkpoint; 324 * Verifies the inclusion promise, if present. 325 """ 326 327 verify_merkle_inclusion(self) 328 verify_checkpoint(keyring, self) 329 330 _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") 331 332 if self.inclusion_promise: 333 self._verify_set(keyring) 334 _logger.debug( 335 f"successfully verified inclusion promise: index={self.log_index}" 336 )
Represents a transparency log entry.
Log entries are retrieved from the transparency log after signing or verification events, or loaded from "Sigstore" bundles provided by the user.
This representation allows for either a missing inclusion promise or a missing
inclusion proof, but not both: attempting to construct a LogEntry
without
at least one will fail.
This entry's unique ID in the log instance it was retrieved from.
For sharded log deployments, IDs are unique per-shard.
Not present for LogEntry
instances loaded from Sigstore bundles.
The log's ID (as the SHA256 hash of the DER-encoded public key for the log at the time of entry inclusion).
An inclusion promise for this log entry, if present.
Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this log entry.
277 def encode_canonical(self) -> bytes: 278 """ 279 Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. 280 281 This encoded representation is suitable for verification against 282 the Signed Entry Timestamp. 283 """ 284 payload: dict[str, int | str] = { 285 "body": self.body, 286 "integratedTime": self.integrated_time, 287 "logID": self.log_id, 288 "logIndex": self.log_index, 289 } 290 291 return rfc8785.dumps(payload)
Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
This encoded representation is suitable for verification against the Signed Entry Timestamp.
380class VerificationMaterial: 381 """ 382 Represents a VerificationMaterial structure. 383 """ 384 385 def __init__(self, inner: _VerificationMaterial) -> None: 386 """Init method.""" 387 self._inner = inner 388 389 @property 390 def timestamp_verification_data(self) -> TimestampVerificationData: 391 """ 392 Returns the Timestamp Verification Data. 393 """ 394 return TimestampVerificationData(self._inner.timestamp_verification_data)
Represents a VerificationMaterial structure.
385 def __init__(self, inner: _VerificationMaterial) -> None: 386 """Init method.""" 387 self._inner = inner
Init method.
389 @property 390 def timestamp_verification_data(self) -> TimestampVerificationData: 391 """ 392 Returns the Timestamp Verification Data. 393 """ 394 return TimestampVerificationData(self._inner.timestamp_verification_data)
Returns the Timestamp Verification Data.
397class InvalidBundle(Error): 398 """ 399 Raised when the associated `Bundle` is invalid in some way. 400 """ 401 402 def diagnostics(self) -> str: 403 """Returns diagnostics for the error.""" 404 405 return dedent( 406 f"""\ 407 An issue occurred while parsing the Sigstore bundle. 408 409 The provided bundle is malformed and may have been modified maliciously. 410 411 Additional context: 412 413 {self} 414 """ 415 )
Raised when the associated Bundle
is invalid in some way.
402 def diagnostics(self) -> str: 403 """Returns diagnostics for the error.""" 404 405 return dedent( 406 f"""\ 407 An issue occurred while parsing the Sigstore bundle. 408 409 The provided bundle is malformed and may have been modified maliciously. 410 411 Additional context: 412 413 {self} 414 """ 415 )
Returns diagnostics for the error.
Inherited Members
418class Bundle: 419 """ 420 Represents a Sigstore bundle. 421 """ 422 423 class BundleType(str, Enum): 424 """ 425 Known Sigstore bundle media types. 426 """ 427 428 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 429 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 430 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 431 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 432 433 def __str__(self) -> str: 434 """Returns the variant's string value.""" 435 return self.value 436 437 def __init__(self, inner: _Bundle) -> None: 438 """ 439 Creates a new bundle. This is not a public API; use 440 `from_json` instead. 441 442 @private 443 """ 444 self._inner = inner 445 self._verify() 446 447 def _verify(self) -> None: 448 """ 449 Performs various feats of heroism to ensure the bundle is well-formed 450 and upholds invariants, including: 451 452 * The "leaf" (signing) certificate is present; 453 * There is a inclusion proof present, even if the Bundle's version 454 predates a mandatory inclusion proof. 455 """ 456 457 # The bundle must have a recognized media type. 458 try: 459 media_type = Bundle.BundleType(self._inner.media_type) 460 except ValueError: 461 raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}") 462 463 # Extract the signing certificate. 464 if media_type in ( 465 Bundle.BundleType.BUNDLE_0_3, 466 Bundle.BundleType.BUNDLE_0_3_ALT, 467 ): 468 # For "v3" bundles, the signing certificate is the only one present. 469 leaf_cert = load_der_x509_certificate( 470 self._inner.verification_material.certificate.raw_bytes 471 ) 472 else: 473 # In older bundles, there is an entire pool (misleadingly called 474 # a chain) of certificates, the first of which is the signing 475 # certificate. 476 certs = ( 477 self._inner.verification_material.x509_certificate_chain.certificates 478 ) 479 480 if len(certs) == 0: 481 raise InvalidBundle("expected non-empty certificate chain in bundle") 482 483 # Per client policy in protobuf-specs: the first entry in the chain 484 # MUST be a leaf certificate, and the rest of the chain MUST NOT 485 # include a root CA or any intermediate CAs that appear in an 486 # independent root of trust. 487 # 488 # We expect some old bundles to violate the rules around root 489 # and intermediate CAs, so we issue warnings and not hard errors 490 # in those cases. 491 leaf_cert, *chain_certs = [ 492 load_der_x509_certificate(cert.raw_bytes) for cert in certs 493 ] 494 if not cert_is_leaf(leaf_cert): 495 raise InvalidBundle( 496 "bundle contains an invalid leaf or non-leaf certificate in the leaf position" 497 ) 498 499 for chain_cert in chain_certs: 500 # TODO: We should also retrieve the root of trust here and 501 # cross-check against it. 502 if cert_is_root_ca(chain_cert): 503 _logger.warning( 504 "this bundle contains a root CA, making it subject to misuse" 505 ) 506 507 self._signing_certificate = leaf_cert 508 509 # Extract the log entry. For the time being, we expect 510 # bundles to only contain a single log entry. 511 tlog_entries = self._inner.verification_material.tlog_entries 512 if len(tlog_entries) != 1: 513 raise InvalidBundle("expected exactly one log entry in bundle") 514 tlog_entry = tlog_entries[0] 515 516 # Handling of inclusion promises and proofs varies between bundle 517 # format versions: 518 # 519 # * For 0.1, an inclusion promise is required; the client 520 # MUST verify the inclusion promise. 521 # The inclusion proof is NOT required. If provided, it might NOT 522 # contain a checkpoint; in this case, we ignore it (since it's 523 # useless without one). 524 # 525 # * For 0.2+, an inclusion proof is required; the client MUST 526 # verify the inclusion proof. The inclusion prof MUST contain 527 # a checkpoint. 528 # The inclusion promise is NOT required; if present, the client 529 # SHOULD verify it. 530 # 531 # Before all of this, we require that the inclusion proof be present 532 # (when constructing the LogEntry). 533 log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) 534 535 if media_type == Bundle.BundleType.BUNDLE_0_1: 536 if not log_entry.inclusion_promise: 537 raise InvalidBundle("bundle must contain an inclusion promise") 538 if not log_entry.inclusion_proof.checkpoint: 539 _logger.debug( 540 "0.1 bundle contains inclusion proof without checkpoint; ignoring" 541 ) 542 else: 543 if not log_entry.inclusion_proof.checkpoint: 544 raise InvalidBundle("expected checkpoint in inclusion proof") 545 546 self._log_entry = log_entry 547 548 @property 549 def signing_certificate(self) -> Certificate: 550 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 551 return self._signing_certificate 552 553 @property 554 def log_entry(self) -> LogEntry: 555 """ 556 Returns the bundle's log entry, containing an inclusion proof 557 (with checkpoint) and an inclusion promise (if the latter is present). 558 """ 559 return self._log_entry 560 561 @property 562 def _dsse_envelope(self) -> dsse.Envelope | None: 563 """ 564 Returns the DSSE envelope within this Bundle as a `dsse.Envelope`. 565 566 @private 567 """ 568 if self._inner.dsse_envelope: 569 return dsse.Envelope(self._inner.dsse_envelope) 570 return None 571 572 @property 573 def signature(self) -> bytes: 574 """ 575 Returns the signature bytes of this bundle. 576 Either from the DSSE Envelope or from the message itself. 577 """ 578 return ( 579 self._dsse_envelope.signature 580 if self._dsse_envelope 581 else self._inner.message_signature.signature 582 ) 583 584 @property 585 def verification_material(self) -> VerificationMaterial: 586 """ 587 Returns the bundle's verification material. 588 """ 589 return VerificationMaterial(self._inner.verification_material) 590 591 @classmethod 592 def from_json(cls, raw: bytes | str) -> Bundle: 593 """ 594 Deserialize the given Sigstore bundle. 595 """ 596 inner = _Bundle().from_json(raw) 597 return cls(inner) 598 599 def to_json(self) -> str: 600 """ 601 Return a JSON encoding of this bundle. 602 """ 603 return self._inner.to_json() 604 605 def _to_parts( 606 self, 607 ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]: 608 """ 609 Decompose the `Bundle` into its core constituent parts. 610 611 @private 612 """ 613 614 content: common_v1.MessageSignature | dsse.Envelope 615 if self._dsse_envelope: 616 content = self._dsse_envelope 617 else: 618 content = self._inner.message_signature 619 620 return (self.signing_certificate, content, self.log_entry) 621 622 @classmethod 623 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 624 """ 625 Construct a Sigstore bundle (of `hashedrekord` type) from its 626 constituent parts. 627 """ 628 629 return cls._from_parts( 630 cert, common_v1.MessageSignature(signature=sig), log_entry 631 ) 632 633 @classmethod 634 def _from_parts( 635 cls, 636 cert: Certificate, 637 content: common_v1.MessageSignature | dsse.Envelope, 638 log_entry: LogEntry, 639 ) -> Bundle: 640 """ 641 @private 642 """ 643 644 inner = _Bundle( 645 media_type=Bundle.BundleType.BUNDLE_0_3.value, 646 verification_material=bundle_v1.VerificationMaterial( 647 certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)), 648 ), 649 ) 650 651 # Fill in the appropriate variants. 652 if isinstance(content, common_v1.MessageSignature): 653 inner.message_signature = content 654 else: 655 inner.dsse_envelope = content._inner 656 657 tlog_entry = log_entry._to_rekor() 658 inner.verification_material.tlog_entries = [tlog_entry] 659 660 return cls(inner)
Represents a Sigstore bundle.
548 @property 549 def signing_certificate(self) -> Certificate: 550 """Returns the bundle's contained signing (i.e. leaf) certificate.""" 551 return self._signing_certificate
Returns the bundle's contained signing (i.e. leaf) certificate.
553 @property 554 def log_entry(self) -> LogEntry: 555 """ 556 Returns the bundle's log entry, containing an inclusion proof 557 (with checkpoint) and an inclusion promise (if the latter is present). 558 """ 559 return self._log_entry
Returns the bundle's log entry, containing an inclusion proof (with checkpoint) and an inclusion promise (if the latter is present).
572 @property 573 def signature(self) -> bytes: 574 """ 575 Returns the signature bytes of this bundle. 576 Either from the DSSE Envelope or from the message itself. 577 """ 578 return ( 579 self._dsse_envelope.signature 580 if self._dsse_envelope 581 else self._inner.message_signature.signature 582 )
Returns the signature bytes of this bundle. Either from the DSSE Envelope or from the message itself.
584 @property 585 def verification_material(self) -> VerificationMaterial: 586 """ 587 Returns the bundle's verification material. 588 """ 589 return VerificationMaterial(self._inner.verification_material)
Returns the bundle's verification material.
591 @classmethod 592 def from_json(cls, raw: bytes | str) -> Bundle: 593 """ 594 Deserialize the given Sigstore bundle. 595 """ 596 inner = _Bundle().from_json(raw) 597 return cls(inner)
Deserialize the given Sigstore bundle.
599 def to_json(self) -> str: 600 """ 601 Return a JSON encoding of this bundle. 602 """ 603 return self._inner.to_json()
Return a JSON encoding of this bundle.
622 @classmethod 623 def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: 624 """ 625 Construct a Sigstore bundle (of `hashedrekord` type) from its 626 constituent parts. 627 """ 628 629 return cls._from_parts( 630 cert, common_v1.MessageSignature(signature=sig), log_entry 631 )
Construct a Sigstore bundle (of hashedrekord
type) from its
constituent parts.
423 class BundleType(str, Enum): 424 """ 425 Known Sigstore bundle media types. 426 """ 427 428 BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1" 429 BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2" 430 BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3" 431 BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json" 432 433 def __str__(self) -> str: 434 """Returns the variant's string value.""" 435 return self.value
Known Sigstore bundle media types.