sigstore.oidc

API for retrieving OIDC tokens.

  1# Copyright 2022 The Sigstore Authors
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#      http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16API for retrieving OIDC tokens.
 17"""
 18
 19from __future__ import annotations
 20
 21import logging
 22import sys
 23import time
 24import urllib.parse
 25import webbrowser
 26from datetime import datetime, timezone
 27from typing import NoReturn, Optional, cast
 28
 29import id
 30import jwt
 31import requests
 32from pydantic import BaseModel, StrictStr
 33
 34from sigstore._internal import USER_AGENT
 35from sigstore.errors import Error, NetworkError
 36
 37DEFAULT_OAUTH_ISSUER_URL = "https://oauth2.sigstore.dev/auth"
 38STAGING_OAUTH_ISSUER_URL = "https://oauth2.sigstage.dev/auth"
 39
 40# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
 41_KNOWN_OIDC_ISSUERS = {
 42    "https://accounts.google.com": "email",
 43    "https://oauth2.sigstore.dev/auth": "email",
 44    "https://oauth2.sigstage.dev/auth": "email",
 45    "https://token.actions.githubusercontent.com": "sub",
 46}
 47_DEFAULT_AUDIENCE = "sigstore"
 48
 49
 50class _OpenIDConfiguration(BaseModel):
 51    """
 52    Represents a (subset) of the fields provided by an OpenID Connect provider's
 53    `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery.
 54
 55    See: <https://openid.net/specs/openid-connect-discovery-1_0.html>
 56    """
 57
 58    authorization_endpoint: StrictStr
 59    token_endpoint: StrictStr
 60
 61
 62class ExpiredIdentity(Exception):
 63    """An error raised when an identity token is expired."""
 64
 65
 66class IdentityToken:
 67    """
 68    An OIDC "identity", corresponding to an underlying OIDC token with
 69    a sensible subject, issuer, and audience for Sigstore purposes.
 70    """
 71
 72    def __init__(self, raw_token: str) -> None:
 73        """
 74        Create a new `IdentityToken` from the given OIDC token.
 75        """
 76
 77        self._raw_token = raw_token
 78
 79        # NOTE: The lack of verification here is intentional, and is part of
 80        # Sigstore's verification model: clients like sigstore-python are
 81        # responsible only for forwarding the OIDC identity to Fulcio for
 82        # certificate binding and issuance.
 83        try:
 84            self._unverified_claims = jwt.decode(
 85                raw_token,
 86                options={
 87                    "verify_signature": False,
 88                    "verify_aud": True,
 89                    "verify_iat": True,
 90                    "verify_exp": True,
 91                    # These claims are required by OpenID Connect, so
 92                    # we can strongly enforce their presence.
 93                    # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
 94                    "require": ["aud", "sub", "iat", "exp", "iss"],
 95                },
 96                audience=_DEFAULT_AUDIENCE,
 97                # NOTE: This leeway shouldn't be strictly necessary, but is
 98                # included to preempt any (small) skew between the host
 99                # and the originating IdP.
100                leeway=5,
101            )
102        except Exception as exc:
103            raise IdentityError(
104                "Identity token is malformed or missing claims"
105            ) from exc
106
107        self._iss: str = self._unverified_claims["iss"]
108        self._nbf: int | None = self._unverified_claims.get("nbf")
109        self._exp: int = self._unverified_claims["exp"]
110
111        # Fail early if this token isn't within its validity period.
112        if not self.in_validity_period():
113            raise IdentityError("Identity token is not within its validity period")
114
115        # When verifying the private key possession proof, Fulcio uses
116        # different claims depending on the token's issuer.
117        # We currently special-case a handful of these, and fall back
118        # on signing the "sub" claim otherwise.
119        identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
120        if identity_claim is not None:
121            if identity_claim not in self._unverified_claims:
122                raise IdentityError(
123                    f"Identity token is missing the required {identity_claim!r} claim"
124                )
125
126            self._identity = str(self._unverified_claims.get(identity_claim))
127        else:
128            try:
129                self._identity = str(self._unverified_claims["sub"])
130            except KeyError:
131                raise IdentityError(
132                    "Identity token is missing the required 'sub' claim"
133                )
134
135        # This identity token might have been retrieved directly from
136        # an identity provider, or it might be a "federated" identity token
137        # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
138        # In the latter case, the claims will also include a `federated_claims`
139        # set, which in turn should include a `connector_id` that reflects
140        # the "real" token issuer. We retrieve this, despite technically
141        # being an implementation detail, because it has value to client
142        # users: a client might want to make sure that its user is identifying
143        # with a *particular* IdP, which means that they need to pierce the
144        # federation layer to check which IdP is actually being used.
145        self._federated_issuer: str | None = None
146        federated_claims = self._unverified_claims.get("federated_claims")
147        if federated_claims is not None:
148            if not isinstance(federated_claims, dict):
149                raise IdentityError(
150                    "unexpected claim type: federated_claims is not a dict"
151                )
152
153            federated_issuer = federated_claims.get("connector_id")
154            if federated_issuer is not None:
155                if not isinstance(federated_issuer, str):
156                    raise IdentityError(
157                        "unexpected claim type: federated_claims.connector_id is not a string"
158                    )
159
160                self._federated_issuer = federated_issuer
161
162    def in_validity_period(self) -> bool:
163        """
164        Returns whether or not this `Identity` is currently within its self-stated validity period.
165
166        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
167        the check here only asserts whether the *unverified* identity's claims
168        are within their validity period.
169        """
170
171        now = datetime.now(timezone.utc).timestamp()
172
173        if self._nbf is not None:
174            return self._nbf <= now < self._exp
175        else:
176            return now < self._exp
177
178    @property
179    def identity(self) -> str:
180        """
181        Returns this `IdentityToken`'s underlying "subject".
182
183        Note that this is **not** always the `sub` claim in the corresponding
184        identity token: depending onm the token's issuer, it may be a *different*
185        claim, such as `email`. This corresponds to the Sigstore ecosystem's
186        behavior, e.g. in each issued certificate's SAN.
187        """
188        return self._identity
189
190    @property
191    def issuer(self) -> str:
192        """
193        Returns a URL identifying this `IdentityToken`'s issuer.
194        """
195        return self._iss
196
197    @property
198    def federated_issuer(self) -> str:
199        """
200        Returns a URL identifying the **federated** issuer for any Sigstore
201        certificate issued against this identity token.
202
203        The behavior of this field is slightly subtle: for non-federated
204        identity providers (like a token issued directly by Google's IdP) it
205        should be exactly equivalent to `IdentityToken.issuer`. For federated
206        issuers (like Sigstore's own federated IdP) it should be equivalent to
207        the underlying federated issuer's URL, which is kept in an
208        implementation-defined claim.
209
210        This attribute exists so that clients who wish to inspect the expected
211        underlying issuer of their certificates can do so without relying on
212        implementation-specific behavior.
213        """
214        if self._federated_issuer is not None:
215            return self._federated_issuer
216
217        return self.issuer
218
219    def __str__(self) -> str:
220        """
221        Returns the underlying OIDC token for this identity.
222
223        That this token is secret in nature and **MUST NOT** be disclosed.
224        """
225        return self._raw_token
226
227
228class IssuerError(Exception):
229    """
230    Raised on any communication or format error with an OIDC issuer.
231    """
232
233    pass
234
235
236class Issuer:
237    """
238    Represents an OIDC issuer (IdP).
239    """
240
241    def __init__(self, base_url: str) -> None:
242        """
243        Create a new `Issuer` from the given base URL.
244
245        This URL is used to locate an OpenID Connect configuration file,
246        which is then used to bootstrap the issuer's state (such
247        as authorization and token endpoints).
248        """
249        self.session = requests.Session()
250        self.session.headers.update({"User-Agent": USER_AGENT})
251
252        oidc_config_url = urllib.parse.urljoin(
253            f"{base_url}/", ".well-known/openid-configuration"
254        )
255
256        try:
257            resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
258        except (requests.ConnectionError, requests.Timeout) as exc:
259            raise NetworkError from exc
260
261        try:
262            resp.raise_for_status()
263        except requests.HTTPError as http_error:
264            raise IssuerError from http_error
265
266        try:
267            # We don't generally expect this to fail (since the provider should
268            # return a non-success HTTP code which we catch above), but we
269            # check just in case we have a misbehaving OIDC issuer.
270            self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
271        except ValueError as exc:
272            raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
273
274    @classmethod
275    def production(cls) -> Issuer:
276        """
277        Returns an `Issuer` configured against Sigstore's production-level services.
278        """
279        return cls(DEFAULT_OAUTH_ISSUER_URL)
280
281    @classmethod
282    def staging(cls) -> Issuer:
283        """
284        Returns an `Issuer` configured against Sigstore's staging-level services.
285        """
286        return cls(STAGING_OAUTH_ISSUER_URL)
287
288    def identity_token(  # nosec: B107
289        self,
290        client_id: str = "sigstore",
291        client_secret: str = "",
292        force_oob: bool = False,
293    ) -> IdentityToken:
294        """
295        Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
296
297        This function blocks on user interaction.
298
299        The `force_oob` flag controls the kind of flow performed. When `False` (the default),
300        this function attempts to open the user's web browser before falling back to
301        an out-of-band flow. When `True`, the out-of-band flow is always used.
302        """
303
304        # This function and the components that it relies on are based off of:
305        # https://github.com/psteniusubi/python-sample
306
307        from sigstore._internal.oidc.oauth import _OAuthFlow
308
309        code: str
310        with _OAuthFlow(client_id, client_secret, self) as server:
311            # Launch web browser
312            if not force_oob and webbrowser.open(server.base_uri):
313                print("Waiting for browser interaction...", file=sys.stderr)
314            else:
315                server.enable_oob()
316                print(
317                    f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
318                    file=sys.stderr,
319                )
320
321            if not server.is_oob():
322                # Wait until the redirect server populates the response
323                while server.auth_response is None:
324                    time.sleep(0.1)
325
326                auth_error = server.auth_response.get("error")
327                if auth_error is not None:
328                    raise IdentityError(
329                        f"Error response from auth endpoint: {auth_error[0]}"
330                    )
331                code = server.auth_response["code"][0]
332            else:
333                # In the out-of-band case, we wait until the user provides the code
334                code = input("Enter verification code: ")
335
336        # Provide code to token endpoint
337        data = {
338            "grant_type": "authorization_code",
339            "redirect_uri": server.redirect_uri,
340            "code": code,
341            "code_verifier": server.oauth_session.code_verifier,
342        }
343        auth = (
344            client_id,
345            client_secret,
346        )
347        logging.debug(f"PAYLOAD: data={data}")
348        try:
349            resp = self.session.post(
350                self.oidc_config.token_endpoint,
351                data=data,
352                auth=auth,
353                timeout=30,
354            )
355        except (requests.ConnectionError, requests.Timeout) as exc:
356            raise NetworkError from exc
357
358        try:
359            resp.raise_for_status()
360        except requests.HTTPError as http_error:
361            raise IdentityError(
362                f"Token request failed with {resp.status_code}"
363            ) from http_error
364
365        token_json = resp.json()
366        token_error = token_json.get("error")
367        if token_error is not None:
368            raise IdentityError(f"Error response from token endpoint: {token_error}")
369
370        return IdentityToken(token_json["access_token"])
371
372
373class IdentityError(Error):
374    """
375    Wraps `id`'s IdentityError.
376    """
377
378    @classmethod
379    def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
380        """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
381        raise cls(str(exc)) from exc
382
383    def diagnostics(self) -> str:
384        """Returns diagnostics for the error."""
385        if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
386            return f"""
387                Insufficient permissions for GitHub Actions workflow.
388
389                The most common reason for this is incorrect
390                configuration of the top-level `permissions` setting of the
391                workflow YAML file. It should be configured like so:
392
393                    permissions:
394                      id-token: write
395
396                Relevant documentation here:
397
398                    https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
399
400                Another possible reason is that the workflow run has been
401                triggered by a PR from a forked repository. PRs from forked
402                repositories typically cannot be granted write access.
403
404                Relevant documentation here:
405
406                    https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
407
408                Additional context:
409
410                {self.__cause__}
411                """
412        else:
413            return f"""
414                An issue occurred with ambient credential detection.
415
416                Additional context:
417
418                {self}
419            """
420
421
422def detect_credential() -> Optional[str]:
423    """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
424    try:
425        return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE))
426    except id.IdentityError as exc:
427        IdentityError.raise_from_id(exc)
DEFAULT_OAUTH_ISSUER_URL = 'https://oauth2.sigstore.dev/auth'
STAGING_OAUTH_ISSUER_URL = 'https://oauth2.sigstage.dev/auth'
class ExpiredIdentity(builtins.Exception):
63class ExpiredIdentity(Exception):
64    """An error raised when an identity token is expired."""

An error raised when an identity token is expired.

class IdentityToken:
 67class IdentityToken:
 68    """
 69    An OIDC "identity", corresponding to an underlying OIDC token with
 70    a sensible subject, issuer, and audience for Sigstore purposes.
 71    """
 72
 73    def __init__(self, raw_token: str) -> None:
 74        """
 75        Create a new `IdentityToken` from the given OIDC token.
 76        """
 77
 78        self._raw_token = raw_token
 79
 80        # NOTE: The lack of verification here is intentional, and is part of
 81        # Sigstore's verification model: clients like sigstore-python are
 82        # responsible only for forwarding the OIDC identity to Fulcio for
 83        # certificate binding and issuance.
 84        try:
 85            self._unverified_claims = jwt.decode(
 86                raw_token,
 87                options={
 88                    "verify_signature": False,
 89                    "verify_aud": True,
 90                    "verify_iat": True,
 91                    "verify_exp": True,
 92                    # These claims are required by OpenID Connect, so
 93                    # we can strongly enforce their presence.
 94                    # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
 95                    "require": ["aud", "sub", "iat", "exp", "iss"],
 96                },
 97                audience=_DEFAULT_AUDIENCE,
 98                # NOTE: This leeway shouldn't be strictly necessary, but is
 99                # included to preempt any (small) skew between the host
100                # and the originating IdP.
101                leeway=5,
102            )
103        except Exception as exc:
104            raise IdentityError(
105                "Identity token is malformed or missing claims"
106            ) from exc
107
108        self._iss: str = self._unverified_claims["iss"]
109        self._nbf: int | None = self._unverified_claims.get("nbf")
110        self._exp: int = self._unverified_claims["exp"]
111
112        # Fail early if this token isn't within its validity period.
113        if not self.in_validity_period():
114            raise IdentityError("Identity token is not within its validity period")
115
116        # When verifying the private key possession proof, Fulcio uses
117        # different claims depending on the token's issuer.
118        # We currently special-case a handful of these, and fall back
119        # on signing the "sub" claim otherwise.
120        identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
121        if identity_claim is not None:
122            if identity_claim not in self._unverified_claims:
123                raise IdentityError(
124                    f"Identity token is missing the required {identity_claim!r} claim"
125                )
126
127            self._identity = str(self._unverified_claims.get(identity_claim))
128        else:
129            try:
130                self._identity = str(self._unverified_claims["sub"])
131            except KeyError:
132                raise IdentityError(
133                    "Identity token is missing the required 'sub' claim"
134                )
135
136        # This identity token might have been retrieved directly from
137        # an identity provider, or it might be a "federated" identity token
138        # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
139        # In the latter case, the claims will also include a `federated_claims`
140        # set, which in turn should include a `connector_id` that reflects
141        # the "real" token issuer. We retrieve this, despite technically
142        # being an implementation detail, because it has value to client
143        # users: a client might want to make sure that its user is identifying
144        # with a *particular* IdP, which means that they need to pierce the
145        # federation layer to check which IdP is actually being used.
146        self._federated_issuer: str | None = None
147        federated_claims = self._unverified_claims.get("federated_claims")
148        if federated_claims is not None:
149            if not isinstance(federated_claims, dict):
150                raise IdentityError(
151                    "unexpected claim type: federated_claims is not a dict"
152                )
153
154            federated_issuer = federated_claims.get("connector_id")
155            if federated_issuer is not None:
156                if not isinstance(federated_issuer, str):
157                    raise IdentityError(
158                        "unexpected claim type: federated_claims.connector_id is not a string"
159                    )
160
161                self._federated_issuer = federated_issuer
162
163    def in_validity_period(self) -> bool:
164        """
165        Returns whether or not this `Identity` is currently within its self-stated validity period.
166
167        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
168        the check here only asserts whether the *unverified* identity's claims
169        are within their validity period.
170        """
171
172        now = datetime.now(timezone.utc).timestamp()
173
174        if self._nbf is not None:
175            return self._nbf <= now < self._exp
176        else:
177            return now < self._exp
178
179    @property
180    def identity(self) -> str:
181        """
182        Returns this `IdentityToken`'s underlying "subject".
183
184        Note that this is **not** always the `sub` claim in the corresponding
185        identity token: depending onm the token's issuer, it may be a *different*
186        claim, such as `email`. This corresponds to the Sigstore ecosystem's
187        behavior, e.g. in each issued certificate's SAN.
188        """
189        return self._identity
190
191    @property
192    def issuer(self) -> str:
193        """
194        Returns a URL identifying this `IdentityToken`'s issuer.
195        """
196        return self._iss
197
198    @property
199    def federated_issuer(self) -> str:
200        """
201        Returns a URL identifying the **federated** issuer for any Sigstore
202        certificate issued against this identity token.
203
204        The behavior of this field is slightly subtle: for non-federated
205        identity providers (like a token issued directly by Google's IdP) it
206        should be exactly equivalent to `IdentityToken.issuer`. For federated
207        issuers (like Sigstore's own federated IdP) it should be equivalent to
208        the underlying federated issuer's URL, which is kept in an
209        implementation-defined claim.
210
211        This attribute exists so that clients who wish to inspect the expected
212        underlying issuer of their certificates can do so without relying on
213        implementation-specific behavior.
214        """
215        if self._federated_issuer is not None:
216            return self._federated_issuer
217
218        return self.issuer
219
220    def __str__(self) -> str:
221        """
222        Returns the underlying OIDC token for this identity.
223
224        That this token is secret in nature and **MUST NOT** be disclosed.
225        """
226        return self._raw_token

An OIDC "identity", corresponding to an underlying OIDC token with a sensible subject, issuer, and audience for Sigstore purposes.

IdentityToken(raw_token: str)
 73    def __init__(self, raw_token: str) -> None:
 74        """
 75        Create a new `IdentityToken` from the given OIDC token.
 76        """
 77
 78        self._raw_token = raw_token
 79
 80        # NOTE: The lack of verification here is intentional, and is part of
 81        # Sigstore's verification model: clients like sigstore-python are
 82        # responsible only for forwarding the OIDC identity to Fulcio for
 83        # certificate binding and issuance.
 84        try:
 85            self._unverified_claims = jwt.decode(
 86                raw_token,
 87                options={
 88                    "verify_signature": False,
 89                    "verify_aud": True,
 90                    "verify_iat": True,
 91                    "verify_exp": True,
 92                    # These claims are required by OpenID Connect, so
 93                    # we can strongly enforce their presence.
 94                    # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
 95                    "require": ["aud", "sub", "iat", "exp", "iss"],
 96                },
 97                audience=_DEFAULT_AUDIENCE,
 98                # NOTE: This leeway shouldn't be strictly necessary, but is
 99                # included to preempt any (small) skew between the host
100                # and the originating IdP.
101                leeway=5,
102            )
103        except Exception as exc:
104            raise IdentityError(
105                "Identity token is malformed or missing claims"
106            ) from exc
107
108        self._iss: str = self._unverified_claims["iss"]
109        self._nbf: int | None = self._unverified_claims.get("nbf")
110        self._exp: int = self._unverified_claims["exp"]
111
112        # Fail early if this token isn't within its validity period.
113        if not self.in_validity_period():
114            raise IdentityError("Identity token is not within its validity period")
115
116        # When verifying the private key possession proof, Fulcio uses
117        # different claims depending on the token's issuer.
118        # We currently special-case a handful of these, and fall back
119        # on signing the "sub" claim otherwise.
120        identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
121        if identity_claim is not None:
122            if identity_claim not in self._unverified_claims:
123                raise IdentityError(
124                    f"Identity token is missing the required {identity_claim!r} claim"
125                )
126
127            self._identity = str(self._unverified_claims.get(identity_claim))
128        else:
129            try:
130                self._identity = str(self._unverified_claims["sub"])
131            except KeyError:
132                raise IdentityError(
133                    "Identity token is missing the required 'sub' claim"
134                )
135
136        # This identity token might have been retrieved directly from
137        # an identity provider, or it might be a "federated" identity token
138        # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
139        # In the latter case, the claims will also include a `federated_claims`
140        # set, which in turn should include a `connector_id` that reflects
141        # the "real" token issuer. We retrieve this, despite technically
142        # being an implementation detail, because it has value to client
143        # users: a client might want to make sure that its user is identifying
144        # with a *particular* IdP, which means that they need to pierce the
145        # federation layer to check which IdP is actually being used.
146        self._federated_issuer: str | None = None
147        federated_claims = self._unverified_claims.get("federated_claims")
148        if federated_claims is not None:
149            if not isinstance(federated_claims, dict):
150                raise IdentityError(
151                    "unexpected claim type: federated_claims is not a dict"
152                )
153
154            federated_issuer = federated_claims.get("connector_id")
155            if federated_issuer is not None:
156                if not isinstance(federated_issuer, str):
157                    raise IdentityError(
158                        "unexpected claim type: federated_claims.connector_id is not a string"
159                    )
160
161                self._federated_issuer = federated_issuer

Create a new IdentityToken from the given OIDC token.

def in_validity_period(self) -> bool:
163    def in_validity_period(self) -> bool:
164        """
165        Returns whether or not this `Identity` is currently within its self-stated validity period.
166
167        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
168        the check here only asserts whether the *unverified* identity's claims
169        are within their validity period.
170        """
171
172        now = datetime.now(timezone.utc).timestamp()
173
174        if self._nbf is not None:
175            return self._nbf <= now < self._exp
176        else:
177            return now < self._exp

Returns whether or not this Identity is currently within its self-stated validity period.

NOTE: As noted in Identity.__init__, this is not a verifying wrapper; the check here only asserts whether the unverified identity's claims are within their validity period.

identity: str
179    @property
180    def identity(self) -> str:
181        """
182        Returns this `IdentityToken`'s underlying "subject".
183
184        Note that this is **not** always the `sub` claim in the corresponding
185        identity token: depending onm the token's issuer, it may be a *different*
186        claim, such as `email`. This corresponds to the Sigstore ecosystem's
187        behavior, e.g. in each issued certificate's SAN.
188        """
189        return self._identity

Returns this IdentityToken's underlying "subject".

Note that this is not always the sub claim in the corresponding identity token: depending onm the token's issuer, it may be a different claim, such as email. This corresponds to the Sigstore ecosystem's behavior, e.g. in each issued certificate's SAN.

issuer: str
191    @property
192    def issuer(self) -> str:
193        """
194        Returns a URL identifying this `IdentityToken`'s issuer.
195        """
196        return self._iss

Returns a URL identifying this IdentityToken's issuer.

federated_issuer: str
198    @property
199    def federated_issuer(self) -> str:
200        """
201        Returns a URL identifying the **federated** issuer for any Sigstore
202        certificate issued against this identity token.
203
204        The behavior of this field is slightly subtle: for non-federated
205        identity providers (like a token issued directly by Google's IdP) it
206        should be exactly equivalent to `IdentityToken.issuer`. For federated
207        issuers (like Sigstore's own federated IdP) it should be equivalent to
208        the underlying federated issuer's URL, which is kept in an
209        implementation-defined claim.
210
211        This attribute exists so that clients who wish to inspect the expected
212        underlying issuer of their certificates can do so without relying on
213        implementation-specific behavior.
214        """
215        if self._federated_issuer is not None:
216            return self._federated_issuer
217
218        return self.issuer

Returns a URL identifying the federated issuer for any Sigstore certificate issued against this identity token.

The behavior of this field is slightly subtle: for non-federated identity providers (like a token issued directly by Google's IdP) it should be exactly equivalent to IdentityToken.issuer. For federated issuers (like Sigstore's own federated IdP) it should be equivalent to the underlying federated issuer's URL, which is kept in an implementation-defined claim.

This attribute exists so that clients who wish to inspect the expected underlying issuer of their certificates can do so without relying on implementation-specific behavior.

class IssuerError(builtins.Exception):
229class IssuerError(Exception):
230    """
231    Raised on any communication or format error with an OIDC issuer.
232    """
233
234    pass

Raised on any communication or format error with an OIDC issuer.

class Issuer:
237class Issuer:
238    """
239    Represents an OIDC issuer (IdP).
240    """
241
242    def __init__(self, base_url: str) -> None:
243        """
244        Create a new `Issuer` from the given base URL.
245
246        This URL is used to locate an OpenID Connect configuration file,
247        which is then used to bootstrap the issuer's state (such
248        as authorization and token endpoints).
249        """
250        self.session = requests.Session()
251        self.session.headers.update({"User-Agent": USER_AGENT})
252
253        oidc_config_url = urllib.parse.urljoin(
254            f"{base_url}/", ".well-known/openid-configuration"
255        )
256
257        try:
258            resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
259        except (requests.ConnectionError, requests.Timeout) as exc:
260            raise NetworkError from exc
261
262        try:
263            resp.raise_for_status()
264        except requests.HTTPError as http_error:
265            raise IssuerError from http_error
266
267        try:
268            # We don't generally expect this to fail (since the provider should
269            # return a non-success HTTP code which we catch above), but we
270            # check just in case we have a misbehaving OIDC issuer.
271            self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
272        except ValueError as exc:
273            raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
274
275    @classmethod
276    def production(cls) -> Issuer:
277        """
278        Returns an `Issuer` configured against Sigstore's production-level services.
279        """
280        return cls(DEFAULT_OAUTH_ISSUER_URL)
281
282    @classmethod
283    def staging(cls) -> Issuer:
284        """
285        Returns an `Issuer` configured against Sigstore's staging-level services.
286        """
287        return cls(STAGING_OAUTH_ISSUER_URL)
288
289    def identity_token(  # nosec: B107
290        self,
291        client_id: str = "sigstore",
292        client_secret: str = "",
293        force_oob: bool = False,
294    ) -> IdentityToken:
295        """
296        Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
297
298        This function blocks on user interaction.
299
300        The `force_oob` flag controls the kind of flow performed. When `False` (the default),
301        this function attempts to open the user's web browser before falling back to
302        an out-of-band flow. When `True`, the out-of-band flow is always used.
303        """
304
305        # This function and the components that it relies on are based off of:
306        # https://github.com/psteniusubi/python-sample
307
308        from sigstore._internal.oidc.oauth import _OAuthFlow
309
310        code: str
311        with _OAuthFlow(client_id, client_secret, self) as server:
312            # Launch web browser
313            if not force_oob and webbrowser.open(server.base_uri):
314                print("Waiting for browser interaction...", file=sys.stderr)
315            else:
316                server.enable_oob()
317                print(
318                    f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
319                    file=sys.stderr,
320                )
321
322            if not server.is_oob():
323                # Wait until the redirect server populates the response
324                while server.auth_response is None:
325                    time.sleep(0.1)
326
327                auth_error = server.auth_response.get("error")
328                if auth_error is not None:
329                    raise IdentityError(
330                        f"Error response from auth endpoint: {auth_error[0]}"
331                    )
332                code = server.auth_response["code"][0]
333            else:
334                # In the out-of-band case, we wait until the user provides the code
335                code = input("Enter verification code: ")
336
337        # Provide code to token endpoint
338        data = {
339            "grant_type": "authorization_code",
340            "redirect_uri": server.redirect_uri,
341            "code": code,
342            "code_verifier": server.oauth_session.code_verifier,
343        }
344        auth = (
345            client_id,
346            client_secret,
347        )
348        logging.debug(f"PAYLOAD: data={data}")
349        try:
350            resp = self.session.post(
351                self.oidc_config.token_endpoint,
352                data=data,
353                auth=auth,
354                timeout=30,
355            )
356        except (requests.ConnectionError, requests.Timeout) as exc:
357            raise NetworkError from exc
358
359        try:
360            resp.raise_for_status()
361        except requests.HTTPError as http_error:
362            raise IdentityError(
363                f"Token request failed with {resp.status_code}"
364            ) from http_error
365
366        token_json = resp.json()
367        token_error = token_json.get("error")
368        if token_error is not None:
369            raise IdentityError(f"Error response from token endpoint: {token_error}")
370
371        return IdentityToken(token_json["access_token"])

Represents an OIDC issuer (IdP).

Issuer(base_url: str)
242    def __init__(self, base_url: str) -> None:
243        """
244        Create a new `Issuer` from the given base URL.
245
246        This URL is used to locate an OpenID Connect configuration file,
247        which is then used to bootstrap the issuer's state (such
248        as authorization and token endpoints).
249        """
250        self.session = requests.Session()
251        self.session.headers.update({"User-Agent": USER_AGENT})
252
253        oidc_config_url = urllib.parse.urljoin(
254            f"{base_url}/", ".well-known/openid-configuration"
255        )
256
257        try:
258            resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
259        except (requests.ConnectionError, requests.Timeout) as exc:
260            raise NetworkError from exc
261
262        try:
263            resp.raise_for_status()
264        except requests.HTTPError as http_error:
265            raise IssuerError from http_error
266
267        try:
268            # We don't generally expect this to fail (since the provider should
269            # return a non-success HTTP code which we catch above), but we
270            # check just in case we have a misbehaving OIDC issuer.
271            self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
272        except ValueError as exc:
273            raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")

Create a new Issuer from the given base URL.

This URL is used to locate an OpenID Connect configuration file, which is then used to bootstrap the issuer's state (such as authorization and token endpoints).

session
@classmethod
def production(cls) -> Issuer:
275    @classmethod
276    def production(cls) -> Issuer:
277        """
278        Returns an `Issuer` configured against Sigstore's production-level services.
279        """
280        return cls(DEFAULT_OAUTH_ISSUER_URL)

Returns an Issuer configured against Sigstore's production-level services.

@classmethod
def staging(cls) -> Issuer:
282    @classmethod
283    def staging(cls) -> Issuer:
284        """
285        Returns an `Issuer` configured against Sigstore's staging-level services.
286        """
287        return cls(STAGING_OAUTH_ISSUER_URL)

Returns an Issuer configured against Sigstore's staging-level services.

def identity_token( self, client_id: str = 'sigstore', client_secret: str = '', force_oob: bool = False) -> IdentityToken:
289    def identity_token(  # nosec: B107
290        self,
291        client_id: str = "sigstore",
292        client_secret: str = "",
293        force_oob: bool = False,
294    ) -> IdentityToken:
295        """
296        Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
297
298        This function blocks on user interaction.
299
300        The `force_oob` flag controls the kind of flow performed. When `False` (the default),
301        this function attempts to open the user's web browser before falling back to
302        an out-of-band flow. When `True`, the out-of-band flow is always used.
303        """
304
305        # This function and the components that it relies on are based off of:
306        # https://github.com/psteniusubi/python-sample
307
308        from sigstore._internal.oidc.oauth import _OAuthFlow
309
310        code: str
311        with _OAuthFlow(client_id, client_secret, self) as server:
312            # Launch web browser
313            if not force_oob and webbrowser.open(server.base_uri):
314                print("Waiting for browser interaction...", file=sys.stderr)
315            else:
316                server.enable_oob()
317                print(
318                    f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
319                    file=sys.stderr,
320                )
321
322            if not server.is_oob():
323                # Wait until the redirect server populates the response
324                while server.auth_response is None:
325                    time.sleep(0.1)
326
327                auth_error = server.auth_response.get("error")
328                if auth_error is not None:
329                    raise IdentityError(
330                        f"Error response from auth endpoint: {auth_error[0]}"
331                    )
332                code = server.auth_response["code"][0]
333            else:
334                # In the out-of-band case, we wait until the user provides the code
335                code = input("Enter verification code: ")
336
337        # Provide code to token endpoint
338        data = {
339            "grant_type": "authorization_code",
340            "redirect_uri": server.redirect_uri,
341            "code": code,
342            "code_verifier": server.oauth_session.code_verifier,
343        }
344        auth = (
345            client_id,
346            client_secret,
347        )
348        logging.debug(f"PAYLOAD: data={data}")
349        try:
350            resp = self.session.post(
351                self.oidc_config.token_endpoint,
352                data=data,
353                auth=auth,
354                timeout=30,
355            )
356        except (requests.ConnectionError, requests.Timeout) as exc:
357            raise NetworkError from exc
358
359        try:
360            resp.raise_for_status()
361        except requests.HTTPError as http_error:
362            raise IdentityError(
363                f"Token request failed with {resp.status_code}"
364            ) from http_error
365
366        token_json = resp.json()
367        token_error = token_json.get("error")
368        if token_error is not None:
369            raise IdentityError(f"Error response from token endpoint: {token_error}")
370
371        return IdentityToken(token_json["access_token"])

Retrieves and returns an IdentityToken from the current Issuer, via OAuth.

This function blocks on user interaction.

The force_oob flag controls the kind of flow performed. When False (the default), this function attempts to open the user's web browser before falling back to an out-of-band flow. When True, the out-of-band flow is always used.

class IdentityError(sigstore.errors.Error):
374class IdentityError(Error):
375    """
376    Wraps `id`'s IdentityError.
377    """
378
379    @classmethod
380    def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
381        """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
382        raise cls(str(exc)) from exc
383
384    def diagnostics(self) -> str:
385        """Returns diagnostics for the error."""
386        if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
387            return f"""
388                Insufficient permissions for GitHub Actions workflow.
389
390                The most common reason for this is incorrect
391                configuration of the top-level `permissions` setting of the
392                workflow YAML file. It should be configured like so:
393
394                    permissions:
395                      id-token: write
396
397                Relevant documentation here:
398
399                    https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
400
401                Another possible reason is that the workflow run has been
402                triggered by a PR from a forked repository. PRs from forked
403                repositories typically cannot be granted write access.
404
405                Relevant documentation here:
406
407                    https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
408
409                Additional context:
410
411                {self.__cause__}
412                """
413        else:
414            return f"""
415                An issue occurred with ambient credential detection.
416
417                Additional context:
418
419                {self}
420            """

Wraps id's IdentityError.

@classmethod
def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
379    @classmethod
380    def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
381        """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
382        raise cls(str(exc)) from exc

Raises a wrapped IdentityError from the provided id.IdentityError.

def diagnostics(self) -> str:
384    def diagnostics(self) -> str:
385        """Returns diagnostics for the error."""
386        if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
387            return f"""
388                Insufficient permissions for GitHub Actions workflow.
389
390                The most common reason for this is incorrect
391                configuration of the top-level `permissions` setting of the
392                workflow YAML file. It should be configured like so:
393
394                    permissions:
395                      id-token: write
396
397                Relevant documentation here:
398
399                    https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
400
401                Another possible reason is that the workflow run has been
402                triggered by a PR from a forked repository. PRs from forked
403                repositories typically cannot be granted write access.
404
405                Relevant documentation here:
406
407                    https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
408
409                Additional context:
410
411                {self.__cause__}
412                """
413        else:
414            return f"""
415                An issue occurred with ambient credential detection.
416
417                Additional context:
418
419                {self}
420            """

Returns diagnostics for the error.

def detect_credential() -> Optional[str]:
423def detect_credential() -> Optional[str]:
424    """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
425    try:
426        return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE))
427    except id.IdentityError as exc:
428        IdentityError.raise_from_id(exc)

Calls id.detect_credential, but wraps exceptions with our own exception type.