sigstore.oidc

API for retrieving OIDC tokens.

  1# Copyright 2022 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16API for retrieving OIDC tokens.
 17"""
 18
 19from __future__ import annotations
 20
 21import logging
 22import sys
 23import time
 24import urllib.parse
 25import webbrowser
 26from datetime import datetime, timezone
 27from typing import NoReturn, Optional, cast
 28
 29import id
 30import jwt
 31import requests
 32from pydantic import BaseModel, StrictStr
 33
 34from sigstore._internal import USER_AGENT
 35from sigstore.errors import Error, NetworkError
 36
 37DEFAULT_OAUTH_ISSUER_URL = "https://oauth2.sigstore.dev/auth"
 38STAGING_OAUTH_ISSUER_URL = "https://oauth2.sigstage.dev/auth"
 39
 40# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
 41_KNOWN_OIDC_ISSUERS = {
 42    "https://accounts.google.com": "email",
 43    "https://oauth2.sigstore.dev/auth": "email",
 44    "https://oauth2.sigstage.dev/auth": "email",
 45    "https://token.actions.githubusercontent.com": "sub",
 46}
 47_DEFAULT_AUDIENCE = "sigstore"
 48
 49
 50class _OpenIDConfiguration(BaseModel):
 51    """
 52    Represents a (subset) of the fields provided by an OpenID Connect provider's
 53    `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery.
 54
 55    See: <https://openid.net/specs/openid-connect-discovery-1_0.html>
 56    """
 57
 58    authorization_endpoint: StrictStr
 59    token_endpoint: StrictStr
 60
 61
 62# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
 63_KNOWN_OIDC_ISSUERS = {
 64    "https://accounts.google.com": "email",
 65    "https://oauth2.sigstore.dev/auth": "email",
 66    "https://oauth2.sigstage.dev/auth": "email",
 67    "https://token.actions.githubusercontent.com": "sub",
 68}
 69DEFAULT_AUDIENCE = "sigstore"
 70
 71
 72class ExpiredIdentity(Exception):
 73    """An error raised when an identity token is expired."""
 74
 75
 76class IdentityToken:
 77    """
 78    An OIDC "identity", corresponding to an underlying OIDC token with
 79    a sensible subject, issuer, and audience for Sigstore purposes.
 80    """
 81
 82    def __init__(self, raw_token: str) -> None:
 83        """
 84        Create a new `IdentityToken` from the given OIDC token.
 85        """
 86
 87        self._raw_token = raw_token
 88
 89        # NOTE: The lack of verification here is intentional, and is part of
 90        # Sigstore's verification model: clients like sigstore-python are
 91        # responsible only for forwarding the OIDC identity to Fulcio for
 92        # certificate binding and issuance.
 93        try:
 94            self._unverified_claims = jwt.decode(
 95                raw_token,
 96                options={
 97                    "verify_signature": False,
 98                    "verify_aud": True,
 99                    "verify_iat": True,
100                    "verify_exp": True,
101                    # These claims are required by OpenID Connect, so
102                    # we can strongly enforce their presence.
103                    # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
104                    "require": ["aud", "sub", "iat", "exp", "iss"],
105                },
106                audience=DEFAULT_AUDIENCE,
107                # NOTE: This leeway shouldn't be strictly necessary, but is
108                # included to preempt any (small) skew between the host
109                # and the originating IdP.
110                leeway=5,
111            )
112        except Exception as exc:
113            raise IdentityError(
114                "Identity token is malformed or missing claims"
115            ) from exc
116
117        self._iss: str = self._unverified_claims["iss"]
118        self._nbf: int | None = self._unverified_claims.get("nbf")
119        self._exp: int = self._unverified_claims["exp"]
120
121        # Fail early if this token isn't within its validity period.
122        if not self.in_validity_period():
123            raise IdentityError("Identity token is not within its validity period")
124
125        # When verifying the private key possession proof, Fulcio uses
126        # different claims depending on the token's issuer.
127        # We currently special-case a handful of these, and fall back
128        # on signing the "sub" claim otherwise.
129        identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
130        if identity_claim is not None:
131            if identity_claim not in self._unverified_claims:
132                raise IdentityError(
133                    f"Identity token is missing the required {identity_claim!r} claim"
134                )
135
136            self._identity = str(self._unverified_claims.get(identity_claim))
137        else:
138            try:
139                self._identity = str(self._unverified_claims["sub"])
140            except KeyError:
141                raise IdentityError(
142                    "Identity token is missing the required 'sub' claim"
143                )
144
145        # This identity token might have been retrieved directly from
146        # an identity provider, or it might be a "federated" identity token
147        # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
148        # In the latter case, the claims will also include a `federated_claims`
149        # set, which in turn should include a `connector_id` that reflects
150        # the "real" token issuer. We retrieve this, despite technically
151        # being an implementation detail, because it has value to client
152        # users: a client might want to make sure that its user is identifying
153        # with a *particular* IdP, which means that they need to pierce the
154        # federation layer to check which IdP is actually being used.
155        self._federated_issuer: str | None = None
156        federated_claims = self._unverified_claims.get("federated_claims")
157        if federated_claims is not None:
158            if not isinstance(federated_claims, dict):
159                raise IdentityError(
160                    "unexpected claim type: federated_claims is not a dict"
161                )
162
163            federated_issuer = federated_claims.get("connector_id")
164            if federated_issuer is not None:
165                if not isinstance(federated_issuer, str):
166                    raise IdentityError(
167                        "unexpected claim type: federated_claims.connector_id is not a string"
168                    )
169
170                self._federated_issuer = federated_issuer
171
172    def in_validity_period(self) -> bool:
173        """
174        Returns whether or not this `Identity` is currently within its self-stated validity period.
175
176        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
177        the check here only asserts whether the *unverified* identity's claims
178        are within their validity period.
179        """
180
181        now = datetime.now(timezone.utc).timestamp()
182
183        if self._nbf is not None:
184            return self._nbf <= now < self._exp
185        else:
186            return now < self._exp
187
188    @property
189    def identity(self) -> str:
190        """
191        Returns this `IdentityToken`'s underlying "subject".
192
193        Note that this is **not** always the `sub` claim in the corresponding
194        identity token: depending onm the token's issuer, it may be a *different*
195        claim, such as `email`. This corresponds to the Sigstore ecosystem's
196        behavior, e.g. in each issued certificate's SAN.
197        """
198        return self._identity
199
200    @property
201    def issuer(self) -> str:
202        """
203        Returns a URL identifying this `IdentityToken`'s issuer.
204        """
205        return self._iss
206
207    @property
208    def federated_issuer(self) -> str:
209        """
210        Returns a URL identifying the **federated** issuer for any Sigstore
211        certificate issued against this identity token.
212
213        The behavior of this field is slightly subtle: for non-federated
214        identity providers (like a token issued directly by Google's IdP) it
215        should be exactly equivalent to `IdentityToken.issuer`. For federated
216        issuers (like Sigstore's own federated IdP) it should be equivalent to
217        the underlying federated issuer's URL, which is kept in an
218        implementation-defined claim.
219
220        This attribute exists so that clients who wish to inspect the expected
221        underlying issuer of their certificates can do so without relying on
222        implementation-specific behavior.
223        """
224        if self._federated_issuer is not None:
225            return self._federated_issuer
226
227        return self.issuer
228
229    def __str__(self) -> str:
230        """
231        Returns the underlying OIDC token for this identity.
232
233        That this token is secret in nature and **MUST NOT** be disclosed.
234        """
235        return self._raw_token
236
237
238class IssuerError(Exception):
239    """
240    Raised on any communication or format error with an OIDC issuer.
241    """
242
243    pass
244
245
246class Issuer:
247    """
248    Represents an OIDC issuer (IdP).
249    """
250
251    def __init__(self, base_url: str) -> None:
252        """
253        Create a new `Issuer` from the given base URL.
254
255        This URL is used to locate an OpenID Connect configuration file,
256        which is then used to bootstrap the issuer's state (such
257        as authorization and token endpoints).
258        """
259        self.session = requests.Session()
260        self.session.headers.update({"User-Agent": USER_AGENT})
261
262        oidc_config_url = urllib.parse.urljoin(
263            f"{base_url}/", ".well-known/openid-configuration"
264        )
265
266        try:
267            resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
268        except (requests.ConnectionError, requests.Timeout) as exc:
269            raise NetworkError from exc
270
271        try:
272            resp.raise_for_status()
273        except requests.HTTPError as http_error:
274            raise IssuerError from http_error
275
276        try:
277            # We don't generally expect this to fail (since the provider should
278            # return a non-success HTTP code which we catch above), but we
279            # check just in case we have a misbehaving OIDC issuer.
280            self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
281        except ValueError as exc:
282            raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
283
284    @classmethod
285    def production(cls) -> Issuer:
286        """
287        Returns an `Issuer` configured against Sigstore's production-level services.
288        """
289        return cls(DEFAULT_OAUTH_ISSUER_URL)
290
291    @classmethod
292    def staging(cls) -> Issuer:
293        """
294        Returns an `Issuer` configured against Sigstore's staging-level services.
295        """
296        return cls(STAGING_OAUTH_ISSUER_URL)
297
298    def identity_token(  # nosec: B107
299        self,
300        client_id: str = "sigstore",
301        client_secret: str = "",
302        force_oob: bool = False,
303    ) -> IdentityToken:
304        """
305        Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
306
307        This function blocks on user interaction.
308
309        The `force_oob` flag controls the kind of flow performed. When `False` (the default),
310        this function attempts to open the user's web browser before falling back to
311        an out-of-band flow. When `True`, the out-of-band flow is always used.
312        """
313
314        # This function and the components that it relies on are based off of:
315        # https://github.com/psteniusubi/python-sample
316
317        from sigstore._internal.oidc.oauth import _OAuthFlow
318
319        code: str
320        with _OAuthFlow(client_id, client_secret, self) as server:
321            # Launch web browser
322            if not force_oob and webbrowser.open(server.base_uri):
323                print("Waiting for browser interaction...", file=sys.stderr)
324            else:
325                server.enable_oob()
326                print(
327                    f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
328                    file=sys.stderr,
329                )
330
331            if not server.is_oob():
332                # Wait until the redirect server populates the response
333                while server.auth_response is None:
334                    time.sleep(0.1)
335
336                auth_error = server.auth_response.get("error")
337                if auth_error is not None:
338                    raise IdentityError(
339                        f"Error response from auth endpoint: {auth_error[0]}"
340                    )
341                code = server.auth_response["code"][0]
342            else:
343                # In the out-of-band case, we wait until the user provides the code
344                code = input("Enter verification code: ")
345
346        # Provide code to token endpoint
347        data = {
348            "grant_type": "authorization_code",
349            "redirect_uri": server.redirect_uri,
350            "code": code,
351            "code_verifier": server.oauth_session.code_verifier,
352        }
353        auth = (
354            client_id,
355            client_secret,
356        )
357        logging.debug(f"PAYLOAD: data={data}")
358        try:
359            resp = self.session.post(
360                self.oidc_config.token_endpoint,
361                data=data,
362                auth=auth,
363                timeout=30,
364            )
365        except (requests.ConnectionError, requests.Timeout) as exc:
366            raise NetworkError from exc
367
368        try:
369            resp.raise_for_status()
370        except requests.HTTPError as http_error:
371            raise IdentityError(
372                f"Token request failed with {resp.status_code}"
373            ) from http_error
374
375        token_json = resp.json()
376        token_error = token_json.get("error")
377        if token_error is not None:
378            raise IdentityError(f"Error response from token endpoint: {token_error}")
379
380        return IdentityToken(token_json["access_token"])
381
382
383class IdentityError(Error):
384    """
385    Wraps `id`'s IdentityError.
386    """
387
388    @classmethod
389    def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
390        """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
391        raise cls(str(exc)) from exc
392
393    def diagnostics(self) -> str:
394        """Returns diagnostics for the error."""
395        if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
396            return f"""
397                Insufficient permissions for GitHub Actions workflow.
398
399                The most common reason for this is incorrect
400                configuration of the top-level `permissions` setting of the
401                workflow YAML file. It should be configured like so:
402
403                    permissions:
404                      id-token: write
405
406                Relevant documentation here:
407
408                    https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
409
410                Another possible reason is that the workflow run has been
411                triggered by a PR from a forked repository. PRs from forked
412                repositories typically cannot be granted write access.
413
414                Relevant documentation here:
415
416                    https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
417
418                Additional context:
419
420                {self.__cause__}
421                """
422        else:
423            return f"""
424                An issue occurred with ambient credential detection.
425
426                Additional context:
427
428                {self}
429            """
430
431
432def detect_credential() -> Optional[str]:
433    """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
434    try:
435        return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE))
436    except id.IdentityError as exc:
437        IdentityError.raise_from_id(exc)
DEFAULT_OAUTH_ISSUER_URL = 'https://oauth2.sigstore.dev/auth'
STAGING_OAUTH_ISSUER_URL = 'https://oauth2.sigstage.dev/auth'
DEFAULT_AUDIENCE = $GITHUB_REPOSITORY_OWNER
class ExpiredIdentity(builtins.Exception):
73class ExpiredIdentity(Exception):
74    """An error raised when an identity token is expired."""

An error raised when an identity token is expired.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class IdentityToken:
 77class IdentityToken:
 78    """
 79    An OIDC "identity", corresponding to an underlying OIDC token with
 80    a sensible subject, issuer, and audience for Sigstore purposes.
 81    """
 82
 83    def __init__(self, raw_token: str) -> None:
 84        """
 85        Create a new `IdentityToken` from the given OIDC token.
 86        """
 87
 88        self._raw_token = raw_token
 89
 90        # NOTE: The lack of verification here is intentional, and is part of
 91        # Sigstore's verification model: clients like sigstore-python are
 92        # responsible only for forwarding the OIDC identity to Fulcio for
 93        # certificate binding and issuance.
 94        try:
 95            self._unverified_claims = jwt.decode(
 96                raw_token,
 97                options={
 98                    "verify_signature": False,
 99                    "verify_aud": True,
100                    "verify_iat": True,
101                    "verify_exp": True,
102                    # These claims are required by OpenID Connect, so
103                    # we can strongly enforce their presence.
104                    # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
105                    "require": ["aud", "sub", "iat", "exp", "iss"],
106                },
107                audience=DEFAULT_AUDIENCE,
108                # NOTE: This leeway shouldn't be strictly necessary, but is
109                # included to preempt any (small) skew between the host
110                # and the originating IdP.
111                leeway=5,
112            )
113        except Exception as exc:
114            raise IdentityError(
115                "Identity token is malformed or missing claims"
116            ) from exc
117
118        self._iss: str = self._unverified_claims["iss"]
119        self._nbf: int | None = self._unverified_claims.get("nbf")
120        self._exp: int = self._unverified_claims["exp"]
121
122        # Fail early if this token isn't within its validity period.
123        if not self.in_validity_period():
124            raise IdentityError("Identity token is not within its validity period")
125
126        # When verifying the private key possession proof, Fulcio uses
127        # different claims depending on the token's issuer.
128        # We currently special-case a handful of these, and fall back
129        # on signing the "sub" claim otherwise.
130        identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
131        if identity_claim is not None:
132            if identity_claim not in self._unverified_claims:
133                raise IdentityError(
134                    f"Identity token is missing the required {identity_claim!r} claim"
135                )
136
137            self._identity = str(self._unverified_claims.get(identity_claim))
138        else:
139            try:
140                self._identity = str(self._unverified_claims["sub"])
141            except KeyError:
142                raise IdentityError(
143                    "Identity token is missing the required 'sub' claim"
144                )
145
146        # This identity token might have been retrieved directly from
147        # an identity provider, or it might be a "federated" identity token
148        # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
149        # In the latter case, the claims will also include a `federated_claims`
150        # set, which in turn should include a `connector_id` that reflects
151        # the "real" token issuer. We retrieve this, despite technically
152        # being an implementation detail, because it has value to client
153        # users: a client might want to make sure that its user is identifying
154        # with a *particular* IdP, which means that they need to pierce the
155        # federation layer to check which IdP is actually being used.
156        self._federated_issuer: str | None = None
157        federated_claims = self._unverified_claims.get("federated_claims")
158        if federated_claims is not None:
159            if not isinstance(federated_claims, dict):
160                raise IdentityError(
161                    "unexpected claim type: federated_claims is not a dict"
162                )
163
164            federated_issuer = federated_claims.get("connector_id")
165            if federated_issuer is not None:
166                if not isinstance(federated_issuer, str):
167                    raise IdentityError(
168                        "unexpected claim type: federated_claims.connector_id is not a string"
169                    )
170
171                self._federated_issuer = federated_issuer
172
173    def in_validity_period(self) -> bool:
174        """
175        Returns whether or not this `Identity` is currently within its self-stated validity period.
176
177        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
178        the check here only asserts whether the *unverified* identity's claims
179        are within their validity period.
180        """
181
182        now = datetime.now(timezone.utc).timestamp()
183
184        if self._nbf is not None:
185            return self._nbf <= now < self._exp
186        else:
187            return now < self._exp
188
189    @property
190    def identity(self) -> str:
191        """
192        Returns this `IdentityToken`'s underlying "subject".
193
194        Note that this is **not** always the `sub` claim in the corresponding
195        identity token: depending onm the token's issuer, it may be a *different*
196        claim, such as `email`. This corresponds to the Sigstore ecosystem's
197        behavior, e.g. in each issued certificate's SAN.
198        """
199        return self._identity
200
201    @property
202    def issuer(self) -> str:
203        """
204        Returns a URL identifying this `IdentityToken`'s issuer.
205        """
206        return self._iss
207
208    @property
209    def federated_issuer(self) -> str:
210        """
211        Returns a URL identifying the **federated** issuer for any Sigstore
212        certificate issued against this identity token.
213
214        The behavior of this field is slightly subtle: for non-federated
215        identity providers (like a token issued directly by Google's IdP) it
216        should be exactly equivalent to `IdentityToken.issuer`. For federated
217        issuers (like Sigstore's own federated IdP) it should be equivalent to
218        the underlying federated issuer's URL, which is kept in an
219        implementation-defined claim.
220
221        This attribute exists so that clients who wish to inspect the expected
222        underlying issuer of their certificates can do so without relying on
223        implementation-specific behavior.
224        """
225        if self._federated_issuer is not None:
226            return self._federated_issuer
227
228        return self.issuer
229
230    def __str__(self) -> str:
231        """
232        Returns the underlying OIDC token for this identity.
233
234        That this token is secret in nature and **MUST NOT** be disclosed.
235        """
236        return self._raw_token

An OIDC "identity", corresponding to an underlying OIDC token with a sensible subject, issuer, and audience for Sigstore purposes.

IdentityToken(raw_token: str)
 83    def __init__(self, raw_token: str) -> None:
 84        """
 85        Create a new `IdentityToken` from the given OIDC token.
 86        """
 87
 88        self._raw_token = raw_token
 89
 90        # NOTE: The lack of verification here is intentional, and is part of
 91        # Sigstore's verification model: clients like sigstore-python are
 92        # responsible only for forwarding the OIDC identity to Fulcio for
 93        # certificate binding and issuance.
 94        try:
 95            self._unverified_claims = jwt.decode(
 96                raw_token,
 97                options={
 98                    "verify_signature": False,
 99                    "verify_aud": True,
100                    "verify_iat": True,
101                    "verify_exp": True,
102                    # These claims are required by OpenID Connect, so
103                    # we can strongly enforce their presence.
104                    # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
105                    "require": ["aud", "sub", "iat", "exp", "iss"],
106                },
107                audience=DEFAULT_AUDIENCE,
108                # NOTE: This leeway shouldn't be strictly necessary, but is
109                # included to preempt any (small) skew between the host
110                # and the originating IdP.
111                leeway=5,
112            )
113        except Exception as exc:
114            raise IdentityError(
115                "Identity token is malformed or missing claims"
116            ) from exc
117
118        self._iss: str = self._unverified_claims["iss"]
119        self._nbf: int | None = self._unverified_claims.get("nbf")
120        self._exp: int = self._unverified_claims["exp"]
121
122        # Fail early if this token isn't within its validity period.
123        if not self.in_validity_period():
124            raise IdentityError("Identity token is not within its validity period")
125
126        # When verifying the private key possession proof, Fulcio uses
127        # different claims depending on the token's issuer.
128        # We currently special-case a handful of these, and fall back
129        # on signing the "sub" claim otherwise.
130        identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
131        if identity_claim is not None:
132            if identity_claim not in self._unverified_claims:
133                raise IdentityError(
134                    f"Identity token is missing the required {identity_claim!r} claim"
135                )
136
137            self._identity = str(self._unverified_claims.get(identity_claim))
138        else:
139            try:
140                self._identity = str(self._unverified_claims["sub"])
141            except KeyError:
142                raise IdentityError(
143                    "Identity token is missing the required 'sub' claim"
144                )
145
146        # This identity token might have been retrieved directly from
147        # an identity provider, or it might be a "federated" identity token
148        # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
149        # In the latter case, the claims will also include a `federated_claims`
150        # set, which in turn should include a `connector_id` that reflects
151        # the "real" token issuer. We retrieve this, despite technically
152        # being an implementation detail, because it has value to client
153        # users: a client might want to make sure that its user is identifying
154        # with a *particular* IdP, which means that they need to pierce the
155        # federation layer to check which IdP is actually being used.
156        self._federated_issuer: str | None = None
157        federated_claims = self._unverified_claims.get("federated_claims")
158        if federated_claims is not None:
159            if not isinstance(federated_claims, dict):
160                raise IdentityError(
161                    "unexpected claim type: federated_claims is not a dict"
162                )
163
164            federated_issuer = federated_claims.get("connector_id")
165            if federated_issuer is not None:
166                if not isinstance(federated_issuer, str):
167                    raise IdentityError(
168                        "unexpected claim type: federated_claims.connector_id is not a string"
169                    )
170
171                self._federated_issuer = federated_issuer

Create a new IdentityToken from the given OIDC token.

def in_validity_period(self) -> bool:
173    def in_validity_period(self) -> bool:
174        """
175        Returns whether or not this `Identity` is currently within its self-stated validity period.
176
177        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
178        the check here only asserts whether the *unverified* identity's claims
179        are within their validity period.
180        """
181
182        now = datetime.now(timezone.utc).timestamp()
183
184        if self._nbf is not None:
185            return self._nbf <= now < self._exp
186        else:
187            return now < self._exp

Returns whether or not this Identity is currently within its self-stated validity period.

NOTE: As noted in Identity.__init__, this is not a verifying wrapper; the check here only asserts whether the unverified identity's claims are within their validity period.

identity: str
189    @property
190    def identity(self) -> str:
191        """
192        Returns this `IdentityToken`'s underlying "subject".
193
194        Note that this is **not** always the `sub` claim in the corresponding
195        identity token: depending onm the token's issuer, it may be a *different*
196        claim, such as `email`. This corresponds to the Sigstore ecosystem's
197        behavior, e.g. in each issued certificate's SAN.
198        """
199        return self._identity

Returns this IdentityToken's underlying "subject".

Note that this is not always the sub claim in the corresponding identity token: depending onm the token's issuer, it may be a different claim, such as email. This corresponds to the Sigstore ecosystem's behavior, e.g. in each issued certificate's SAN.

issuer: str
201    @property
202    def issuer(self) -> str:
203        """
204        Returns a URL identifying this `IdentityToken`'s issuer.
205        """
206        return self._iss

Returns a URL identifying this IdentityToken's issuer.

federated_issuer: str
208    @property
209    def federated_issuer(self) -> str:
210        """
211        Returns a URL identifying the **federated** issuer for any Sigstore
212        certificate issued against this identity token.
213
214        The behavior of this field is slightly subtle: for non-federated
215        identity providers (like a token issued directly by Google's IdP) it
216        should be exactly equivalent to `IdentityToken.issuer`. For federated
217        issuers (like Sigstore's own federated IdP) it should be equivalent to
218        the underlying federated issuer's URL, which is kept in an
219        implementation-defined claim.
220
221        This attribute exists so that clients who wish to inspect the expected
222        underlying issuer of their certificates can do so without relying on
223        implementation-specific behavior.
224        """
225        if self._federated_issuer is not None:
226            return self._federated_issuer
227
228        return self.issuer

Returns a URL identifying the federated issuer for any Sigstore certificate issued against this identity token.

The behavior of this field is slightly subtle: for non-federated identity providers (like a token issued directly by Google's IdP) it should be exactly equivalent to IdentityToken.issuer. For federated issuers (like Sigstore's own federated IdP) it should be equivalent to the underlying federated issuer's URL, which is kept in an implementation-defined claim.

This attribute exists so that clients who wish to inspect the expected underlying issuer of their certificates can do so without relying on implementation-specific behavior.

class IssuerError(builtins.Exception):
239class IssuerError(Exception):
240    """
241    Raised on any communication or format error with an OIDC issuer.
242    """
243
244    pass

Raised on any communication or format error with an OIDC issuer.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class Issuer:
247class Issuer:
248    """
249    Represents an OIDC issuer (IdP).
250    """
251
252    def __init__(self, base_url: str) -> None:
253        """
254        Create a new `Issuer` from the given base URL.
255
256        This URL is used to locate an OpenID Connect configuration file,
257        which is then used to bootstrap the issuer's state (such
258        as authorization and token endpoints).
259        """
260        self.session = requests.Session()
261        self.session.headers.update({"User-Agent": USER_AGENT})
262
263        oidc_config_url = urllib.parse.urljoin(
264            f"{base_url}/", ".well-known/openid-configuration"
265        )
266
267        try:
268            resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
269        except (requests.ConnectionError, requests.Timeout) as exc:
270            raise NetworkError from exc
271
272        try:
273            resp.raise_for_status()
274        except requests.HTTPError as http_error:
275            raise IssuerError from http_error
276
277        try:
278            # We don't generally expect this to fail (since the provider should
279            # return a non-success HTTP code which we catch above), but we
280            # check just in case we have a misbehaving OIDC issuer.
281            self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
282        except ValueError as exc:
283            raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
284
285    @classmethod
286    def production(cls) -> Issuer:
287        """
288        Returns an `Issuer` configured against Sigstore's production-level services.
289        """
290        return cls(DEFAULT_OAUTH_ISSUER_URL)
291
292    @classmethod
293    def staging(cls) -> Issuer:
294        """
295        Returns an `Issuer` configured against Sigstore's staging-level services.
296        """
297        return cls(STAGING_OAUTH_ISSUER_URL)
298
299    def identity_token(  # nosec: B107
300        self,
301        client_id: str = "sigstore",
302        client_secret: str = "",
303        force_oob: bool = False,
304    ) -> IdentityToken:
305        """
306        Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
307
308        This function blocks on user interaction.
309
310        The `force_oob` flag controls the kind of flow performed. When `False` (the default),
311        this function attempts to open the user's web browser before falling back to
312        an out-of-band flow. When `True`, the out-of-band flow is always used.
313        """
314
315        # This function and the components that it relies on are based off of:
316        # https://github.com/psteniusubi/python-sample
317
318        from sigstore._internal.oidc.oauth import _OAuthFlow
319
320        code: str
321        with _OAuthFlow(client_id, client_secret, self) as server:
322            # Launch web browser
323            if not force_oob and webbrowser.open(server.base_uri):
324                print("Waiting for browser interaction...", file=sys.stderr)
325            else:
326                server.enable_oob()
327                print(
328                    f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
329                    file=sys.stderr,
330                )
331
332            if not server.is_oob():
333                # Wait until the redirect server populates the response
334                while server.auth_response is None:
335                    time.sleep(0.1)
336
337                auth_error = server.auth_response.get("error")
338                if auth_error is not None:
339                    raise IdentityError(
340                        f"Error response from auth endpoint: {auth_error[0]}"
341                    )
342                code = server.auth_response["code"][0]
343            else:
344                # In the out-of-band case, we wait until the user provides the code
345                code = input("Enter verification code: ")
346
347        # Provide code to token endpoint
348        data = {
349            "grant_type": "authorization_code",
350            "redirect_uri": server.redirect_uri,
351            "code": code,
352            "code_verifier": server.oauth_session.code_verifier,
353        }
354        auth = (
355            client_id,
356            client_secret,
357        )
358        logging.debug(f"PAYLOAD: data={data}")
359        try:
360            resp = self.session.post(
361                self.oidc_config.token_endpoint,
362                data=data,
363                auth=auth,
364                timeout=30,
365            )
366        except (requests.ConnectionError, requests.Timeout) as exc:
367            raise NetworkError from exc
368
369        try:
370            resp.raise_for_status()
371        except requests.HTTPError as http_error:
372            raise IdentityError(
373                f"Token request failed with {resp.status_code}"
374            ) from http_error
375
376        token_json = resp.json()
377        token_error = token_json.get("error")
378        if token_error is not None:
379            raise IdentityError(f"Error response from token endpoint: {token_error}")
380
381        return IdentityToken(token_json["access_token"])

Represents an OIDC issuer (IdP).

Issuer(base_url: str)
252    def __init__(self, base_url: str) -> None:
253        """
254        Create a new `Issuer` from the given base URL.
255
256        This URL is used to locate an OpenID Connect configuration file,
257        which is then used to bootstrap the issuer's state (such
258        as authorization and token endpoints).
259        """
260        self.session = requests.Session()
261        self.session.headers.update({"User-Agent": USER_AGENT})
262
263        oidc_config_url = urllib.parse.urljoin(
264            f"{base_url}/", ".well-known/openid-configuration"
265        )
266
267        try:
268            resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
269        except (requests.ConnectionError, requests.Timeout) as exc:
270            raise NetworkError from exc
271
272        try:
273            resp.raise_for_status()
274        except requests.HTTPError as http_error:
275            raise IssuerError from http_error
276
277        try:
278            # We don't generally expect this to fail (since the provider should
279            # return a non-success HTTP code which we catch above), but we
280            # check just in case we have a misbehaving OIDC issuer.
281            self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
282        except ValueError as exc:
283            raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")

Create a new Issuer from the given base URL.

This URL is used to locate an OpenID Connect configuration file, which is then used to bootstrap the issuer's state (such as authorization and token endpoints).

session
@classmethod
def production(cls) -> Issuer:
285    @classmethod
286    def production(cls) -> Issuer:
287        """
288        Returns an `Issuer` configured against Sigstore's production-level services.
289        """
290        return cls(DEFAULT_OAUTH_ISSUER_URL)

Returns an Issuer configured against Sigstore's production-level services.

@classmethod
def staging(cls) -> Issuer:
292    @classmethod
293    def staging(cls) -> Issuer:
294        """
295        Returns an `Issuer` configured against Sigstore's staging-level services.
296        """
297        return cls(STAGING_OAUTH_ISSUER_URL)

Returns an Issuer configured against Sigstore's staging-level services.

def identity_token( self, client_id: str = 'sigstore', client_secret: str = '', force_oob: bool = False) -> IdentityToken:
299    def identity_token(  # nosec: B107
300        self,
301        client_id: str = "sigstore",
302        client_secret: str = "",
303        force_oob: bool = False,
304    ) -> IdentityToken:
305        """
306        Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
307
308        This function blocks on user interaction.
309
310        The `force_oob` flag controls the kind of flow performed. When `False` (the default),
311        this function attempts to open the user's web browser before falling back to
312        an out-of-band flow. When `True`, the out-of-band flow is always used.
313        """
314
315        # This function and the components that it relies on are based off of:
316        # https://github.com/psteniusubi/python-sample
317
318        from sigstore._internal.oidc.oauth import _OAuthFlow
319
320        code: str
321        with _OAuthFlow(client_id, client_secret, self) as server:
322            # Launch web browser
323            if not force_oob and webbrowser.open(server.base_uri):
324                print("Waiting for browser interaction...", file=sys.stderr)
325            else:
326                server.enable_oob()
327                print(
328                    f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
329                    file=sys.stderr,
330                )
331
332            if not server.is_oob():
333                # Wait until the redirect server populates the response
334                while server.auth_response is None:
335                    time.sleep(0.1)
336
337                auth_error = server.auth_response.get("error")
338                if auth_error is not None:
339                    raise IdentityError(
340                        f"Error response from auth endpoint: {auth_error[0]}"
341                    )
342                code = server.auth_response["code"][0]
343            else:
344                # In the out-of-band case, we wait until the user provides the code
345                code = input("Enter verification code: ")
346
347        # Provide code to token endpoint
348        data = {
349            "grant_type": "authorization_code",
350            "redirect_uri": server.redirect_uri,
351            "code": code,
352            "code_verifier": server.oauth_session.code_verifier,
353        }
354        auth = (
355            client_id,
356            client_secret,
357        )
358        logging.debug(f"PAYLOAD: data={data}")
359        try:
360            resp = self.session.post(
361                self.oidc_config.token_endpoint,
362                data=data,
363                auth=auth,
364                timeout=30,
365            )
366        except (requests.ConnectionError, requests.Timeout) as exc:
367            raise NetworkError from exc
368
369        try:
370            resp.raise_for_status()
371        except requests.HTTPError as http_error:
372            raise IdentityError(
373                f"Token request failed with {resp.status_code}"
374            ) from http_error
375
376        token_json = resp.json()
377        token_error = token_json.get("error")
378        if token_error is not None:
379            raise IdentityError(f"Error response from token endpoint: {token_error}")
380
381        return IdentityToken(token_json["access_token"])

Retrieves and returns an IdentityToken from the current Issuer, via OAuth.

This function blocks on user interaction.

The force_oob flag controls the kind of flow performed. When False (the default), this function attempts to open the user's web browser before falling back to an out-of-band flow. When True, the out-of-band flow is always used.

class IdentityError(sigstore.errors.Error):
384class IdentityError(Error):
385    """
386    Wraps `id`'s IdentityError.
387    """
388
389    @classmethod
390    def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
391        """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
392        raise cls(str(exc)) from exc
393
394    def diagnostics(self) -> str:
395        """Returns diagnostics for the error."""
396        if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
397            return f"""
398                Insufficient permissions for GitHub Actions workflow.
399
400                The most common reason for this is incorrect
401                configuration of the top-level `permissions` setting of the
402                workflow YAML file. It should be configured like so:
403
404                    permissions:
405                      id-token: write
406
407                Relevant documentation here:
408
409                    https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
410
411                Another possible reason is that the workflow run has been
412                triggered by a PR from a forked repository. PRs from forked
413                repositories typically cannot be granted write access.
414
415                Relevant documentation here:
416
417                    https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
418
419                Additional context:
420
421                {self.__cause__}
422                """
423        else:
424            return f"""
425                An issue occurred with ambient credential detection.
426
427                Additional context:
428
429                {self}
430            """

Wraps id's IdentityError.

@classmethod
def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
389    @classmethod
390    def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
391        """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
392        raise cls(str(exc)) from exc

Raises a wrapped IdentityError from the provided id.IdentityError.

def diagnostics(self) -> str:
394    def diagnostics(self) -> str:
395        """Returns diagnostics for the error."""
396        if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
397            return f"""
398                Insufficient permissions for GitHub Actions workflow.
399
400                The most common reason for this is incorrect
401                configuration of the top-level `permissions` setting of the
402                workflow YAML file. It should be configured like so:
403
404                    permissions:
405                      id-token: write
406
407                Relevant documentation here:
408
409                    https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
410
411                Another possible reason is that the workflow run has been
412                triggered by a PR from a forked repository. PRs from forked
413                repositories typically cannot be granted write access.
414
415                Relevant documentation here:
416
417                    https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
418
419                Additional context:
420
421                {self.__cause__}
422                """
423        else:
424            return f"""
425                An issue occurred with ambient credential detection.
426
427                Additional context:
428
429                {self}
430            """

Returns diagnostics for the error.

Inherited Members
builtins.Exception
Exception
sigstore.errors.Error
log_and_exit
builtins.BaseException
with_traceback
add_note
args
def detect_credential() -> Optional[str]:
433def detect_credential() -> Optional[str]:
434    """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
435    try:
436        return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE))
437    except id.IdentityError as exc:
438        IdentityError.raise_from_id(exc)

Calls id.detect_credential, but wraps exceptions with our own exception type.