sigstore.models

Common models shared between signing and verification.

  1# Copyright 2022 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16Common models shared between signing and verification.
 17"""
 18
 19from __future__ import annotations
 20
 21import base64
 22import logging
 23import typing
 24from enum import Enum
 25from textwrap import dedent
 26from typing import Any, List, Optional
 27
 28import rfc8785
 29from cryptography.hazmat.primitives.serialization import Encoding
 30from cryptography.x509 import (
 31    Certificate,
 32    load_der_x509_certificate,
 33)
 34from pydantic import (
 35    BaseModel,
 36    ConfigDict,
 37    Field,
 38    StrictInt,
 39    StrictStr,
 40    TypeAdapter,
 41    ValidationInfo,
 42    field_validator,
 43)
 44from pydantic.dataclasses import dataclass
 45from rekor_types import Dsse, Hashedrekord, ProposedEntry
 46from rfc3161_client import TimeStampResponse, decode_timestamp_response
 47from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1
 48from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
 49    Bundle as _Bundle,
 50)
 51from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
 52    TimestampVerificationData as _TimestampVerificationData,
 53)
 54from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
 55    VerificationMaterial as _VerificationMaterial,
 56)
 57from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1
 58from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1
 59from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
 60    InclusionProof,
 61)
 62
 63from sigstore import dsse
 64from sigstore._internal.merkle import verify_merkle_inclusion
 65from sigstore._internal.rekor.checkpoint import verify_checkpoint
 66from sigstore._utils import (
 67    B64Str,
 68    KeyID,
 69    cert_is_leaf,
 70    cert_is_root_ca,
 71)
 72from sigstore.errors import Error, VerificationError
 73
 74if typing.TYPE_CHECKING:
 75    from sigstore._internal.trust import RekorKeyring
 76
 77
 78_logger = logging.getLogger(__name__)
 79
 80
 81class LogInclusionProof(BaseModel):
 82    """
 83    Represents an inclusion proof for a transparency log entry.
 84    """
 85
 86    model_config = ConfigDict(populate_by_name=True)
 87
 88    checkpoint: StrictStr = Field(..., alias="checkpoint")
 89    hashes: List[StrictStr] = Field(..., alias="hashes")
 90    log_index: StrictInt = Field(..., alias="logIndex")
 91    root_hash: StrictStr = Field(..., alias="rootHash")
 92    tree_size: StrictInt = Field(..., alias="treeSize")
 93
 94    @field_validator("log_index")
 95    def _log_index_positive(cls, v: int) -> int:
 96        if v < 0:
 97            raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
 98        return v
 99
100    @field_validator("tree_size")
101    def _tree_size_positive(cls, v: int) -> int:
102        if v < 0:
103            raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
104        return v
105
106    @field_validator("tree_size")
107    def _log_index_within_tree_size(
108        cls, v: int, info: ValidationInfo, **kwargs: Any
109    ) -> int:
110        if "log_index" in info.data and v <= info.data["log_index"]:
111            raise ValueError(
112                "Inclusion proof has log index greater than or equal to tree size: "
113                f"{v} <= {info.data['log_index']}"
114            )
115        return v
116
117
118@dataclass(frozen=True)
119class LogEntry:
120    """
121    Represents a transparency log entry.
122
123    Log entries are retrieved from the transparency log after signing or verification events,
124    or loaded from "Sigstore" bundles provided by the user.
125
126    This representation allows for either a missing inclusion promise or a missing
127    inclusion proof, but not both: attempting to construct a `LogEntry` without
128    at least one will fail.
129    """
130
131    uuid: Optional[str]
132    """
133    This entry's unique ID in the log instance it was retrieved from.
134
135    For sharded log deployments, IDs are unique per-shard.
136
137    Not present for `LogEntry` instances loaded from Sigstore bundles.
138    """
139
140    body: B64Str
141    """
142    The base64-encoded body of the transparency log entry.
143    """
144
145    integrated_time: int
146    """
147    The UNIX time at which this entry was integrated into the transparency log.
148    """
149
150    log_id: str
151    """
152    The log's ID (as the SHA256 hash of the DER-encoded public key for the log
153    at the time of entry inclusion).
154    """
155
156    log_index: int
157    """
158    The index of this entry within the log.
159    """
160
161    inclusion_proof: LogInclusionProof
162    """
163    An inclusion proof for this log entry.
164    """
165
166    inclusion_promise: Optional[B64Str]
167    """
168    An inclusion promise for this log entry, if present.
169
170    Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
171    log entry.
172    """
173
174    @classmethod
175    def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
176        """
177        Create a new `LogEntry` from the given API response.
178        """
179
180        # Assumes we only get one entry back
181        entries = list(dict_.items())
182        if len(entries) != 1:
183            raise ValueError("Received multiple entries in response")
184
185        uuid, entry = entries[0]
186        return LogEntry(
187            uuid=uuid,
188            body=entry["body"],
189            integrated_time=entry["integratedTime"],
190            log_id=entry["logID"],
191            log_index=entry["logIndex"],
192            inclusion_proof=LogInclusionProof.model_validate(
193                entry["verification"]["inclusionProof"]
194            ),
195            inclusion_promise=entry["verification"]["signedEntryTimestamp"],
196        )
197
198    @classmethod
199    def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
200        """
201        Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
202        """
203        tlog_entry = rekor_v1.TransparencyLogEntry()
204        tlog_entry.from_dict(dict_)
205
206        inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
207        # This check is required by us as the client, not the
208        # protobuf-specs themselves.
209        if not inclusion_proof or not inclusion_proof.checkpoint.envelope:
210            raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
211
212        parsed_inclusion_proof = LogInclusionProof(
213            checkpoint=inclusion_proof.checkpoint.envelope,
214            hashes=[h.hex() for h in inclusion_proof.hashes],
215            log_index=inclusion_proof.log_index,
216            root_hash=inclusion_proof.root_hash.hex(),
217            tree_size=inclusion_proof.tree_size,
218        )
219
220        return LogEntry(
221            uuid=None,
222            body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
223            integrated_time=tlog_entry.integrated_time,
224            log_id=tlog_entry.log_id.key_id.hex(),
225            log_index=tlog_entry.log_index,
226            inclusion_proof=parsed_inclusion_proof,
227            inclusion_promise=B64Str(
228                base64.b64encode(
229                    tlog_entry.inclusion_promise.signed_entry_timestamp
230                ).decode()
231            ),
232        )
233
234    def _to_rekor(self) -> rekor_v1.TransparencyLogEntry:
235        """
236        Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`.
237
238        @private
239        """
240        inclusion_promise: rekor_v1.InclusionPromise | None = None
241        if self.inclusion_promise:
242            inclusion_promise = rekor_v1.InclusionPromise(
243                signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
244            )
245
246        inclusion_proof = rekor_v1.InclusionProof(
247            log_index=self.inclusion_proof.log_index,
248            root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
249            tree_size=self.inclusion_proof.tree_size,
250            hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
251            checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
252        )
253
254        tlog_entry = rekor_v1.TransparencyLogEntry(
255            log_index=self.log_index,
256            log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
257            integrated_time=self.integrated_time,
258            inclusion_promise=inclusion_promise,  # type: ignore[arg-type]
259            inclusion_proof=inclusion_proof,
260            canonicalized_body=base64.b64decode(self.body),
261        )
262
263        # Fill in the appropriate kind
264        body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
265            tlog_entry.canonicalized_body
266        )
267        if not isinstance(body_entry, (Hashedrekord, Dsse)):
268            raise InvalidBundle("log entry is not of expected type")
269
270        tlog_entry.kind_version = rekor_v1.KindVersion(
271            kind=body_entry.kind, version=body_entry.api_version
272        )
273
274        return tlog_entry
275
276    def encode_canonical(self) -> bytes:
277        """
278        Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
279
280        This encoded representation is suitable for verification against
281        the Signed Entry Timestamp.
282        """
283        payload: dict[str, int | str] = {
284            "body": self.body,
285            "integratedTime": self.integrated_time,
286            "logID": self.log_id,
287            "logIndex": self.log_index,
288        }
289
290        return rfc8785.dumps(payload)
291
292    def _verify_set(self, keyring: RekorKeyring) -> None:
293        """
294        Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
295        `entry` using the given `keyring`.
296
297        Fails if the given log entry does not contain an inclusion promise.
298        """
299
300        if self.inclusion_promise is None:
301            raise VerificationError("SET: invalid inclusion promise: missing")
302
303        signed_entry_ts = base64.b64decode(self.inclusion_promise)
304
305        try:
306            keyring.verify(
307                key_id=KeyID(bytes.fromhex(self.log_id)),
308                signature=signed_entry_ts,
309                data=self.encode_canonical(),
310            )
311        except VerificationError as exc:
312            raise VerificationError(f"SET: invalid inclusion promise: {exc}")
313
314    def _verify(self, keyring: RekorKeyring) -> None:
315        """
316        Verifies this log entry.
317
318        This method performs steps (5), (6), and optionally (7) in
319        the top-level verify API:
320
321        * Verifies the consistency of the entry with the given bundle;
322        * Verifies the Merkle inclusion proof and its signed checkpoint;
323        * Verifies the inclusion promise, if present.
324        """
325
326        verify_merkle_inclusion(self)
327        verify_checkpoint(keyring, self)
328
329        _logger.debug(f"successfully verified inclusion proof: index={self.log_index}")
330
331        if self.inclusion_promise:
332            self._verify_set(keyring)
333            _logger.debug(
334                f"successfully verified inclusion promise: index={self.log_index}"
335            )
336
337
338class TimestampVerificationData:
339    """
340    Represents a TimestampVerificationData structure.
341
342    @private
343    """
344
345    def __init__(self, inner: _TimestampVerificationData) -> None:
346        """Init method."""
347        self._inner = inner
348        self._verify()
349
350    def _verify(self) -> None:
351        """
352        Verifies the TimestampVerificationData.
353
354        It verifies that TimeStamp Responses embedded in the bundle are correctly
355        formed.
356        """
357        try:
358            self._signed_ts = [
359                decode_timestamp_response(ts.signed_timestamp)
360                for ts in self._inner.rfc3161_timestamps
361            ]
362        except ValueError:
363            raise VerificationError("Invalid Timestamp Response")
364
365    @property
366    def rfc3161_timestamps(self) -> list[TimeStampResponse]:
367        """Returns a list of signed timestamp."""
368        return self._signed_ts
369
370    @classmethod
371    def from_json(cls, raw: str | bytes) -> TimestampVerificationData:
372        """
373        Deserialize the given timestamp verification data.
374        """
375        inner = _TimestampVerificationData().from_json(raw)
376        return cls(inner)
377
378
379class VerificationMaterial:
380    """
381    Represents a VerificationMaterial structure.
382    """
383
384    def __init__(self, inner: _VerificationMaterial) -> None:
385        """Init method."""
386        self._inner = inner
387
388    @property
389    def timestamp_verification_data(self) -> TimestampVerificationData:
390        """
391        Returns the Timestamp Verification Data.
392        """
393        return TimestampVerificationData(self._inner.timestamp_verification_data)
394
395
396class InvalidBundle(Error):
397    """
398    Raised when the associated `Bundle` is invalid in some way.
399    """
400
401    def diagnostics(self) -> str:
402        """Returns diagnostics for the error."""
403
404        return dedent(
405            f"""\
406        An issue occurred while parsing the Sigstore bundle.
407
408        The provided bundle is malformed and may have been modified maliciously.
409
410        Additional context:
411
412        {self}
413        """
414        )
415
416
417class Bundle:
418    """
419    Represents a Sigstore bundle.
420    """
421
422    class BundleType(str, Enum):
423        """
424        Known Sigstore bundle media types.
425        """
426
427        BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
428        BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
429        BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
430        BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
431
432        def __str__(self) -> str:
433            """Returns the variant's string value."""
434            return self.value
435
436    def __init__(self, inner: _Bundle) -> None:
437        """
438        Creates a new bundle. This is not a public API; use
439        `from_json` instead.
440
441        @private
442        """
443        self._inner = inner
444        self._verify()
445
446    def _verify(self) -> None:
447        """
448        Performs various feats of heroism to ensure the bundle is well-formed
449        and upholds invariants, including:
450
451        * The "leaf" (signing) certificate is present;
452        * There is a inclusion proof present, even if the Bundle's version
453           predates a mandatory inclusion proof.
454        """
455
456        # The bundle must have a recognized media type.
457        try:
458            media_type = Bundle.BundleType(self._inner.media_type)
459        except ValueError:
460            raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
461
462        # Extract the signing certificate.
463        if media_type in (
464            Bundle.BundleType.BUNDLE_0_3,
465            Bundle.BundleType.BUNDLE_0_3_ALT,
466        ):
467            # For "v3" bundles, the signing certificate is the only one present.
468            leaf_cert = load_der_x509_certificate(
469                self._inner.verification_material.certificate.raw_bytes
470            )
471        else:
472            # In older bundles, there is an entire pool (misleadingly called
473            # a chain) of certificates, the first of which is the signing
474            # certificate.
475            certs = (
476                self._inner.verification_material.x509_certificate_chain.certificates
477            )
478
479            if len(certs) == 0:
480                raise InvalidBundle("expected non-empty certificate chain in bundle")
481
482            # Per client policy in protobuf-specs: the first entry in the chain
483            # MUST be a leaf certificate, and the rest of the chain MUST NOT
484            # include a root CA or any intermediate CAs that appear in an
485            # independent root of trust.
486            #
487            # We expect some old bundles to violate the rules around root
488            # and intermediate CAs, so we issue warnings and not hard errors
489            # in those cases.
490            leaf_cert, *chain_certs = [
491                load_der_x509_certificate(cert.raw_bytes) for cert in certs
492            ]
493            if not cert_is_leaf(leaf_cert):
494                raise InvalidBundle(
495                    "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
496                )
497
498            for chain_cert in chain_certs:
499                # TODO: We should also retrieve the root of trust here and
500                # cross-check against it.
501                if cert_is_root_ca(chain_cert):
502                    _logger.warning(
503                        "this bundle contains a root CA, making it subject to misuse"
504                    )
505
506        self._signing_certificate = leaf_cert
507
508        # Extract the log entry. For the time being, we expect
509        # bundles to only contain a single log entry.
510        tlog_entries = self._inner.verification_material.tlog_entries
511        if len(tlog_entries) != 1:
512            raise InvalidBundle("expected exactly one log entry in bundle")
513        tlog_entry = tlog_entries[0]
514
515        # Handling of inclusion promises and proofs varies between bundle
516        # format versions:
517        #
518        # * For 0.1, an inclusion promise is required; the client
519        #   MUST verify the inclusion promise.
520        #   The inclusion proof is NOT required. If provided, it might NOT
521        #   contain a checkpoint; in this case, we ignore it (since it's
522        #   useless without one).
523        #
524        # * For 0.2+, an inclusion proof is required; the client MUST
525        #   verify the inclusion proof. The inclusion prof MUST contain
526        #   a checkpoint.
527        #   The inclusion promise is NOT required; if present, the client
528        #   SHOULD verify it.
529        #
530        # Before all of this, we require that the inclusion proof be present
531        # (when constructing the LogEntry).
532        log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())
533
534        if media_type == Bundle.BundleType.BUNDLE_0_1:
535            if not log_entry.inclusion_promise:
536                raise InvalidBundle("bundle must contain an inclusion promise")
537            if not log_entry.inclusion_proof.checkpoint:
538                _logger.debug(
539                    "0.1 bundle contains inclusion proof without checkpoint; ignoring"
540                )
541        else:
542            if not log_entry.inclusion_proof.checkpoint:
543                raise InvalidBundle("expected checkpoint in inclusion proof")
544
545        self._log_entry = log_entry
546
547    @property
548    def signing_certificate(self) -> Certificate:
549        """Returns the bundle's contained signing (i.e. leaf) certificate."""
550        return self._signing_certificate
551
552    @property
553    def log_entry(self) -> LogEntry:
554        """
555        Returns the bundle's log entry, containing an inclusion proof
556        (with checkpoint) and an inclusion promise (if the latter is present).
557        """
558        return self._log_entry
559
560    @property
561    def _dsse_envelope(self) -> dsse.Envelope | None:
562        """
563        Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
564
565        @private
566        """
567        if self._inner.dsse_envelope:
568            return dsse.Envelope(self._inner.dsse_envelope)
569        return None
570
571    @property
572    def signature(self) -> bytes:
573        """
574        Returns the signature bytes of this bundle.
575        Either from the DSSE Envelope or from the message itself.
576        """
577        return (
578            self._dsse_envelope.signature
579            if self._dsse_envelope
580            else self._inner.message_signature.signature
581        )
582
583    @property
584    def verification_material(self) -> VerificationMaterial:
585        """
586        Returns the bundle's verification material.
587        """
588        return VerificationMaterial(self._inner.verification_material)
589
590    @classmethod
591    def from_json(cls, raw: bytes | str) -> Bundle:
592        """
593        Deserialize the given Sigstore bundle.
594        """
595        inner = _Bundle().from_json(raw)
596        return cls(inner)
597
598    def to_json(self) -> str:
599        """
600        Return a JSON encoding of this bundle.
601        """
602        return self._inner.to_json()
603
604    def _to_parts(
605        self,
606    ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]:
607        """
608        Decompose the `Bundle` into its core constituent parts.
609
610        @private
611        """
612
613        content: common_v1.MessageSignature | dsse.Envelope
614        if self._dsse_envelope:
615            content = self._dsse_envelope
616        else:
617            content = self._inner.message_signature
618
619        return (self.signing_certificate, content, self.log_entry)
620
621    @classmethod
622    def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
623        """
624        Construct a Sigstore bundle (of `hashedrekord` type) from its
625        constituent parts.
626        """
627
628        return cls._from_parts(
629            cert, common_v1.MessageSignature(signature=sig), log_entry
630        )
631
632    @classmethod
633    def _from_parts(
634        cls,
635        cert: Certificate,
636        content: common_v1.MessageSignature | dsse.Envelope,
637        log_entry: LogEntry,
638    ) -> Bundle:
639        """
640        @private
641        """
642
643        inner = _Bundle(
644            media_type=Bundle.BundleType.BUNDLE_0_3.value,
645            verification_material=bundle_v1.VerificationMaterial(
646                certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)),
647            ),
648        )
649
650        # Fill in the appropriate variants.
651        if isinstance(content, common_v1.MessageSignature):
652            inner.message_signature = content
653        else:
654            inner.dsse_envelope = content._inner
655
656        tlog_entry = log_entry._to_rekor()
657        inner.verification_material.tlog_entries = [tlog_entry]
658
659        return cls(inner)
class LogInclusionProof(pydantic.main.BaseModel):
 82class LogInclusionProof(BaseModel):
 83    """
 84    Represents an inclusion proof for a transparency log entry.
 85    """
 86
 87    model_config = ConfigDict(populate_by_name=True)
 88
 89    checkpoint: StrictStr = Field(..., alias="checkpoint")
 90    hashes: List[StrictStr] = Field(..., alias="hashes")
 91    log_index: StrictInt = Field(..., alias="logIndex")
 92    root_hash: StrictStr = Field(..., alias="rootHash")
 93    tree_size: StrictInt = Field(..., alias="treeSize")
 94
 95    @field_validator("log_index")
 96    def _log_index_positive(cls, v: int) -> int:
 97        if v < 0:
 98            raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
 99        return v
100
101    @field_validator("tree_size")
102    def _tree_size_positive(cls, v: int) -> int:
103        if v < 0:
104            raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
105        return v
106
107    @field_validator("tree_size")
108    def _log_index_within_tree_size(
109        cls, v: int, info: ValidationInfo, **kwargs: Any
110    ) -> int:
111        if "log_index" in info.data and v <= info.data["log_index"]:
112            raise ValueError(
113                "Inclusion proof has log index greater than or equal to tree size: "
114                f"{v} <= {info.data['log_index']}"
115            )
116        return v

Represents an inclusion proof for a transparency log entry.

model_config = {'populate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

checkpoint: Annotated[str, Strict(strict=True)]
hashes: List[Annotated[str, Strict(strict=True)]]
log_index: Annotated[int, Strict(strict=True)]
root_hash: Annotated[str, Strict(strict=True)]
tree_size: Annotated[int, Strict(strict=True)]
model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'checkpoint': FieldInfo(annotation=str, required=True, alias='checkpoint', alias_priority=2, metadata=[Strict(strict=True)]), 'hashes': FieldInfo(annotation=List[Annotated[str, Strict(strict=True)]], required=True, alias='hashes', alias_priority=2), 'log_index': FieldInfo(annotation=int, required=True, alias='logIndex', alias_priority=2, metadata=[Strict(strict=True)]), 'root_hash': FieldInfo(annotation=str, required=True, alias='rootHash', alias_priority=2, metadata=[Strict(strict=True)]), 'tree_size': FieldInfo(annotation=int, required=True, alias='treeSize', alias_priority=2, metadata=[Strict(strict=True)])}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

@dataclass(frozen=True)
class LogEntry:
119@dataclass(frozen=True)
120class LogEntry:
121    """
122    Represents a transparency log entry.
123
124    Log entries are retrieved from the transparency log after signing or verification events,
125    or loaded from "Sigstore" bundles provided by the user.
126
127    This representation allows for either a missing inclusion promise or a missing
128    inclusion proof, but not both: attempting to construct a `LogEntry` without
129    at least one will fail.
130    """
131
132    uuid: Optional[str]
133    """
134    This entry's unique ID in the log instance it was retrieved from.
135
136    For sharded log deployments, IDs are unique per-shard.
137
138    Not present for `LogEntry` instances loaded from Sigstore bundles.
139    """
140
141    body: B64Str
142    """
143    The base64-encoded body of the transparency log entry.
144    """
145
146    integrated_time: int
147    """
148    The UNIX time at which this entry was integrated into the transparency log.
149    """
150
151    log_id: str
152    """
153    The log's ID (as the SHA256 hash of the DER-encoded public key for the log
154    at the time of entry inclusion).
155    """
156
157    log_index: int
158    """
159    The index of this entry within the log.
160    """
161
162    inclusion_proof: LogInclusionProof
163    """
164    An inclusion proof for this log entry.
165    """
166
167    inclusion_promise: Optional[B64Str]
168    """
169    An inclusion promise for this log entry, if present.
170
171    Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
172    log entry.
173    """
174
175    @classmethod
176    def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
177        """
178        Create a new `LogEntry` from the given API response.
179        """
180
181        # Assumes we only get one entry back
182        entries = list(dict_.items())
183        if len(entries) != 1:
184            raise ValueError("Received multiple entries in response")
185
186        uuid, entry = entries[0]
187        return LogEntry(
188            uuid=uuid,
189            body=entry["body"],
190            integrated_time=entry["integratedTime"],
191            log_id=entry["logID"],
192            log_index=entry["logIndex"],
193            inclusion_proof=LogInclusionProof.model_validate(
194                entry["verification"]["inclusionProof"]
195            ),
196            inclusion_promise=entry["verification"]["signedEntryTimestamp"],
197        )
198
199    @classmethod
200    def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
201        """
202        Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
203        """
204        tlog_entry = rekor_v1.TransparencyLogEntry()
205        tlog_entry.from_dict(dict_)
206
207        inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
208        # This check is required by us as the client, not the
209        # protobuf-specs themselves.
210        if not inclusion_proof or not inclusion_proof.checkpoint.envelope:
211            raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
212
213        parsed_inclusion_proof = LogInclusionProof(
214            checkpoint=inclusion_proof.checkpoint.envelope,
215            hashes=[h.hex() for h in inclusion_proof.hashes],
216            log_index=inclusion_proof.log_index,
217            root_hash=inclusion_proof.root_hash.hex(),
218            tree_size=inclusion_proof.tree_size,
219        )
220
221        return LogEntry(
222            uuid=None,
223            body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
224            integrated_time=tlog_entry.integrated_time,
225            log_id=tlog_entry.log_id.key_id.hex(),
226            log_index=tlog_entry.log_index,
227            inclusion_proof=parsed_inclusion_proof,
228            inclusion_promise=B64Str(
229                base64.b64encode(
230                    tlog_entry.inclusion_promise.signed_entry_timestamp
231                ).decode()
232            ),
233        )
234
235    def _to_rekor(self) -> rekor_v1.TransparencyLogEntry:
236        """
237        Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`.
238
239        @private
240        """
241        inclusion_promise: rekor_v1.InclusionPromise | None = None
242        if self.inclusion_promise:
243            inclusion_promise = rekor_v1.InclusionPromise(
244                signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
245            )
246
247        inclusion_proof = rekor_v1.InclusionProof(
248            log_index=self.inclusion_proof.log_index,
249            root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
250            tree_size=self.inclusion_proof.tree_size,
251            hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
252            checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
253        )
254
255        tlog_entry = rekor_v1.TransparencyLogEntry(
256            log_index=self.log_index,
257            log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
258            integrated_time=self.integrated_time,
259            inclusion_promise=inclusion_promise,  # type: ignore[arg-type]
260            inclusion_proof=inclusion_proof,
261            canonicalized_body=base64.b64decode(self.body),
262        )
263
264        # Fill in the appropriate kind
265        body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
266            tlog_entry.canonicalized_body
267        )
268        if not isinstance(body_entry, (Hashedrekord, Dsse)):
269            raise InvalidBundle("log entry is not of expected type")
270
271        tlog_entry.kind_version = rekor_v1.KindVersion(
272            kind=body_entry.kind, version=body_entry.api_version
273        )
274
275        return tlog_entry
276
277    def encode_canonical(self) -> bytes:
278        """
279        Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
280
281        This encoded representation is suitable for verification against
282        the Signed Entry Timestamp.
283        """
284        payload: dict[str, int | str] = {
285            "body": self.body,
286            "integratedTime": self.integrated_time,
287            "logID": self.log_id,
288            "logIndex": self.log_index,
289        }
290
291        return rfc8785.dumps(payload)
292
293    def _verify_set(self, keyring: RekorKeyring) -> None:
294        """
295        Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
296        `entry` using the given `keyring`.
297
298        Fails if the given log entry does not contain an inclusion promise.
299        """
300
301        if self.inclusion_promise is None:
302            raise VerificationError("SET: invalid inclusion promise: missing")
303
304        signed_entry_ts = base64.b64decode(self.inclusion_promise)
305
306        try:
307            keyring.verify(
308                key_id=KeyID(bytes.fromhex(self.log_id)),
309                signature=signed_entry_ts,
310                data=self.encode_canonical(),
311            )
312        except VerificationError as exc:
313            raise VerificationError(f"SET: invalid inclusion promise: {exc}")
314
315    def _verify(self, keyring: RekorKeyring) -> None:
316        """
317        Verifies this log entry.
318
319        This method performs steps (5), (6), and optionally (7) in
320        the top-level verify API:
321
322        * Verifies the consistency of the entry with the given bundle;
323        * Verifies the Merkle inclusion proof and its signed checkpoint;
324        * Verifies the inclusion promise, if present.
325        """
326
327        verify_merkle_inclusion(self)
328        verify_checkpoint(keyring, self)
329
330        _logger.debug(f"successfully verified inclusion proof: index={self.log_index}")
331
332        if self.inclusion_promise:
333            self._verify_set(keyring)
334            _logger.debug(
335                f"successfully verified inclusion promise: index={self.log_index}"
336            )

Represents a transparency log entry.

Log entries are retrieved from the transparency log after signing or verification events, or loaded from "Sigstore" bundles provided by the user.

This representation allows for either a missing inclusion promise or a missing inclusion proof, but not both: attempting to construct a LogEntry without at least one will fail.

LogEntry(*args: Any, **kwargs: Any)
139    def __init__(__dataclass_self__: PydanticDataclass, *args: Any, **kwargs: Any) -> None:
140        __tracebackhide__ = True
141        s = __dataclass_self__
142        s.__pydantic_validator__.validate_python(ArgsKwargs(args, kwargs), self_instance=s)
uuid: Optional[str]

This entry's unique ID in the log instance it was retrieved from.

For sharded log deployments, IDs are unique per-shard.

Not present for LogEntry instances loaded from Sigstore bundles.

The base64-encoded body of the transparency log entry.

integrated_time: int

The UNIX time at which this entry was integrated into the transparency log.

log_id: str

The log's ID (as the SHA256 hash of the DER-encoded public key for the log at the time of entry inclusion).

log_index: int

The index of this entry within the log.

inclusion_proof: LogInclusionProof

An inclusion proof for this log entry.

inclusion_promise: Optional[sigstore._utils.B64Str]

An inclusion promise for this log entry, if present.

Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this log entry.

def encode_canonical(self) -> bytes:
277    def encode_canonical(self) -> bytes:
278        """
279        Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
280
281        This encoded representation is suitable for verification against
282        the Signed Entry Timestamp.
283        """
284        payload: dict[str, int | str] = {
285            "body": self.body,
286            "integratedTime": self.integrated_time,
287            "logID": self.log_id,
288            "logIndex": self.log_index,
289        }
290
291        return rfc8785.dumps(payload)

Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.

This encoded representation is suitable for verification against the Signed Entry Timestamp.

class VerificationMaterial:
380class VerificationMaterial:
381    """
382    Represents a VerificationMaterial structure.
383    """
384
385    def __init__(self, inner: _VerificationMaterial) -> None:
386        """Init method."""
387        self._inner = inner
388
389    @property
390    def timestamp_verification_data(self) -> TimestampVerificationData:
391        """
392        Returns the Timestamp Verification Data.
393        """
394        return TimestampVerificationData(self._inner.timestamp_verification_data)

Represents a VerificationMaterial structure.

VerificationMaterial( inner: sigstore_protobuf_specs.dev.sigstore.bundle.v1.VerificationMaterial)
385    def __init__(self, inner: _VerificationMaterial) -> None:
386        """Init method."""
387        self._inner = inner

Init method.

timestamp_verification_data: sigstore.models.TimestampVerificationData
389    @property
390    def timestamp_verification_data(self) -> TimestampVerificationData:
391        """
392        Returns the Timestamp Verification Data.
393        """
394        return TimestampVerificationData(self._inner.timestamp_verification_data)

Returns the Timestamp Verification Data.

class InvalidBundle(sigstore.errors.Error):
397class InvalidBundle(Error):
398    """
399    Raised when the associated `Bundle` is invalid in some way.
400    """
401
402    def diagnostics(self) -> str:
403        """Returns diagnostics for the error."""
404
405        return dedent(
406            f"""\
407        An issue occurred while parsing the Sigstore bundle.
408
409        The provided bundle is malformed and may have been modified maliciously.
410
411        Additional context:
412
413        {self}
414        """
415        )

Raised when the associated Bundle is invalid in some way.

def diagnostics(self) -> str:
402    def diagnostics(self) -> str:
403        """Returns diagnostics for the error."""
404
405        return dedent(
406            f"""\
407        An issue occurred while parsing the Sigstore bundle.
408
409        The provided bundle is malformed and may have been modified maliciously.
410
411        Additional context:
412
413        {self}
414        """
415        )

Returns diagnostics for the error.

class Bundle:
418class Bundle:
419    """
420    Represents a Sigstore bundle.
421    """
422
423    class BundleType(str, Enum):
424        """
425        Known Sigstore bundle media types.
426        """
427
428        BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
429        BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
430        BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
431        BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
432
433        def __str__(self) -> str:
434            """Returns the variant's string value."""
435            return self.value
436
437    def __init__(self, inner: _Bundle) -> None:
438        """
439        Creates a new bundle. This is not a public API; use
440        `from_json` instead.
441
442        @private
443        """
444        self._inner = inner
445        self._verify()
446
447    def _verify(self) -> None:
448        """
449        Performs various feats of heroism to ensure the bundle is well-formed
450        and upholds invariants, including:
451
452        * The "leaf" (signing) certificate is present;
453        * There is a inclusion proof present, even if the Bundle's version
454           predates a mandatory inclusion proof.
455        """
456
457        # The bundle must have a recognized media type.
458        try:
459            media_type = Bundle.BundleType(self._inner.media_type)
460        except ValueError:
461            raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
462
463        # Extract the signing certificate.
464        if media_type in (
465            Bundle.BundleType.BUNDLE_0_3,
466            Bundle.BundleType.BUNDLE_0_3_ALT,
467        ):
468            # For "v3" bundles, the signing certificate is the only one present.
469            leaf_cert = load_der_x509_certificate(
470                self._inner.verification_material.certificate.raw_bytes
471            )
472        else:
473            # In older bundles, there is an entire pool (misleadingly called
474            # a chain) of certificates, the first of which is the signing
475            # certificate.
476            certs = (
477                self._inner.verification_material.x509_certificate_chain.certificates
478            )
479
480            if len(certs) == 0:
481                raise InvalidBundle("expected non-empty certificate chain in bundle")
482
483            # Per client policy in protobuf-specs: the first entry in the chain
484            # MUST be a leaf certificate, and the rest of the chain MUST NOT
485            # include a root CA or any intermediate CAs that appear in an
486            # independent root of trust.
487            #
488            # We expect some old bundles to violate the rules around root
489            # and intermediate CAs, so we issue warnings and not hard errors
490            # in those cases.
491            leaf_cert, *chain_certs = [
492                load_der_x509_certificate(cert.raw_bytes) for cert in certs
493            ]
494            if not cert_is_leaf(leaf_cert):
495                raise InvalidBundle(
496                    "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
497                )
498
499            for chain_cert in chain_certs:
500                # TODO: We should also retrieve the root of trust here and
501                # cross-check against it.
502                if cert_is_root_ca(chain_cert):
503                    _logger.warning(
504                        "this bundle contains a root CA, making it subject to misuse"
505                    )
506
507        self._signing_certificate = leaf_cert
508
509        # Extract the log entry. For the time being, we expect
510        # bundles to only contain a single log entry.
511        tlog_entries = self._inner.verification_material.tlog_entries
512        if len(tlog_entries) != 1:
513            raise InvalidBundle("expected exactly one log entry in bundle")
514        tlog_entry = tlog_entries[0]
515
516        # Handling of inclusion promises and proofs varies between bundle
517        # format versions:
518        #
519        # * For 0.1, an inclusion promise is required; the client
520        #   MUST verify the inclusion promise.
521        #   The inclusion proof is NOT required. If provided, it might NOT
522        #   contain a checkpoint; in this case, we ignore it (since it's
523        #   useless without one).
524        #
525        # * For 0.2+, an inclusion proof is required; the client MUST
526        #   verify the inclusion proof. The inclusion prof MUST contain
527        #   a checkpoint.
528        #   The inclusion promise is NOT required; if present, the client
529        #   SHOULD verify it.
530        #
531        # Before all of this, we require that the inclusion proof be present
532        # (when constructing the LogEntry).
533        log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())
534
535        if media_type == Bundle.BundleType.BUNDLE_0_1:
536            if not log_entry.inclusion_promise:
537                raise InvalidBundle("bundle must contain an inclusion promise")
538            if not log_entry.inclusion_proof.checkpoint:
539                _logger.debug(
540                    "0.1 bundle contains inclusion proof without checkpoint; ignoring"
541                )
542        else:
543            if not log_entry.inclusion_proof.checkpoint:
544                raise InvalidBundle("expected checkpoint in inclusion proof")
545
546        self._log_entry = log_entry
547
548    @property
549    def signing_certificate(self) -> Certificate:
550        """Returns the bundle's contained signing (i.e. leaf) certificate."""
551        return self._signing_certificate
552
553    @property
554    def log_entry(self) -> LogEntry:
555        """
556        Returns the bundle's log entry, containing an inclusion proof
557        (with checkpoint) and an inclusion promise (if the latter is present).
558        """
559        return self._log_entry
560
561    @property
562    def _dsse_envelope(self) -> dsse.Envelope | None:
563        """
564        Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
565
566        @private
567        """
568        if self._inner.dsse_envelope:
569            return dsse.Envelope(self._inner.dsse_envelope)
570        return None
571
572    @property
573    def signature(self) -> bytes:
574        """
575        Returns the signature bytes of this bundle.
576        Either from the DSSE Envelope or from the message itself.
577        """
578        return (
579            self._dsse_envelope.signature
580            if self._dsse_envelope
581            else self._inner.message_signature.signature
582        )
583
584    @property
585    def verification_material(self) -> VerificationMaterial:
586        """
587        Returns the bundle's verification material.
588        """
589        return VerificationMaterial(self._inner.verification_material)
590
591    @classmethod
592    def from_json(cls, raw: bytes | str) -> Bundle:
593        """
594        Deserialize the given Sigstore bundle.
595        """
596        inner = _Bundle().from_json(raw)
597        return cls(inner)
598
599    def to_json(self) -> str:
600        """
601        Return a JSON encoding of this bundle.
602        """
603        return self._inner.to_json()
604
605    def _to_parts(
606        self,
607    ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]:
608        """
609        Decompose the `Bundle` into its core constituent parts.
610
611        @private
612        """
613
614        content: common_v1.MessageSignature | dsse.Envelope
615        if self._dsse_envelope:
616            content = self._dsse_envelope
617        else:
618            content = self._inner.message_signature
619
620        return (self.signing_certificate, content, self.log_entry)
621
622    @classmethod
623    def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
624        """
625        Construct a Sigstore bundle (of `hashedrekord` type) from its
626        constituent parts.
627        """
628
629        return cls._from_parts(
630            cert, common_v1.MessageSignature(signature=sig), log_entry
631        )
632
633    @classmethod
634    def _from_parts(
635        cls,
636        cert: Certificate,
637        content: common_v1.MessageSignature | dsse.Envelope,
638        log_entry: LogEntry,
639    ) -> Bundle:
640        """
641        @private
642        """
643
644        inner = _Bundle(
645            media_type=Bundle.BundleType.BUNDLE_0_3.value,
646            verification_material=bundle_v1.VerificationMaterial(
647                certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)),
648            ),
649        )
650
651        # Fill in the appropriate variants.
652        if isinstance(content, common_v1.MessageSignature):
653            inner.message_signature = content
654        else:
655            inner.dsse_envelope = content._inner
656
657        tlog_entry = log_entry._to_rekor()
658        inner.verification_material.tlog_entries = [tlog_entry]
659
660        return cls(inner)

Represents a Sigstore bundle.

signing_certificate: cryptography.x509.base.Certificate
548    @property
549    def signing_certificate(self) -> Certificate:
550        """Returns the bundle's contained signing (i.e. leaf) certificate."""
551        return self._signing_certificate

Returns the bundle's contained signing (i.e. leaf) certificate.

log_entry: LogEntry
553    @property
554    def log_entry(self) -> LogEntry:
555        """
556        Returns the bundle's log entry, containing an inclusion proof
557        (with checkpoint) and an inclusion promise (if the latter is present).
558        """
559        return self._log_entry

Returns the bundle's log entry, containing an inclusion proof (with checkpoint) and an inclusion promise (if the latter is present).

signature: bytes
572    @property
573    def signature(self) -> bytes:
574        """
575        Returns the signature bytes of this bundle.
576        Either from the DSSE Envelope or from the message itself.
577        """
578        return (
579            self._dsse_envelope.signature
580            if self._dsse_envelope
581            else self._inner.message_signature.signature
582        )

Returns the signature bytes of this bundle. Either from the DSSE Envelope or from the message itself.

verification_material: VerificationMaterial
584    @property
585    def verification_material(self) -> VerificationMaterial:
586        """
587        Returns the bundle's verification material.
588        """
589        return VerificationMaterial(self._inner.verification_material)

Returns the bundle's verification material.

@classmethod
def from_json(cls, raw: bytes | str) -> Bundle:
591    @classmethod
592    def from_json(cls, raw: bytes | str) -> Bundle:
593        """
594        Deserialize the given Sigstore bundle.
595        """
596        inner = _Bundle().from_json(raw)
597        return cls(inner)

Deserialize the given Sigstore bundle.

def to_json(self) -> str:
599    def to_json(self) -> str:
600        """
601        Return a JSON encoding of this bundle.
602        """
603        return self._inner.to_json()

Return a JSON encoding of this bundle.

@classmethod
def from_parts( cls, cert: cryptography.x509.base.Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
622    @classmethod
623    def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
624        """
625        Construct a Sigstore bundle (of `hashedrekord` type) from its
626        constituent parts.
627        """
628
629        return cls._from_parts(
630            cert, common_v1.MessageSignature(signature=sig), log_entry
631        )

Construct a Sigstore bundle (of hashedrekord type) from its constituent parts.

class Bundle.BundleType(builtins.str, enum.Enum):
423    class BundleType(str, Enum):
424        """
425        Known Sigstore bundle media types.
426        """
427
428        BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
429        BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
430        BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
431        BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
432
433        def __str__(self) -> str:
434            """Returns the variant's string value."""
435            return self.value

Known Sigstore bundle media types.

BUNDLE_0_1 = <BundleType.BUNDLE_0_1: 'application/vnd.dev.sigstore.bundle+json;version=0.1'>
BUNDLE_0_2 = <BundleType.BUNDLE_0_2: 'application/vnd.dev.sigstore.bundle+json;version=0.2'>
BUNDLE_0_3_ALT = <BundleType.BUNDLE_0_3_ALT: 'application/vnd.dev.sigstore.bundle+json;version=0.3'>
BUNDLE_0_3 = <BundleType.BUNDLE_0_3: 'application/vnd.dev.sigstore.bundle.v0.3+json'>