sigstore.oidc
API for retrieving OIDC tokens.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16API for retrieving OIDC tokens. 17""" 18 19from __future__ import annotations 20 21import logging 22import sys 23import time 24import urllib.parse 25import webbrowser 26from datetime import datetime, timezone 27from typing import NoReturn, Optional, cast 28 29import id 30import jwt 31import requests 32from pydantic import BaseModel, StrictStr 33 34from sigstore._internal import USER_AGENT 35from sigstore.errors import Error, NetworkError 36 37DEFAULT_OAUTH_ISSUER_URL = "https://oauth2.sigstore.dev/auth" 38STAGING_OAUTH_ISSUER_URL = "https://oauth2.sigstage.dev/auth" 39 40# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 41_KNOWN_OIDC_ISSUERS = { 42 "https://accounts.google.com": "email", 43 "https://oauth2.sigstore.dev/auth": "email", 44 "https://oauth2.sigstage.dev/auth": "email", 45 "https://token.actions.githubusercontent.com": "sub", 46} 47_DEFAULT_AUDIENCE = "sigstore" 48 49 50class _OpenIDConfiguration(BaseModel): 51 """ 52 Represents a (subset) of the fields provided by an OpenID Connect provider's 53 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery. 54 55 See: <https://openid.net/specs/openid-connect-discovery-1_0.html> 56 """ 57 58 authorization_endpoint: StrictStr 59 token_endpoint: StrictStr 60 61 62class ExpiredIdentity(Exception): 63 """An error raised when an identity token is expired.""" 64 65 66class IdentityToken: 67 """ 68 An OIDC "identity", corresponding to an underlying OIDC token with 69 a sensible subject, issuer, and audience for Sigstore purposes. 70 """ 71 72 def __init__(self, raw_token: str) -> None: 73 """ 74 Create a new `IdentityToken` from the given OIDC token. 75 """ 76 77 self._raw_token = raw_token 78 79 # NOTE: The lack of verification here is intentional, and is part of 80 # Sigstore's verification model: clients like sigstore-python are 81 # responsible only for forwarding the OIDC identity to Fulcio for 82 # certificate binding and issuance. 83 try: 84 self._unverified_claims = jwt.decode( 85 raw_token, 86 options={ 87 "verify_signature": False, 88 "verify_aud": True, 89 "verify_iat": True, 90 "verify_exp": True, 91 # These claims are required by OpenID Connect, so 92 # we can strongly enforce their presence. 93 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 94 "require": ["aud", "sub", "iat", "exp", "iss"], 95 }, 96 audience=_DEFAULT_AUDIENCE, 97 # NOTE: This leeway shouldn't be strictly necessary, but is 98 # included to preempt any (small) skew between the host 99 # and the originating IdP. 100 leeway=5, 101 ) 102 except Exception as exc: 103 raise IdentityError( 104 "Identity token is malformed or missing claims" 105 ) from exc 106 107 self._iss: str = self._unverified_claims["iss"] 108 self._nbf: int | None = self._unverified_claims.get("nbf") 109 self._exp: int = self._unverified_claims["exp"] 110 111 # Fail early if this token isn't within its validity period. 112 if not self.in_validity_period(): 113 raise IdentityError("Identity token is not within its validity period") 114 115 # When verifying the private key possession proof, Fulcio uses 116 # different claims depending on the token's issuer. 117 # We currently special-case a handful of these, and fall back 118 # on signing the "sub" claim otherwise. 119 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 120 if identity_claim is not None: 121 if identity_claim not in self._unverified_claims: 122 raise IdentityError( 123 f"Identity token is missing the required {identity_claim!r} claim" 124 ) 125 126 self._identity = str(self._unverified_claims.get(identity_claim)) 127 else: 128 try: 129 self._identity = str(self._unverified_claims["sub"]) 130 except KeyError: 131 raise IdentityError( 132 "Identity token is missing the required 'sub' claim" 133 ) 134 135 # This identity token might have been retrieved directly from 136 # an identity provider, or it might be a "federated" identity token 137 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 138 # In the latter case, the claims will also include a `federated_claims` 139 # set, which in turn should include a `connector_id` that reflects 140 # the "real" token issuer. We retrieve this, despite technically 141 # being an implementation detail, because it has value to client 142 # users: a client might want to make sure that its user is identifying 143 # with a *particular* IdP, which means that they need to pierce the 144 # federation layer to check which IdP is actually being used. 145 self._federated_issuer: str | None = None 146 federated_claims = self._unverified_claims.get("federated_claims") 147 if federated_claims is not None: 148 if not isinstance(federated_claims, dict): 149 raise IdentityError( 150 "unexpected claim type: federated_claims is not a dict" 151 ) 152 153 federated_issuer = federated_claims.get("connector_id") 154 if federated_issuer is not None: 155 if not isinstance(federated_issuer, str): 156 raise IdentityError( 157 "unexpected claim type: federated_claims.connector_id is not a string" 158 ) 159 160 self._federated_issuer = federated_issuer 161 162 def in_validity_period(self) -> bool: 163 """ 164 Returns whether or not this `Identity` is currently within its self-stated validity period. 165 166 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 167 the check here only asserts whether the *unverified* identity's claims 168 are within their validity period. 169 """ 170 171 now = datetime.now(timezone.utc).timestamp() 172 173 if self._nbf is not None: 174 return self._nbf <= now < self._exp 175 else: 176 return now < self._exp 177 178 @property 179 def identity(self) -> str: 180 """ 181 Returns this `IdentityToken`'s underlying "subject". 182 183 Note that this is **not** always the `sub` claim in the corresponding 184 identity token: depending onm the token's issuer, it may be a *different* 185 claim, such as `email`. This corresponds to the Sigstore ecosystem's 186 behavior, e.g. in each issued certificate's SAN. 187 """ 188 return self._identity 189 190 @property 191 def issuer(self) -> str: 192 """ 193 Returns a URL identifying this `IdentityToken`'s issuer. 194 """ 195 return self._iss 196 197 @property 198 def federated_issuer(self) -> str: 199 """ 200 Returns a URL identifying the **federated** issuer for any Sigstore 201 certificate issued against this identity token. 202 203 The behavior of this field is slightly subtle: for non-federated 204 identity providers (like a token issued directly by Google's IdP) it 205 should be exactly equivalent to `IdentityToken.issuer`. For federated 206 issuers (like Sigstore's own federated IdP) it should be equivalent to 207 the underlying federated issuer's URL, which is kept in an 208 implementation-defined claim. 209 210 This attribute exists so that clients who wish to inspect the expected 211 underlying issuer of their certificates can do so without relying on 212 implementation-specific behavior. 213 """ 214 if self._federated_issuer is not None: 215 return self._federated_issuer 216 217 return self.issuer 218 219 def __str__(self) -> str: 220 """ 221 Returns the underlying OIDC token for this identity. 222 223 That this token is secret in nature and **MUST NOT** be disclosed. 224 """ 225 return self._raw_token 226 227 228class IssuerError(Exception): 229 """ 230 Raised on any communication or format error with an OIDC issuer. 231 """ 232 233 pass 234 235 236class Issuer: 237 """ 238 Represents an OIDC issuer (IdP). 239 """ 240 241 def __init__(self, base_url: str) -> None: 242 """ 243 Create a new `Issuer` from the given base URL. 244 245 This URL is used to locate an OpenID Connect configuration file, 246 which is then used to bootstrap the issuer's state (such 247 as authorization and token endpoints). 248 """ 249 self.session = requests.Session() 250 self.session.headers.update({"User-Agent": USER_AGENT}) 251 252 oidc_config_url = urllib.parse.urljoin( 253 f"{base_url}/", ".well-known/openid-configuration" 254 ) 255 256 try: 257 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 258 except (requests.ConnectionError, requests.Timeout) as exc: 259 raise NetworkError from exc 260 261 try: 262 resp.raise_for_status() 263 except requests.HTTPError as http_error: 264 raise IssuerError from http_error 265 266 try: 267 # We don't generally expect this to fail (since the provider should 268 # return a non-success HTTP code which we catch above), but we 269 # check just in case we have a misbehaving OIDC issuer. 270 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 271 except ValueError as exc: 272 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 273 274 @classmethod 275 def production(cls) -> Issuer: 276 """ 277 Returns an `Issuer` configured against Sigstore's production-level services. 278 """ 279 return cls(DEFAULT_OAUTH_ISSUER_URL) 280 281 @classmethod 282 def staging(cls) -> Issuer: 283 """ 284 Returns an `Issuer` configured against Sigstore's staging-level services. 285 """ 286 return cls(STAGING_OAUTH_ISSUER_URL) 287 288 def identity_token( # nosec: B107 289 self, 290 client_id: str = "sigstore", 291 client_secret: str = "", 292 force_oob: bool = False, 293 ) -> IdentityToken: 294 """ 295 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 296 297 This function blocks on user interaction. 298 299 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 300 this function attempts to open the user's web browser before falling back to 301 an out-of-band flow. When `True`, the out-of-band flow is always used. 302 """ 303 304 # This function and the components that it relies on are based off of: 305 # https://github.com/psteniusubi/python-sample 306 307 from sigstore._internal.oidc.oauth import _OAuthFlow 308 309 code: str 310 with _OAuthFlow(client_id, client_secret, self) as server: 311 # Launch web browser 312 if not force_oob and webbrowser.open(server.base_uri): 313 print("Waiting for browser interaction...", file=sys.stderr) 314 else: 315 server.enable_oob() 316 print( 317 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 318 file=sys.stderr, 319 ) 320 321 if not server.is_oob(): 322 # Wait until the redirect server populates the response 323 while server.auth_response is None: 324 time.sleep(0.1) 325 326 auth_error = server.auth_response.get("error") 327 if auth_error is not None: 328 raise IdentityError( 329 f"Error response from auth endpoint: {auth_error[0]}" 330 ) 331 code = server.auth_response["code"][0] 332 else: 333 # In the out-of-band case, we wait until the user provides the code 334 code = input("Enter verification code: ") 335 336 # Provide code to token endpoint 337 data = { 338 "grant_type": "authorization_code", 339 "redirect_uri": server.redirect_uri, 340 "code": code, 341 "code_verifier": server.oauth_session.code_verifier, 342 } 343 auth = ( 344 client_id, 345 client_secret, 346 ) 347 logging.debug(f"PAYLOAD: data={data}") 348 try: 349 resp = self.session.post( 350 self.oidc_config.token_endpoint, 351 data=data, 352 auth=auth, 353 timeout=30, 354 ) 355 except (requests.ConnectionError, requests.Timeout) as exc: 356 raise NetworkError from exc 357 358 try: 359 resp.raise_for_status() 360 except requests.HTTPError as http_error: 361 raise IdentityError( 362 f"Token request failed with {resp.status_code}" 363 ) from http_error 364 365 token_json = resp.json() 366 token_error = token_json.get("error") 367 if token_error is not None: 368 raise IdentityError(f"Error response from token endpoint: {token_error}") 369 370 return IdentityToken(token_json["access_token"]) 371 372 373class IdentityError(Error): 374 """ 375 Wraps `id`'s IdentityError. 376 """ 377 378 @classmethod 379 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 380 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 381 raise cls(str(exc)) from exc 382 383 def diagnostics(self) -> str: 384 """Returns diagnostics for the error.""" 385 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 386 return f""" 387 Insufficient permissions for GitHub Actions workflow. 388 389 The most common reason for this is incorrect 390 configuration of the top-level `permissions` setting of the 391 workflow YAML file. It should be configured like so: 392 393 permissions: 394 id-token: write 395 396 Relevant documentation here: 397 398 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 399 400 Another possible reason is that the workflow run has been 401 triggered by a PR from a forked repository. PRs from forked 402 repositories typically cannot be granted write access. 403 404 Relevant documentation here: 405 406 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 407 408 Additional context: 409 410 {self.__cause__} 411 """ 412 else: 413 return f""" 414 An issue occurred with ambient credential detection. 415 416 Additional context: 417 418 {self} 419 """ 420 421 422def detect_credential() -> Optional[str]: 423 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 424 try: 425 return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE)) 426 except id.IdentityError as exc: 427 IdentityError.raise_from_id(exc)
An error raised when an identity token is expired.
67class IdentityToken: 68 """ 69 An OIDC "identity", corresponding to an underlying OIDC token with 70 a sensible subject, issuer, and audience for Sigstore purposes. 71 """ 72 73 def __init__(self, raw_token: str) -> None: 74 """ 75 Create a new `IdentityToken` from the given OIDC token. 76 """ 77 78 self._raw_token = raw_token 79 80 # NOTE: The lack of verification here is intentional, and is part of 81 # Sigstore's verification model: clients like sigstore-python are 82 # responsible only for forwarding the OIDC identity to Fulcio for 83 # certificate binding and issuance. 84 try: 85 self._unverified_claims = jwt.decode( 86 raw_token, 87 options={ 88 "verify_signature": False, 89 "verify_aud": True, 90 "verify_iat": True, 91 "verify_exp": True, 92 # These claims are required by OpenID Connect, so 93 # we can strongly enforce their presence. 94 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 95 "require": ["aud", "sub", "iat", "exp", "iss"], 96 }, 97 audience=_DEFAULT_AUDIENCE, 98 # NOTE: This leeway shouldn't be strictly necessary, but is 99 # included to preempt any (small) skew between the host 100 # and the originating IdP. 101 leeway=5, 102 ) 103 except Exception as exc: 104 raise IdentityError( 105 "Identity token is malformed or missing claims" 106 ) from exc 107 108 self._iss: str = self._unverified_claims["iss"] 109 self._nbf: int | None = self._unverified_claims.get("nbf") 110 self._exp: int = self._unverified_claims["exp"] 111 112 # Fail early if this token isn't within its validity period. 113 if not self.in_validity_period(): 114 raise IdentityError("Identity token is not within its validity period") 115 116 # When verifying the private key possession proof, Fulcio uses 117 # different claims depending on the token's issuer. 118 # We currently special-case a handful of these, and fall back 119 # on signing the "sub" claim otherwise. 120 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 121 if identity_claim is not None: 122 if identity_claim not in self._unverified_claims: 123 raise IdentityError( 124 f"Identity token is missing the required {identity_claim!r} claim" 125 ) 126 127 self._identity = str(self._unverified_claims.get(identity_claim)) 128 else: 129 try: 130 self._identity = str(self._unverified_claims["sub"]) 131 except KeyError: 132 raise IdentityError( 133 "Identity token is missing the required 'sub' claim" 134 ) 135 136 # This identity token might have been retrieved directly from 137 # an identity provider, or it might be a "federated" identity token 138 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 139 # In the latter case, the claims will also include a `federated_claims` 140 # set, which in turn should include a `connector_id` that reflects 141 # the "real" token issuer. We retrieve this, despite technically 142 # being an implementation detail, because it has value to client 143 # users: a client might want to make sure that its user is identifying 144 # with a *particular* IdP, which means that they need to pierce the 145 # federation layer to check which IdP is actually being used. 146 self._federated_issuer: str | None = None 147 federated_claims = self._unverified_claims.get("federated_claims") 148 if federated_claims is not None: 149 if not isinstance(federated_claims, dict): 150 raise IdentityError( 151 "unexpected claim type: federated_claims is not a dict" 152 ) 153 154 federated_issuer = federated_claims.get("connector_id") 155 if federated_issuer is not None: 156 if not isinstance(federated_issuer, str): 157 raise IdentityError( 158 "unexpected claim type: federated_claims.connector_id is not a string" 159 ) 160 161 self._federated_issuer = federated_issuer 162 163 def in_validity_period(self) -> bool: 164 """ 165 Returns whether or not this `Identity` is currently within its self-stated validity period. 166 167 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 168 the check here only asserts whether the *unverified* identity's claims 169 are within their validity period. 170 """ 171 172 now = datetime.now(timezone.utc).timestamp() 173 174 if self._nbf is not None: 175 return self._nbf <= now < self._exp 176 else: 177 return now < self._exp 178 179 @property 180 def identity(self) -> str: 181 """ 182 Returns this `IdentityToken`'s underlying "subject". 183 184 Note that this is **not** always the `sub` claim in the corresponding 185 identity token: depending onm the token's issuer, it may be a *different* 186 claim, such as `email`. This corresponds to the Sigstore ecosystem's 187 behavior, e.g. in each issued certificate's SAN. 188 """ 189 return self._identity 190 191 @property 192 def issuer(self) -> str: 193 """ 194 Returns a URL identifying this `IdentityToken`'s issuer. 195 """ 196 return self._iss 197 198 @property 199 def federated_issuer(self) -> str: 200 """ 201 Returns a URL identifying the **federated** issuer for any Sigstore 202 certificate issued against this identity token. 203 204 The behavior of this field is slightly subtle: for non-federated 205 identity providers (like a token issued directly by Google's IdP) it 206 should be exactly equivalent to `IdentityToken.issuer`. For federated 207 issuers (like Sigstore's own federated IdP) it should be equivalent to 208 the underlying federated issuer's URL, which is kept in an 209 implementation-defined claim. 210 211 This attribute exists so that clients who wish to inspect the expected 212 underlying issuer of their certificates can do so without relying on 213 implementation-specific behavior. 214 """ 215 if self._federated_issuer is not None: 216 return self._federated_issuer 217 218 return self.issuer 219 220 def __str__(self) -> str: 221 """ 222 Returns the underlying OIDC token for this identity. 223 224 That this token is secret in nature and **MUST NOT** be disclosed. 225 """ 226 return self._raw_token
An OIDC "identity", corresponding to an underlying OIDC token with a sensible subject, issuer, and audience for Sigstore purposes.
73 def __init__(self, raw_token: str) -> None: 74 """ 75 Create a new `IdentityToken` from the given OIDC token. 76 """ 77 78 self._raw_token = raw_token 79 80 # NOTE: The lack of verification here is intentional, and is part of 81 # Sigstore's verification model: clients like sigstore-python are 82 # responsible only for forwarding the OIDC identity to Fulcio for 83 # certificate binding and issuance. 84 try: 85 self._unverified_claims = jwt.decode( 86 raw_token, 87 options={ 88 "verify_signature": False, 89 "verify_aud": True, 90 "verify_iat": True, 91 "verify_exp": True, 92 # These claims are required by OpenID Connect, so 93 # we can strongly enforce their presence. 94 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 95 "require": ["aud", "sub", "iat", "exp", "iss"], 96 }, 97 audience=_DEFAULT_AUDIENCE, 98 # NOTE: This leeway shouldn't be strictly necessary, but is 99 # included to preempt any (small) skew between the host 100 # and the originating IdP. 101 leeway=5, 102 ) 103 except Exception as exc: 104 raise IdentityError( 105 "Identity token is malformed or missing claims" 106 ) from exc 107 108 self._iss: str = self._unverified_claims["iss"] 109 self._nbf: int | None = self._unverified_claims.get("nbf") 110 self._exp: int = self._unverified_claims["exp"] 111 112 # Fail early if this token isn't within its validity period. 113 if not self.in_validity_period(): 114 raise IdentityError("Identity token is not within its validity period") 115 116 # When verifying the private key possession proof, Fulcio uses 117 # different claims depending on the token's issuer. 118 # We currently special-case a handful of these, and fall back 119 # on signing the "sub" claim otherwise. 120 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 121 if identity_claim is not None: 122 if identity_claim not in self._unverified_claims: 123 raise IdentityError( 124 f"Identity token is missing the required {identity_claim!r} claim" 125 ) 126 127 self._identity = str(self._unverified_claims.get(identity_claim)) 128 else: 129 try: 130 self._identity = str(self._unverified_claims["sub"]) 131 except KeyError: 132 raise IdentityError( 133 "Identity token is missing the required 'sub' claim" 134 ) 135 136 # This identity token might have been retrieved directly from 137 # an identity provider, or it might be a "federated" identity token 138 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 139 # In the latter case, the claims will also include a `federated_claims` 140 # set, which in turn should include a `connector_id` that reflects 141 # the "real" token issuer. We retrieve this, despite technically 142 # being an implementation detail, because it has value to client 143 # users: a client might want to make sure that its user is identifying 144 # with a *particular* IdP, which means that they need to pierce the 145 # federation layer to check which IdP is actually being used. 146 self._federated_issuer: str | None = None 147 federated_claims = self._unverified_claims.get("federated_claims") 148 if federated_claims is not None: 149 if not isinstance(federated_claims, dict): 150 raise IdentityError( 151 "unexpected claim type: federated_claims is not a dict" 152 ) 153 154 federated_issuer = federated_claims.get("connector_id") 155 if federated_issuer is not None: 156 if not isinstance(federated_issuer, str): 157 raise IdentityError( 158 "unexpected claim type: federated_claims.connector_id is not a string" 159 ) 160 161 self._federated_issuer = federated_issuer
Create a new IdentityToken
from the given OIDC token.
163 def in_validity_period(self) -> bool: 164 """ 165 Returns whether or not this `Identity` is currently within its self-stated validity period. 166 167 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 168 the check here only asserts whether the *unverified* identity's claims 169 are within their validity period. 170 """ 171 172 now = datetime.now(timezone.utc).timestamp() 173 174 if self._nbf is not None: 175 return self._nbf <= now < self._exp 176 else: 177 return now < self._exp
Returns whether or not this Identity
is currently within its self-stated validity period.
NOTE: As noted in Identity.__init__
, this is not a verifying wrapper;
the check here only asserts whether the unverified identity's claims
are within their validity period.
179 @property 180 def identity(self) -> str: 181 """ 182 Returns this `IdentityToken`'s underlying "subject". 183 184 Note that this is **not** always the `sub` claim in the corresponding 185 identity token: depending onm the token's issuer, it may be a *different* 186 claim, such as `email`. This corresponds to the Sigstore ecosystem's 187 behavior, e.g. in each issued certificate's SAN. 188 """ 189 return self._identity
Returns this IdentityToken
's underlying "subject".
Note that this is not always the sub
claim in the corresponding
identity token: depending onm the token's issuer, it may be a different
claim, such as email
. This corresponds to the Sigstore ecosystem's
behavior, e.g. in each issued certificate's SAN.
191 @property 192 def issuer(self) -> str: 193 """ 194 Returns a URL identifying this `IdentityToken`'s issuer. 195 """ 196 return self._iss
Returns a URL identifying this IdentityToken
's issuer.
198 @property 199 def federated_issuer(self) -> str: 200 """ 201 Returns a URL identifying the **federated** issuer for any Sigstore 202 certificate issued against this identity token. 203 204 The behavior of this field is slightly subtle: for non-federated 205 identity providers (like a token issued directly by Google's IdP) it 206 should be exactly equivalent to `IdentityToken.issuer`. For federated 207 issuers (like Sigstore's own federated IdP) it should be equivalent to 208 the underlying federated issuer's URL, which is kept in an 209 implementation-defined claim. 210 211 This attribute exists so that clients who wish to inspect the expected 212 underlying issuer of their certificates can do so without relying on 213 implementation-specific behavior. 214 """ 215 if self._federated_issuer is not None: 216 return self._federated_issuer 217 218 return self.issuer
Returns a URL identifying the federated issuer for any Sigstore certificate issued against this identity token.
The behavior of this field is slightly subtle: for non-federated
identity providers (like a token issued directly by Google's IdP) it
should be exactly equivalent to IdentityToken.issuer
. For federated
issuers (like Sigstore's own federated IdP) it should be equivalent to
the underlying federated issuer's URL, which is kept in an
implementation-defined claim.
This attribute exists so that clients who wish to inspect the expected underlying issuer of their certificates can do so without relying on implementation-specific behavior.
229class IssuerError(Exception): 230 """ 231 Raised on any communication or format error with an OIDC issuer. 232 """ 233 234 pass
Raised on any communication or format error with an OIDC issuer.
237class Issuer: 238 """ 239 Represents an OIDC issuer (IdP). 240 """ 241 242 def __init__(self, base_url: str) -> None: 243 """ 244 Create a new `Issuer` from the given base URL. 245 246 This URL is used to locate an OpenID Connect configuration file, 247 which is then used to bootstrap the issuer's state (such 248 as authorization and token endpoints). 249 """ 250 self.session = requests.Session() 251 self.session.headers.update({"User-Agent": USER_AGENT}) 252 253 oidc_config_url = urllib.parse.urljoin( 254 f"{base_url}/", ".well-known/openid-configuration" 255 ) 256 257 try: 258 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 259 except (requests.ConnectionError, requests.Timeout) as exc: 260 raise NetworkError from exc 261 262 try: 263 resp.raise_for_status() 264 except requests.HTTPError as http_error: 265 raise IssuerError from http_error 266 267 try: 268 # We don't generally expect this to fail (since the provider should 269 # return a non-success HTTP code which we catch above), but we 270 # check just in case we have a misbehaving OIDC issuer. 271 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 272 except ValueError as exc: 273 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 274 275 @classmethod 276 def production(cls) -> Issuer: 277 """ 278 Returns an `Issuer` configured against Sigstore's production-level services. 279 """ 280 return cls(DEFAULT_OAUTH_ISSUER_URL) 281 282 @classmethod 283 def staging(cls) -> Issuer: 284 """ 285 Returns an `Issuer` configured against Sigstore's staging-level services. 286 """ 287 return cls(STAGING_OAUTH_ISSUER_URL) 288 289 def identity_token( # nosec: B107 290 self, 291 client_id: str = "sigstore", 292 client_secret: str = "", 293 force_oob: bool = False, 294 ) -> IdentityToken: 295 """ 296 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 297 298 This function blocks on user interaction. 299 300 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 301 this function attempts to open the user's web browser before falling back to 302 an out-of-band flow. When `True`, the out-of-band flow is always used. 303 """ 304 305 # This function and the components that it relies on are based off of: 306 # https://github.com/psteniusubi/python-sample 307 308 from sigstore._internal.oidc.oauth import _OAuthFlow 309 310 code: str 311 with _OAuthFlow(client_id, client_secret, self) as server: 312 # Launch web browser 313 if not force_oob and webbrowser.open(server.base_uri): 314 print("Waiting for browser interaction...", file=sys.stderr) 315 else: 316 server.enable_oob() 317 print( 318 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 319 file=sys.stderr, 320 ) 321 322 if not server.is_oob(): 323 # Wait until the redirect server populates the response 324 while server.auth_response is None: 325 time.sleep(0.1) 326 327 auth_error = server.auth_response.get("error") 328 if auth_error is not None: 329 raise IdentityError( 330 f"Error response from auth endpoint: {auth_error[0]}" 331 ) 332 code = server.auth_response["code"][0] 333 else: 334 # In the out-of-band case, we wait until the user provides the code 335 code = input("Enter verification code: ") 336 337 # Provide code to token endpoint 338 data = { 339 "grant_type": "authorization_code", 340 "redirect_uri": server.redirect_uri, 341 "code": code, 342 "code_verifier": server.oauth_session.code_verifier, 343 } 344 auth = ( 345 client_id, 346 client_secret, 347 ) 348 logging.debug(f"PAYLOAD: data={data}") 349 try: 350 resp = self.session.post( 351 self.oidc_config.token_endpoint, 352 data=data, 353 auth=auth, 354 timeout=30, 355 ) 356 except (requests.ConnectionError, requests.Timeout) as exc: 357 raise NetworkError from exc 358 359 try: 360 resp.raise_for_status() 361 except requests.HTTPError as http_error: 362 raise IdentityError( 363 f"Token request failed with {resp.status_code}" 364 ) from http_error 365 366 token_json = resp.json() 367 token_error = token_json.get("error") 368 if token_error is not None: 369 raise IdentityError(f"Error response from token endpoint: {token_error}") 370 371 return IdentityToken(token_json["access_token"])
Represents an OIDC issuer (IdP).
242 def __init__(self, base_url: str) -> None: 243 """ 244 Create a new `Issuer` from the given base URL. 245 246 This URL is used to locate an OpenID Connect configuration file, 247 which is then used to bootstrap the issuer's state (such 248 as authorization and token endpoints). 249 """ 250 self.session = requests.Session() 251 self.session.headers.update({"User-Agent": USER_AGENT}) 252 253 oidc_config_url = urllib.parse.urljoin( 254 f"{base_url}/", ".well-known/openid-configuration" 255 ) 256 257 try: 258 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 259 except (requests.ConnectionError, requests.Timeout) as exc: 260 raise NetworkError from exc 261 262 try: 263 resp.raise_for_status() 264 except requests.HTTPError as http_error: 265 raise IssuerError from http_error 266 267 try: 268 # We don't generally expect this to fail (since the provider should 269 # return a non-success HTTP code which we catch above), but we 270 # check just in case we have a misbehaving OIDC issuer. 271 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 272 except ValueError as exc: 273 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
Create a new Issuer
from the given base URL.
This URL is used to locate an OpenID Connect configuration file, which is then used to bootstrap the issuer's state (such as authorization and token endpoints).
275 @classmethod 276 def production(cls) -> Issuer: 277 """ 278 Returns an `Issuer` configured against Sigstore's production-level services. 279 """ 280 return cls(DEFAULT_OAUTH_ISSUER_URL)
Returns an Issuer
configured against Sigstore's production-level services.
282 @classmethod 283 def staging(cls) -> Issuer: 284 """ 285 Returns an `Issuer` configured against Sigstore's staging-level services. 286 """ 287 return cls(STAGING_OAUTH_ISSUER_URL)
Returns an Issuer
configured against Sigstore's staging-level services.
289 def identity_token( # nosec: B107 290 self, 291 client_id: str = "sigstore", 292 client_secret: str = "", 293 force_oob: bool = False, 294 ) -> IdentityToken: 295 """ 296 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 297 298 This function blocks on user interaction. 299 300 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 301 this function attempts to open the user's web browser before falling back to 302 an out-of-band flow. When `True`, the out-of-band flow is always used. 303 """ 304 305 # This function and the components that it relies on are based off of: 306 # https://github.com/psteniusubi/python-sample 307 308 from sigstore._internal.oidc.oauth import _OAuthFlow 309 310 code: str 311 with _OAuthFlow(client_id, client_secret, self) as server: 312 # Launch web browser 313 if not force_oob and webbrowser.open(server.base_uri): 314 print("Waiting for browser interaction...", file=sys.stderr) 315 else: 316 server.enable_oob() 317 print( 318 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 319 file=sys.stderr, 320 ) 321 322 if not server.is_oob(): 323 # Wait until the redirect server populates the response 324 while server.auth_response is None: 325 time.sleep(0.1) 326 327 auth_error = server.auth_response.get("error") 328 if auth_error is not None: 329 raise IdentityError( 330 f"Error response from auth endpoint: {auth_error[0]}" 331 ) 332 code = server.auth_response["code"][0] 333 else: 334 # In the out-of-band case, we wait until the user provides the code 335 code = input("Enter verification code: ") 336 337 # Provide code to token endpoint 338 data = { 339 "grant_type": "authorization_code", 340 "redirect_uri": server.redirect_uri, 341 "code": code, 342 "code_verifier": server.oauth_session.code_verifier, 343 } 344 auth = ( 345 client_id, 346 client_secret, 347 ) 348 logging.debug(f"PAYLOAD: data={data}") 349 try: 350 resp = self.session.post( 351 self.oidc_config.token_endpoint, 352 data=data, 353 auth=auth, 354 timeout=30, 355 ) 356 except (requests.ConnectionError, requests.Timeout) as exc: 357 raise NetworkError from exc 358 359 try: 360 resp.raise_for_status() 361 except requests.HTTPError as http_error: 362 raise IdentityError( 363 f"Token request failed with {resp.status_code}" 364 ) from http_error 365 366 token_json = resp.json() 367 token_error = token_json.get("error") 368 if token_error is not None: 369 raise IdentityError(f"Error response from token endpoint: {token_error}") 370 371 return IdentityToken(token_json["access_token"])
Retrieves and returns an IdentityToken
from the current Issuer
, via OAuth.
This function blocks on user interaction.
The force_oob
flag controls the kind of flow performed. When False
(the default),
this function attempts to open the user's web browser before falling back to
an out-of-band flow. When True
, the out-of-band flow is always used.
374class IdentityError(Error): 375 """ 376 Wraps `id`'s IdentityError. 377 """ 378 379 @classmethod 380 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 381 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 382 raise cls(str(exc)) from exc 383 384 def diagnostics(self) -> str: 385 """Returns diagnostics for the error.""" 386 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 387 return f""" 388 Insufficient permissions for GitHub Actions workflow. 389 390 The most common reason for this is incorrect 391 configuration of the top-level `permissions` setting of the 392 workflow YAML file. It should be configured like so: 393 394 permissions: 395 id-token: write 396 397 Relevant documentation here: 398 399 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 400 401 Another possible reason is that the workflow run has been 402 triggered by a PR from a forked repository. PRs from forked 403 repositories typically cannot be granted write access. 404 405 Relevant documentation here: 406 407 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 408 409 Additional context: 410 411 {self.__cause__} 412 """ 413 else: 414 return f""" 415 An issue occurred with ambient credential detection. 416 417 Additional context: 418 419 {self} 420 """
Wraps id
's IdentityError.
379 @classmethod 380 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 381 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 382 raise cls(str(exc)) from exc
Raises a wrapped IdentityError from the provided id.IdentityError
.
384 def diagnostics(self) -> str: 385 """Returns diagnostics for the error.""" 386 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 387 return f""" 388 Insufficient permissions for GitHub Actions workflow. 389 390 The most common reason for this is incorrect 391 configuration of the top-level `permissions` setting of the 392 workflow YAML file. It should be configured like so: 393 394 permissions: 395 id-token: write 396 397 Relevant documentation here: 398 399 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 400 401 Another possible reason is that the workflow run has been 402 triggered by a PR from a forked repository. PRs from forked 403 repositories typically cannot be granted write access. 404 405 Relevant documentation here: 406 407 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 408 409 Additional context: 410 411 {self.__cause__} 412 """ 413 else: 414 return f""" 415 An issue occurred with ambient credential detection. 416 417 Additional context: 418 419 {self} 420 """
Returns diagnostics for the error.
Inherited Members
423def detect_credential() -> Optional[str]: 424 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 425 try: 426 return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE)) 427 except id.IdentityError as exc: 428 IdentityError.raise_from_id(exc)
Calls id.detect_credential
, but wraps exceptions with our own exception type.