sigstore.models

Common models shared between signing and verification.

  1# Copyright 2022 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16Common models shared between signing and verification.
 17"""
 18
 19from __future__ import annotations
 20
 21import base64
 22import logging
 23import typing
 24from enum import Enum
 25from textwrap import dedent
 26from typing import Any, List, Optional
 27
 28import rfc8785
 29from cryptography.hazmat.primitives.serialization import Encoding
 30from cryptography.x509 import (
 31    Certificate,
 32    load_der_x509_certificate,
 33)
 34from pydantic import (
 35    BaseModel,
 36    ConfigDict,
 37    Field,
 38    StrictInt,
 39    StrictStr,
 40    TypeAdapter,
 41    ValidationInfo,
 42    field_validator,
 43)
 44from pydantic.dataclasses import dataclass
 45from rekor_types import Dsse, Hashedrekord, ProposedEntry
 46from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1
 47from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
 48    Bundle as _Bundle,
 49)
 50from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1
 51from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1
 52from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
 53    InclusionProof,
 54)
 55
 56from sigstore import dsse
 57from sigstore._internal.merkle import verify_merkle_inclusion
 58from sigstore._internal.rekor.checkpoint import verify_checkpoint
 59from sigstore._utils import (
 60    B64Str,
 61    KeyID,
 62    cert_is_leaf,
 63    cert_is_root_ca,
 64)
 65from sigstore.errors import Error, VerificationError
 66
 67if typing.TYPE_CHECKING:
 68    from sigstore._internal.trust import RekorKeyring
 69
 70
 71_logger = logging.getLogger(__name__)
 72
 73
 74class LogInclusionProof(BaseModel):
 75    """
 76    Represents an inclusion proof for a transparency log entry.
 77    """
 78
 79    model_config = ConfigDict(populate_by_name=True)
 80
 81    checkpoint: StrictStr = Field(..., alias="checkpoint")
 82    hashes: List[StrictStr] = Field(..., alias="hashes")
 83    log_index: StrictInt = Field(..., alias="logIndex")
 84    root_hash: StrictStr = Field(..., alias="rootHash")
 85    tree_size: StrictInt = Field(..., alias="treeSize")
 86
 87    @field_validator("log_index")
 88    def _log_index_positive(cls, v: int) -> int:
 89        if v < 0:
 90            raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
 91        return v
 92
 93    @field_validator("tree_size")
 94    def _tree_size_positive(cls, v: int) -> int:
 95        if v < 0:
 96            raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
 97        return v
 98
 99    @field_validator("tree_size")
100    def _log_index_within_tree_size(
101        cls, v: int, info: ValidationInfo, **kwargs: Any
102    ) -> int:
103        if "log_index" in info.data and v <= info.data["log_index"]:
104            raise ValueError(
105                "Inclusion proof has log index greater than or equal to tree size: "
106                f"{v} <= {info.data['log_index']}"
107            )
108        return v
109
110
111@dataclass(frozen=True)
112class LogEntry:
113    """
114    Represents a transparency log entry.
115
116    Log entries are retrieved from the transparency log after signing or verification events,
117    or loaded from "Sigstore" bundles provided by the user.
118
119    This representation allows for either a missing inclusion promise or a missing
120    inclusion proof, but not both: attempting to construct a `LogEntry` without
121    at least one will fail.
122    """
123
124    uuid: Optional[str]
125    """
126    This entry's unique ID in the log instance it was retrieved from.
127
128    For sharded log deployments, IDs are unique per-shard.
129
130    Not present for `LogEntry` instances loaded from Sigstore bundles.
131    """
132
133    body: B64Str
134    """
135    The base64-encoded body of the transparency log entry.
136    """
137
138    integrated_time: int
139    """
140    The UNIX time at which this entry was integrated into the transparency log.
141    """
142
143    log_id: str
144    """
145    The log's ID (as the SHA256 hash of the DER-encoded public key for the log
146    at the time of entry inclusion).
147    """
148
149    log_index: int
150    """
151    The index of this entry within the log.
152    """
153
154    inclusion_proof: LogInclusionProof
155    """
156    An inclusion proof for this log entry.
157    """
158
159    inclusion_promise: Optional[B64Str]
160    """
161    An inclusion promise for this log entry, if present.
162
163    Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
164    log entry.
165    """
166
167    @classmethod
168    def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
169        """
170        Create a new `LogEntry` from the given API response.
171        """
172
173        # Assumes we only get one entry back
174        entries = list(dict_.items())
175        if len(entries) != 1:
176            raise ValueError("Received multiple entries in response")
177
178        uuid, entry = entries[0]
179        return LogEntry(
180            uuid=uuid,
181            body=entry["body"],
182            integrated_time=entry["integratedTime"],
183            log_id=entry["logID"],
184            log_index=entry["logIndex"],
185            inclusion_proof=LogInclusionProof.model_validate(
186                entry["verification"]["inclusionProof"]
187            ),
188            inclusion_promise=entry["verification"]["signedEntryTimestamp"],
189        )
190
191    @classmethod
192    def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
193        """
194        Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
195        """
196        tlog_entry = rekor_v1.TransparencyLogEntry()
197        tlog_entry.from_dict(dict_)
198
199        inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
200        # This check is required by us as the client, not the
201        # protobuf-specs themselves.
202        if not inclusion_proof or not inclusion_proof.checkpoint.envelope:
203            raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
204
205        parsed_inclusion_proof = LogInclusionProof(
206            checkpoint=inclusion_proof.checkpoint.envelope,
207            hashes=[h.hex() for h in inclusion_proof.hashes],
208            log_index=inclusion_proof.log_index,
209            root_hash=inclusion_proof.root_hash.hex(),
210            tree_size=inclusion_proof.tree_size,
211        )
212
213        return LogEntry(
214            uuid=None,
215            body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
216            integrated_time=tlog_entry.integrated_time,
217            log_id=tlog_entry.log_id.key_id.hex(),
218            log_index=tlog_entry.log_index,
219            inclusion_proof=parsed_inclusion_proof,
220            inclusion_promise=B64Str(
221                base64.b64encode(
222                    tlog_entry.inclusion_promise.signed_entry_timestamp
223                ).decode()
224            ),
225        )
226
227    def _to_rekor(self) -> rekor_v1.TransparencyLogEntry:
228        """
229        Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`.
230
231        @private
232        """
233        inclusion_promise: rekor_v1.InclusionPromise | None = None
234        if self.inclusion_promise:
235            inclusion_promise = rekor_v1.InclusionPromise(
236                signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
237            )
238
239        inclusion_proof = rekor_v1.InclusionProof(
240            log_index=self.inclusion_proof.log_index,
241            root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
242            tree_size=self.inclusion_proof.tree_size,
243            hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
244            checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
245        )
246
247        tlog_entry = rekor_v1.TransparencyLogEntry(
248            log_index=self.log_index,
249            log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
250            integrated_time=self.integrated_time,
251            inclusion_promise=inclusion_promise,  # type: ignore[arg-type]
252            inclusion_proof=inclusion_proof,
253            canonicalized_body=base64.b64decode(self.body),
254        )
255
256        # Fill in the appropriate kind
257        body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
258            tlog_entry.canonicalized_body
259        )
260        if not isinstance(body_entry, (Hashedrekord, Dsse)):
261            raise InvalidBundle("log entry is not of expected type")
262
263        tlog_entry.kind_version = rekor_v1.KindVersion(
264            kind=body_entry.kind, version=body_entry.api_version
265        )
266
267        return tlog_entry
268
269    def encode_canonical(self) -> bytes:
270        """
271        Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
272
273        This encoded representation is suitable for verification against
274        the Signed Entry Timestamp.
275        """
276        payload: dict[str, int | str] = {
277            "body": self.body,
278            "integratedTime": self.integrated_time,
279            "logID": self.log_id,
280            "logIndex": self.log_index,
281        }
282
283        return rfc8785.dumps(payload)
284
285    def _verify_set(self, keyring: RekorKeyring) -> None:
286        """
287        Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
288        `entry` using the given `keyring`.
289
290        Fails if the given log entry does not contain an inclusion promise.
291        """
292
293        if self.inclusion_promise is None:
294            raise VerificationError("SET: invalid inclusion promise: missing")
295
296        signed_entry_ts = base64.b64decode(self.inclusion_promise)
297
298        try:
299            keyring.verify(
300                key_id=KeyID(bytes.fromhex(self.log_id)),
301                signature=signed_entry_ts,
302                data=self.encode_canonical(),
303            )
304        except VerificationError as exc:
305            raise VerificationError(f"SET: invalid inclusion promise: {exc}")
306
307    def _verify(self, keyring: RekorKeyring) -> None:
308        """
309        Verifies this log entry.
310
311        This method performs steps (5), (6), and optionally (7) in
312        the top-level verify API:
313
314        * Verifies the consistency of the entry with the given bundle;
315        * Verifies the Merkle inclusion proof and its signed checkpoint;
316        * Verifies the inclusion promise, if present.
317        """
318
319        verify_merkle_inclusion(self)
320        verify_checkpoint(keyring, self)
321
322        _logger.debug(f"successfully verified inclusion proof: index={self.log_index}")
323
324        if self.inclusion_promise:
325            self._verify_set(keyring)
326            _logger.debug(
327                f"successfully verified inclusion promise: index={self.log_index}"
328            )
329
330
331class InvalidBundle(Error):
332    """
333    Raised when the associated `Bundle` is invalid in some way.
334    """
335
336    def diagnostics(self) -> str:
337        """Returns diagnostics for the error."""
338
339        return dedent(
340            f"""\
341        An issue occurred while parsing the Sigstore bundle.
342
343        The provided bundle is malformed and may have been modified maliciously.
344
345        Additional context:
346
347        {self}
348        """
349        )
350
351
352class Bundle:
353    """
354    Represents a Sigstore bundle.
355    """
356
357    class BundleType(str, Enum):
358        """
359        Known Sigstore bundle media types.
360        """
361
362        BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
363        BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
364        BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
365        BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
366
367        def __str__(self) -> str:
368            """Returns the variant's string value."""
369            return self.value
370
371    def __init__(self, inner: _Bundle) -> None:
372        """
373        Creates a new bundle. This is not a public API; use
374        `from_json` instead.
375
376        @private
377        """
378        self._inner = inner
379        self._verify()
380
381    def _verify(self) -> None:
382        """
383        Performs various feats of heroism to ensure the bundle is well-formed
384        and upholds invariants, including:
385
386        * The "leaf" (signing) certificate is present;
387        * There is a inclusion proof present, even if the Bundle's version
388           predates a mandatory inclusion proof.
389        """
390
391        # The bundle must have a recognized media type.
392        try:
393            media_type = Bundle.BundleType(self._inner.media_type)
394        except ValueError:
395            raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
396
397        # Extract the signing certificate.
398        if media_type in (
399            Bundle.BundleType.BUNDLE_0_3,
400            Bundle.BundleType.BUNDLE_0_3_ALT,
401        ):
402            # For "v3" bundles, the signing certificate is the only one present.
403            leaf_cert = load_der_x509_certificate(
404                self._inner.verification_material.certificate.raw_bytes
405            )
406        else:
407            # In older bundles, there is an entire pool (misleadingly called
408            # a chain) of certificates, the first of which is the signing
409            # certificate.
410            certs = (
411                self._inner.verification_material.x509_certificate_chain.certificates
412            )
413
414            if len(certs) == 0:
415                raise InvalidBundle("expected non-empty certificate chain in bundle")
416
417            # Per client policy in protobuf-specs: the first entry in the chain
418            # MUST be a leaf certificate, and the rest of the chain MUST NOT
419            # include a root CA or any intermediate CAs that appear in an
420            # independent root of trust.
421            #
422            # We expect some old bundles to violate the rules around root
423            # and intermediate CAs, so we issue warnings and not hard errors
424            # in those cases.
425            leaf_cert, *chain_certs = [
426                load_der_x509_certificate(cert.raw_bytes) for cert in certs
427            ]
428            if not cert_is_leaf(leaf_cert):
429                raise InvalidBundle(
430                    "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
431                )
432
433            for chain_cert in chain_certs:
434                # TODO: We should also retrieve the root of trust here and
435                # cross-check against it.
436                if cert_is_root_ca(chain_cert):
437                    _logger.warning(
438                        "this bundle contains a root CA, making it subject to misuse"
439                    )
440
441        self._signing_certificate = leaf_cert
442
443        # Extract the log entry. For the time being, we expect
444        # bundles to only contain a single log entry.
445        tlog_entries = self._inner.verification_material.tlog_entries
446        if len(tlog_entries) != 1:
447            raise InvalidBundle("expected exactly one log entry in bundle")
448        tlog_entry = tlog_entries[0]
449
450        # Handling of inclusion promises and proofs varies between bundle
451        # format versions:
452        #
453        # * For 0.1, an inclusion promise is required; the client
454        #   MUST verify the inclusion promise.
455        #   The inclusion proof is NOT required. If provided, it might NOT
456        #   contain a checkpoint; in this case, we ignore it (since it's
457        #   useless without one).
458        #
459        # * For 0.2+, an inclusion proof is required; the client MUST
460        #   verify the inclusion proof. The inclusion prof MUST contain
461        #   a checkpoint.
462        #   The inclusion promise is NOT required; if present, the client
463        #   SHOULD verify it.
464        #
465        # Before all of this, we require that the inclusion proof be present
466        # (when constructing the LogEntry).
467        log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())
468
469        if media_type == Bundle.BundleType.BUNDLE_0_1:
470            if not log_entry.inclusion_promise:
471                raise InvalidBundle("bundle must contain an inclusion promise")
472            if not log_entry.inclusion_proof.checkpoint:
473                _logger.debug(
474                    "0.1 bundle contains inclusion proof without checkpoint; ignoring"
475                )
476        else:
477            if not log_entry.inclusion_proof.checkpoint:
478                raise InvalidBundle("expected checkpoint in inclusion proof")
479
480        self._log_entry = log_entry
481
482    @property
483    def signing_certificate(self) -> Certificate:
484        """Returns the bundle's contained signing (i.e. leaf) certificate."""
485        return self._signing_certificate
486
487    @property
488    def log_entry(self) -> LogEntry:
489        """
490        Returns the bundle's log entry, containing an inclusion proof
491        (with checkpoint) and an inclusion promise (if the latter is present).
492        """
493        return self._log_entry
494
495    @property
496    def _dsse_envelope(self) -> dsse.Envelope | None:
497        """
498        Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
499
500        @private
501        """
502        if self._inner.dsse_envelope:
503            return dsse.Envelope(self._inner.dsse_envelope)
504        return None
505
506    @classmethod
507    def from_json(cls, raw: bytes | str) -> Bundle:
508        """
509        Deserialize the given Sigstore bundle.
510        """
511        inner = _Bundle().from_json(raw)
512        return cls(inner)
513
514    def to_json(self) -> str:
515        """
516        Return a JSON encoding of this bundle.
517        """
518        return self._inner.to_json()
519
520    def _to_parts(
521        self,
522    ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]:
523        """
524        Decompose the `Bundle` into its core constituent parts.
525
526        @private
527        """
528
529        content: common_v1.MessageSignature | dsse.Envelope
530        if self._dsse_envelope:
531            content = self._dsse_envelope
532        else:
533            content = self._inner.message_signature
534
535        return (self.signing_certificate, content, self.log_entry)
536
537    @classmethod
538    def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
539        """
540        Construct a Sigstore bundle (of `hashedrekord` type) from its
541        constituent parts.
542        """
543
544        return cls._from_parts(
545            cert, common_v1.MessageSignature(signature=sig), log_entry
546        )
547
548    @classmethod
549    def _from_parts(
550        cls,
551        cert: Certificate,
552        content: common_v1.MessageSignature | dsse.Envelope,
553        log_entry: LogEntry,
554    ) -> Bundle:
555        """
556        @private
557        """
558
559        inner = _Bundle(
560            media_type=Bundle.BundleType.BUNDLE_0_3.value,
561            verification_material=bundle_v1.VerificationMaterial(
562                certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)),
563            ),
564        )
565
566        # Fill in the appropriate variants.
567        if isinstance(content, common_v1.MessageSignature):
568            inner.message_signature = content
569        else:
570            inner.dsse_envelope = content._inner
571
572        tlog_entry = log_entry._to_rekor()
573        inner.verification_material.tlog_entries = [tlog_entry]
574
575        return cls(inner)
class LogInclusionProof(pydantic.main.BaseModel):
 75class LogInclusionProof(BaseModel):
 76    """
 77    Represents an inclusion proof for a transparency log entry.
 78    """
 79
 80    model_config = ConfigDict(populate_by_name=True)
 81
 82    checkpoint: StrictStr = Field(..., alias="checkpoint")
 83    hashes: List[StrictStr] = Field(..., alias="hashes")
 84    log_index: StrictInt = Field(..., alias="logIndex")
 85    root_hash: StrictStr = Field(..., alias="rootHash")
 86    tree_size: StrictInt = Field(..., alias="treeSize")
 87
 88    @field_validator("log_index")
 89    def _log_index_positive(cls, v: int) -> int:
 90        if v < 0:
 91            raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
 92        return v
 93
 94    @field_validator("tree_size")
 95    def _tree_size_positive(cls, v: int) -> int:
 96        if v < 0:
 97            raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
 98        return v
 99
100    @field_validator("tree_size")
101    def _log_index_within_tree_size(
102        cls, v: int, info: ValidationInfo, **kwargs: Any
103    ) -> int:
104        if "log_index" in info.data and v <= info.data["log_index"]:
105            raise ValueError(
106                "Inclusion proof has log index greater than or equal to tree size: "
107                f"{v} <= {info.data['log_index']}"
108            )
109        return v

Represents an inclusion proof for a transparency log entry.

model_config = {'populate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

checkpoint: Annotated[str, Strict(strict=True)]
hashes: List[Annotated[str, Strict(strict=True)]]
log_index: Annotated[int, Strict(strict=True)]
root_hash: Annotated[str, Strict(strict=True)]
tree_size: Annotated[int, Strict(strict=True)]
model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'checkpoint': FieldInfo(annotation=str, required=True, alias='checkpoint', alias_priority=2, metadata=[Strict(strict=True)]), 'hashes': FieldInfo(annotation=List[Annotated[str, Strict(strict=True)]], required=True, alias='hashes', alias_priority=2), 'log_index': FieldInfo(annotation=int, required=True, alias='logIndex', alias_priority=2, metadata=[Strict(strict=True)]), 'root_hash': FieldInfo(annotation=str, required=True, alias='rootHash', alias_priority=2, metadata=[Strict(strict=True)]), 'tree_size': FieldInfo(annotation=int, required=True, alias='treeSize', alias_priority=2, metadata=[Strict(strict=True)])}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
@dataclass(frozen=True)
class LogEntry:
112@dataclass(frozen=True)
113class LogEntry:
114    """
115    Represents a transparency log entry.
116
117    Log entries are retrieved from the transparency log after signing or verification events,
118    or loaded from "Sigstore" bundles provided by the user.
119
120    This representation allows for either a missing inclusion promise or a missing
121    inclusion proof, but not both: attempting to construct a `LogEntry` without
122    at least one will fail.
123    """
124
125    uuid: Optional[str]
126    """
127    This entry's unique ID in the log instance it was retrieved from.
128
129    For sharded log deployments, IDs are unique per-shard.
130
131    Not present for `LogEntry` instances loaded from Sigstore bundles.
132    """
133
134    body: B64Str
135    """
136    The base64-encoded body of the transparency log entry.
137    """
138
139    integrated_time: int
140    """
141    The UNIX time at which this entry was integrated into the transparency log.
142    """
143
144    log_id: str
145    """
146    The log's ID (as the SHA256 hash of the DER-encoded public key for the log
147    at the time of entry inclusion).
148    """
149
150    log_index: int
151    """
152    The index of this entry within the log.
153    """
154
155    inclusion_proof: LogInclusionProof
156    """
157    An inclusion proof for this log entry.
158    """
159
160    inclusion_promise: Optional[B64Str]
161    """
162    An inclusion promise for this log entry, if present.
163
164    Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
165    log entry.
166    """
167
168    @classmethod
169    def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
170        """
171        Create a new `LogEntry` from the given API response.
172        """
173
174        # Assumes we only get one entry back
175        entries = list(dict_.items())
176        if len(entries) != 1:
177            raise ValueError("Received multiple entries in response")
178
179        uuid, entry = entries[0]
180        return LogEntry(
181            uuid=uuid,
182            body=entry["body"],
183            integrated_time=entry["integratedTime"],
184            log_id=entry["logID"],
185            log_index=entry["logIndex"],
186            inclusion_proof=LogInclusionProof.model_validate(
187                entry["verification"]["inclusionProof"]
188            ),
189            inclusion_promise=entry["verification"]["signedEntryTimestamp"],
190        )
191
192    @classmethod
193    def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
194        """
195        Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
196        """
197        tlog_entry = rekor_v1.TransparencyLogEntry()
198        tlog_entry.from_dict(dict_)
199
200        inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
201        # This check is required by us as the client, not the
202        # protobuf-specs themselves.
203        if not inclusion_proof or not inclusion_proof.checkpoint.envelope:
204            raise InvalidBundle("entry must contain inclusion proof, with checkpoint")
205
206        parsed_inclusion_proof = LogInclusionProof(
207            checkpoint=inclusion_proof.checkpoint.envelope,
208            hashes=[h.hex() for h in inclusion_proof.hashes],
209            log_index=inclusion_proof.log_index,
210            root_hash=inclusion_proof.root_hash.hex(),
211            tree_size=inclusion_proof.tree_size,
212        )
213
214        return LogEntry(
215            uuid=None,
216            body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
217            integrated_time=tlog_entry.integrated_time,
218            log_id=tlog_entry.log_id.key_id.hex(),
219            log_index=tlog_entry.log_index,
220            inclusion_proof=parsed_inclusion_proof,
221            inclusion_promise=B64Str(
222                base64.b64encode(
223                    tlog_entry.inclusion_promise.signed_entry_timestamp
224                ).decode()
225            ),
226        )
227
228    def _to_rekor(self) -> rekor_v1.TransparencyLogEntry:
229        """
230        Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`.
231
232        @private
233        """
234        inclusion_promise: rekor_v1.InclusionPromise | None = None
235        if self.inclusion_promise:
236            inclusion_promise = rekor_v1.InclusionPromise(
237                signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
238            )
239
240        inclusion_proof = rekor_v1.InclusionProof(
241            log_index=self.inclusion_proof.log_index,
242            root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
243            tree_size=self.inclusion_proof.tree_size,
244            hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
245            checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
246        )
247
248        tlog_entry = rekor_v1.TransparencyLogEntry(
249            log_index=self.log_index,
250            log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
251            integrated_time=self.integrated_time,
252            inclusion_promise=inclusion_promise,  # type: ignore[arg-type]
253            inclusion_proof=inclusion_proof,
254            canonicalized_body=base64.b64decode(self.body),
255        )
256
257        # Fill in the appropriate kind
258        body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json(
259            tlog_entry.canonicalized_body
260        )
261        if not isinstance(body_entry, (Hashedrekord, Dsse)):
262            raise InvalidBundle("log entry is not of expected type")
263
264        tlog_entry.kind_version = rekor_v1.KindVersion(
265            kind=body_entry.kind, version=body_entry.api_version
266        )
267
268        return tlog_entry
269
270    def encode_canonical(self) -> bytes:
271        """
272        Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
273
274        This encoded representation is suitable for verification against
275        the Signed Entry Timestamp.
276        """
277        payload: dict[str, int | str] = {
278            "body": self.body,
279            "integratedTime": self.integrated_time,
280            "logID": self.log_id,
281            "logIndex": self.log_index,
282        }
283
284        return rfc8785.dumps(payload)
285
286    def _verify_set(self, keyring: RekorKeyring) -> None:
287        """
288        Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
289        `entry` using the given `keyring`.
290
291        Fails if the given log entry does not contain an inclusion promise.
292        """
293
294        if self.inclusion_promise is None:
295            raise VerificationError("SET: invalid inclusion promise: missing")
296
297        signed_entry_ts = base64.b64decode(self.inclusion_promise)
298
299        try:
300            keyring.verify(
301                key_id=KeyID(bytes.fromhex(self.log_id)),
302                signature=signed_entry_ts,
303                data=self.encode_canonical(),
304            )
305        except VerificationError as exc:
306            raise VerificationError(f"SET: invalid inclusion promise: {exc}")
307
308    def _verify(self, keyring: RekorKeyring) -> None:
309        """
310        Verifies this log entry.
311
312        This method performs steps (5), (6), and optionally (7) in
313        the top-level verify API:
314
315        * Verifies the consistency of the entry with the given bundle;
316        * Verifies the Merkle inclusion proof and its signed checkpoint;
317        * Verifies the inclusion promise, if present.
318        """
319
320        verify_merkle_inclusion(self)
321        verify_checkpoint(keyring, self)
322
323        _logger.debug(f"successfully verified inclusion proof: index={self.log_index}")
324
325        if self.inclusion_promise:
326            self._verify_set(keyring)
327            _logger.debug(
328                f"successfully verified inclusion promise: index={self.log_index}"
329            )

Represents a transparency log entry.

Log entries are retrieved from the transparency log after signing or verification events, or loaded from "Sigstore" bundles provided by the user.

This representation allows for either a missing inclusion promise or a missing inclusion proof, but not both: attempting to construct a LogEntry without at least one will fail.

LogEntry(*args: Any, **kwargs: Any)
139    def __init__(__dataclass_self__: PydanticDataclass, *args: Any, **kwargs: Any) -> None:
140        __tracebackhide__ = True
141        s = __dataclass_self__
142        s.__pydantic_validator__.validate_python(ArgsKwargs(args, kwargs), self_instance=s)
uuid: Optional[str]

This entry's unique ID in the log instance it was retrieved from.

For sharded log deployments, IDs are unique per-shard.

Not present for LogEntry instances loaded from Sigstore bundles.

The base64-encoded body of the transparency log entry.

integrated_time: int

The UNIX time at which this entry was integrated into the transparency log.

log_id: str

The log's ID (as the SHA256 hash of the DER-encoded public key for the log at the time of entry inclusion).

log_index: int

The index of this entry within the log.

inclusion_proof: LogInclusionProof

An inclusion proof for this log entry.

inclusion_promise: Optional[sigstore._utils.B64Str]

An inclusion promise for this log entry, if present.

Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this log entry.

def encode_canonical(self) -> bytes:
270    def encode_canonical(self) -> bytes:
271        """
272        Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
273
274        This encoded representation is suitable for verification against
275        the Signed Entry Timestamp.
276        """
277        payload: dict[str, int | str] = {
278            "body": self.body,
279            "integratedTime": self.integrated_time,
280            "logID": self.log_id,
281            "logIndex": self.log_index,
282        }
283
284        return rfc8785.dumps(payload)

Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.

This encoded representation is suitable for verification against the Signed Entry Timestamp.

class InvalidBundle(sigstore.errors.Error):
332class InvalidBundle(Error):
333    """
334    Raised when the associated `Bundle` is invalid in some way.
335    """
336
337    def diagnostics(self) -> str:
338        """Returns diagnostics for the error."""
339
340        return dedent(
341            f"""\
342        An issue occurred while parsing the Sigstore bundle.
343
344        The provided bundle is malformed and may have been modified maliciously.
345
346        Additional context:
347
348        {self}
349        """
350        )

Raised when the associated Bundle is invalid in some way.

def diagnostics(self) -> str:
337    def diagnostics(self) -> str:
338        """Returns diagnostics for the error."""
339
340        return dedent(
341            f"""\
342        An issue occurred while parsing the Sigstore bundle.
343
344        The provided bundle is malformed and may have been modified maliciously.
345
346        Additional context:
347
348        {self}
349        """
350        )

Returns diagnostics for the error.

Inherited Members
builtins.Exception
Exception
sigstore.errors.Error
log_and_exit
builtins.BaseException
with_traceback
add_note
args
class Bundle:
353class Bundle:
354    """
355    Represents a Sigstore bundle.
356    """
357
358    class BundleType(str, Enum):
359        """
360        Known Sigstore bundle media types.
361        """
362
363        BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
364        BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
365        BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
366        BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
367
368        def __str__(self) -> str:
369            """Returns the variant's string value."""
370            return self.value
371
372    def __init__(self, inner: _Bundle) -> None:
373        """
374        Creates a new bundle. This is not a public API; use
375        `from_json` instead.
376
377        @private
378        """
379        self._inner = inner
380        self._verify()
381
382    def _verify(self) -> None:
383        """
384        Performs various feats of heroism to ensure the bundle is well-formed
385        and upholds invariants, including:
386
387        * The "leaf" (signing) certificate is present;
388        * There is a inclusion proof present, even if the Bundle's version
389           predates a mandatory inclusion proof.
390        """
391
392        # The bundle must have a recognized media type.
393        try:
394            media_type = Bundle.BundleType(self._inner.media_type)
395        except ValueError:
396            raise InvalidBundle(f"unsupported bundle format: {self._inner.media_type}")
397
398        # Extract the signing certificate.
399        if media_type in (
400            Bundle.BundleType.BUNDLE_0_3,
401            Bundle.BundleType.BUNDLE_0_3_ALT,
402        ):
403            # For "v3" bundles, the signing certificate is the only one present.
404            leaf_cert = load_der_x509_certificate(
405                self._inner.verification_material.certificate.raw_bytes
406            )
407        else:
408            # In older bundles, there is an entire pool (misleadingly called
409            # a chain) of certificates, the first of which is the signing
410            # certificate.
411            certs = (
412                self._inner.verification_material.x509_certificate_chain.certificates
413            )
414
415            if len(certs) == 0:
416                raise InvalidBundle("expected non-empty certificate chain in bundle")
417
418            # Per client policy in protobuf-specs: the first entry in the chain
419            # MUST be a leaf certificate, and the rest of the chain MUST NOT
420            # include a root CA or any intermediate CAs that appear in an
421            # independent root of trust.
422            #
423            # We expect some old bundles to violate the rules around root
424            # and intermediate CAs, so we issue warnings and not hard errors
425            # in those cases.
426            leaf_cert, *chain_certs = [
427                load_der_x509_certificate(cert.raw_bytes) for cert in certs
428            ]
429            if not cert_is_leaf(leaf_cert):
430                raise InvalidBundle(
431                    "bundle contains an invalid leaf or non-leaf certificate in the leaf position"
432                )
433
434            for chain_cert in chain_certs:
435                # TODO: We should also retrieve the root of trust here and
436                # cross-check against it.
437                if cert_is_root_ca(chain_cert):
438                    _logger.warning(
439                        "this bundle contains a root CA, making it subject to misuse"
440                    )
441
442        self._signing_certificate = leaf_cert
443
444        # Extract the log entry. For the time being, we expect
445        # bundles to only contain a single log entry.
446        tlog_entries = self._inner.verification_material.tlog_entries
447        if len(tlog_entries) != 1:
448            raise InvalidBundle("expected exactly one log entry in bundle")
449        tlog_entry = tlog_entries[0]
450
451        # Handling of inclusion promises and proofs varies between bundle
452        # format versions:
453        #
454        # * For 0.1, an inclusion promise is required; the client
455        #   MUST verify the inclusion promise.
456        #   The inclusion proof is NOT required. If provided, it might NOT
457        #   contain a checkpoint; in this case, we ignore it (since it's
458        #   useless without one).
459        #
460        # * For 0.2+, an inclusion proof is required; the client MUST
461        #   verify the inclusion proof. The inclusion prof MUST contain
462        #   a checkpoint.
463        #   The inclusion promise is NOT required; if present, the client
464        #   SHOULD verify it.
465        #
466        # Before all of this, we require that the inclusion proof be present
467        # (when constructing the LogEntry).
468        log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())
469
470        if media_type == Bundle.BundleType.BUNDLE_0_1:
471            if not log_entry.inclusion_promise:
472                raise InvalidBundle("bundle must contain an inclusion promise")
473            if not log_entry.inclusion_proof.checkpoint:
474                _logger.debug(
475                    "0.1 bundle contains inclusion proof without checkpoint; ignoring"
476                )
477        else:
478            if not log_entry.inclusion_proof.checkpoint:
479                raise InvalidBundle("expected checkpoint in inclusion proof")
480
481        self._log_entry = log_entry
482
483    @property
484    def signing_certificate(self) -> Certificate:
485        """Returns the bundle's contained signing (i.e. leaf) certificate."""
486        return self._signing_certificate
487
488    @property
489    def log_entry(self) -> LogEntry:
490        """
491        Returns the bundle's log entry, containing an inclusion proof
492        (with checkpoint) and an inclusion promise (if the latter is present).
493        """
494        return self._log_entry
495
496    @property
497    def _dsse_envelope(self) -> dsse.Envelope | None:
498        """
499        Returns the DSSE envelope within this Bundle as a `dsse.Envelope`.
500
501        @private
502        """
503        if self._inner.dsse_envelope:
504            return dsse.Envelope(self._inner.dsse_envelope)
505        return None
506
507    @classmethod
508    def from_json(cls, raw: bytes | str) -> Bundle:
509        """
510        Deserialize the given Sigstore bundle.
511        """
512        inner = _Bundle().from_json(raw)
513        return cls(inner)
514
515    def to_json(self) -> str:
516        """
517        Return a JSON encoding of this bundle.
518        """
519        return self._inner.to_json()
520
521    def _to_parts(
522        self,
523    ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]:
524        """
525        Decompose the `Bundle` into its core constituent parts.
526
527        @private
528        """
529
530        content: common_v1.MessageSignature | dsse.Envelope
531        if self._dsse_envelope:
532            content = self._dsse_envelope
533        else:
534            content = self._inner.message_signature
535
536        return (self.signing_certificate, content, self.log_entry)
537
538    @classmethod
539    def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
540        """
541        Construct a Sigstore bundle (of `hashedrekord` type) from its
542        constituent parts.
543        """
544
545        return cls._from_parts(
546            cert, common_v1.MessageSignature(signature=sig), log_entry
547        )
548
549    @classmethod
550    def _from_parts(
551        cls,
552        cert: Certificate,
553        content: common_v1.MessageSignature | dsse.Envelope,
554        log_entry: LogEntry,
555    ) -> Bundle:
556        """
557        @private
558        """
559
560        inner = _Bundle(
561            media_type=Bundle.BundleType.BUNDLE_0_3.value,
562            verification_material=bundle_v1.VerificationMaterial(
563                certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)),
564            ),
565        )
566
567        # Fill in the appropriate variants.
568        if isinstance(content, common_v1.MessageSignature):
569            inner.message_signature = content
570        else:
571            inner.dsse_envelope = content._inner
572
573        tlog_entry = log_entry._to_rekor()
574        inner.verification_material.tlog_entries = [tlog_entry]
575
576        return cls(inner)

Represents a Sigstore bundle.

signing_certificate: cryptography.x509.base.Certificate
483    @property
484    def signing_certificate(self) -> Certificate:
485        """Returns the bundle's contained signing (i.e. leaf) certificate."""
486        return self._signing_certificate

Returns the bundle's contained signing (i.e. leaf) certificate.

log_entry: LogEntry
488    @property
489    def log_entry(self) -> LogEntry:
490        """
491        Returns the bundle's log entry, containing an inclusion proof
492        (with checkpoint) and an inclusion promise (if the latter is present).
493        """
494        return self._log_entry

Returns the bundle's log entry, containing an inclusion proof (with checkpoint) and an inclusion promise (if the latter is present).

@classmethod
def from_json(cls, raw: bytes | str) -> Bundle:
507    @classmethod
508    def from_json(cls, raw: bytes | str) -> Bundle:
509        """
510        Deserialize the given Sigstore bundle.
511        """
512        inner = _Bundle().from_json(raw)
513        return cls(inner)

Deserialize the given Sigstore bundle.

def to_json(self) -> str:
515    def to_json(self) -> str:
516        """
517        Return a JSON encoding of this bundle.
518        """
519        return self._inner.to_json()

Return a JSON encoding of this bundle.

@classmethod
def from_parts( cls, cert: cryptography.x509.base.Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
538    @classmethod
539    def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle:
540        """
541        Construct a Sigstore bundle (of `hashedrekord` type) from its
542        constituent parts.
543        """
544
545        return cls._from_parts(
546            cert, common_v1.MessageSignature(signature=sig), log_entry
547        )

Construct a Sigstore bundle (of hashedrekord type) from its constituent parts.

class Bundle.BundleType(builtins.str, enum.Enum):
358    class BundleType(str, Enum):
359        """
360        Known Sigstore bundle media types.
361        """
362
363        BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
364        BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
365        BUNDLE_0_3_ALT = "application/vnd.dev.sigstore.bundle+json;version=0.3"
366        BUNDLE_0_3 = "application/vnd.dev.sigstore.bundle.v0.3+json"
367
368        def __str__(self) -> str:
369            """Returns the variant's string value."""
370            return self.value

Known Sigstore bundle media types.

BUNDLE_0_1 = <BundleType.BUNDLE_0_1: 'application/vnd.dev.sigstore.bundle+json;version=0.1'>
BUNDLE_0_2 = <BundleType.BUNDLE_0_2: 'application/vnd.dev.sigstore.bundle+json;version=0.2'>
BUNDLE_0_3_ALT = <BundleType.BUNDLE_0_3_ALT: 'application/vnd.dev.sigstore.bundle+json;version=0.3'>
BUNDLE_0_3 = <BundleType.BUNDLE_0_3: 'application/vnd.dev.sigstore.bundle.v0.3+json'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans