Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions codex_switcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,36 +1166,72 @@ def restore_auth_file(auth_path: Path, backup_bytes: Optional[bytes]) -> bool:
except Exception:
return False

def find_saved_account_path(record_key: str = '', email: str = '') -> Optional[Path]:
"""按账号身份或邮箱查找已存档文件"""
def normalize_identity_value(value: str) -> str:
"""Normalize an account identity component for comparisons."""
normalized = str(value or '').strip().lower()
if normalized in ('', 'unknown', 'n/a', 'none', '?'):
return ''
return normalized

def email_plan_identity(info: dict) -> Tuple[str, str]:
"""Return the email+plan identity tuple, or empty values if incomplete."""
return (
normalize_identity_value(info.get('email', '')),
normalize_identity_value(info.get('plan_type', '')),
)

def same_account_identity(existing: dict, candidate: dict) -> bool:
"""Compare account identity using record_key first, then email+plan."""
existing_record_key = normalize_identity_value(existing.get('record_key', ''))
candidate_record_key = normalize_identity_value(candidate.get('record_key', ''))
if existing_record_key and candidate_record_key and existing_record_key == candidate_record_key:
return True

existing_email, existing_plan = email_plan_identity(existing)
candidate_email, candidate_plan = email_plan_identity(candidate)
return bool(
existing_email and existing_plan and
candidate_email and candidate_plan and
existing_email == candidate_email and
existing_plan == candidate_plan
)

def find_saved_account_path(record_key: str = '', email: str = '', plan_type: str = '') -> Optional[Path]:
"""按账号身份查找已存档文件"""
accounts_dir = get_accounts_dir()
if not accounts_dir.exists():
return None

email_lower = email.lower() if email else ''
fallback_path = None
candidate = {
'record_key': record_key,
'email': email,
'plan_type': plan_type,
}
for auth_file in sorted(accounts_dir.glob('auth_*.json')):
auth_data = load_auth_data_from_path(auth_file)
if not auth_data:
continue
info = get_account_info(auth_data, str(auth_file))
if not info:
continue
if record_key and info.get('record_key') == record_key:
if same_account_identity(info, candidate):
return auth_file
if email_lower and info.get('email', '').lower() == email_lower and fallback_path is None:
fallback_path = auth_file

return fallback_path
return None

def upsert_current_auth_archive(account_info: dict) -> Tuple[bool, Optional[Path], str]:
"""按账号身份更新或创建当前 auth 存档"""
record_key = account_info.get('record_key', '')
email = account_info.get('email', '')
existing_path = find_saved_account_path(record_key, email)
plan_type = account_info.get('plan_type', '')
existing_path = find_saved_account_path(record_key, email, plan_type)
save_name = email or 'account'
normalized_plan = normalize_identity_value(plan_type)
if normalized_plan:
save_name = f"{save_name}_{normalized_plan}"
archive_path = save_auth_file_snapshot(
get_auth_file(),
email or 'account',
save_name,
replace_path=existing_path,
allow_timestamp_suffix=existing_path is None,
)
Expand Down
37 changes: 37 additions & 0 deletions test_account_identity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest

import codex_switcher


class AccountIdentityTests(unittest.TestCase):
def test_same_email_different_plan_is_not_same_identity(self):
existing = {
"record_key": "",
"email": "user@example.com",
"plan_type": "plus",
}
candidate = {
"record_key": "",
"email": "USER@example.com",
"plan_type": "team",
}

self.assertFalse(codex_switcher.same_account_identity(existing, candidate))

def test_same_email_same_plan_is_same_identity(self):
existing = {
"record_key": "",
"email": "user@example.com",
"plan_type": "plus",
}
candidate = {
"record_key": "",
"email": "USER@example.com",
"plan_type": "PLUS",
}

self.assertTrue(codex_switcher.same_account_identity(existing, candidate))


if __name__ == "__main__":
unittest.main()