sigstore.oidc
API for retrieving OIDC tokens.
1# Copyright 2022 The Sigstore Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16API for retrieving OIDC tokens. 17""" 18 19from __future__ import annotations 20 21import logging 22import sys 23import time 24import urllib.parse 25import webbrowser 26from datetime import datetime, timezone 27from typing import NoReturn, Optional, cast 28 29import id 30import jwt 31import requests 32from pydantic import BaseModel, StrictStr 33 34from sigstore._internal import USER_AGENT 35from sigstore.errors import Error, NetworkError 36 37DEFAULT_OAUTH_ISSUER_URL = "https://oauth2.sigstore.dev/auth" 38STAGING_OAUTH_ISSUER_URL = "https://oauth2.sigstage.dev/auth" 39 40# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 41_KNOWN_OIDC_ISSUERS = { 42 "https://accounts.google.com": "email", 43 "https://oauth2.sigstore.dev/auth": "email", 44 "https://oauth2.sigstage.dev/auth": "email", 45 "https://token.actions.githubusercontent.com": "sub", 46} 47_DEFAULT_AUDIENCE = "sigstore" 48 49 50class _OpenIDConfiguration(BaseModel): 51 """ 52 Represents a (subset) of the fields provided by an OpenID Connect provider's 53 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery. 54 55 See: <https://openid.net/specs/openid-connect-discovery-1_0.html> 56 """ 57 58 authorization_endpoint: StrictStr 59 token_endpoint: StrictStr 60 61 62# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 63_KNOWN_OIDC_ISSUERS = { 64 "https://accounts.google.com": "email", 65 "https://oauth2.sigstore.dev/auth": "email", 66 "https://oauth2.sigstage.dev/auth": "email", 67 "https://token.actions.githubusercontent.com": "sub", 68} 69DEFAULT_AUDIENCE = "sigstore" 70 71 72class ExpiredIdentity(Exception): 73 """An error raised when an identity token is expired.""" 74 75 76class IdentityToken: 77 """ 78 An OIDC "identity", corresponding to an underlying OIDC token with 79 a sensible subject, issuer, and audience for Sigstore purposes. 80 """ 81 82 def __init__(self, raw_token: str) -> None: 83 """ 84 Create a new `IdentityToken` from the given OIDC token. 85 """ 86 87 self._raw_token = raw_token 88 89 # NOTE: The lack of verification here is intentional, and is part of 90 # Sigstore's verification model: clients like sigstore-python are 91 # responsible only for forwarding the OIDC identity to Fulcio for 92 # certificate binding and issuance. 93 try: 94 self._unverified_claims = jwt.decode( 95 raw_token, 96 options={ 97 "verify_signature": False, 98 "verify_aud": True, 99 "verify_iat": True, 100 "verify_exp": True, 101 # These claims are required by OpenID Connect, so 102 # we can strongly enforce their presence. 103 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 104 "require": ["aud", "sub", "iat", "exp", "iss"], 105 }, 106 audience=DEFAULT_AUDIENCE, 107 # NOTE: This leeway shouldn't be strictly necessary, but is 108 # included to preempt any (small) skew between the host 109 # and the originating IdP. 110 leeway=5, 111 ) 112 except Exception as exc: 113 raise IdentityError( 114 "Identity token is malformed or missing claims" 115 ) from exc 116 117 self._iss: str = self._unverified_claims["iss"] 118 self._nbf: int | None = self._unverified_claims.get("nbf") 119 self._exp: int = self._unverified_claims["exp"] 120 121 # Fail early if this token isn't within its validity period. 122 if not self.in_validity_period(): 123 raise IdentityError("Identity token is not within its validity period") 124 125 # When verifying the private key possession proof, Fulcio uses 126 # different claims depending on the token's issuer. 127 # We currently special-case a handful of these, and fall back 128 # on signing the "sub" claim otherwise. 129 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 130 if identity_claim is not None: 131 if identity_claim not in self._unverified_claims: 132 raise IdentityError( 133 f"Identity token is missing the required {identity_claim!r} claim" 134 ) 135 136 self._identity = str(self._unverified_claims.get(identity_claim)) 137 else: 138 try: 139 self._identity = str(self._unverified_claims["sub"]) 140 except KeyError: 141 raise IdentityError( 142 "Identity token is missing the required 'sub' claim" 143 ) 144 145 # This identity token might have been retrieved directly from 146 # an identity provider, or it might be a "federated" identity token 147 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 148 # In the latter case, the claims will also include a `federated_claims` 149 # set, which in turn should include a `connector_id` that reflects 150 # the "real" token issuer. We retrieve this, despite technically 151 # being an implementation detail, because it has value to client 152 # users: a client might want to make sure that its user is identifying 153 # with a *particular* IdP, which means that they need to pierce the 154 # federation layer to check which IdP is actually being used. 155 self._federated_issuer: str | None = None 156 federated_claims = self._unverified_claims.get("federated_claims") 157 if federated_claims is not None: 158 if not isinstance(federated_claims, dict): 159 raise IdentityError( 160 "unexpected claim type: federated_claims is not a dict" 161 ) 162 163 federated_issuer = federated_claims.get("connector_id") 164 if federated_issuer is not None: 165 if not isinstance(federated_issuer, str): 166 raise IdentityError( 167 "unexpected claim type: federated_claims.connector_id is not a string" 168 ) 169 170 self._federated_issuer = federated_issuer 171 172 def in_validity_period(self) -> bool: 173 """ 174 Returns whether or not this `Identity` is currently within its self-stated validity period. 175 176 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 177 the check here only asserts whether the *unverified* identity's claims 178 are within their validity period. 179 """ 180 181 now = datetime.now(timezone.utc).timestamp() 182 183 if self._nbf is not None: 184 return self._nbf <= now < self._exp 185 else: 186 return now < self._exp 187 188 @property 189 def identity(self) -> str: 190 """ 191 Returns this `IdentityToken`'s underlying "subject". 192 193 Note that this is **not** always the `sub` claim in the corresponding 194 identity token: depending onm the token's issuer, it may be a *different* 195 claim, such as `email`. This corresponds to the Sigstore ecosystem's 196 behavior, e.g. in each issued certificate's SAN. 197 """ 198 return self._identity 199 200 @property 201 def issuer(self) -> str: 202 """ 203 Returns a URL identifying this `IdentityToken`'s issuer. 204 """ 205 return self._iss 206 207 @property 208 def federated_issuer(self) -> str: 209 """ 210 Returns a URL identifying the **federated** issuer for any Sigstore 211 certificate issued against this identity token. 212 213 The behavior of this field is slightly subtle: for non-federated 214 identity providers (like a token issued directly by Google's IdP) it 215 should be exactly equivalent to `IdentityToken.issuer`. For federated 216 issuers (like Sigstore's own federated IdP) it should be equivalent to 217 the underlying federated issuer's URL, which is kept in an 218 implementation-defined claim. 219 220 This attribute exists so that clients who wish to inspect the expected 221 underlying issuer of their certificates can do so without relying on 222 implementation-specific behavior. 223 """ 224 if self._federated_issuer is not None: 225 return self._federated_issuer 226 227 return self.issuer 228 229 def __str__(self) -> str: 230 """ 231 Returns the underlying OIDC token for this identity. 232 233 That this token is secret in nature and **MUST NOT** be disclosed. 234 """ 235 return self._raw_token 236 237 238class IssuerError(Exception): 239 """ 240 Raised on any communication or format error with an OIDC issuer. 241 """ 242 243 pass 244 245 246class Issuer: 247 """ 248 Represents an OIDC issuer (IdP). 249 """ 250 251 def __init__(self, base_url: str) -> None: 252 """ 253 Create a new `Issuer` from the given base URL. 254 255 This URL is used to locate an OpenID Connect configuration file, 256 which is then used to bootstrap the issuer's state (such 257 as authorization and token endpoints). 258 """ 259 self.session = requests.Session() 260 self.session.headers.update({"User-Agent": USER_AGENT}) 261 262 oidc_config_url = urllib.parse.urljoin( 263 f"{base_url}/", ".well-known/openid-configuration" 264 ) 265 266 try: 267 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 268 except (requests.ConnectionError, requests.Timeout) as exc: 269 raise NetworkError from exc 270 271 try: 272 resp.raise_for_status() 273 except requests.HTTPError as http_error: 274 raise IssuerError from http_error 275 276 try: 277 # We don't generally expect this to fail (since the provider should 278 # return a non-success HTTP code which we catch above), but we 279 # check just in case we have a misbehaving OIDC issuer. 280 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 281 except ValueError as exc: 282 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 283 284 @classmethod 285 def production(cls) -> Issuer: 286 """ 287 Returns an `Issuer` configured against Sigstore's production-level services. 288 """ 289 return cls(DEFAULT_OAUTH_ISSUER_URL) 290 291 @classmethod 292 def staging(cls) -> Issuer: 293 """ 294 Returns an `Issuer` configured against Sigstore's staging-level services. 295 """ 296 return cls(STAGING_OAUTH_ISSUER_URL) 297 298 def identity_token( # nosec: B107 299 self, 300 client_id: str = "sigstore", 301 client_secret: str = "", 302 force_oob: bool = False, 303 ) -> IdentityToken: 304 """ 305 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 306 307 This function blocks on user interaction. 308 309 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 310 this function attempts to open the user's web browser before falling back to 311 an out-of-band flow. When `True`, the out-of-band flow is always used. 312 """ 313 314 # This function and the components that it relies on are based off of: 315 # https://github.com/psteniusubi/python-sample 316 317 from sigstore._internal.oidc.oauth import _OAuthFlow 318 319 code: str 320 with _OAuthFlow(client_id, client_secret, self) as server: 321 # Launch web browser 322 if not force_oob and webbrowser.open(server.base_uri): 323 print("Waiting for browser interaction...", file=sys.stderr) 324 else: 325 server.enable_oob() 326 print( 327 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 328 file=sys.stderr, 329 ) 330 331 if not server.is_oob(): 332 # Wait until the redirect server populates the response 333 while server.auth_response is None: 334 time.sleep(0.1) 335 336 auth_error = server.auth_response.get("error") 337 if auth_error is not None: 338 raise IdentityError( 339 f"Error response from auth endpoint: {auth_error[0]}" 340 ) 341 code = server.auth_response["code"][0] 342 else: 343 # In the out-of-band case, we wait until the user provides the code 344 code = input("Enter verification code: ") 345 346 # Provide code to token endpoint 347 data = { 348 "grant_type": "authorization_code", 349 "redirect_uri": server.redirect_uri, 350 "code": code, 351 "code_verifier": server.oauth_session.code_verifier, 352 } 353 auth = ( 354 client_id, 355 client_secret, 356 ) 357 logging.debug(f"PAYLOAD: data={data}") 358 try: 359 resp = self.session.post( 360 self.oidc_config.token_endpoint, 361 data=data, 362 auth=auth, 363 timeout=30, 364 ) 365 except (requests.ConnectionError, requests.Timeout) as exc: 366 raise NetworkError from exc 367 368 try: 369 resp.raise_for_status() 370 except requests.HTTPError as http_error: 371 raise IdentityError( 372 f"Token request failed with {resp.status_code}" 373 ) from http_error 374 375 token_json = resp.json() 376 token_error = token_json.get("error") 377 if token_error is not None: 378 raise IdentityError(f"Error response from token endpoint: {token_error}") 379 380 return IdentityToken(token_json["access_token"]) 381 382 383class IdentityError(Error): 384 """ 385 Wraps `id`'s IdentityError. 386 """ 387 388 @classmethod 389 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 390 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 391 raise cls(str(exc)) from exc 392 393 def diagnostics(self) -> str: 394 """Returns diagnostics for the error.""" 395 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 396 return f""" 397 Insufficient permissions for GitHub Actions workflow. 398 399 The most common reason for this is incorrect 400 configuration of the top-level `permissions` setting of the 401 workflow YAML file. It should be configured like so: 402 403 permissions: 404 id-token: write 405 406 Relevant documentation here: 407 408 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 409 410 Another possible reason is that the workflow run has been 411 triggered by a PR from a forked repository. PRs from forked 412 repositories typically cannot be granted write access. 413 414 Relevant documentation here: 415 416 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 417 418 Additional context: 419 420 {self.__cause__} 421 """ 422 else: 423 return f""" 424 An issue occurred with ambient credential detection. 425 426 Additional context: 427 428 {self} 429 """ 430 431 432def detect_credential() -> Optional[str]: 433 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 434 try: 435 return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE)) 436 except id.IdentityError as exc: 437 IdentityError.raise_from_id(exc)
An error raised when an identity token is expired.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
77class IdentityToken: 78 """ 79 An OIDC "identity", corresponding to an underlying OIDC token with 80 a sensible subject, issuer, and audience for Sigstore purposes. 81 """ 82 83 def __init__(self, raw_token: str) -> None: 84 """ 85 Create a new `IdentityToken` from the given OIDC token. 86 """ 87 88 self._raw_token = raw_token 89 90 # NOTE: The lack of verification here is intentional, and is part of 91 # Sigstore's verification model: clients like sigstore-python are 92 # responsible only for forwarding the OIDC identity to Fulcio for 93 # certificate binding and issuance. 94 try: 95 self._unverified_claims = jwt.decode( 96 raw_token, 97 options={ 98 "verify_signature": False, 99 "verify_aud": True, 100 "verify_iat": True, 101 "verify_exp": True, 102 # These claims are required by OpenID Connect, so 103 # we can strongly enforce their presence. 104 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 105 "require": ["aud", "sub", "iat", "exp", "iss"], 106 }, 107 audience=DEFAULT_AUDIENCE, 108 # NOTE: This leeway shouldn't be strictly necessary, but is 109 # included to preempt any (small) skew between the host 110 # and the originating IdP. 111 leeway=5, 112 ) 113 except Exception as exc: 114 raise IdentityError( 115 "Identity token is malformed or missing claims" 116 ) from exc 117 118 self._iss: str = self._unverified_claims["iss"] 119 self._nbf: int | None = self._unverified_claims.get("nbf") 120 self._exp: int = self._unverified_claims["exp"] 121 122 # Fail early if this token isn't within its validity period. 123 if not self.in_validity_period(): 124 raise IdentityError("Identity token is not within its validity period") 125 126 # When verifying the private key possession proof, Fulcio uses 127 # different claims depending on the token's issuer. 128 # We currently special-case a handful of these, and fall back 129 # on signing the "sub" claim otherwise. 130 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 131 if identity_claim is not None: 132 if identity_claim not in self._unverified_claims: 133 raise IdentityError( 134 f"Identity token is missing the required {identity_claim!r} claim" 135 ) 136 137 self._identity = str(self._unverified_claims.get(identity_claim)) 138 else: 139 try: 140 self._identity = str(self._unverified_claims["sub"]) 141 except KeyError: 142 raise IdentityError( 143 "Identity token is missing the required 'sub' claim" 144 ) 145 146 # This identity token might have been retrieved directly from 147 # an identity provider, or it might be a "federated" identity token 148 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 149 # In the latter case, the claims will also include a `federated_claims` 150 # set, which in turn should include a `connector_id` that reflects 151 # the "real" token issuer. We retrieve this, despite technically 152 # being an implementation detail, because it has value to client 153 # users: a client might want to make sure that its user is identifying 154 # with a *particular* IdP, which means that they need to pierce the 155 # federation layer to check which IdP is actually being used. 156 self._federated_issuer: str | None = None 157 federated_claims = self._unverified_claims.get("federated_claims") 158 if federated_claims is not None: 159 if not isinstance(federated_claims, dict): 160 raise IdentityError( 161 "unexpected claim type: federated_claims is not a dict" 162 ) 163 164 federated_issuer = federated_claims.get("connector_id") 165 if federated_issuer is not None: 166 if not isinstance(federated_issuer, str): 167 raise IdentityError( 168 "unexpected claim type: federated_claims.connector_id is not a string" 169 ) 170 171 self._federated_issuer = federated_issuer 172 173 def in_validity_period(self) -> bool: 174 """ 175 Returns whether or not this `Identity` is currently within its self-stated validity period. 176 177 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 178 the check here only asserts whether the *unverified* identity's claims 179 are within their validity period. 180 """ 181 182 now = datetime.now(timezone.utc).timestamp() 183 184 if self._nbf is not None: 185 return self._nbf <= now < self._exp 186 else: 187 return now < self._exp 188 189 @property 190 def identity(self) -> str: 191 """ 192 Returns this `IdentityToken`'s underlying "subject". 193 194 Note that this is **not** always the `sub` claim in the corresponding 195 identity token: depending onm the token's issuer, it may be a *different* 196 claim, such as `email`. This corresponds to the Sigstore ecosystem's 197 behavior, e.g. in each issued certificate's SAN. 198 """ 199 return self._identity 200 201 @property 202 def issuer(self) -> str: 203 """ 204 Returns a URL identifying this `IdentityToken`'s issuer. 205 """ 206 return self._iss 207 208 @property 209 def federated_issuer(self) -> str: 210 """ 211 Returns a URL identifying the **federated** issuer for any Sigstore 212 certificate issued against this identity token. 213 214 The behavior of this field is slightly subtle: for non-federated 215 identity providers (like a token issued directly by Google's IdP) it 216 should be exactly equivalent to `IdentityToken.issuer`. For federated 217 issuers (like Sigstore's own federated IdP) it should be equivalent to 218 the underlying federated issuer's URL, which is kept in an 219 implementation-defined claim. 220 221 This attribute exists so that clients who wish to inspect the expected 222 underlying issuer of their certificates can do so without relying on 223 implementation-specific behavior. 224 """ 225 if self._federated_issuer is not None: 226 return self._federated_issuer 227 228 return self.issuer 229 230 def __str__(self) -> str: 231 """ 232 Returns the underlying OIDC token for this identity. 233 234 That this token is secret in nature and **MUST NOT** be disclosed. 235 """ 236 return self._raw_token
An OIDC "identity", corresponding to an underlying OIDC token with a sensible subject, issuer, and audience for Sigstore purposes.
83 def __init__(self, raw_token: str) -> None: 84 """ 85 Create a new `IdentityToken` from the given OIDC token. 86 """ 87 88 self._raw_token = raw_token 89 90 # NOTE: The lack of verification here is intentional, and is part of 91 # Sigstore's verification model: clients like sigstore-python are 92 # responsible only for forwarding the OIDC identity to Fulcio for 93 # certificate binding and issuance. 94 try: 95 self._unverified_claims = jwt.decode( 96 raw_token, 97 options={ 98 "verify_signature": False, 99 "verify_aud": True, 100 "verify_iat": True, 101 "verify_exp": True, 102 # These claims are required by OpenID Connect, so 103 # we can strongly enforce their presence. 104 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 105 "require": ["aud", "sub", "iat", "exp", "iss"], 106 }, 107 audience=DEFAULT_AUDIENCE, 108 # NOTE: This leeway shouldn't be strictly necessary, but is 109 # included to preempt any (small) skew between the host 110 # and the originating IdP. 111 leeway=5, 112 ) 113 except Exception as exc: 114 raise IdentityError( 115 "Identity token is malformed or missing claims" 116 ) from exc 117 118 self._iss: str = self._unverified_claims["iss"] 119 self._nbf: int | None = self._unverified_claims.get("nbf") 120 self._exp: int = self._unverified_claims["exp"] 121 122 # Fail early if this token isn't within its validity period. 123 if not self.in_validity_period(): 124 raise IdentityError("Identity token is not within its validity period") 125 126 # When verifying the private key possession proof, Fulcio uses 127 # different claims depending on the token's issuer. 128 # We currently special-case a handful of these, and fall back 129 # on signing the "sub" claim otherwise. 130 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 131 if identity_claim is not None: 132 if identity_claim not in self._unverified_claims: 133 raise IdentityError( 134 f"Identity token is missing the required {identity_claim!r} claim" 135 ) 136 137 self._identity = str(self._unverified_claims.get(identity_claim)) 138 else: 139 try: 140 self._identity = str(self._unverified_claims["sub"]) 141 except KeyError: 142 raise IdentityError( 143 "Identity token is missing the required 'sub' claim" 144 ) 145 146 # This identity token might have been retrieved directly from 147 # an identity provider, or it might be a "federated" identity token 148 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 149 # In the latter case, the claims will also include a `federated_claims` 150 # set, which in turn should include a `connector_id` that reflects 151 # the "real" token issuer. We retrieve this, despite technically 152 # being an implementation detail, because it has value to client 153 # users: a client might want to make sure that its user is identifying 154 # with a *particular* IdP, which means that they need to pierce the 155 # federation layer to check which IdP is actually being used. 156 self._federated_issuer: str | None = None 157 federated_claims = self._unverified_claims.get("federated_claims") 158 if federated_claims is not None: 159 if not isinstance(federated_claims, dict): 160 raise IdentityError( 161 "unexpected claim type: federated_claims is not a dict" 162 ) 163 164 federated_issuer = federated_claims.get("connector_id") 165 if federated_issuer is not None: 166 if not isinstance(federated_issuer, str): 167 raise IdentityError( 168 "unexpected claim type: federated_claims.connector_id is not a string" 169 ) 170 171 self._federated_issuer = federated_issuer
Create a new IdentityToken
from the given OIDC token.
173 def in_validity_period(self) -> bool: 174 """ 175 Returns whether or not this `Identity` is currently within its self-stated validity period. 176 177 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 178 the check here only asserts whether the *unverified* identity's claims 179 are within their validity period. 180 """ 181 182 now = datetime.now(timezone.utc).timestamp() 183 184 if self._nbf is not None: 185 return self._nbf <= now < self._exp 186 else: 187 return now < self._exp
Returns whether or not this Identity
is currently within its self-stated validity period.
NOTE: As noted in Identity.__init__
, this is not a verifying wrapper;
the check here only asserts whether the unverified identity's claims
are within their validity period.
189 @property 190 def identity(self) -> str: 191 """ 192 Returns this `IdentityToken`'s underlying "subject". 193 194 Note that this is **not** always the `sub` claim in the corresponding 195 identity token: depending onm the token's issuer, it may be a *different* 196 claim, such as `email`. This corresponds to the Sigstore ecosystem's 197 behavior, e.g. in each issued certificate's SAN. 198 """ 199 return self._identity
Returns this IdentityToken
's underlying "subject".
Note that this is not always the sub
claim in the corresponding
identity token: depending onm the token's issuer, it may be a different
claim, such as email
. This corresponds to the Sigstore ecosystem's
behavior, e.g. in each issued certificate's SAN.
201 @property 202 def issuer(self) -> str: 203 """ 204 Returns a URL identifying this `IdentityToken`'s issuer. 205 """ 206 return self._iss
Returns a URL identifying this IdentityToken
's issuer.
208 @property 209 def federated_issuer(self) -> str: 210 """ 211 Returns a URL identifying the **federated** issuer for any Sigstore 212 certificate issued against this identity token. 213 214 The behavior of this field is slightly subtle: for non-federated 215 identity providers (like a token issued directly by Google's IdP) it 216 should be exactly equivalent to `IdentityToken.issuer`. For federated 217 issuers (like Sigstore's own federated IdP) it should be equivalent to 218 the underlying federated issuer's URL, which is kept in an 219 implementation-defined claim. 220 221 This attribute exists so that clients who wish to inspect the expected 222 underlying issuer of their certificates can do so without relying on 223 implementation-specific behavior. 224 """ 225 if self._federated_issuer is not None: 226 return self._federated_issuer 227 228 return self.issuer
Returns a URL identifying the federated issuer for any Sigstore certificate issued against this identity token.
The behavior of this field is slightly subtle: for non-federated
identity providers (like a token issued directly by Google's IdP) it
should be exactly equivalent to IdentityToken.issuer
. For federated
issuers (like Sigstore's own federated IdP) it should be equivalent to
the underlying federated issuer's URL, which is kept in an
implementation-defined claim.
This attribute exists so that clients who wish to inspect the expected underlying issuer of their certificates can do so without relying on implementation-specific behavior.
239class IssuerError(Exception): 240 """ 241 Raised on any communication or format error with an OIDC issuer. 242 """ 243 244 pass
Raised on any communication or format error with an OIDC issuer.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
247class Issuer: 248 """ 249 Represents an OIDC issuer (IdP). 250 """ 251 252 def __init__(self, base_url: str) -> None: 253 """ 254 Create a new `Issuer` from the given base URL. 255 256 This URL is used to locate an OpenID Connect configuration file, 257 which is then used to bootstrap the issuer's state (such 258 as authorization and token endpoints). 259 """ 260 self.session = requests.Session() 261 self.session.headers.update({"User-Agent": USER_AGENT}) 262 263 oidc_config_url = urllib.parse.urljoin( 264 f"{base_url}/", ".well-known/openid-configuration" 265 ) 266 267 try: 268 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 269 except (requests.ConnectionError, requests.Timeout) as exc: 270 raise NetworkError from exc 271 272 try: 273 resp.raise_for_status() 274 except requests.HTTPError as http_error: 275 raise IssuerError from http_error 276 277 try: 278 # We don't generally expect this to fail (since the provider should 279 # return a non-success HTTP code which we catch above), but we 280 # check just in case we have a misbehaving OIDC issuer. 281 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 282 except ValueError as exc: 283 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 284 285 @classmethod 286 def production(cls) -> Issuer: 287 """ 288 Returns an `Issuer` configured against Sigstore's production-level services. 289 """ 290 return cls(DEFAULT_OAUTH_ISSUER_URL) 291 292 @classmethod 293 def staging(cls) -> Issuer: 294 """ 295 Returns an `Issuer` configured against Sigstore's staging-level services. 296 """ 297 return cls(STAGING_OAUTH_ISSUER_URL) 298 299 def identity_token( # nosec: B107 300 self, 301 client_id: str = "sigstore", 302 client_secret: str = "", 303 force_oob: bool = False, 304 ) -> IdentityToken: 305 """ 306 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 307 308 This function blocks on user interaction. 309 310 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 311 this function attempts to open the user's web browser before falling back to 312 an out-of-band flow. When `True`, the out-of-band flow is always used. 313 """ 314 315 # This function and the components that it relies on are based off of: 316 # https://github.com/psteniusubi/python-sample 317 318 from sigstore._internal.oidc.oauth import _OAuthFlow 319 320 code: str 321 with _OAuthFlow(client_id, client_secret, self) as server: 322 # Launch web browser 323 if not force_oob and webbrowser.open(server.base_uri): 324 print("Waiting for browser interaction...", file=sys.stderr) 325 else: 326 server.enable_oob() 327 print( 328 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 329 file=sys.stderr, 330 ) 331 332 if not server.is_oob(): 333 # Wait until the redirect server populates the response 334 while server.auth_response is None: 335 time.sleep(0.1) 336 337 auth_error = server.auth_response.get("error") 338 if auth_error is not None: 339 raise IdentityError( 340 f"Error response from auth endpoint: {auth_error[0]}" 341 ) 342 code = server.auth_response["code"][0] 343 else: 344 # In the out-of-band case, we wait until the user provides the code 345 code = input("Enter verification code: ") 346 347 # Provide code to token endpoint 348 data = { 349 "grant_type": "authorization_code", 350 "redirect_uri": server.redirect_uri, 351 "code": code, 352 "code_verifier": server.oauth_session.code_verifier, 353 } 354 auth = ( 355 client_id, 356 client_secret, 357 ) 358 logging.debug(f"PAYLOAD: data={data}") 359 try: 360 resp = self.session.post( 361 self.oidc_config.token_endpoint, 362 data=data, 363 auth=auth, 364 timeout=30, 365 ) 366 except (requests.ConnectionError, requests.Timeout) as exc: 367 raise NetworkError from exc 368 369 try: 370 resp.raise_for_status() 371 except requests.HTTPError as http_error: 372 raise IdentityError( 373 f"Token request failed with {resp.status_code}" 374 ) from http_error 375 376 token_json = resp.json() 377 token_error = token_json.get("error") 378 if token_error is not None: 379 raise IdentityError(f"Error response from token endpoint: {token_error}") 380 381 return IdentityToken(token_json["access_token"])
Represents an OIDC issuer (IdP).
252 def __init__(self, base_url: str) -> None: 253 """ 254 Create a new `Issuer` from the given base URL. 255 256 This URL is used to locate an OpenID Connect configuration file, 257 which is then used to bootstrap the issuer's state (such 258 as authorization and token endpoints). 259 """ 260 self.session = requests.Session() 261 self.session.headers.update({"User-Agent": USER_AGENT}) 262 263 oidc_config_url = urllib.parse.urljoin( 264 f"{base_url}/", ".well-known/openid-configuration" 265 ) 266 267 try: 268 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 269 except (requests.ConnectionError, requests.Timeout) as exc: 270 raise NetworkError from exc 271 272 try: 273 resp.raise_for_status() 274 except requests.HTTPError as http_error: 275 raise IssuerError from http_error 276 277 try: 278 # We don't generally expect this to fail (since the provider should 279 # return a non-success HTTP code which we catch above), but we 280 # check just in case we have a misbehaving OIDC issuer. 281 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 282 except ValueError as exc: 283 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
Create a new Issuer
from the given base URL.
This URL is used to locate an OpenID Connect configuration file, which is then used to bootstrap the issuer's state (such as authorization and token endpoints).
285 @classmethod 286 def production(cls) -> Issuer: 287 """ 288 Returns an `Issuer` configured against Sigstore's production-level services. 289 """ 290 return cls(DEFAULT_OAUTH_ISSUER_URL)
Returns an Issuer
configured against Sigstore's production-level services.
292 @classmethod 293 def staging(cls) -> Issuer: 294 """ 295 Returns an `Issuer` configured against Sigstore's staging-level services. 296 """ 297 return cls(STAGING_OAUTH_ISSUER_URL)
Returns an Issuer
configured against Sigstore's staging-level services.
299 def identity_token( # nosec: B107 300 self, 301 client_id: str = "sigstore", 302 client_secret: str = "", 303 force_oob: bool = False, 304 ) -> IdentityToken: 305 """ 306 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 307 308 This function blocks on user interaction. 309 310 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 311 this function attempts to open the user's web browser before falling back to 312 an out-of-band flow. When `True`, the out-of-band flow is always used. 313 """ 314 315 # This function and the components that it relies on are based off of: 316 # https://github.com/psteniusubi/python-sample 317 318 from sigstore._internal.oidc.oauth import _OAuthFlow 319 320 code: str 321 with _OAuthFlow(client_id, client_secret, self) as server: 322 # Launch web browser 323 if not force_oob and webbrowser.open(server.base_uri): 324 print("Waiting for browser interaction...", file=sys.stderr) 325 else: 326 server.enable_oob() 327 print( 328 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 329 file=sys.stderr, 330 ) 331 332 if not server.is_oob(): 333 # Wait until the redirect server populates the response 334 while server.auth_response is None: 335 time.sleep(0.1) 336 337 auth_error = server.auth_response.get("error") 338 if auth_error is not None: 339 raise IdentityError( 340 f"Error response from auth endpoint: {auth_error[0]}" 341 ) 342 code = server.auth_response["code"][0] 343 else: 344 # In the out-of-band case, we wait until the user provides the code 345 code = input("Enter verification code: ") 346 347 # Provide code to token endpoint 348 data = { 349 "grant_type": "authorization_code", 350 "redirect_uri": server.redirect_uri, 351 "code": code, 352 "code_verifier": server.oauth_session.code_verifier, 353 } 354 auth = ( 355 client_id, 356 client_secret, 357 ) 358 logging.debug(f"PAYLOAD: data={data}") 359 try: 360 resp = self.session.post( 361 self.oidc_config.token_endpoint, 362 data=data, 363 auth=auth, 364 timeout=30, 365 ) 366 except (requests.ConnectionError, requests.Timeout) as exc: 367 raise NetworkError from exc 368 369 try: 370 resp.raise_for_status() 371 except requests.HTTPError as http_error: 372 raise IdentityError( 373 f"Token request failed with {resp.status_code}" 374 ) from http_error 375 376 token_json = resp.json() 377 token_error = token_json.get("error") 378 if token_error is not None: 379 raise IdentityError(f"Error response from token endpoint: {token_error}") 380 381 return IdentityToken(token_json["access_token"])
Retrieves and returns an IdentityToken
from the current Issuer
, via OAuth.
This function blocks on user interaction.
The force_oob
flag controls the kind of flow performed. When False
(the default),
this function attempts to open the user's web browser before falling back to
an out-of-band flow. When True
, the out-of-band flow is always used.
384class IdentityError(Error): 385 """ 386 Wraps `id`'s IdentityError. 387 """ 388 389 @classmethod 390 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 391 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 392 raise cls(str(exc)) from exc 393 394 def diagnostics(self) -> str: 395 """Returns diagnostics for the error.""" 396 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 397 return f""" 398 Insufficient permissions for GitHub Actions workflow. 399 400 The most common reason for this is incorrect 401 configuration of the top-level `permissions` setting of the 402 workflow YAML file. It should be configured like so: 403 404 permissions: 405 id-token: write 406 407 Relevant documentation here: 408 409 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 410 411 Another possible reason is that the workflow run has been 412 triggered by a PR from a forked repository. PRs from forked 413 repositories typically cannot be granted write access. 414 415 Relevant documentation here: 416 417 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 418 419 Additional context: 420 421 {self.__cause__} 422 """ 423 else: 424 return f""" 425 An issue occurred with ambient credential detection. 426 427 Additional context: 428 429 {self} 430 """
Wraps id
's IdentityError.
389 @classmethod 390 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 391 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 392 raise cls(str(exc)) from exc
Raises a wrapped IdentityError from the provided id.IdentityError
.
394 def diagnostics(self) -> str: 395 """Returns diagnostics for the error.""" 396 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 397 return f""" 398 Insufficient permissions for GitHub Actions workflow. 399 400 The most common reason for this is incorrect 401 configuration of the top-level `permissions` setting of the 402 workflow YAML file. It should be configured like so: 403 404 permissions: 405 id-token: write 406 407 Relevant documentation here: 408 409 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 410 411 Another possible reason is that the workflow run has been 412 triggered by a PR from a forked repository. PRs from forked 413 repositories typically cannot be granted write access. 414 415 Relevant documentation here: 416 417 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 418 419 Additional context: 420 421 {self.__cause__} 422 """ 423 else: 424 return f""" 425 An issue occurred with ambient credential detection. 426 427 Additional context: 428 429 {self} 430 """
Returns diagnostics for the error.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
433def detect_credential() -> Optional[str]: 434 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 435 try: 436 return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE)) 437 except id.IdentityError as exc: 438 IdentityError.raise_from_id(exc)
Calls id.detect_credential
, but wraps exceptions with our own exception type.