Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 56 additions & 55 deletions tests/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1447,64 +1447,65 @@ async def mock_callback() -> AuthorizationCodeResult:
except StopAsyncIteration:
pass # Expected

@pytest.mark.anyio
async def test_403_step_up_preserves_scope_from_stored_token(
self, oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage
):
"""SEP-2350: a restart-loaded token's scope is folded into the step-up union.

On restart only the token is reloaded (not client_metadata.scope), so the stored token's
granted scope must seed the union, or the challenge would re-authorize for less.
"""
client_info = OAuthClientInformationFull(
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uris=[AnyUrl("http://localhost:3030/callback")],
)
# Simulate a restart: a token granted "read" is loaded, but client_metadata carries no scope.
oauth_provider.context.current_tokens = OAuthToken(access_token="t", scope="read")
oauth_provider.context.token_expiry_time = time.time() + 1800
oauth_provider.context.client_info = client_info
oauth_provider.context.client_metadata.scope = None
oauth_provider._initialized = True

captured_state: str | None = None
reauthorize_scope: str | None = None

async def capture_redirect(url: str) -> None:
nonlocal captured_state, reauthorize_scope
params = parse_qs(urlparse(url).query)
reauthorize_scope = params["scope"][0]
captured_state = params.get("state", [None])[0]

async def mock_callback() -> AuthorizationCodeResult:
return AuthorizationCodeResult(code="auth_code", state=captured_state)

oauth_provider.context.redirect_handler = capture_redirect
oauth_provider.context.callback_handler = mock_callback

auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp"))
request = await auth_flow.__anext__()
response_403 = httpx.Response(
403,
headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="write"'},
request=request,
)
token_exchange_request = await auth_flow.asend(response_403)
@pytest.mark.anyio
async def test_403_step_up_preserves_scope_from_stored_token(
oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage
):
"""SEP-2350: a restart-loaded token's scope is folded into the step-up union.

On restart only the token is reloaded (not client_metadata.scope), so the stored token's
granted scope must seed the union, or the challenge would re-authorize for less.
"""
client_info = OAuthClientInformationFull(
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uris=[AnyUrl("http://localhost:3030/callback")],
)
# Simulate a restart: a token granted "read" is loaded, but client_metadata carries no scope.
oauth_provider.context.current_tokens = OAuthToken(access_token="t", scope="read")
oauth_provider.context.token_expiry_time = time.time() + 1800
oauth_provider.context.client_info = client_info
oauth_provider.context.client_metadata.scope = None
oauth_provider._initialized = True

captured_state: str | None = None
reauthorize_scope: str | None = None

async def capture_redirect(url: str) -> None:
nonlocal captured_state, reauthorize_scope
params = parse_qs(urlparse(url).query)
reauthorize_scope = params["scope"][0]
captured_state = params.get("state", [None])[0]

async def mock_callback() -> AuthorizationCodeResult:
return AuthorizationCodeResult(code="auth_code", state=captured_state)

oauth_provider.context.redirect_handler = capture_redirect
oauth_provider.context.callback_handler = mock_callback

auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp"))
request = await auth_flow.__anext__()
response_403 = httpx.Response(
403,
headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="write"'},
request=request,
)
token_exchange_request = await auth_flow.asend(response_403)

assert reauthorize_scope == "read write"
assert reauthorize_scope == "read write"

# Drive the flow to completion so the context lock is released cleanly
token_response = httpx.Response(
200,
json={"access_token": "new", "token_type": "Bearer", "expires_in": 3600, "scope": "read write"},
request=token_exchange_request,
)
final_request = await auth_flow.asend(token_response)
try:
await auth_flow.asend(httpx.Response(200, request=final_request))
except StopAsyncIteration:
pass
# Drive the flow to completion so the context lock is released cleanly
token_response = httpx.Response(
200,
json={"access_token": "new", "token_type": "Bearer", "expires_in": 3600, "scope": "read write"},
request=token_exchange_request,
)
final_request = await auth_flow.asend(token_response)
try:
await auth_flow.asend(httpx.Response(200, request=final_request))
except StopAsyncIteration:
pass


@pytest.mark.parametrize(
Expand Down
Loading