Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 246 additions & 58 deletions msal/authority.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import re
try:
from urllib.parse import urlparse
except ImportError: # Fall back to Python 2
Expand All @@ -16,21 +17,82 @@
AZURE_GOV_SG = "login.sovcloud-identity.sg"

WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net
WELL_KNOWN_AUTHORITY_HOSTS = frozenset([
WORLD_WIDE,
"login.microsoft.com",
"login.windows.net",
"sts.windows.net",
DEPRECATED_AZURE_CHINA,
"login.partner.microsoftonline.cn",
"login.microsoftonline.de", # deprecated
'login-us.microsoftonline.com',
AZURE_US_GOVERNMENT,
"login.usgovcloudapi.net",
AZURE_GOV_FR,
AZURE_GOV_DE,
AZURE_GOV_SG,
])

# Sovereign-cloud sentinels. Aliases of the same cloud map to the same value
# so callers can compare clouds with simple equality.
_CLOUD_PUBLIC = "PUBLIC"
_CLOUD_CHINA = "CHINA"
_CLOUD_GERMANY = "GERMANY"
_CLOUD_US_GOV = "US_GOV"
_CLOUD_US_ALT = "US_ALT"
_CLOUD_PPE = "PPE"
_CLOUD_BLEU = "BLEU"
_CLOUD_DELOS = "DELOS"
_CLOUD_GOV_SG = "GOV_SG"

# Single source of truth for known Microsoft authority hosts. Add an alias
# here and WELL_KNOWN_AUTHORITY_HOSTS / _KNOWN_HOST_TO_CLOUD pick it up.
_HOSTS_BY_CLOUD = {
_CLOUD_PUBLIC: (
AZURE_PUBLIC,
"login.microsoft.com",
"login.windows.net",
"sts.windows.net",
),
_CLOUD_CHINA: (
"login.partner.microsoftonline.cn",
DEPRECATED_AZURE_CHINA,
),
_CLOUD_GERMANY: ("login.microsoftonline.de",), # deprecated
_CLOUD_US_GOV: (
AZURE_US_GOVERNMENT,
"login.usgovcloudapi.net",
),
_CLOUD_US_ALT: ("login-us.microsoftonline.com",),
_CLOUD_BLEU: (AZURE_GOV_FR,),
_CLOUD_DELOS: (AZURE_GOV_DE,),
_CLOUD_GOV_SG: (AZURE_GOV_SG,),
}

# Hosts that resolve to a cloud for the cross-cloud check but MUST NOT enter
# WELL_KNOWN_AUTHORITY_HOSTS (which gates instance-discovery skipping).
# - PPE: non-production.
# - ciamlogin.com: bare suffix, never a usable authority on its own; tenant
# subdomains resolve to Public via _resolve_known_cloud's regional logic.
_EXTRA_HOSTS_BY_CLOUD = {
_CLOUD_PPE: (
"login.windows-ppe.net",
"sts.windows-ppe.net",
"login.microsoft-ppe.com",
),
_CLOUD_PUBLIC: ("ciamlogin.com",),
}

# Derived from _HOSTS_BY_CLOUD so a new alias cannot drift out of sync.
WELL_KNOWN_AUTHORITY_HOSTS = frozenset(
host for hosts in _HOSTS_BY_CLOUD.values() for host in hosts)

_KNOWN_HOST_TO_CLOUD = {
host: cloud
for cloud, hosts in _HOSTS_BY_CLOUD.items()
for host in hosts
}
_KNOWN_HOST_TO_CLOUD.update({
host: cloud
for cloud, hosts in _EXTRA_HOSTS_BY_CLOUD.items()
for host in hosts
})

# Catch a duplicated host at import time rather than in production.
_all_listed_hosts = [
h for hosts in _HOSTS_BY_CLOUD.values() for h in hosts
] + [
h for hosts in _EXTRA_HOSTS_BY_CLOUD.values() for h in hosts
]
assert len(_all_listed_hosts) == len(_KNOWN_HOST_TO_CLOUD), (
"Duplicate host in cloud tables: {}".format(
sorted(h for h in set(_all_listed_hosts) if _all_listed_hosts.count(h) > 1)))
del _all_listed_hosts

WELL_KNOWN_B2C_HOSTS = [
"b2clogin.com",
Expand All @@ -41,6 +103,84 @@
]
_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com"

# RFC 1035 / RFC 1123 DNS label: 1-63 chars, no leading/trailing hyphen.
# Used as a shape gate on the region prefix; not an allow-list of regions.
_REGION_PREFIX_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?$")


def _resolve_known_cloud(host):
"""Return the cloud sentinel for *host*, or None.

Trust gate for the cross-cloud check (used by `_are_in_same_cloud`,
`_ensure_endpoint_same_cloud_as_authority`, and Rule 2 of
`has_valid_issuer`). Tightening here is safe; loosening here weakens
all three call sites at once.

Matches an alias in :data:`_KNOWN_HOST_TO_CLOUD` or a ``{region}.{alias}``
sub-host where ``{region}`` matches :data:`_REGION_PREFIX_PATTERN`.
Hosts are lowercased here, so callers may pass either the raw
``urlparse(...).hostname`` (already lowercased per RFC 3986) or any
untrusted string.
"""
if not host:
return None
host = host.lower()
cloud = _KNOWN_HOST_TO_CLOUD.get(host)
if cloud is not None:
return cloud
dot = host.find(".")
if dot <= 0:
return None
prefix = host[:dot]
base = host[dot + 1:]
if _REGION_PREFIX_PATTERN.match(prefix) and base in _KNOWN_HOST_TO_CLOUD:
return _KNOWN_HOST_TO_CLOUD[base]
return None


def _are_in_same_cloud(host_a, host_b):
"""Default-deny: True iff both hosts resolve to the same known cloud."""
cloud_a = _resolve_known_cloud(host_a)
if cloud_a is None:
return False
cloud_b = _resolve_known_cloud(host_b)
if cloud_b is None:
return False
return cloud_a == cloud_b


def _ensure_endpoint_same_cloud_as_authority(
authority_url, endpoint_url, endpoint_name):
"""Reject an OIDC discovery endpoint that crosses sovereign clouds.

No-op when *authority_url* is a custom domain (custom OIDC IdPs are
unconstrained) or when *endpoint_url* is empty / not absolute. Raises
:class:`ValueError` naming the authority, endpoint kind, and offending
URL; no tokens or secrets are surfaced.
"""
if not endpoint_url:
return
endpoint_parsed = urlparse(endpoint_url)
if not endpoint_parsed.scheme or not endpoint_parsed.hostname:
return # Let downstream parsing surface a non-absolute URL
authority_host = urlparse(authority_url).hostname if authority_url else None
authority_cloud = _resolve_known_cloud(authority_host)
if authority_cloud is None:
return
endpoint_cloud = _resolve_known_cloud(endpoint_parsed.hostname)
if endpoint_cloud is None or endpoint_cloud != authority_cloud:
raise ValueError(
"OIDC discovery for authority '{authority}' returned a "
"{name} '{endpoint}' whose host is not in the same Microsoft "
"sovereign cloud as the authority. MSAL refused to use that "
"endpoint. Verify the OIDC discovery endpoint is not being "
"intercepted and that the configured authority points at the "
"correct sovereign cloud.".format(
authority=authority_url,
name=endpoint_name,
endpoint=endpoint_url,
))


def _get_instance_discovery_host(instance):
return instance if instance in WELL_KNOWN_AUTHORITY_HOSTS else WORLD_WIDE
Expand Down Expand Up @@ -118,16 +258,29 @@ def __init__(
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID

# Validate the issuer if using OIDC authority
if self._oidc_authority_url and not self.has_valid_issuer():
raise ValueError((
"The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
"When using the 'oidc_authority' parameter in ClientApplication, the authority "
"will be validated against the issuer from {auth}/.well-known/openid-configuration ."
"If using a known Entra authority (e.g. login.microsoftonline.com) the "
"'authority' parameter should be used instead of 'oidc_authority'. "
""
).format(iss=self._issuer, auth=oidc_authority_url))
# Validate the issuer and enforce same-cloud endpoints (OIDC only).
# See #5927 for the cross-cloud hardening.
if self._oidc_authority_url:
if not self.has_valid_issuer():
raise ValueError((
"The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
"When using the 'oidc_authority' parameter in ClientApplication, the authority "
"will be validated against the issuer from {auth}/.well-known/openid-configuration ."
"If using a known Entra authority (e.g. login.microsoftonline.com) the "
"'authority' parameter should be used instead of 'oidc_authority'. "
""
).format(iss=self._issuer, auth=self._oidc_authority_url))
_ensure_endpoint_same_cloud_as_authority(
self._oidc_authority_url, self.token_endpoint, "token_endpoint")
_ensure_endpoint_same_cloud_as_authority(
self._oidc_authority_url, self.authorization_endpoint,
"authorization_endpoint")
if self.device_authorization_endpoint:
_ensure_endpoint_same_cloud_as_authority(
self._oidc_authority_url,
self.device_authorization_endpoint,
"device_authorization_endpoint")

def _initialize_oidc_authority(self, oidc_authority_url):
authority, self.instance, tenant = canonicalize(oidc_authority_url)
self.is_adfs = tenant.lower() == 'adfs' # As a convention
Expand Down Expand Up @@ -201,58 +354,93 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
return {} # This can guide the caller to fall back normal ROPC flow

def has_valid_issuer(self):
"""
Returns True if the issuer from OIDC discovery is valid for this authority.

An issuer is valid if one of the following is true:
- It exactly matches the authority URL (with/without trailing slash)
- It has the same scheme and host as the authority (path can be different)
- The issuer host is a well-known Microsoft authority host
- The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com)
- For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers
"""True if the OIDC issuer is valid for this authority.

Steps below are evaluated in this order; the bracketed labels are
the historical rule names retained for cross-reference with the
MSAL.NET port (#5927). Order is security-sensitive.

Step 1 [Case 1]: Exact match.
Step 2 [Case 4]: Same scheme + netloc (paths may differ).
Step 3 [Rule 3]: CIAM tenant pattern (cross-host only). Must run
before Step 4 so a ``<x>.ciamlogin.com`` issuer cannot bypass
tenant matching via Rule 2b (CIAM resolves to Public).
Step 4 [Rule 2]: Same Microsoft cloud. 2a accepts any known-MS
issuer under a custom-domain authority (#5927 federation);
2b accepts a known-MS issuer under a known-MS authority only
when the two clouds are identical.
Step 5 [Case 3b]: Region-shaped prefix on the authority host.
Step 6 [Case 5]: B2C subdomain (excluding ``.ciamlogin.com``,
handled by Step 3).
"""
if not self._issuer or not self._oidc_authority_url:
return False

# Case 1: Exact match (most common case, normalized for trailing slashes)
# Step 1 [Case 1]: exact match (trailing slash insensitive)
if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"):
return True

issuer_parsed = urlparse(self._issuer)
authority_parsed = urlparse(self._oidc_authority_url)
issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None
authority_host = (
authority_parsed.hostname.lower() if authority_parsed.hostname else "")

if not issuer_host:
return False

# Case 2: Issuer is from a trusted Microsoft host - O(1) lookup
if issuer_host in WELL_KNOWN_AUTHORITY_HOSTS:

# Step 2 [Case 4]: same scheme + host. Runs before Step 3 so a CIAM
# authority/issuer pair on the same host (different paths) passes.
if (authority_parsed.scheme == issuer_parsed.scheme and
authority_parsed.netloc == issuer_parsed.netloc):
return True

# Case 3: Regional variant check - O(1) lookup
# e.g., westus2.login.microsoft.com -> extract "login.microsoft.com"
# Step 3 [Rule 3]: cross-host CIAM issuer. Tenant must match
# authority's first path segment (or first hostname label). Must run
# before Step 4 to block the Rule 2b CIAM bypass.
if issuer_host.endswith(_CIAM_DOMAIN_SUFFIX):
issuer_tenant = issuer_host[:-len(_CIAM_DOMAIN_SUFFIX)]
auth_path_parts = [p for p in authority_parsed.path.split("/") if p]
if auth_path_parts:
authority_tenant = auth_path_parts[0].lower()
Comment on lines +401 to +405
else:
authority_tenant = authority_host.split(".", 1)[0]
if issuer_tenant and issuer_tenant == authority_tenant:
normalized_issuer_path = issuer_parsed.path.rstrip("/").lower()
if normalized_issuer_path in (
"",
"/" + issuer_tenant,
"/" + issuer_tenant + "/v2.0"):
return True
return False # Tenant mismatch: reject.

# Step 4 [Rule 2]: known Microsoft issuer over HTTPS.
# 2a: custom-domain authority -> accept (#5927 federation).
# 2b: known-MS authority -> accept only if same cloud.
issuer_cloud = _resolve_known_cloud(issuer_host)
if issuer_cloud is not None and issuer_parsed.scheme == "https":
authority_cloud = _resolve_known_cloud(authority_host)
if authority_cloud is None:
return True # 2a
if authority_cloud == issuer_cloud:
return True # 2b
# Cross-cloud: fall through to reject.

# Step 5 [Case 3b]: region-shaped prefix on the authority host
# (e.g. issuer=us.someweb.com, authority=someweb.com).
dot_index = issuer_host.find(".")
if dot_index > 0:
prefix = issuer_host[:dot_index]
potential_base = issuer_host[dot_index + 1:]
if "." not in issuer_host[:dot_index]:
# 3a: Base host is a trusted Microsoft host
if potential_base in WELL_KNOWN_AUTHORITY_HOSTS:
return True
# 3b: Issuer has a region prefix on the authority host
# e.g. issuer=us.someweb.com, authority=someweb.com
authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else ""
if potential_base == authority_host:
return True
if (_REGION_PREFIX_PATTERN.match(prefix)
and potential_base == authority_host):
return True

# Case 4: Same scheme and host (path can differ)
if (authority_parsed.scheme == issuer_parsed.scheme and
authority_parsed.netloc == issuer_parsed.netloc):
return True

# Case 5: Check if issuer host is a subdomain of a well-known B2C host
# e.g., tenant.b2clogin.com matches .b2clogin.com
# but fakeb2clogin.com does not
if any(issuer_host.endswith("." + h) for h in WELL_KNOWN_B2C_HOSTS):
# Step 6 [Case 5]: B2C subdomain. .ciamlogin.com handled by Step 3.
if any(
issuer_host.endswith("." + h)
for h in WELL_KNOWN_B2C_HOSTS
if h != "ciamlogin.com"):
Comment on lines +440 to +443
return True

return False
Expand Down
Loading
Loading