mirror of
https://github.com/velocitatem/cvfs.git
synced 2026-05-31 08:43:37 +00:00
verify tokens using full jwks and relaxed options
This commit is contained in:
@@ -58,19 +58,22 @@ class OidcTokenValidator:
|
|||||||
sub="dev-user", email="dev@example.com", name="Developer"
|
sub="dev-user", email="dev@example.com", name="Developer"
|
||||||
)
|
)
|
||||||
header = jwt.get_unverified_header(token)
|
header = jwt.get_unverified_header(token)
|
||||||
kid = header.get("kid")
|
|
||||||
alg = header.get("alg") or "RS256"
|
alg = header.get("alg") or "RS256"
|
||||||
key = await self._get_key(kid)
|
jwks = await self._get_jwks()
|
||||||
if not key:
|
if not jwks:
|
||||||
raise TokenValidationError("Unable to resolve signing key")
|
raise TokenValidationError("Unable to resolve signing keys")
|
||||||
try:
|
try:
|
||||||
claims = jwt.decode(
|
claims = jwt.decode(
|
||||||
token,
|
token,
|
||||||
key,
|
jwks,
|
||||||
algorithms=[alg],
|
algorithms=[alg],
|
||||||
audience=self.audience,
|
options={"verify_aud": False, "verify_iss": False},
|
||||||
issuer=self.issuer,
|
|
||||||
)
|
)
|
||||||
|
iss = claims.get("iss")
|
||||||
|
if self.issuer and iss not in (self.issuer, self.issuer + "/"):
|
||||||
|
# fallback: check if it matches discovery host
|
||||||
|
if not (iss and iss.startswith(self.issuer.split("/application/")[0])):
|
||||||
|
raise TokenValidationError(f"Invalid issuer: {iss}")
|
||||||
except JWTError as exc:
|
except JWTError as exc:
|
||||||
raise TokenValidationError(str(exc)) from exc
|
raise TokenValidationError(str(exc)) from exc
|
||||||
roles = claims.get("roles") or claims.get("app_metadata", {}).get("roles") or []
|
roles = claims.get("roles") or claims.get("app_metadata", {}).get("roles") or []
|
||||||
@@ -95,7 +98,7 @@ class OidcTokenValidator:
|
|||||||
if isinstance(jwks_uri, str):
|
if isinstance(jwks_uri, str):
|
||||||
self.jwks_url = jwks_uri
|
self.jwks_url = jwks_uri
|
||||||
|
|
||||||
async def _get_key(self, kid: str | None) -> dict[str, Any] | None:
|
async def _get_jwks(self) -> dict[str, Any] | None:
|
||||||
await self._ensure_jwks_url()
|
await self._ensure_jwks_url()
|
||||||
if not self.jwks_url:
|
if not self.jwks_url:
|
||||||
return None
|
return None
|
||||||
@@ -105,12 +108,7 @@ class OidcTokenValidator:
|
|||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
self._jwks = response.json()
|
self._jwks = response.json()
|
||||||
self._jwks_expiry = time.time() + 3600
|
self._jwks_expiry = time.time() + 3600
|
||||||
keys = self._jwks.get("keys", []) if isinstance(self._jwks, dict) else []
|
return self._jwks
|
||||||
if kid:
|
|
||||||
for key in keys:
|
|
||||||
if key.get("kid") == kid:
|
|
||||||
return key
|
|
||||||
return keys[0] if keys else None
|
|
||||||
|
|
||||||
|
|
||||||
def build_validator(
|
def build_validator(
|
||||||
|
|||||||
Reference in New Issue
Block a user