-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
239 lines (204 loc) · 7.2 KB
/
main.py
File metadata and controls
239 lines (204 loc) · 7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""Cloud Run entrypoint for Firstrade platform validation and dry-run cycles."""
from __future__ import annotations
import os
import traceback
from datetime import datetime, timezone
from flask import Flask, jsonify, request
from application.firstrade_client import (
FirstradeBrokerClient,
FirstradeCredentials,
FirstradePlatformError,
is_live_trading_enabled,
mask_account_id,
)
from application.rebalance_service import run_strategy_cycle
from application.session_check_service import run_session_check
from notifications.telegram import build_sender
from strategy_registry import get_platform_profile_status_matrix
app = Flask(__name__)
def _flag(name: str, default: str = "false") -> bool:
return (os.getenv(name, default) or "").strip().lower() == "true"
def _split_env_list(value: str | None) -> tuple[str, ...]:
return tuple(
item.strip()
for item in str(value or "").replace(";", ",").split(",")
if item.strip()
)
def _telegram_notification_targets() -> tuple[tuple[str, str], ...]:
targets: list[tuple[str, str]] = []
main_token = os.getenv("TELEGRAM_TOKEN")
main_chat_id = os.getenv("GLOBAL_TELEGRAM_CHAT_ID")
if main_token and main_chat_id:
targets.append((main_token, main_chat_id))
seen: set[tuple[str, str]] = set()
unique_targets: list[tuple[str, str]] = []
for target in targets:
if target in seen:
continue
seen.add(target)
unique_targets.append(target)
return tuple(unique_targets)
def _runtime_error_notification_message(exc: Exception) -> str:
error_text = f"{type(exc).__name__}: {exc}"
if len(error_text) > 1200:
error_text = error_text[:1197] + "..."
return "\n".join(
(
"Firstrade strategy run failed",
f"service: {os.getenv('K_SERVICE') or 'firstrade-quant-service'}",
f"revision: {os.getenv('K_REVISION') or '<unknown>'}",
f"route: {request.method} {request.path}",
f"strategy: {os.getenv('STRATEGY_PROFILE') or '<unset>'}",
f"account_scope: {os.getenv('ACCOUNT_REGION') or '<unset>'}",
f"error: {error_text}",
)
)
def _notify_runtime_error(exc: Exception) -> bool:
targets = _telegram_notification_targets()
if not targets:
print(
"Firstrade runtime error notification skipped: no Telegram target configured.",
flush=True,
)
return False
message = _runtime_error_notification_message(exc)
attempted = False
for token, chat_id in targets:
attempted = True
try:
build_sender(token, chat_id)(message)
except Exception as send_exc: # pragma: no cover - build_sender normally handles this.
print(f"Firstrade runtime error Telegram send failed: {send_exc}", flush=True)
return attempted
def _handle_strategy_run_exception(exc: Exception) -> bool:
print(f"Firstrade strategy run failed: {type(exc).__name__}: {exc}", flush=True)
traceback.print_exc()
return _notify_runtime_error(exc)
@app.get("/")
def health():
return jsonify(
{
"service": "firstrade-platform",
"api_kind": "unofficial-reverse-engineered",
"strategy_domain": "us_equity",
"live_trading_enabled": is_live_trading_enabled(),
"smoke_on_http": _flag("FIRSTRADE_RUN_SMOKE_ON_HTTP"),
"session_check_on_http": _flag("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP"),
"strategy_run_on_http": _flag("FIRSTRADE_RUN_STRATEGY_ON_HTTP"),
"as_of": datetime.now(timezone.utc).isoformat(),
}
)
@app.get("/profiles")
def profiles():
return jsonify(
{
"platform": "firstrade",
"strategy_domain": "us_equity",
"profiles": get_platform_profile_status_matrix(),
}
)
@app.get("/smoke")
def smoke():
if not _flag("FIRSTRADE_RUN_SMOKE_ON_HTTP"):
return (
jsonify(
{
"ok": False,
"error": "Set FIRSTRADE_RUN_SMOKE_ON_HTTP=true to allow HTTP-triggered quote validation.",
}
),
403,
)
try:
credentials = FirstradeCredentials.from_env()
client = FirstradeBrokerClient(
credentials,
live_trading_enabled=is_live_trading_enabled(),
).connect()
account = client.select_account(os.getenv("FIRSTRADE_ACCOUNT") or None)
symbol = os.getenv("FIRSTRADE_SMOKE_SYMBOL", "SPY")
return jsonify(
{
"ok": True,
"api_kind": "unofficial-reverse-engineered",
"selected_account": mask_account_id(account),
"quote": client.get_quote(account, symbol),
}
)
except FirstradePlatformError as exc:
return jsonify({"ok": False, "error": str(exc)}), 500
@app.post("/session-check")
@app.get("/session-check")
def session_check():
if not _flag("FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP"):
return (
jsonify(
{
"ok": False,
"error": (
"Set FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true to allow HTTP-triggered "
"read-only Firstrade session and account-state checks."
),
}
),
403,
)
try:
return jsonify(run_session_check())
except FirstradePlatformError as exc:
return jsonify({"ok": False, "error": str(exc)}), 500
except Exception as exc:
return jsonify({"ok": False, "error": f"{type(exc).__name__}: {exc}"}), 500
@app.post("/")
@app.post("/run")
@app.get("/run")
def run_strategy():
if not _flag("FIRSTRADE_RUN_STRATEGY_ON_HTTP"):
return (
jsonify(
{
"ok": False,
"error": (
"Set FIRSTRADE_RUN_STRATEGY_ON_HTTP=true to allow HTTP-triggered "
"strategy evaluation and guarded order routing."
),
}
),
403,
)
try:
return jsonify(run_strategy_cycle())
except (FirstradePlatformError, EnvironmentError, ValueError) as exc:
notification_attempted = _handle_strategy_run_exception(exc)
return (
jsonify(
{
"ok": False,
"error": str(exc),
"runtime_error_notification_attempted": notification_attempted,
}
),
500,
)
except Exception as exc:
notification_attempted = _handle_strategy_run_exception(exc)
return (
jsonify(
{
"ok": False,
"error": f"{type(exc).__name__}: {exc}",
"runtime_error_notification_attempted": notification_attempted,
}
),
500,
)
@app.post("/precheck")
@app.get("/precheck")
def precheck():
return health()
@app.post("/probe")
@app.get("/probe")
def probe():
return health()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "8080")))