Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,10 @@ If you relied on extra fields round-tripping through MCP types, move that data i

## New Features

### Step-up authorization unions previously requested scopes (SEP-2350)

When a `403 insufficient_scope` challenge triggers step-up re-authorization, the OAuth client now requests the union of the previously requested scopes and the newly challenged scopes, instead of replacing the scope with only the challenged ones. This keeps permissions granted for earlier operations from being dropped when a later operation escalates. No API change; the wider scope is sent automatically on the re-authorization request.

### OAuth Dynamic Client Registration sends `application_type` (SEP-837)

`OAuthClientMetadata` now carries an `application_type` field that is sent during Dynamic Client Registration. It defaults to `"native"`, which suits MCP clients that use loopback redirect URIs (CLI and desktop apps); browser-based clients served from a non-local host should set it to `"web"`:
Expand Down
11 changes: 9 additions & 2 deletions src/mcp/client/auth/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
handle_token_response_scopes,
is_valid_client_metadata_url,
should_use_client_metadata_url,
union_scopes,
validate_authorization_response_iss,
validate_metadata_issuer,
)
Expand Down Expand Up @@ -634,13 +635,19 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.
# Step 2: Check if we need to step-up authorization
if error == "insufficient_scope": # pragma: no branch
try:
# Step 2a: Update the required scopes
self.context.client_metadata.scope = get_client_metadata_scopes(
# Step 2a: Union previously requested scopes with the newly challenged
# scopes (SEP-2350) so escalating one operation keeps the others' grants.
# Fold in the stored token's scope too: on a restart the token is reloaded
# but client_metadata.scope is not, so it would otherwise be the only basis.
challenged_scope = get_client_metadata_scopes(
extract_scope_from_www_auth(response),
self.context.protected_resource_metadata,
self.context.oauth_metadata,
self.context.client_metadata.grant_types,
)
granted_scope = self.context.current_tokens.scope if self.context.current_tokens else None
prior_scope = union_scopes(self.context.client_metadata.scope, granted_scope)
self.context.client_metadata.scope = union_scopes(prior_scope, challenged_scope)

# Step 2b: Perform (re-)authorization and token exchange
token_response = yield await self._perform_authorization()
Expand Down
22 changes: 22 additions & 0 deletions src/mcp/client/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,28 @@ def get_client_metadata_scopes(
return selected_scope


def union_scopes(previous_scope: str | None, new_scope: str | None) -> str | None:
"""Merge two space-delimited scope strings, preserving order and dropping duplicates.

SEP-2350: on step-up re-authorization the client requests the union of previously requested
scopes and the newly challenged scopes, so escalating one operation does not drop the
permissions granted for another. Previously requested scopes come first; new scopes are
appended in order.
"""
if not previous_scope:
return new_scope
if not new_scope:
return previous_scope

merged = previous_scope.split()
seen = set(merged)
for scope in new_scope.split():
if scope not in seen:
merged.append(scope)
seen.add(scope)
return " ".join(merged)


def build_oauth_authorization_server_metadata_discovery_urls(auth_server_url: str | None, server_url: str) -> list[str]:
"""Generate an ordered list of URLs for authorization server metadata discovery.

Expand Down
88 changes: 82 additions & 6 deletions tests/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
handle_registration_response,
is_valid_client_metadata_url,
should_use_client_metadata_url,
union_scopes,
validate_authorization_response_iss,
validate_metadata_issuer,
)
Expand Down Expand Up @@ -1387,10 +1388,9 @@
async def capture_redirect(url: str) -> None:
nonlocal redirect_captured, captured_state
redirect_captured = True
# Verify the new scope is included in authorization URL
assert "scope=admin%3Awrite+admin%3Adelete" in url or "scope=admin:write+admin:delete" in url.replace(
"%3A", ":"
).replace("+", " ")
# SEP-2350: the authorization URL carries the union of the prior and challenged scopes
scope = parse_qs(urlparse(url).query)["scope"][0]
assert scope == "read write admin:write admin:delete"
# Extract state from redirect URL
parsed = urlparse(url)
params = parse_qs(parsed.query)
Expand Down Expand Up @@ -1420,8 +1420,8 @@
# Trigger step-up - should get token exchange request
token_exchange_request = await auth_flow.asend(response_403)

# Verify scope was updated
assert oauth_provider.context.client_metadata.scope == "admin:write admin:delete"
# Verify scope was updated to the union of prior and challenged scopes (SEP-2350)
assert oauth_provider.context.client_metadata.scope == "read write admin:write admin:delete"
assert redirect_captured

# Complete the flow with successful token response
Expand All @@ -1447,6 +1447,65 @@
except StopAsyncIteration:
pass # Expected

@pytest.mark.anyio
async def test_403_step_up_preserves_scope_from_stored_token(
self, oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage
):

Check warning on line 1453 in tests/client/test_auth.py

View check run for this annotation

Claude / Claude Code Review

New test added inside Test* class against repo convention

Per the repo's AGENTS.md testing convention, new tests should be plain top-level test_* functions, not methods of Test* classes — even when adding to a legacy file that already has them. test_403_step_up_preserves_scope_from_stored_token is added inside the TestAuthFlow class, but the fixtures it uses (oauth_provider, mock_storage) are module-level, so it can be moved to a top-level function unchanged (drop self and dedent), matching the other new test test_union_scopes.
Comment on lines +1450 to +1453

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Per the repo's AGENTS.md testing convention, new tests should be plain top-level test_* functions, not methods of Test* classes — even when adding to a legacy file that already has them. test_403_step_up_preserves_scope_from_stored_token is added inside the TestAuthFlow class, but the fixtures it uses (oauth_provider, mock_storage) are module-level, so it can be moved to a top-level function unchanged (drop self and dedent), matching the other new test test_union_scopes.

Extended reasoning...

AGENTS.md (loaded into the project guidelines via CLAUDE.md's @AGENTS.md include) states the testing convention explicitly: "Do not use Test prefixed classes — write plain top-level test_* functions. Legacy files still contain Test* classes; do NOT follow that pattern for new tests even when adding to such a file." That last clause directly addresses this situation — adding a new test to a file that already contains legacy Test* classes is not an exception.

The new test test_403_step_up_preserves_scope_from_stored_token (added in commit 33bcb51 to cover the restart/stored-token path) is defined as a method of the existing TestAuthFlow class in tests/client/test_auth.py: it takes self, is indented as a class member, and sits immediately after test_403_insufficient_scope_updates_scope_from_header. This is exactly the pattern the guideline forbids for new tests.

There is no technical reason the test needs to live in the class. Both fixtures it requests — oauth_provider (tests/client/test_auth.py:92) and mock_storage (tests/client/test_auth.py:66) — are module-level pytest fixtures, not class-scoped, so they resolve identically for a top-level function. The class provides no shared state or setup the test depends on.

The PR itself demonstrates the intended pattern: the other new test, test_union_scopes, is written as a plain top-level parametrized function at the bottom of the same file. So the convention was applied to one of the two new tests but not the other.

Step-by-step illustration of the fix:

  1. Cut the entire test_403_step_up_preserves_scope_from_stored_token method out of TestAuthFlow.
  2. Paste it at module level (e.g. after the class, near the other top-level tests), dedent one level, and remove the self parameter so the signature becomes async def test_403_step_up_preserves_scope_from_stored_token(oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage):.
  3. Keep the @pytest.mark.anyio decorator. Run pytest tests/client/test_auth.py -k stored_token — it passes unchanged because both fixtures are module-level.

Impact is purely stylistic — the test runs and asserts the same behavior either way — so this is a nit, not a blocker. Fixing it keeps the file from accumulating more class-based tests that the guideline is trying to phase out.

"""SEP-2350: a restart-loaded token's scope is folded into the step-up union.

On restart only the token is reloaded (not client_metadata.scope), so the stored token's
granted scope must seed the union, or the challenge would re-authorize for less.
"""
client_info = OAuthClientInformationFull(
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uris=[AnyUrl("http://localhost:3030/callback")],
)
# Simulate a restart: a token granted "read" is loaded, but client_metadata carries no scope.
oauth_provider.context.current_tokens = OAuthToken(access_token="t", scope="read")
oauth_provider.context.token_expiry_time = time.time() + 1800
oauth_provider.context.client_info = client_info
oauth_provider.context.client_metadata.scope = None
oauth_provider._initialized = True

captured_state: str | None = None
reauthorize_scope: str | None = None

async def capture_redirect(url: str) -> None:
nonlocal captured_state, reauthorize_scope
params = parse_qs(urlparse(url).query)
reauthorize_scope = params["scope"][0]
captured_state = params.get("state", [None])[0]

async def mock_callback() -> AuthorizationCodeResult:
return AuthorizationCodeResult(code="auth_code", state=captured_state)

oauth_provider.context.redirect_handler = capture_redirect
oauth_provider.context.callback_handler = mock_callback

auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp"))
request = await auth_flow.__anext__()
response_403 = httpx.Response(
403,
headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="write"'},
request=request,
)
token_exchange_request = await auth_flow.asend(response_403)

assert reauthorize_scope == "read write"

# Drive the flow to completion so the context lock is released cleanly
token_response = httpx.Response(
200,
json={"access_token": "new", "token_type": "Bearer", "expires_in": 3600, "scope": "read write"},
request=token_exchange_request,
)
final_request = await auth_flow.asend(token_response)
try:
await auth_flow.asend(httpx.Response(200, request=final_request))
except StopAsyncIteration:
pass


@pytest.mark.parametrize(
(
Expand Down Expand Up @@ -2717,3 +2776,20 @@
def test_validate_metadata_issuer_rejects_mismatch():
with pytest.raises(OAuthFlowError, match="metadata issuer mismatch"):
validate_metadata_issuer(_issuer_metadata(issuer="https://attacker.example.com"), _ISSUER)


@pytest.mark.parametrize(
("previous", "new", "expected"),
[
pytest.param("mcp:basic", "mcp:write", "mcp:basic mcp:write", id="disjoint-union-order"),
pytest.param(
"mcp:basic offline_access", "mcp:write mcp:basic", "mcp:basic offline_access mcp:write", id="dedup"
),
pytest.param(None, "mcp:write", "mcp:write", id="no-previous"),
pytest.param("mcp:basic", None, "mcp:basic", id="no-new"),
pytest.param(None, None, None, id="both-empty"),
],
)
def test_union_scopes(previous: str | None, new: str | None, expected: str | None):
"""SEP-2350: union merges previous and new scopes, dedups, and preserves order."""
assert union_scopes(previous, new) == expected
9 changes: 9 additions & 0 deletions tests/interaction/_requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -3265,6 +3265,15 @@ def __post_init__(self) -> None:
transports=("streamable-http",),
note="OAuth is HTTP-only.",
),
"client-auth:403-scope-union": Requirement(
source=f"{SPEC_BASE_URL}/basic/authorization#step-up-authorization-flow",
behavior=(
"On a 403 insufficient_scope step-up, the re-authorization request carries the union of the "
"previously requested scopes and the newly challenged scopes (SEP-2350)."
),
transports=("streamable-http",),
note="OAuth is HTTP-only.",
),
"client-auth:as-metadata-discovery:priority-order": Requirement(
source=f"{SPEC_BASE_URL}/basic/authorization#authorization-server-metadata-discovery",
behavior=(
Expand Down
29 changes: 29 additions & 0 deletions tests/interaction/auth/test_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,35 @@ async def test_a_403_insufficient_scope_triggers_one_reauthorize_with_the_challe
assert counts[("POST", "/token")] == 2


@requirement("client-auth:403-scope-union")
async def test_a_403_step_up_re_authorizes_with_the_union_of_prior_and_challenged_scopes() -> None:
"""The step-up re-authorize requests the union of the previously requested and challenged scopes.

The first authorization requests `mcp`; the 403 challenges a disjoint `write` (not naming
`mcp`). Per SEP-2350 the client must re-authorize with `mcp write`, not drop `mcp`. The client
is pre-registered with both scopes so the server's authorize handler accepts the wider request.
"""
provider = InMemoryAuthorizationServerProvider()
storage = InMemoryTokenStorage(client_info=seeded_client(provider, scope="mcp write"))
server = Server("guarded", on_list_tools=list_tools)
settings = auth_settings(required_scopes=["mcp"], valid_scopes=["mcp", "write"])
challenge = 'Bearer error="insufficient_scope", scope="write"'

with anyio.fail_after(5):
async with connect_with_oauth(
server,
provider=provider,
storage=storage,
settings=settings,
app_shim=step_up_shim(challenge),
) as (client, headless):
await client.list_tools()

assert len(headless.authorize_urls) == 2
assert authorize_params(headless.authorize_urls[0])["scope"] == "mcp"
assert authorize_params(headless.authorize_urls[1])["scope"] == "mcp write"


@requirement("client-auth:401-after-auth-throws")
async def test_a_second_401_after_a_completed_oauth_flow_surfaces_without_looping() -> None:
"""A 401 on the post-auth retry surfaces as an error rather than re-entering discovery.
Expand Down
Loading