diff --git a/flocks/server/routes/session.py b/flocks/server/routes/session.py index 68b15a082..074f7c123 100644 --- a/flocks/server/routes/session.py +++ b/flocks/server/routes/session.py @@ -35,6 +35,7 @@ # Default agent name constant DEFAULT_AGENT = "rex" +DEFAULT_MESSAGE_PAGE_LIMIT = 50 # File extensions that are safe to persist when materialising data-URL uploads. # Intentionally narrow: any extension outside this set is rejected to prevent @@ -136,6 +137,23 @@ class SessionResponse(BaseModel): goal: Optional[SessionGoalResponse] = Field(None, description="Persisted session goal state") +class SessionListItem(BaseModel): + """Lightweight session row for the session manager sidebar.""" + model_config = ConfigDict(populate_by_name=True, by_alias=True) + + id: str + title: str + time: SessionTime + category: str = "user" + parentID: Optional[str] = None + provider: Optional[str] = None + model: Optional[str] = None + model_pinned: bool = False + canWrite: bool = False + canDelete: bool = False + isShared: bool = False + + def _session_to_response(session: SessionModel) -> SessionResponse: """ Convert SessionModel to SessionResponse @@ -174,6 +192,29 @@ def _session_to_response(session: SessionModel) -> SessionResponse: ) +def _session_to_list_item(session: SessionModel) -> SessionListItem: + """Convert a session to the lightweight manager-list response shape.""" + current_user = get_current_auth_user() + return SessionListItem( + id=session.id, + title=session.title, + time=SessionTime( + created=session.time.created, + updated=session.time.updated, + compacting=session.time.compacting, + archived=session.time.archived, + ), + category=session.category, + parentID=session.parent_id, + provider=session.provider, + model=session.model, + model_pinned=session.model_pinned, + canWrite=SessionPolicy.can_write(session, current_user), + canDelete=SessionPolicy.can_delete(session, current_user), + isShared=SessionPolicy.is_shared(session), + ) + + async def _session_to_response_with_goal(session: SessionModel) -> SessionResponse: """Convert SessionModel to SessionResponse and attach persisted goal state.""" response = _session_to_response(session) @@ -332,57 +373,89 @@ async def get_session_status() -> Dict[str, Any]: @router.get( "", - response_model=List[SessionResponse], + response_model=List[Union[SessionResponse, SessionListItem]], summary="List sessions", description="Get a list of all sessions, sorted by most recently updated", ) async def list_sessions( request: Request, + view: Optional[Literal["list"]] = Query(None, description="Use lightweight list rows"), + manager: Optional[bool] = Query(None, description="Apply session-manager visibility filters"), directory: Optional[str] = Query(None, description="Filter by project directory"), roots: Optional[bool] = Query(None, description="Only return root sessions (no parentID)"), start: Optional[int] = Query(None, description="Filter sessions updated on or after this timestamp"), search: Optional[str] = Query(None, description="Filter by title (case-insensitive)"), limit: Optional[int] = Query(None, ge=1, description="Maximum sessions to return"), + offset: Optional[int] = Query(None, ge=0, description="Number of filtered sessions to skip"), category: Optional[str] = Query(None, description="Filter by category: user or task"), -) -> List[SessionResponse]: +) -> List[Union[SessionResponse, SessionListItem]]: """List all sessions with optional filters""" started_at = time.perf_counter() _current_user = require_user(request) + list_started_at = time.perf_counter() all_sessions = await Session.list_all() + list_elapsed_ms = (time.perf_counter() - list_started_at) * 1000 filtered = [] term = search.lower() if search else None + manager_categories = {"user", "workflow", "entity-config"} + skip_remaining = offset or 0 for session in all_sessions: if _is_hidden_from_session_manager(session): continue if directory is not None and session.directory != directory: continue - if roots and session.parent_id: + if (roots or manager) and session.parent_id: continue if start is not None and session.time.updated < start: continue if term is not None and term not in session.title.lower(): continue - if category is not None: + if manager: + if session.category not in manager_categories: + continue + elif category is not None: if session.category != category: continue elif session.category == "test": # exclude test sessions from the default listing continue + + if skip_remaining > 0: + skip_remaining -= 1 + continue filtered.append(session) if limit is not None and len(filtered) >= limit: break - + + if view == "list": + response = [_session_to_list_item(s) for s in filtered] + log_route_timing(log, "session.list.light.complete", started_at=started_at, extra={ + "total": len(all_sessions), + "count": len(response), + "roots": roots, + "manager": manager, + "limit": limit, + "offset": offset, + "search": bool(search), + "category": category, + "list_ms": round(list_elapsed_ms, 2), + }) + return response + response = [await _session_to_response_with_goal(s) for s in filtered] log_route_timing(log, "session.list.complete", started_at=started_at, extra={ "count": len(response), "roots": roots, + "manager": manager, "limit": limit, + "offset": offset, "search": bool(search), "category": category, + "list_ms": round(list_elapsed_ms, 2), }) return response @@ -514,6 +587,7 @@ async def get_session(sessionID: str, request: Request) -> SessionResponse: ) async def get_session_context_usage(sessionID: str, request: Request) -> ContextUsageSnapshot: """Return current prompt/context usage for a session.""" + started_at = time.perf_counter() current_user = require_user(request) session = await _get_session_by_id_unfiltered(sessionID) if not session: @@ -522,7 +596,14 @@ async def get_session_context_usage(sessionID: str, request: Request) -> Context detail=f"Session {sessionID} not found", ) _require_session_read_access(session, current_user) - return await build_context_usage_snapshot(sessionID, session=session) + snapshot = await build_context_usage_snapshot(sessionID, session=session) + log_route_timing(log, "session.context_usage.complete", started_at=started_at, extra={ + "sessionID": sessionID, + "source": snapshot.source, + "used_tokens": snapshot.used_tokens, + "segments": len(snapshot.segments), + }) + return snapshot @router.get( @@ -1277,6 +1358,14 @@ class MessageWithParts(BaseModel): parts: List[MessagePartInfo] = [] +class MessagePage(BaseModel): + """Paged message response used by the WebUI session page.""" + sessionID: str + items: List[MessageWithParts] = [] + hasMore: bool = False + nextBefore: Optional[str] = None + + class MessageEditRequest(BaseModel): """Request to edit message text.""" @@ -1284,9 +1373,138 @@ class MessageEditRequest(BaseModel): partID: Optional[str] = Field(None, description="Specific text part ID to edit") +def _json_bytes(value: Any) -> bytes: + return json.dumps(value, ensure_ascii=False, default=str).encode("utf-8") + + +def _part_to_response_info( + part: Any, + *, + session_id: str, + message_id: str, + index: int = 0, +) -> MessagePartInfo: + text_value = getattr(part, "text", None) if part.type in ("text", "reasoning", "thinking") else None + + url_value = getattr(part, "url", None) if part.type == "file" else None + + state_value = None + if part.type == "tool": + raw_state = getattr(part, "state", None) + if raw_state is not None: + if hasattr(raw_state, "model_dump"): + state_value = raw_state.model_dump() + elif isinstance(raw_state, dict): + state_value = raw_state + + return MessagePartInfo( + id=part.id if hasattr(part, "id") else f"{message_id}_part_{index}", + messageID=message_id, + sessionID=session_id, + type=part.type, + text=text_value, + synthetic=getattr(part, "synthetic", None), + tool=getattr(part, "tool", None) if part.type == "tool" else None, + state=state_value, + callID=getattr(part, "callID", None) if part.type == "tool" else None, + metadata=getattr(part, "metadata", None), + url=url_value, + mime=getattr(part, "mime", None) if part.type == "file" else None, + filename=getattr(part, "filename", None) if part.type == "file" else None, + ) + + +async def _message_to_response_info(msg: Any, *, cwd: str) -> MessageInfo: + if msg.role == "user": + model_dict = getattr(msg, "model", None) + if model_dict and isinstance(model_dict, dict): + model_info = model_dict + else: + try: + from flocks.agent.registry import Agent + + default_agent = await Agent.default_agent() + agent_obj = await Agent.get(default_agent) + if agent_obj and hasattr(agent_obj, "model") and agent_obj.model: + model_info = agent_obj.model + else: + model_info = {"providerID": "openai", "modelID": "gpt-4-turbo-preview"} + except Exception: + model_info = {"providerID": "openai", "modelID": "gpt-4-turbo-preview"} + + return UserMessageInfo( + id=msg.id, + sessionID=msg.sessionID, + role="user", + time=msg.time, + agent=getattr(msg, "agent", None) or DEFAULT_AGENT, + model=model_info, + compacted=getattr(msg, "compacted", None), + ) + + tokens_raw = getattr(msg, "tokens", None) + if tokens_raw is not None and hasattr(tokens_raw, "model_dump"): + tokens_dict = tokens_raw.model_dump() + elif isinstance(tokens_raw, dict): + tokens_dict = tokens_raw + else: + tokens_dict = { + "input": 0, + "output": 0, + "reasoning": 0, + "cache": {"read": 0, "write": 0}, + } + + path_raw = getattr(msg, "path", None) + if path_raw is not None and hasattr(path_raw, "model_dump"): + path_dict = path_raw.model_dump() + elif isinstance(path_raw, dict): + path_dict = path_raw + else: + path_dict = {"cwd": cwd, "root": cwd} + + return AssistantMessageInfo( + id=msg.id, + sessionID=msg.sessionID, + role="assistant", + time=msg.time, + parentID=getattr(msg, "parentID", None), + modelID=getattr(msg, "modelID", None) or "claude-sonnet-4-5-20250929", + providerID=getattr(msg, "providerID", None) or "anthropic", + mode=getattr(msg, "agent", None) or DEFAULT_AGENT, + agent=getattr(msg, "agent", None) or DEFAULT_AGENT, + path=path_dict, + cost=getattr(msg, "cost", 0.0) or 0.0, + tokens=tokens_dict, + error=getattr(msg, "error", None), + finish=getattr(msg, "finish", None), + compacted=getattr(msg, "compacted", None), + ) + + +async def _message_with_parts_to_response( + msg_with_parts: Any, + *, + session_id: str, + cwd: str, +) -> MessageWithParts: + msg = msg_with_parts.info + info = await _message_to_response_info(msg, cwd=cwd) + parts = [ + _part_to_response_info( + part, + session_id=session_id, + message_id=msg.id, + index=i, + ) + for i, part in enumerate(msg_with_parts.parts) + ] + return MessageWithParts(info=info, parts=parts) + + @router.get( "/{sessionID}/message", - response_model=List[MessageWithParts], + response_model=Union[List[MessageWithParts], MessagePage], summary="Get session messages", description="Get all messages in a session", ) @@ -1294,11 +1512,15 @@ async def get_session_messages( sessionID: str, http_request: Request, limit: Optional[int] = Query(None, ge=1, description="Maximum messages to return"), -) -> List[MessageWithParts]: + before: Optional[str] = Query(None, description="Load messages before this message id"), + page: bool = Query(False, description="Return paged response metadata"), + include_archived: bool = Query(True, description="Include compacted/archived messages"), +) -> Union[List[MessageWithParts], MessagePage]: """Get session messages""" from flocks.session.message import Message import os + started_at = time.perf_counter() current_user = require_user(http_request) session = await _get_session_by_id_unfiltered(sessionID) if not session: @@ -1312,121 +1534,60 @@ async def get_session_messages( from flocks.session.orphan_tools import abort_orphan_running_parts_in_messages from flocks.session.core.status import SessionStatus - messages_with_parts = await Message.list_with_parts(sessionID, include_archived=True) + paged = page or before is not None + if paged or limit: + page_limit = limit or DEFAULT_MESSAGE_PAGE_LIMIT + messages_with_parts, has_more, next_before = await Message.list_recent_with_parts( + sessionID, + limit=page_limit, + before=before, + include_archived=include_archived, + ) + else: + messages_with_parts = await Message.list_with_parts( + sessionID, + include_archived=include_archived, + ) + has_more = False + next_before = None + if sessionID not in SessionStatus.get_busy_session_ids(): await abort_orphan_running_parts_in_messages(sessionID, messages_with_parts) - if limit: - messages_with_parts = messages_with_parts[-limit:] - - result = [] + cwd = os.getcwd() - - for msg_with_parts in messages_with_parts: - msg = msg_with_parts.info - - # Create appropriate message info based on role - if msg.role == "user": - # Extract model from msg.model dict (UserMessageInfo has model as dict) - model_dict = getattr(msg, 'model', None) - if model_dict and isinstance(model_dict, dict): - model_info = model_dict - else: - # Fallback: try to get from Agent.default_agent's model - try: - from flocks.agent.registry import Agent - default_agent = await Agent.default_agent() - agent_obj = await Agent.get(default_agent) - if agent_obj and hasattr(agent_obj, 'model') and agent_obj.model: - model_info = agent_obj.model - else: - model_info = {"providerID": "openai", "modelID": "gpt-4-turbo-preview"} - except Exception: - model_info = {"providerID": "openai", "modelID": "gpt-4-turbo-preview"} - - info = UserMessageInfo( - id=msg.id, - sessionID=msg.sessionID, - role="user", - time=msg.time, - agent=getattr(msg, 'agent', None) or DEFAULT_AGENT, - model=model_info, - compacted=getattr(msg, 'compacted', None), - ) - else: - # Convert tokens to dict if it's a TokenUsage object - tokens_raw = getattr(msg, 'tokens', None) - if tokens_raw is not None and hasattr(tokens_raw, 'model_dump'): - tokens_dict = tokens_raw.model_dump() - elif isinstance(tokens_raw, dict): - tokens_dict = tokens_raw - else: - tokens_dict = { - "input": 0, - "output": 0, - "reasoning": 0, - "cache": {"read": 0, "write": 0} - } - - # Convert path to dict if it's a MessagePath object - path_raw = getattr(msg, 'path', None) - if path_raw is not None and hasattr(path_raw, 'model_dump'): - path_dict = path_raw.model_dump() - elif isinstance(path_raw, dict): - path_dict = path_raw - else: - path_dict = {"cwd": cwd, "root": cwd} - - info = AssistantMessageInfo( - id=msg.id, - sessionID=msg.sessionID, - role="assistant", - time=msg.time, - parentID=getattr(msg, 'parentID', None), - modelID=getattr(msg, 'modelID', None) or "claude-sonnet-4-5-20250929", - providerID=getattr(msg, 'providerID', None) or "anthropic", - mode=getattr(msg, 'agent', None) or DEFAULT_AGENT, - agent=getattr(msg, 'agent', None) or DEFAULT_AGENT, - path=path_dict, - cost=getattr(msg, 'cost', 0.0) or 0.0, - tokens=tokens_dict, - error=getattr(msg, 'error', None), - finish=getattr(msg, 'finish', None), - compacted=getattr(msg, 'compacted', None), - ) - - parts = [] - for i, part in enumerate(msg_with_parts.parts): - # Convert state to dict if it's a Pydantic model - state_value = None - if part.type == "tool": - raw_state = getattr(part, 'state', None) - if raw_state is not None: - if hasattr(raw_state, 'model_dump'): - state_value = raw_state.model_dump() - elif isinstance(raw_state, dict): - state_value = raw_state - - part_info = MessagePartInfo( - id=part.id if hasattr(part, 'id') else f"{msg.id}_part_{i}", - messageID=msg.id, - sessionID=sessionID, - type=part.type, - text=getattr(part, 'text', None) if part.type in ("text", "reasoning") else None, - synthetic=getattr(part, 'synthetic', None), - tool=getattr(part, 'tool', None) if part.type == "tool" else None, - state=state_value, - callID=getattr(part, 'callID', None) if part.type == "tool" else None, - metadata=getattr(part, 'metadata', None), - url=getattr(part, 'url', None) if part.type == "file" else None, - mime=getattr(part, 'mime', None) if part.type == "file" else None, - filename=getattr(part, 'filename', None) if part.type == "file" else None, - ) - parts.append(part_info) - result.append(MessageWithParts(info=info, parts=parts)) - + result = [ + await _message_with_parts_to_response( + msg_with_parts, + session_id=sessionID, + cwd=cwd, + ) + for msg_with_parts in messages_with_parts + ] + + payload_bytes = len(_json_bytes([item.model_dump() for item in result])) + log_route_timing(log, "session.messages.page.complete", started_at=started_at, extra={ + "sessionID": sessionID, + "count": len(result), + "parts": sum(len(message.parts) for message in result), + "paged": paged, + "limit": limit, + "before": bool(before), + "has_more": has_more, + "payload_bytes": payload_bytes, + }) + + if paged: + return MessagePage( + sessionID=sessionID, + items=result, + hasMore=has_more, + nextBefore=next_before, + ) return result except Exception as e: log.error("session.messages.error", {"error": str(e), "sessionID": sessionID}) + if page or before is not None: + return MessagePage(sessionID=sessionID, items=[], hasMore=False, nextBefore=None) return [] @@ -1450,82 +1611,13 @@ async def get_message(sessionID: str, messageID: str, http_request: Request) -> ) _require_session_read_access(session, current_user) - msg_with_parts = await Message.get_with_parts(sessionID, messageID) + msg_with_parts = await Message.get_with_parts_lazy(sessionID, messageID) if msg_with_parts: - msg = msg_with_parts.info - cwd = os.getcwd() - - # Create appropriate message info based on role - if msg.role == "user": - info = UserMessageInfo( - id=msg.id, - sessionID=msg.sessionID, - role="user", - time=msg.time, - agent=getattr(msg, 'agent', None) or DEFAULT_AGENT, - model={ - "providerID": getattr(msg, 'providerID', None) or "anthropic", - "modelID": getattr(msg, 'modelID', None) or "claude-sonnet-4-5-20250929", - }, - ) - else: - # Convert tokens to dict if it's a TokenUsage object - tokens_raw = getattr(msg, 'tokens', None) - if tokens_raw is not None and hasattr(tokens_raw, 'model_dump'): - tokens_dict = tokens_raw.model_dump() - elif isinstance(tokens_raw, dict): - tokens_dict = tokens_raw - else: - tokens_dict = { - "input": 0, - "output": 0, - "reasoning": 0, - "cache": {"read": 0, "write": 0} - } - - # Convert path to dict if it's a MessagePath object - path_raw = getattr(msg, 'path', None) - if path_raw is not None and hasattr(path_raw, 'model_dump'): - path_dict = path_raw.model_dump() - elif isinstance(path_raw, dict): - path_dict = path_raw - else: - path_dict = {"cwd": cwd, "root": cwd} - - info = AssistantMessageInfo( - id=msg.id, - sessionID=msg.sessionID, - role="assistant", - time=msg.time, - parentID=getattr(msg, 'parentID', None), - modelID=getattr(msg, 'modelID', None) or "claude-sonnet-4-5-20250929", - providerID=getattr(msg, 'providerID', None) or "anthropic", - mode=getattr(msg, 'agent', None) or DEFAULT_AGENT, - agent=getattr(msg, 'agent', None) or DEFAULT_AGENT, - path=path_dict, - cost=getattr(msg, 'cost', 0.0) or 0.0, - tokens=tokens_dict, - ) - - parts = [] - for i, part in enumerate(msg_with_parts.parts): - part_info = MessagePartInfo( - id=part.id if hasattr(part, 'id') else f"{msg.id}_part_{i}", - messageID=msg.id, - sessionID=sessionID, - type=part.type, - text=getattr(part, 'text', None) if part.type in ("text", "reasoning") else None, - synthetic=getattr(part, 'synthetic', None), - tool=getattr(part, 'tool', None) if part.type == "tool" else None, - state=getattr(part, 'state', None) if part.type == "tool" else None, - callID=getattr(part, 'callID', None) if part.type == "tool" else None, - metadata=getattr(part, 'metadata', None), - url=getattr(part, 'url', None) if part.type == "file" else None, - mime=getattr(part, 'mime', None) if part.type == "file" else None, - filename=getattr(part, 'filename', None) if part.type == "file" else None, - ) - parts.append(part_info) - return MessageWithParts(info=info, parts=parts) + return await _message_with_parts_to_response( + msg_with_parts, + session_id=sessionID, + cwd=os.getcwd(), + ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, diff --git a/flocks/session/message.py b/flocks/session/message.py index 9bf05da41..075c19248 100644 --- a/flocks/session/message.py +++ b/flocks/session/message.py @@ -439,6 +439,7 @@ class Message: _parts_storage_format: Dict[str, Literal["legacy", "per_message"]] = {} _parts_persisted_mids: Dict[str, set[str]] = {} _parts_flush_tasks: Dict[str, asyncio.Task] = {} + _parts_fully_loaded: set[str] = set() _lru: OrderedDict[str, bool] = OrderedDict() # LRU tracker: move_to_end() is O(1) # Maximum number of sessions to keep in cache before evicting oldest @@ -565,21 +566,49 @@ async def _flush_later() -> None: @classmethod async def _ensure_cache(cls, session_id: str) -> None: - """Ensure messages for a session are loaded into cache from storage. + """Ensure messages and all parts for a session are loaded into cache. Uses a per-session lock so operations on different sessions are fully concurrent. """ + await cls._ensure_message_cache(session_id) + if session_id in cls._parts_fully_loaded: + cls._lru.move_to_end(session_id) + return + + lock = _session_locks.get(session_id) + async with lock: + if session_id in cls._parts_fully_loaded: + cls._lru.move_to_end(session_id) + return + + message_times = { + message.id: message.time + for message in cls._messages_cache.get(session_id, []) + } + await cls._load_all_parts_locked(session_id, message_times=message_times) + cls._parts_fully_loaded.add(session_id) + cls._lru.move_to_end(session_id) + log.debug("message.cache.loaded", {"session_id": session_id, "parts": "all"}) + + @classmethod + async def _ensure_message_cache(cls, session_id: str) -> None: + """Ensure message metadata is cached without loading every part. + + The session page only needs recent messages initially. Loading all + parts here would deserialize large tool outputs even when only the + newest page is requested. + """ if session_id in cls._lru: cls._lru.move_to_end(session_id) return - + lock = _session_locks.get(session_id) async with lock: if session_id in cls._lru: cls._lru.move_to_end(session_id) return - + while len(cls._lru) >= cls._MAX_CACHED_SESSIONS: evict_id, _ = cls._lru.popitem(last=False) cls._cancel_parts_flush_task(evict_id) @@ -590,13 +619,13 @@ async def _ensure_cache(cls, session_id: str) -> None: cls._parts_serialized_cache.pop(evict_id, None) cls._parts_storage_format.pop(evict_id, None) cls._parts_persisted_mids.pop(evict_id, None) + cls._parts_fully_loaded.discard(evict_id) _session_locks.discard(evict_id) log.debug("message.cache.evicted", {"session_id": evict_id}) - + storage_key = f"{cls._MESSAGE_PREFIX}:{session_id}" stored_data = await Storage.get(storage_key) - - message_times: Dict[str, Dict[str, int]] = {} + if isinstance(stored_data, list): messages = [] for msg_data in stored_data: @@ -624,61 +653,113 @@ async def _ensure_cache(cls, session_id: str) -> None: continue messages.append(message) - message_times[message.id] = message.time cls._messages_cache[session_id] = messages else: cls._messages_cache[session_id] = [] - - item_entries = await Storage.list_entries(prefix=cls._parts_item_prefix(session_id)) - stored_parts: Dict[str, List[Dict[str, Any]]] = {} - persisted_mids: set[str] = set() - - for key, value in item_entries: - if not cls._is_parts_item_key(key, session_id): - continue - if not isinstance(value, list): - continue - msg_id = key.rsplit(":", 1)[1] - stored_parts[msg_id] = value - persisted_mids.add(msg_id) - - if stored_parts: - cls._parts_storage_format[session_id] = "per_message" - else: - legacy_parts = await Storage.get(cls._parts_blob_key(session_id)) - if isinstance(legacy_parts, dict): - stored_parts = legacy_parts - persisted_mids = set(legacy_parts.keys()) - cls._parts_storage_format[session_id] = "legacy" - else: - cls._parts_storage_format[session_id] = "per_message" - - if stored_parts: - cls._parts_cache[session_id] = {} - cls._parts_revision_cache[session_id] = {} - for msg_id, parts_data in stored_parts.items(): - if not isinstance(parts_data, list): - continue - cls._parts_cache[session_id][msg_id] = cls._deserialize_parts_list( - session_id, - msg_id, - parts_data, - message_time=message_times.get(msg_id), - ) - cls._parts_revision_cache[session_id][msg_id] = 0 - cls._parts_serialized_cache[session_id] = { - msg_id: list(parts_data) - for msg_id, parts_data in stored_parts.items() - } - else: - cls._parts_cache[session_id] = {} - cls._parts_revision_cache[session_id] = {} - cls._parts_serialized_cache[session_id] = {} - cls._parts_persisted_mids[session_id] = persisted_mids - + + cls._parts_cache.setdefault(session_id, {}) + cls._parts_revision_cache.setdefault(session_id, {}) + cls._parts_serialized_cache.setdefault(session_id, {}) + cls._parts_storage_format.setdefault(session_id, "per_message") + cls._parts_persisted_mids.setdefault(session_id, set()) + cls._parts_fully_loaded.discard(session_id) cls._rebuild_id_index(session_id) cls._lru[session_id] = True - log.debug("message.cache.loaded", {"session_id": session_id}) + log.debug("message.cache.loaded", {"session_id": session_id, "parts": "lazy"}) + + @classmethod + async def _load_all_parts_locked( + cls, + session_id: str, + *, + message_times: Dict[str, Dict[str, int]], + ) -> None: + """Load all stored parts for a session. Caller must hold session lock.""" + item_entries = await Storage.list_entries(prefix=cls._parts_item_prefix(session_id)) + stored_parts: Dict[str, List[Dict[str, Any]]] = {} + persisted_mids: set[str] = set() + + for key, value in item_entries: + if not cls._is_parts_item_key(key, session_id): + continue + if not isinstance(value, list): + continue + msg_id = key.rsplit(":", 1)[1] + stored_parts[msg_id] = value + persisted_mids.add(msg_id) + + if stored_parts: + cls._parts_storage_format[session_id] = "per_message" + else: + legacy_parts = await Storage.get(cls._parts_blob_key(session_id)) + if isinstance(legacy_parts, dict): + stored_parts = legacy_parts + persisted_mids = set(legacy_parts.keys()) + cls._parts_storage_format[session_id] = "legacy" + else: + cls._parts_storage_format[session_id] = "per_message" + + cls._parts_cache[session_id] = {} + cls._parts_revision_cache[session_id] = {} + cls._parts_serialized_cache[session_id] = {} + for msg_id, parts_data in stored_parts.items(): + if not isinstance(parts_data, list): + continue + cls._parts_cache[session_id][msg_id] = cls._deserialize_parts_list( + session_id, + msg_id, + parts_data, + message_time=message_times.get(msg_id), + ) + cls._parts_revision_cache[session_id][msg_id] = 0 + cls._parts_serialized_cache[session_id][msg_id] = list(parts_data) + cls._parts_persisted_mids[session_id] = persisted_mids + + @classmethod + async def _load_parts_for_message( + cls, + session_id: str, + message_id: str, + *, + legacy_parts_cache: Optional[Dict[str, List[Dict[str, Any]]]] = None, + ) -> List[PartType]: + """Load one message's parts without hydrating every part in the session.""" + await cls._ensure_message_cache(session_id) + cached = cls._parts_cache.setdefault(session_id, {}).get(message_id) + if cached is not None: + return list(cached) + + message_times = { + message.id: message.time + for message in cls._messages_cache.get(session_id, []) + } + parts_data = await Storage.get(cls._parts_item_key(session_id, message_id)) + if not isinstance(parts_data, list): + legacy_parts = legacy_parts_cache + if legacy_parts is None: + stored_legacy_parts = await Storage.get(cls._parts_blob_key(session_id)) + legacy_parts = stored_legacy_parts if isinstance(stored_legacy_parts, dict) else None + if isinstance(legacy_parts, dict): + cls._parts_storage_format[session_id] = "legacy" + parts_data = legacy_parts.get(message_id, []) + cls._parts_persisted_mids.setdefault(session_id, set()).update(legacy_parts.keys()) + else: + cls._parts_storage_format.setdefault(session_id, "per_message") + parts_data = [] + else: + cls._parts_storage_format[session_id] = "per_message" + cls._parts_persisted_mids.setdefault(session_id, set()).add(message_id) + + parts = cls._deserialize_parts_list( + session_id, + message_id, + parts_data, + message_time=message_times.get(message_id), + ) + cls._parts_cache.setdefault(session_id, {})[message_id] = parts + cls._parts_revision_cache.setdefault(session_id, {})[message_id] = 0 + cls._parts_serialized_cache.setdefault(session_id, {})[message_id] = list(parts_data) + return list(parts) @classmethod def _rebuild_id_index(cls, session_id: str) -> None: @@ -1295,7 +1376,7 @@ async def list(cls, session_id: str, include_archived: bool = False) -> List[Mes Returns: List of messages """ - await cls._ensure_cache(session_id) + await cls._ensure_message_cache(session_id) messages = cls._messages_cache.get(session_id, []) if not include_archived: messages = [m for m in messages if not getattr(m, 'compacted', None)] @@ -1304,7 +1385,7 @@ async def list(cls, session_id: str, include_archived: bool = False) -> List[Mes @classmethod async def get(cls, session_id: str, message_id: str) -> Optional[MessageInfo]: """Get a specific message by ID (O(1) via index).""" - await cls._ensure_cache(session_id) + await cls._ensure_message_cache(session_id) idx = cls._msg_id_index.get(session_id, {}).get(message_id) if idx is not None: messages = cls._messages_cache.get(session_id, []) @@ -1594,6 +1675,72 @@ async def list_with_parts(cls, session_id: str, include_archived: bool = False) parts = cls._parts_cache.get(session_id, {}).get(message.id, []) result.append(MessageWithParts(info=message, parts=parts)) return result + + @classmethod + async def list_recent_with_parts( + cls, + session_id: str, + *, + limit: int = 50, + before: Optional[str] = None, + include_archived: bool = True, + ) -> tuple[List[MessageWithParts], bool, Optional[str]]: + """List one recent page of messages and lazily load only their parts. + + ``before`` is a message id cursor. The returned page preserves + chronological order and ``next_before`` is the first message id in the + page, suitable for loading older messages. + """ + safe_limit = max(int(limit), 1) + messages = await cls.list(session_id, include_archived=include_archived) + if before: + before_index = next( + (idx for idx, message in enumerate(messages) if message.id == before), + len(messages), + ) + candidate_messages = messages[:before_index] + else: + candidate_messages = messages + + has_more = len(candidate_messages) > safe_limit + page_messages = candidate_messages[-safe_limit:] + legacy_parts_cache: Optional[Dict[str, List[Dict[str, Any]]]] = None + missing_message_ids = [ + message.id + for message in page_messages + if message.id not in cls._parts_cache.setdefault(session_id, {}) + ] + if missing_message_ids: + has_per_message_parts = False + for message_id in missing_message_ids: + parts_data = await Storage.get(cls._parts_item_key(session_id, message_id)) + if isinstance(parts_data, list): + has_per_message_parts = True + break + if not has_per_message_parts: + stored_legacy_parts = await Storage.get(cls._parts_blob_key(session_id)) + if isinstance(stored_legacy_parts, dict): + legacy_parts_cache = stored_legacy_parts + + result = [] + for message in page_messages: + parts = await cls._load_parts_for_message( + session_id, + message.id, + legacy_parts_cache=legacy_parts_cache, + ) + result.append(MessageWithParts(info=message, parts=parts)) + next_before = page_messages[0].id if has_more and page_messages else None + return result, has_more, next_before + + @classmethod + async def get_with_parts_lazy(cls, session_id: str, message_id: str) -> Optional[MessageWithParts]: + """Get one message with parts without hydrating all session parts.""" + message = await cls.get(session_id, message_id) + if not message: + return None + parts = await cls._load_parts_for_message(session_id, message_id) + return MessageWithParts(info=message, parts=parts) @classmethod async def delete(cls, session_id: str, message_id: str) -> bool: @@ -1685,6 +1832,7 @@ async def clear(cls, session_id: str) -> int: cls._parts_serialized_cache[session_id] = {} cls._parts_persisted_mids[session_id] = set() cls._parts_storage_format[session_id] = "per_message" + cls._parts_fully_loaded.add(session_id) cls._cancel_parts_flush_task(session_id) # Persist changes @@ -1717,6 +1865,7 @@ def invalidate_cache(cls, session_id: Optional[str] = None) -> None: cls._parts_serialized_cache.pop(session_id, None) cls._parts_storage_format.pop(session_id, None) cls._parts_persisted_mids.pop(session_id, None) + cls._parts_fully_loaded.discard(session_id) _session_locks.discard(session_id) else: for sid in list(cls._parts_flush_tasks): @@ -1729,6 +1878,7 @@ def invalidate_cache(cls, session_id: Optional[str] = None) -> None: cls._parts_serialized_cache.clear() cls._parts_storage_format.clear() cls._parts_persisted_mids.clear() + cls._parts_fully_loaded.clear() _session_locks.clear() log.debug("message.cache.invalidated", {"session_id": session_id}) @@ -2270,5 +2420,3 @@ def update_part(cls, session_id: str, message_id: str, part_id: str, **updates) def remove_part(cls, session_id: str, message_id: str, part_id: str) -> bool: """Sync version""" return cls._run_async(Message.remove_part(session_id, message_id, part_id)) - - diff --git a/flocks/session/streaming/stream_processor.py b/flocks/session/streaming/stream_processor.py index e6047ef2b..1835e0ff8 100644 --- a/flocks/session/streaming/stream_processor.py +++ b/flocks/session/streaming/stream_processor.py @@ -71,6 +71,16 @@ def _resolve_tool_error(result: ToolResult) -> str: return "Unknown error" +def _invalid_tool_call_parse_error(tool_input: Dict[str, Any]) -> tuple[str, str]: + """Return original tool name and user-facing parse error for invalid tool events.""" + original_tool = str(tool_input.get("tool") or "unknown").strip() or "unknown" + raw_error = str(tool_input.get("error") or "Failed to parse tool arguments.").strip() + prefix = f"Failed to parse tool arguments for {original_tool}" + if raw_error.startswith(prefix): + return original_tool, raw_error + return original_tool, f"{prefix}: {raw_error}" + + @dataclass class ToolCallState: """State for tracking tool calls""" @@ -508,6 +518,56 @@ async def _handle_tool_call(self, event: ToolCallEvent) -> None: tool_state = self.tool_calls[tool_call_id] tool_state.input = tool_input + + if tool_name == "invalid": + original_tool, parse_error = _invalid_tool_call_parse_error(tool_input) + tool_state.name = original_tool + tool_state.status = "error" + tool_state.error = parse_error + + tool_error_time = int(datetime.now().timestamp() * 1000) + error_state = ToolStateError( + status="error", + input=tool_input, + error=parse_error, + time={"start": tool_error_time, "end": tool_error_time}, + ) + error_part = ToolPart( + id=tool_state.part_id, + sessionID=self.session_id, + messageID=self.assistant_message.id, + type="tool", + callID=tool_call_id, + tool=original_tool, + state=error_state, + ) + await Message.store_part(self.session_id, self.assistant_message.id, error_part) + + if self.event_publish_callback: + await self.event_publish_callback("message.part.updated", { + "part": { + "id": tool_state.part_id, + "messageID": self.assistant_message.id, + "sessionID": self.session_id, + "type": "tool", + "callID": tool_call_id, + "tool": original_tool, + "state": { + "status": "error", + "input": tool_input, + "error": parse_error, + "time": {"start": tool_error_time, "end": tool_error_time}, + }, + }, + }) + + log.warn("stream.tool_call.invalid_arguments", { + "tool_call_id": tool_call_id, + "tool_name": original_tool, + "error": parse_error, + }) + return + tool_state.status = "running" # Update ToolPart to running state (like Flocks's Session.updatePart) diff --git a/flocks/tool/agent/delegate_task.py b/flocks/tool/agent/delegate_task.py index 9cffcdd16..176a8f8ec 100644 --- a/flocks/tool/agent/delegate_task.py +++ b/flocks/tool/agent/delegate_task.py @@ -37,12 +37,6 @@ async def _subagent_session_permissions(agent_name: str) -> list: from flocks.agent.registry import Agent from flocks.session.session import PermissionRule as SessionPermissionRule - def deny_nested_delegation() -> list: - return [ - SessionPermissionRule(permission="delegate_task", action="deny", pattern="*"), - SessionPermissionRule(permission="task", action="deny", pattern="*"), - ] - try: agent = await Agent.get(agent_name) except Exception as exc: @@ -67,7 +61,6 @@ def deny_nested_delegation() -> list: pattern=getattr(rule, "pattern", None) or "*", ) ) - rules.extend(deny_nested_delegation()) return rules if agent_name == "prometheus": @@ -78,7 +71,6 @@ def deny_nested_delegation() -> list: ]) elif not rules: rules.append(SessionPermissionRule(permission="question", action="deny", pattern="*")) - rules.extend(deny_nested_delegation()) return rules diff --git a/install.ps1 b/install.ps1 index 3da8af933..adc82d82b 100644 --- a/install.ps1 +++ b/install.ps1 @@ -157,6 +157,22 @@ function Unblock-InstallFiles { } } +function Remove-ExistingInstallDirectory { + param([string]$TargetDir) + + if ([string]::IsNullOrWhiteSpace($TargetDir) -or -not (Test-Path -LiteralPath $TargetDir)) { + return + } + + Write-Info "Removing previous Flocks installation: $TargetDir" + try { + Remove-Item -LiteralPath $TargetDir -Recurse -Force -ErrorAction Stop + } + catch { + Fail "Failed to remove previous Flocks installation directory '$TargetDir'. Close any running Flocks process and retry. Error: $($_.Exception.Message)" + } +} + function Invoke-WorkspaceInstaller { param( [string]$InstallerPath, @@ -252,9 +268,7 @@ function Main { if ((-not [string]::IsNullOrWhiteSpace($installParent)) -and -not (Test-Path -LiteralPath $installParent)) { New-Item -ItemType Directory -Path $installParent -Force | Out-Null } - if (Test-Path $InstallDir) { - Remove-Item -Path $InstallDir -Recurse -Force - } + Remove-ExistingInstallDirectory -TargetDir $InstallDir Copy-Item -Path $projectRoot -Destination $InstallDir -Recurse -Force Unblock-InstallFiles -TargetDir $InstallDir diff --git a/install_zh.ps1 b/install_zh.ps1 index d1748b905..fccf8c07d 100644 --- a/install_zh.ps1 +++ b/install_zh.ps1 @@ -163,6 +163,22 @@ function Unblock-InstallFiles { } } +function Remove-ExistingInstallDirectory { + param([string]$TargetDir) + + if ([string]::IsNullOrWhiteSpace($TargetDir) -or -not (Test-Path -LiteralPath $TargetDir)) { + return + } + + Write-Info "正在删除历史 Flocks 安装目录: $TargetDir" + try { + Remove-Item -LiteralPath $TargetDir -Recurse -Force -ErrorAction Stop + } + catch { + Fail "无法删除历史 Flocks 安装目录 '$TargetDir'。请关闭正在运行的 Flocks 进程后重试。错误: $($_.Exception.Message)" + } +} + function Invoke-WorkspaceInstaller { param( [string]$InstallerPath, @@ -291,9 +307,7 @@ function Main { if ((-not [string]::IsNullOrWhiteSpace($installParent)) -and -not (Test-Path -LiteralPath $installParent)) { New-Item -ItemType Directory -Path $installParent -Force | Out-Null } - if (Test-Path $InstallDir) { - Remove-Item -Path $InstallDir -Recurse -Force - } + Remove-ExistingInstallDirectory -TargetDir $InstallDir Copy-Item -Path $projectRoot -Destination $InstallDir -Recurse -Force Unblock-InstallFiles -TargetDir $InstallDir diff --git a/packaging/windows/flocks-setup.iss b/packaging/windows/flocks-setup.iss index aa1643c68..40bddc0f0 100644 --- a/packaging/windows/flocks-setup.iss +++ b/packaging/windows/flocks-setup.iss @@ -76,6 +76,31 @@ Type: filesandordirs; Name: "{app}\*" Type: dirifempty; Name: "{app}" [Code] +procedure RemoveExistingFlocksRepoDir; +var + ExistingRepoDir: string; +begin + ExistingRepoDir := ExpandConstant('{app}\flocks'); + if not DirExists(ExistingRepoDir) then + exit; + + WizardForm.StatusLabel.Caption := 'Removing previous Flocks installation...'; + if not DelTree(ExistingRepoDir, True, True, True) then + begin + RaiseException( + 'Failed to remove previous Flocks installation directory:' + #13#10 + + ExistingRepoDir + #13#10 + #13#10 + + 'Please close any running Flocks process and retry.' + ); + end; +end; + +procedure CurStepChanged(CurStep: TSetupStep); +begin + if CurStep = ssInstall then + RemoveExistingFlocksRepoDir; +end; + function IsUnderBaseDir(const CandidateDir, BaseDir: string): Boolean; var NormalizedCandidate: string; diff --git a/pyproject.toml b/pyproject.toml index 821861848..303f222c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "flocks" -version = "v2026.6.17" +version = "v2026.6.18" description = "AI-Native SecOps platform with multi-agent collaboration" authors = [ {name = "Flocks Team", email = "team@example.com"} diff --git a/tests/scripts/test_install_script_sources.py b/tests/scripts/test_install_script_sources.py index ad979c175..18bb7f9b5 100644 --- a/tests/scripts/test_install_script_sources.py +++ b/tests/scripts/test_install_script_sources.py @@ -238,6 +238,20 @@ def test_windows_bootstrap_installers_only_create_missing_parent_directories() - assert "Test-Path -LiteralPath $installParent" in script +def test_windows_bootstrap_installers_remove_existing_install_directory_before_copy() -> None: + for path in ( + REPO_ROOT / "install.ps1", + REPO_ROOT / "install_zh.ps1", + ): + script = path.read_text(encoding="utf-8-sig") + assert "function Remove-ExistingInstallDirectory" in script + assert "Remove-Item -LiteralPath $TargetDir -Recurse -Force -ErrorAction Stop" in script + assert "Remove-ExistingInstallDirectory -TargetDir $InstallDir" in script + assert script.index("Remove-ExistingInstallDirectory -TargetDir $InstallDir") < script.index( + "Copy-Item -Path $projectRoot -Destination $InstallDir -Recurse -Force" + ) + + def test_main_bash_installer_falls_back_to_nvm_when_brew_is_missing_on_macos() -> None: script = (SCRIPT_DIR / "install.sh").read_text(encoding="utf-8") script_without_main = re.sub(r'\nmain "\$@"\s*$', "\n", script) diff --git a/tests/server/routes/test_session_routes.py b/tests/server/routes/test_session_routes.py index c7977be9c..42023abd6 100644 --- a/tests/server/routes/test_session_routes.py +++ b/tests/server/routes/test_session_routes.py @@ -121,6 +121,60 @@ async def test_list_sessions_roots_excludes_children(self, client: AsyncClient): assert parent_id in ids assert child_id not in ids + @pytest.mark.asyncio + async def test_list_sessions_light_manager_filters_and_omits_heavy_fields(self, client: AsyncClient): + """Lightweight manager list returns only sidebar metadata.""" + from flocks.session.goal import GoalManager + + user_resp = await client.post("/api/session", json={"title": "User"}) + user_id = user_resp.json()["id"] + workflow_resp = await client.post( + "/api/session", + json={"title": "Workflow", "category": "workflow"}, + ) + workflow_id = workflow_resp.json()["id"] + task_resp = await client.post( + "/api/session", + json={"title": "Task", "category": "task"}, + ) + task_id = task_resp.json()["id"] + child_resp = await client.post( + "/api/session", + json={"title": "Child", "parentID": user_id}, + ) + child_id = child_resp.json()["id"] + await GoalManager.set_goal(user_id, "Do not hydrate in list mode") + + resp = await client.get( + "/api/session", + params={"view": "list", "manager": "true", "roots": "true", "limit": "100"}, + ) + + assert resp.status_code == status.HTTP_200_OK + data = resp.json() + ids = {item["id"] for item in data} + assert user_id in ids + assert workflow_id in ids + assert task_id not in ids + assert child_id not in ids + + row = next(item for item in data if item["id"] == user_id) + assert set(row) == { + "id", + "title", + "time", + "category", + "parentID", + "provider", + "model", + "model_pinned", + "canWrite", + "canDelete", + "isShared", + } + assert "goal" not in row + assert "summary" not in row + @pytest.mark.asyncio async def test_get_session(self, client: AsyncClient, session_id: str): """GET /api/session/{id} returns the specific session.""" @@ -289,6 +343,41 @@ async def test_list_messages_uses_preloaded_orphan_recovery_path( preloaded_recovery.assert_awaited_once() legacy_recovery.assert_not_called() + @pytest.mark.asyncio + async def test_list_messages_page_uses_lazy_recent_path( + self, + client: AsyncClient, + session_id: str, + monkeypatch: pytest.MonkeyPatch, + ): + old_msg = await Message.create(session_id, MessageRole.USER, "old") + mid_msg = await Message.create(session_id, MessageRole.USER, "middle") + new_msg = await Message.create(session_id, MessageRole.ASSISTANT, "new") + + async def _fail_full_list(*args, **kwargs): + raise AssertionError("full list_with_parts should not be used for paged reads") + + monkeypatch.setattr(Message, "list_with_parts", _fail_full_list) + + first_resp = await client.get( + f"/api/session/{session_id}/message", + params={"page": "true", "limit": "2"}, + ) + + assert first_resp.status_code == status.HTTP_200_OK + first_page = first_resp.json() + assert [item["info"]["id"] for item in first_page["items"]] == [mid_msg.id, new_msg.id] + assert first_page["hasMore"] is True + assert first_page["nextBefore"] == mid_msg.id + + older_resp = await client.get( + f"/api/session/{session_id}/message", + params={"page": "true", "limit": "2", "before": first_page["nextBefore"]}, + ) + assert older_resp.status_code == status.HTTP_200_OK + older_page = older_resp.json() + assert [item["info"]["id"] for item in older_page["items"]] == [old_msg.id] + assert older_page["hasMore"] is False # =========================================================================== # Delete permissions (single-admin model) diff --git a/tests/session/test_message_parts_persistence.py b/tests/session/test_message_parts_persistence.py index a7229b285..fef4d425f 100644 --- a/tests/session/test_message_parts_persistence.py +++ b/tests/session/test_message_parts_persistence.py @@ -86,6 +86,38 @@ async def test_legacy_blob_reads_without_migration() -> None: assert await Storage.list_keys(prefix=f"message_parts:{session_id}:") == [] +@pytest.mark.asyncio +async def test_recent_legacy_page_reads_legacy_blob_once(monkeypatch: pytest.MonkeyPatch) -> None: + session_id = "ses_parts_legacy_recent_page" + await _write_legacy_session( + session_id, + { + "msg_a": "legacy a", + "msg_b": "legacy b", + "msg_c": "legacy c", + }, + ) + + original_get = Storage.get + legacy_blob_reads = 0 + + async def counting_get(key: str): + nonlocal legacy_blob_reads + if key == f"message_parts:{session_id}": + legacy_blob_reads += 1 + return await original_get(key) + + monkeypatch.setattr(Storage, "get", counting_get) + + messages, has_more, next_before = await Message.list_recent_with_parts(session_id, limit=3) + + assert [message.info.id for message in messages] == ["msg_a", "msg_b", "msg_c"] + assert [message.parts[0].text for message in messages] == ["legacy a", "legacy b", "legacy c"] + assert has_more is False + assert next_before is None + assert legacy_blob_reads == 1 + + @pytest.mark.asyncio async def test_legacy_session_updates_continue_writing_legacy_blob() -> None: session_id = "ses_parts_legacy_update" diff --git a/tests/session/test_stream_processor.py b/tests/session/test_stream_processor.py index 1f4475e32..186de7c3f 100644 --- a/tests/session/test_stream_processor.py +++ b/tests/session/test_stream_processor.py @@ -588,6 +588,50 @@ async def test_unknown_tool_still_tracked(self): state = proc.tool_calls["tc_unk"] assert state.status == "error" + @pytest.mark.asyncio + async def test_invalid_tool_call_records_parse_error_without_registry_execution(self): + event_callback = AsyncMock() + proc = _make_processor(event_callback=event_callback) + execute_mock = AsyncMock(return_value=ToolResult(success=True, output="should not run", title="invalid")) + + with ( + patch("flocks.session.streaming.stream_processor.Message.store_part", new=AsyncMock()) as mock_store, + patch("flocks.session.streaming.stream_processor.Message.update_part", new=AsyncMock()), + patch( + "flocks.session.streaming.stream_processor.ToolRegistry.execute", + new=execute_mock, + ), + ): + await proc.process_event( + ToolCallEvent( + tool_call_id="tc_invalid", + tool_name="invalid", + input={ + "tool": "edit", + "error": "Failed to parse tool arguments (123 chars). Please ensure valid JSON.", + "arguments_preview": "{\"file_path\":", + }, + ) + ) + + execute_mock.assert_not_awaited() + state = proc.tool_calls["tc_invalid"] + assert state.status == "error" + assert "Failed to parse tool arguments for edit" in state.error + assert "Failed to parse tool arguments (123 chars)" in state.error + + completed_part = mock_store.await_args_list[-1].args[2] + assert completed_part.tool == "edit" + assert completed_part.state.status == "error" + assert completed_part.state.input["tool"] == "edit" + assert completed_part.state.input["arguments_preview"] == "{\"file_path\":" + assert completed_part.state.error == state.error + + published_part = event_callback.await_args_list[-1].args[1]["part"] + assert published_part["tool"] == "edit" + assert published_part["state"]["status"] == "error" + assert published_part["state"]["error"] == state.error + @pytest.mark.asyncio async def test_tool_start_callback_called(self): callback = AsyncMock() diff --git a/tests/tool/test_delegate_task_compat.py b/tests/tool/test_delegate_task_compat.py index f97355f1c..047ba6d07 100644 --- a/tests/tool/test_delegate_task_compat.py +++ b/tests/tool/test_delegate_task_compat.py @@ -37,7 +37,7 @@ async def test_delegate_task_derives_description_and_ignores_blank_skills(self): patch("flocks.tool.agent.delegate_task.is_delegatable", return_value=True), patch("flocks.tool.agent.delegate_task.Skill.get", AsyncMock()) as skill_get, patch("flocks.tool.agent.delegate_task.Session.get_by_id", AsyncMock(return_value=parent_session)), - patch("flocks.tool.agent.delegate_task.Session.create", AsyncMock(return_value=child_session)), + patch("flocks.tool.agent.delegate_task.Session.create", AsyncMock(return_value=child_session)) as create_session, patch("flocks.tool.agent.delegate_task.Message.create", AsyncMock()), patch("flocks.tool.agent.delegate_task.SessionLoop.run", AsyncMock(return_value=SimpleNamespace( action="stop", @@ -57,6 +57,10 @@ async def test_delegate_task_derives_description_and_ignores_blank_skills(self): assert result.title == "Investigate threatbook.cn assets" assert result.metadata["sessionId"] == "ses-child" skill_get.assert_not_awaited() + permissions = create_session.await_args.kwargs["permission"] + denied_permissions = {rule.permission for rule in permissions if rule.action == "deny"} + assert "delegate_task" not in denied_permissions + assert "task" not in denied_permissions @pytest.mark.asyncio async def test_delegate_task_category_model_uses_runtime_override_without_pinning(self): diff --git a/uv.lock b/uv.lock index 02ef844ca..62cd1f857 100644 --- a/uv.lock +++ b/uv.lock @@ -537,7 +537,7 @@ wheels = [ [[package]] name = "flocks" -version = "2026.6.17" +version = "2026.6.18" source = { editable = "." } dependencies = [ { name = "aiofiles" }, diff --git a/webui/src/api/session.ts b/webui/src/api/session.ts index 0123d0f44..eee2df3a0 100644 --- a/webui/src/api/session.ts +++ b/webui/src/api/session.ts @@ -11,6 +11,9 @@ export interface SessionMessagePartPayload { state?: Record; callID?: string; metadata?: Record; + url?: string | null; + mime?: string; + filename?: string; } export interface QueuedPrompt { @@ -69,6 +72,8 @@ export interface SessionResponse { } export interface SessionListParams { + view?: 'list'; + manager?: boolean; limit?: number; offset?: number; directory?: string; @@ -78,6 +83,20 @@ export interface SessionListParams { category?: string; } +export interface SessionMessagePage { + sessionID: string; + items: Array<{ info: Record; parts: SessionMessagePartPayload[] }>; + hasMore: boolean; + nextBefore?: string | null; +} + +export interface SessionMessageListParams { + limit?: number; + before?: string | null; + page?: boolean; + include_archived?: boolean; +} + export const sessionApi = { /** * 获取会话列表 @@ -159,6 +178,13 @@ export const sessionApi = { return response.data; }, + getMessagesPage: async (sessionId: string, params?: SessionMessageListParams): Promise => { + const response = await client.get(`/api/session/${sessionId}/message`, { + params: { page: true, limit: 50, include_archived: true, ...params }, + }); + return response.data; + }, + getContextUsage: async (sessionId: string): Promise => { const response = await client.get(`/api/session/${sessionId}/context-usage`); return response.data; diff --git a/webui/src/components/common/SessionChat.test.ts b/webui/src/components/common/SessionChat.test.ts index fed4b3476..f599eabba 100644 --- a/webui/src/components/common/SessionChat.test.ts +++ b/webui/src/components/common/SessionChat.test.ts @@ -1825,19 +1825,23 @@ describe('SessionChat fallback polling', () => { truncateAfterMessage: vi.fn(), }); clientGetMock.mockResolvedValueOnce({ - data: [ - { - info: { - id: 'assistant-1', - sessionID: 'sess-1', - role: 'assistant', - finish: 'tool-calls', + data: { + items: [ + { + info: { + id: 'assistant-1', + sessionID: 'sess-1', + role: 'assistant', + finish: 'tool-calls', + }, + parts: [ + { id: 'tool-1', type: 'tool', state: { status: 'running' } }, + ], }, - parts: [ - { id: 'tool-1', type: 'tool', state: { status: 'running' } }, - ], - }, - ], + ], + hasMore: false, + nextBefore: null, + }, }); render(React.createElement(SessionChat, { @@ -1856,7 +1860,9 @@ describe('SessionChat fallback polling', () => { expect(refetch).not.toHaveBeenCalled(); expect(onStreamingDone).not.toHaveBeenCalled(); - expect(clientGetMock).toHaveBeenCalledWith('/api/session/sess-1/message'); + expect(clientGetMock).toHaveBeenCalledWith('/api/session/sess-1/message', { + params: { page: true, limit: 50, include_archived: true }, + }); } finally { vi.useRealTimers(); } @@ -1888,19 +1894,23 @@ describe('SessionChat fallback polling', () => { clientGetMock.mockImplementation((url: string) => { if (url === '/api/session/sess-1/message') { return Promise.resolve({ - data: [ - { - info: { - id: 'assistant-1', - sessionID: 'sess-1', - role: 'assistant', - finish: 'stop', + data: { + items: [ + { + info: { + id: 'assistant-1', + sessionID: 'sess-1', + role: 'assistant', + finish: 'stop', + }, + parts: [ + { id: 'text-1', type: 'text', text: 'done' }, + ], }, - parts: [ - { id: 'text-1', type: 'text', text: 'done' }, - ], - }, - ], + ], + hasMore: false, + nextBefore: null, + }, }); } if (url === '/api/session/status') { diff --git a/webui/src/components/common/SessionChat.tsx b/webui/src/components/common/SessionChat.tsx index 23212a683..8af70f666 100644 --- a/webui/src/components/common/SessionChat.tsx +++ b/webui/src/components/common/SessionChat.tsx @@ -1582,6 +1582,9 @@ export default function SessionChat({ }); }, []); + const loadOlderMessagesRef = useRef<(() => Promise) | null>(null); + const hasMoreMessagesRef = useRef(false); + const loadingOlderMessagesRef = useRef(false); const rafScheduledRef = useRef(false); const handleScroll = useCallback(() => { if (rafScheduledRef.current) return; @@ -1590,6 +1593,18 @@ export default function SessionChat({ const el = scrollContainerRef.current; if (el) { isAtBottomRef.current = el.scrollHeight - el.scrollTop - el.clientHeight <= SCROLL_BOTTOM_THRESHOLD_PX; + if (el.scrollTop <= 80 && hasMoreMessagesRef.current && !loadingOlderMessagesRef.current) { + const previousHeight = el.scrollHeight; + const previousTop = el.scrollTop; + const loadPromise = loadOlderMessagesRef.current?.(); + if (loadPromise) void loadPromise.finally(() => { + requestAnimationFrame(() => { + const current = scrollContainerRef.current; + if (!current) return; + current.scrollTop = current.scrollHeight - previousHeight + previousTop; + }); + }); + } } rafScheduledRef.current = false; }); @@ -1598,7 +1613,10 @@ export default function SessionChat({ const { messages, loading, + loadingOlder, + hasMore: hasMoreMessages, refetch, + loadOlder, addMessage, updateMessage, updateMessagePart, @@ -1607,6 +1625,9 @@ export default function SessionChat({ truncateAfterMessage, } = useSessionMessages(sessionId || undefined); + useEffect(() => { loadOlderMessagesRef.current = loadOlder; }, [loadOlder]); + useEffect(() => { hasMoreMessagesRef.current = hasMoreMessages; }, [hasMoreMessages]); + useEffect(() => { loadingOlderMessagesRef.current = loadingOlder; }, [loadingOlder]); const contextUsageMessages = contextUsageRefreshing && !contextUsageSnapshot ? [] : messages; const contextUsageBreakdown = useMemo( () => buildContextUsageBreakdown(contextUsageMessages, input, contextUsageSnapshot), @@ -1701,7 +1722,26 @@ export default function SessionChat({ }, [sessionId]); useEffect(() => { - void refreshContextUsage({ clear: true }); + if (!sessionId) { + void refreshContextUsage({ clear: true }); + return; + } + const requestIdle = (window as any).requestIdleCallback as + | ((cb: () => void, options?: { timeout?: number }) => number) + | undefined; + const cancelIdle = (window as any).cancelIdleCallback as + | ((id: number) => void) + | undefined; + if (requestIdle) { + const idleId = requestIdle(() => { + void refreshContextUsage({ clear: true }); + }, { timeout: 1500 }); + return () => cancelIdle?.(idleId); + } + const timer = window.setTimeout(() => { + void refreshContextUsage({ clear: true }); + }, 250); + return () => window.clearTimeout(timer); }, [refreshContextUsage]); useEffect(() => { @@ -2821,8 +2861,10 @@ export default function SessionChat({ if (!isStreaming || !sessionId) return; const timer = setInterval(async () => { try { - const res = await client.get(`/api/session/${sessionId}/message`); - const msgs: any[] = res.data || []; + const res = await client.get(`/api/session/${sessionId}/message`, { + params: { page: true, limit: 50, include_archived: true }, + }); + const msgs: any[] = Array.isArray(res.data) ? res.data : (res.data?.items || []); const lastMsg = msgs[msgs.length - 1]; if (lastMsg?.info?.role === 'assistant' && (lastMsg.info.finish || lastMsg.info.time?.completed)) { const hasFetchedActiveTool = msgs.some((msg) => hasActiveToolPart(msg.parts)); @@ -3042,6 +3084,19 @@ export default function SessionChat({ ) ) : (
+ {hasMoreMessages && ( +
+ +
+ )} {merged.map((msg, i) => { if (skipIndices.has(i)) return null; return ( @@ -3722,7 +3777,6 @@ function ChatMessageBubbleInner({ const messageErrorText = isUser ? '' : getMessageErrorText(message); const avatarSize = compact ? 'w-7 h-7 text-xs' : 'w-8 h-8 text-sm'; - const avatar = isUser ? ( @@ -3775,8 +3829,8 @@ function ChatMessageBubbleInner({ // Render attachments (file/image parts) first so the bubble shows // image previews above the textual prompt — matches typical chat // UX for "look at this image and …" style messages. - const fileParts = parts.filter((p) => p.type === 'file' && p.url); - const displayParts = parts.filter((p) => !(p.type === 'file' && p.url)); + const fileParts = parts.filter((p) => p.type === 'file'); + const displayParts = parts.filter((p) => p.type !== 'file'); const isBlockingQuestionToolPart = (part: MessagePart): boolean => { if (part.type !== 'tool') return false; if (part.callID && pendingQuestions?.[part.callID]) return true; diff --git a/webui/src/hooks/useSessions.test.ts b/webui/src/hooks/useSessions.test.ts index 1a1e0b35a..03a744a3b 100644 --- a/webui/src/hooks/useSessions.test.ts +++ b/webui/src/hooks/useSessions.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi, afterEach } from 'vitest'; import { renderHook, act } from '@testing-library/react'; -import { applyMessagePartUpdate, useSessionMessages } from './useSessions'; +import { applyMessagePartUpdate, useSessionMessages, useSessions } from './useSessions'; +import { sessionApi } from '@/api/session'; import client from '@/api/client'; import type { Message } from '@/types'; @@ -431,4 +432,173 @@ describe('updateMessagePart scheduling', () => { expect((msg?.parts as any[])[1].text).toBe('保留这段已输出文本'); expect((msg?.parts as any[])[0].state.status).toBe('completed'); }); + + it('fetches the first message page and prepends older messages', async () => { + vi.mocked(client.get) + .mockResolvedValueOnce({ + data: { + items: [{ + info: { + id: 'msg-new', + sessionID: 'sess-1', + role: 'assistant', + time: { created: 200 }, + }, + parts: [], + }], + hasMore: true, + nextBefore: 'msg-new', + }, + } as any) + .mockResolvedValueOnce({ + data: { + items: [{ + info: { + id: 'msg-old', + sessionID: 'sess-1', + role: 'user', + time: { created: 100 }, + model: { providerID: 'openai', modelID: 'gpt-4o' }, + }, + parts: [{ id: 'part-old', type: 'text', text: 'old' }], + }], + hasMore: false, + nextBefore: null, + }, + } as any); + + const { result } = renderHook(() => useSessionMessages('sess-1')); + await act(async () => {}); + + expect(result.current.messages.map((msg) => msg.id)).toEqual(['msg-new']); + expect(result.current.hasMore).toBe(true); + expect(client.get).toHaveBeenCalledWith('/api/session/sess-1/message', { + params: { page: true, limit: 50, include_archived: true }, + }); + + await act(async () => { + await result.current.loadOlder(); + }); + + expect(result.current.messages.map((msg) => msg.id)).toEqual(['msg-old', 'msg-new']); + expect(result.current.hasMore).toBe(false); + expect(client.get).toHaveBeenLastCalledWith('/api/session/sess-1/message', { + params: { page: true, limit: 50, before: 'msg-new', include_archived: true }, + }); + }); + +}); + +describe('useSessions list loading', () => { + afterEach(() => { + vi.clearAllMocks(); + }); + + it('uses the lightweight session manager list endpoint', async () => { + vi.mocked(sessionApi.list).mockResolvedValueOnce([{ + id: 'session-1', + title: 'Session', + time: { created: 1, updated: 2 }, + category: 'user', + }] as any); + + const { result } = renderHook(() => useSessions('triage')); + await act(async () => {}); + + expect(sessionApi.list).toHaveBeenCalledWith({ + view: 'list', + manager: true, + roots: true, + limit: 100, + offset: 0, + search: 'triage', + }); + expect(result.current.sessions).toHaveLength(1); + }); + + it('keeps an optimistically added session when an older list request returns without it', async () => { + let resolveList: (value: any[]) => void = () => {}; + vi.mocked(sessionApi.list).mockReturnValueOnce(new Promise((resolve) => { + resolveList = resolve; + }) as any); + + const { result } = renderHook(() => useSessions()); + + act(() => { + result.current.addSession({ + id: 'session-new', + title: 'New Session', + time: { created: 2, updated: 2 }, + category: 'user', + } as any); + }); + + expect(result.current.sessions.map((session) => session.id)).toEqual(['session-new']); + + await act(async () => { + resolveList([{ + id: 'session-old', + title: 'Old Session', + time: { created: 1, updated: 1 }, + category: 'user', + }]); + }); + + expect(result.current.sessions.map((session) => session.id)).toEqual(['session-new', 'session-old']); + }); + + it('preserves the current list when a background refetch fails', async () => { + vi.mocked(sessionApi.list) + .mockResolvedValueOnce([{ + id: 'session-1', + title: 'Session', + time: { created: 1, updated: 2 }, + category: 'user', + }] as any) + .mockRejectedValueOnce(new Error('network down')); + + const { result } = renderHook(() => useSessions()); + await act(async () => {}); + + expect(result.current.sessions.map((session) => session.id)).toEqual(['session-1']); + + await act(async () => { + await result.current.refetch(); + }); + + expect(result.current.error).toBe('network down'); + expect(result.current.sessions.map((session) => session.id)).toEqual(['session-1']); + }); + + it('keeps the page mounted while refetching after search changes', async () => { + let resolveSearch: (value: any[]) => void = () => {}; + vi.mocked(sessionApi.list) + .mockResolvedValueOnce([{ + id: 'session-1', + title: 'Session', + time: { created: 1, updated: 2 }, + category: 'user', + }] as any) + .mockReturnValueOnce(new Promise((resolve) => { + resolveSearch = resolve; + }) as any); + + const { result, rerender } = renderHook( + ({ search }) => useSessions(search), + { initialProps: { search: '' } }, + ); + await act(async () => {}); + + expect(result.current.loading).toBe(false); + expect(result.current.sessions.map((session) => session.id)).toEqual(['session-1']); + + rerender({ search: 'triage' }); + + expect(result.current.loading).toBe(false); + expect(result.current.sessions.map((session) => session.id)).toEqual(['session-1']); + + await act(async () => { + resolveSearch([]); + }); + }); }); diff --git a/webui/src/hooks/useSessions.ts b/webui/src/hooks/useSessions.ts index 708813fd7..4b93a1699 100644 --- a/webui/src/hooks/useSessions.ts +++ b/webui/src/hooks/useSessions.ts @@ -5,6 +5,8 @@ import type { Session, Message } from '@/types'; const VISIBLE_CATEGORIES = new Set(['user', 'workflow', 'entity-config']); const ABORTED_TOOL_ERROR = 'Tool execution was interrupted'; +const SESSION_LIST_PAGE_SIZE = 100; +const MESSAGE_PAGE_SIZE = 50; function finalizeStoppedMessageParts(parts: Message['parts'], stoppedAt = Date.now()): Message['parts'] { return parts.map((part) => { @@ -54,6 +56,75 @@ function mergeFetchedMessages(prev: Message[], fetched: Message[]): Message[] { }); } +function mergeLatestFetchedMessages(prev: Message[], fetched: Message[]): Message[] { + if (prev.length === 0) return fetched; + const fetchedIds = new Set(fetched.map((message) => message.id)); + const mergedFetched = mergeFetchedMessages(prev, fetched); + const firstFetchedTimestamp = mergedFetched[0]?.timestamp ?? Number.POSITIVE_INFINITY; + const retainedOlder = prev.filter( + (message) => !fetchedIds.has(message.id) && message.timestamp <= firstFetchedTimestamp, + ); + const retainedNewer = prev.filter( + (message) => !fetchedIds.has(message.id) && message.timestamp > firstFetchedTimestamp, + ); + return [...retainedOlder, ...mergedFetched, ...retainedNewer]; +} + +function prependOlderMessages(prev: Message[], older: Message[]): Message[] { + const existingIds = new Set(prev.map((message) => message.id)); + return [...older.filter((message) => !existingIds.has(message.id)), ...prev]; +} + +function transformMessageResponse(data: any): { + messages: Message[]; + hasMore: boolean; + nextBefore: string | null; +} { + const items = Array.isArray(data) ? data : (data?.items ?? []); + return { + messages: items.map((msg: any) => ({ + id: msg.info.id, + sessionID: msg.info.sessionID, + role: msg.info.role, + parts: msg.parts || [], + parentID: msg.info.parentID, + agent: msg.info.agent, + model: msg.info.model, + modelID: msg.info.modelID, + providerID: msg.info.providerID, + cost: msg.info.cost, + tokens: msg.info.tokens, + timestamp: msg.info.time?.created || Date.now(), + finish: msg.info.finish || null, + error: msg.info.error || null, + compacted: msg.info.compacted || null, + })), + hasMore: Array.isArray(data) ? false : Boolean(data?.hasMore), + nextBefore: Array.isArray(data) ? null : (data?.nextBefore ?? null), + }; +} + +function markMeasure(name: string, startMark: string) { + if (typeof performance === 'undefined') return; + try { + performance.measure(name, startMark); + } catch { + // Ignore environments where the mark was cleared or performance is mocked. + } +} + +function mergeSessionListWithOptimistic(fetched: Session[], optimistic: Map): Session[] { + if (optimistic.size === 0) return fetched; + const fetchedIds = new Set(fetched.map(session => session.id)); + const optimisticRows = Array.from(optimistic.values()).filter(session => !fetchedIds.has(session.id)); + return [...optimisticRows, ...fetched]; +} + +function appendSessionList(prev: Session[], fetched: Session[]): Session[] { + const existingIds = new Set(prev.map(session => session.id)); + return [...prev, ...fetched.filter(session => !existingIds.has(session.id))]; +} + /** * Pure reducer for updating a message part in the messages list. * Exported for unit testing. @@ -110,42 +181,83 @@ export function applyMessagePartUpdate( return updated; } -export function useSessions() { +export function useSessions(search = '') { const [sessions, setSessions] = useState([]); const [loading, setLoading] = useState(true); + const [loadingMore, setLoadingMore] = useState(false); + const [hasMore, setHasMore] = useState(false); const [error, setError] = useState(null); // Track whether the initial fetch has completed — refetches should be silent const initializedRef = useRef(false); + const sessionsRef = useRef([]); + const hasLoadedOnceRef = useRef(false); + const requestSeqRef = useRef(0); + const optimisticSessionsRef = useRef>(new Map()); + useEffect(() => { + sessionsRef.current = sessions; + }, [sessions]); - const fetchSessions = useCallback(async () => { + const fetchSessions = useCallback(async (options?: { append?: boolean }) => { + const append = Boolean(options?.append); + const requestSeq = ++requestSeqRef.current; try { // Only show the full-page loading state on the very first fetch. // Subsequent refetches (triggered by SSE events) update data silently // to avoid unmounting SessionChat and disrupting the active conversation. - if (!initializedRef.current) setLoading(true); + if (append) { + setLoadingMore(true); + } else if (!initializedRef.current) { + setLoading(true); + } setError(null); // Fetch only root sessions: child sessions are internal and never shown // in the sidebar, so excluding them avoids extra payload and filtering. - const response = await sessionApi.list({ roots: true }); + const startMark = append ? 'sessions:list:older-start' : 'sessions:list:first-start'; + if (typeof performance !== 'undefined') performance.mark(startMark); + const response = await sessionApi.list({ + view: 'list', + manager: true, + roots: true, + limit: SESSION_LIST_PAGE_SIZE, + offset: append + ? sessionsRef.current.filter(session => !optimisticSessionsRef.current.has(session.id)).length + : 0, + search: search.trim() || undefined, + }); + if (requestSeq !== requestSeqRef.current) return; + markMeasure(append ? 'sessions:list:older-page' : 'sessions:list:first-render', startMark); if (Array.isArray(response)) { - setSessions( - response.filter( - (s: any) => (!s.category || VISIBLE_CATEGORIES.has(s.category)) && !s.parentID, - ), + const nextSessions = response.filter( + (s: any) => (!s.category || VISIBLE_CATEGORIES.has(s.category)) && !s.parentID, ); + nextSessions.forEach((session: Session) => optimisticSessionsRef.current.delete(session.id)); + setSessions(prev => append + ? appendSessionList(prev, nextSessions) + : mergeSessionListWithOptimistic(nextSessions, optimisticSessionsRef.current)); + setHasMore(response.length >= SESSION_LIST_PAGE_SIZE); + hasLoadedOnceRef.current = true; } else { - setSessions([]); + if (!append && !hasLoadedOnceRef.current) setSessions([]); + setHasMore(false); } } catch (err: any) { + if (requestSeq !== requestSeqRef.current) return; setError(err.message || 'Failed to fetch sessions'); - setSessions([]); + if (!append && !hasLoadedOnceRef.current) setSessions([]); } finally { - setLoading(false); - initializedRef.current = true; + if (requestSeq === requestSeqRef.current) { + setLoading(false); + setLoadingMore(false); + initializedRef.current = true; + } } - }, []); + }, [search]); const updateSessionTitle = useCallback((sessionId: string, title: string) => { + const optimistic = optimisticSessionsRef.current.get(sessionId); + if (optimistic) { + optimisticSessionsRef.current.set(sessionId, { ...optimistic, title }); + } setSessions(prev => prev.map(session => session.id === sessionId ? { ...session, title } : session, @@ -154,20 +266,26 @@ export function useSessions() { }, []); useEffect(() => { + if (!hasLoadedOnceRef.current) { + initializedRef.current = false; + } fetchSessions(); - }, []); + }, [fetchSessions]); const removeSession = useCallback((sessionId: string) => { + optimisticSessionsRef.current.delete(sessionId); setSessions(prev => prev.filter(s => s.id !== sessionId)); }, []); const removeSessions = useCallback((sessionIds: string[]) => { const idSet = new Set(sessionIds); + sessionIds.forEach(sessionId => optimisticSessionsRef.current.delete(sessionId)); setSessions(prev => prev.filter(s => !idSet.has(s.id))); }, []); /** Optimistically prepend a newly created session without a full refetch. */ const addSession = useCallback((session: Session) => { + optimisticSessionsRef.current.set(session.id, session); setSessions(prev => { if (prev.some(s => s.id === session.id)) return prev; return [session, ...prev]; @@ -183,12 +301,18 @@ export function useSessions() { removeSession, removeSessions, addSession, + hasMore, + loadingMore, + loadMore: () => fetchSessions({ append: true }), }; } export function useSessionMessages(sessionId?: string) { const [messages, setMessages] = useState([]); const [loading, setLoading] = useState(false); + const [loadingOlder, setLoadingOlder] = useState(false); + const [hasMore, setHasMore] = useState(false); + const [nextBefore, setNextBefore] = useState(null); const [error, setError] = useState(null); // Tracks part IDs seen in this session to distinguish first-time creation // (structural change → immediate update) from content deltas (low-priority). @@ -200,29 +324,16 @@ export function useSessionMessages(sessionId?: string) { try { setLoading(true); setError(null); - const response = await client.get(`/api/session/${sessionId}/message`); - - // Backend returns MessageWithParts[] format: { info: {...}, parts: [...] } - // Transform to flat message structure for UI - const messagesData = response.data.map((msg: any) => ({ - id: msg.info.id, - sessionID: msg.info.sessionID, - role: msg.info.role, - parts: msg.parts || [], - parentID: msg.info.parentID, - agent: msg.info.agent, - model: msg.info.model, - modelID: msg.info.modelID, - providerID: msg.info.providerID, - cost: msg.info.cost, - tokens: msg.info.tokens, - timestamp: msg.info.time?.created || Date.now(), - finish: msg.info.finish || null, - error: msg.info.error || null, - compacted: msg.info.compacted || null, - })); - - setMessages(prev => mergeFetchedMessages(prev, messagesData)); + const startMark = 'session:messages:first-page-start'; + if (typeof performance !== 'undefined') performance.mark(startMark); + const response = await client.get(`/api/session/${sessionId}/message`, { + params: { page: true, limit: MESSAGE_PAGE_SIZE, include_archived: true }, + }); + markMeasure('session:messages:first-page', startMark); + const { messages: messagesData, hasMore, nextBefore } = transformMessageResponse(response.data); + setMessages(prev => mergeLatestFetchedMessages(prev, messagesData)); + setHasMore(hasMore); + setNextBefore(nextBefore); } catch (err: any) { setError(err.message || 'Failed to fetch messages'); } finally { @@ -230,11 +341,41 @@ export function useSessionMessages(sessionId?: string) { } }, [sessionId]); + const loadOlder = useCallback(async () => { + if (!sessionId || !hasMore || !nextBefore || loadingOlder) return; + + try { + setLoadingOlder(true); + setError(null); + const startMark = 'session:messages:older-page-start'; + if (typeof performance !== 'undefined') performance.mark(startMark); + const response = await client.get(`/api/session/${sessionId}/message`, { + params: { + page: true, + limit: MESSAGE_PAGE_SIZE, + before: nextBefore, + include_archived: true, + }, + }); + markMeasure('session:messages:older-page', startMark); + const page = transformMessageResponse(response.data); + setMessages(prev => prependOlderMessages(prev, page.messages)); + setHasMore(page.hasMore); + setNextBefore(page.nextBefore); + } catch (err: any) { + setError(err.message || 'Failed to fetch older messages'); + } finally { + setLoadingOlder(false); + } + }, [hasMore, loadingOlder, nextBefore, sessionId]); + // Reset state synchronously before paint when session changes // to prevent flash of welcome screen (useEffect runs AFTER paint) useLayoutEffect(() => { setMessages([]); setError(null); + setHasMore(false); + setNextBefore(null); knownPartIdsRef.current.clear(); if (sessionId) { setLoading(true); @@ -250,8 +391,11 @@ export function useSessionMessages(sessionId?: string) { return { messages, loading, + loadingOlder, + hasMore, error, refetch: fetchMessages, + loadOlder, addMessage: (message: Message) => { setMessages(prev => [...prev, message]); }, diff --git a/webui/src/pages/Home/index.tsx b/webui/src/pages/Home/index.tsx index b59d39c2c..2dda6f562 100644 --- a/webui/src/pages/Home/index.tsx +++ b/webui/src/pages/Home/index.tsx @@ -149,6 +149,7 @@ export default function Home() {
diff --git a/webui/src/pages/Session/index.test.tsx b/webui/src/pages/Session/index.test.tsx index 0c6703358..e83075e88 100644 --- a/webui/src/pages/Session/index.test.tsx +++ b/webui/src/pages/Session/index.test.tsx @@ -96,6 +96,7 @@ vi.mock('@/components/common/SessionChat', () => ({ agentName, model, display, + hideInput, }: { sessionId?: string | null; agentName?: string; @@ -103,6 +104,7 @@ vi.mock('@/components/common/SessionChat', () => ({ toolbarSlot?: React.ReactNode; centerToolbarSlot?: React.ReactNode; model?: { providerID: string; modelID: string } | null; + hideInput?: boolean; display?: { compact?: boolean; showActions?: boolean; @@ -119,6 +121,7 @@ vi.mock('@/components/common/SessionChat', () => ({ data-model={model ? `${model.providerID}/${model.modelID}` : ''} data-collapse-intermediate={String(Boolean(display?.collapseIntermediateSteps))} data-process-groups-default-open={String(Boolean(display?.processGroupsDefaultOpen))} + data-hide-input={String(Boolean(hideInput))} > {sessionId ?? 'no-session'} {toolbarSlot} @@ -215,6 +218,7 @@ describe('SessionPage session actions menu', () => { beforeEach(() => { vi.clearAllMocks(); localStorage.clear(); + sessionStorage.clear(); useSessions.mockReturnValue({ sessions: [session], @@ -384,17 +388,36 @@ describe('SessionPage session actions menu', () => { expect(screen.getByTestId('session-chat')).toHaveTextContent('no-session'); }); - it('attaches the previously selected session on initial load', () => { + it('does not auto-attach the previously selected session on first app visit', () => { localStorage.setItem('flocks:last-selected-session', 'session-1'); renderSessionPage(); + expect(screen.getByTestId('session-chat')).toHaveTextContent('no-session'); + }); + + it('attaches the previously selected session after the session page has been visited', () => { + localStorage.setItem('flocks:last-selected-session', 'session-1'); + sessionStorage.setItem('flocks:sessions:visited', 'true'); + + renderSessionPage(); + expect(screen.getByTestId('session-chat')).toHaveTextContent('session-1'); }); - it('defaults session process groups open on the session management page', () => { + it('does not auto-attach the previously selected session when entering from home', () => { localStorage.setItem('flocks:last-selected-session', 'session-1'); + sessionStorage.setItem('flocks:sessions:visited', 'true'); + + renderSessionPage({ + pathname: '/sessions', + state: { skipLastSelectedSessionRestore: true }, + }); + + expect(screen.getByTestId('session-chat')).toHaveTextContent('no-session'); + }); + it('defaults session process groups open on the session management page', () => { renderSessionPage(); expect(screen.getByTestId('session-chat')).toHaveAttribute('data-collapse-intermediate', 'true'); @@ -440,6 +463,54 @@ describe('SessionPage session actions menu', () => { }); }); + it('keeps a selected session that is valid but missing from the current list', async () => { + useSessions.mockReturnValue({ + sessions: [], + loading: false, + error: null, + refetch: refetchSessions, + updateSessionTitle, + removeSession, + removeSessions, + addSession, + }); + sessionApi.get.mockResolvedValue({ + ...session, + id: 'session-missing-from-list', + title: 'Fetched Session', + canWrite: false, + }); + + renderSessionPage('/sessions?session=session-missing-from-list'); + + await waitFor(() => { + expect(sessionApi.get).toHaveBeenCalledWith('session-missing-from-list'); + expect(screen.getByTestId('session-chat')).toHaveTextContent('session-missing-from-list'); + expect(screen.getByTestId('session-chat')).toHaveAttribute('data-hide-input', 'true'); + }); + }); + + it('clears the selected session after confirming it no longer exists', async () => { + useSessions.mockReturnValue({ + sessions: [], + loading: false, + error: null, + refetch: refetchSessions, + updateSessionTitle, + removeSession, + removeSessions, + addSession, + }); + sessionApi.get.mockRejectedValue({ response: { status: 404 } }); + + renderSessionPage('/sessions?session=session-deleted'); + + await waitFor(() => { + expect(sessionApi.get).toHaveBeenCalledWith('session-deleted'); + expect(screen.getByTestId('session-chat')).toHaveTextContent('no-session'); + }); + }); + it('lists the same visible agents as the Agent page selector logic', async () => { const user = userEvent.setup(); useAgents.mockReturnValue({ @@ -545,7 +616,6 @@ describe('SessionPage session actions menu', () => { }); it('shows the pinned model for the selected session on load', async () => { - localStorage.setItem('flocks:last-selected-session', 'session-1'); useSessions.mockReturnValue({ sessions: [{ ...session, @@ -571,7 +641,7 @@ describe('SessionPage session actions menu', () => { defaultModelAPI.getResolved.mockResolvedValue({ data: { provider_id: 'openai', model_id: 'gpt-4o' } }); modelV2API.listDefinitions.mockResolvedValue({ data: { models: modelDefinitions } }); - renderSessionPage(); + renderSessionPage('/sessions?session=session-1'); await waitFor(() => { expect(screen.getByTestId('session-chat')).toHaveAttribute('data-model', 'minimax/minimax-m3'); @@ -581,7 +651,6 @@ describe('SessionPage session actions menu', () => { it('persists model changes to the selected session', async () => { const user = userEvent.setup(); - localStorage.setItem('flocks:last-selected-session', 'session-1'); useSessions.mockReturnValue({ sessions: [session], loading: false, @@ -608,7 +677,7 @@ describe('SessionPage session actions menu', () => { model_pinned: true, }); - renderSessionPage(); + renderSessionPage('/sessions?session=session-1'); await waitFor(() => { expect(screen.getByTestId('session-chat')).toHaveAttribute('data-model', 'openai/gpt-4o'); @@ -629,7 +698,6 @@ describe('SessionPage session actions menu', () => { it('resets the selected model to the default when creating a new session', async () => { const user = userEvent.setup(); - localStorage.setItem('flocks:last-selected-session', 'session-1'); useSessions.mockReturnValue({ sessions: [{ ...session, @@ -655,7 +723,7 @@ describe('SessionPage session actions menu', () => { defaultModelAPI.getResolved.mockResolvedValue({ data: { provider_id: 'openai', model_id: 'gpt-4o' } }); modelV2API.listDefinitions.mockResolvedValue({ data: { models: modelDefinitions } }); - renderSessionPage(); + renderSessionPage('/sessions?session=session-1'); await waitFor(() => { expect(screen.getByTestId('session-chat')).toHaveAttribute('data-model', 'minimax/minimax-m3'); diff --git a/webui/src/pages/Session/index.tsx b/webui/src/pages/Session/index.tsx index 0c1a731a7..1526e4af2 100644 --- a/webui/src/pages/Session/index.tsx +++ b/webui/src/pages/Session/index.tsx @@ -8,7 +8,7 @@ import { } from 'lucide-react'; import { useTranslation } from 'react-i18next'; import i18n from '@/i18n'; -import { useNavigate, useSearchParams } from 'react-router-dom'; +import { useLocation, useNavigate, useSearchParams } from 'react-router-dom'; import LoadingSpinner from '@/components/common/LoadingSpinner'; import { useToast } from '@/components/common/Toast'; import SessionChat, { type SSEChatEvent, type SSEConnectionStatus } from '@/components/common/SessionChat'; @@ -23,7 +23,7 @@ import { useDefaultModelVision } from '@/hooks/useDefaultModelVision'; import { buildPromptParts, type ImagePartData } from '@/utils/imageUpload'; import { getAgentDisplayDescription, getAgentDisplayName, isAgentUsableInChat } from '@/utils/agentDisplay'; import { formatSessionDate } from '@/utils/time'; -import type { ModelDefinitionV2 } from '@/types'; +import type { ModelDefinitionV2, Session } from '@/types'; function sanitizeSessionExportName(value: string) { const trimmed = value.trim(); @@ -35,6 +35,7 @@ function sanitizeSessionExportName(value: string) { } const LAST_SELECTED_SESSION_STORAGE_KEY = 'flocks:last-selected-session'; +const SESSION_PAGE_VISITED_STORAGE_KEY = 'flocks:sessions:visited'; type AgentSourceFilter = 'all' | 'builtin' | 'custom'; type ChatModelOption = { key: string; @@ -83,12 +84,38 @@ function writeLastSelectedSessionId(sessionId: string | null) { } } +function hasVisitedSessionPage(): boolean { + try { + return window.sessionStorage.getItem(SESSION_PAGE_VISITED_STORAGE_KEY) === 'true'; + } catch { + return false; + } +} + +function markSessionPageVisited() { + try { + window.sessionStorage.setItem(SESSION_PAGE_VISITED_STORAGE_KEY, 'true'); + } catch { + // Ignore storage failures; this only controls best-effort restore behavior. + } +} + +function shouldSkipLastSelectedSessionRestore(state: unknown): boolean { + return Boolean( + state + && typeof state === 'object' + && 'skipLastSelectedSessionRestore' in state + && (state as { skipLastSelectedSessionRestore?: unknown }).skipLastSelectedSessionRestore, + ); +} + function makeModelKey(providerID: string, modelID: string): string { return `${providerID}::${modelID}`; } export default function SessionPage() { const { t, i18n } = useTranslation('session'); + const location = useLocation(); const navigate = useNavigate(); const [searchParams, setSearchParams] = useSearchParams(); const [selectedSessionId, setSelectedSessionId] = useState(null); @@ -114,12 +141,24 @@ export default function SessionPage() { const supportsVision = useDefaultModelVision(); const [searchQuery, setSearchQuery] = useState(''); const [agentSourceFilter, setAgentSourceFilter] = useState('all'); + const [selectedSessionFallback, setSelectedSessionFallback] = useState(null); const [selectorTooltip, setSelectorTooltip] = useState(null); const renameInputRef = useRef(null); const renameSubmitInFlightRef = useRef(false); const toast = useToast(); - const { sessions, loading: loadingSessions, refetch: refetchSessions, updateSessionTitle, removeSession, removeSessions, addSession } = useSessions(); + const { + sessions, + loading: loadingSessions, + refetch: refetchSessions, + updateSessionTitle, + removeSession, + removeSessions, + addSession, + hasMore: hasMoreSessions, + loadingMore: loadingMoreSessions, + loadMore: loadMoreSessions, + } = useSessions(searchQuery); const { agents, loading: loadingAgents } = useAgents(); const { providers, loading: loadingProviders } = useProviders(); const primaryAgents = useMemo(() => agents.filter((a) => a.mode === 'primary' && isAgentUsableInChat(a)), [agents]); @@ -213,10 +252,12 @@ export default function SessionPage() { ? { providerID: selectedModelOption.providerID, modelID: selectedModelOption.modelID } : null; const effectiveSupportsVision = selectedModelOption?.supportsVision ?? supportsVision; - const selectedSession = useMemo( + const listedSelectedSession = useMemo( () => sessions.find(s => s.id === selectedSessionId) ?? null, [sessions, selectedSessionId], ); + const selectedSession = listedSelectedSession + ?? (selectedSessionFallback?.id === selectedSessionId ? selectedSessionFallback : null); // 今天/昨天不限制;本周/上周/更早默认只显示 5 条 const GROUP_DEFAULT_LIMIT: Record = { @@ -306,11 +347,59 @@ export default function SessionPage() { } }, [searchParams, selectedSessionId, setSearchParams]); + useEffect(() => { + if (loadingSessions) return; + + const alreadyVisited = hasVisitedSessionPage(); + markSessionPageVisited(); + + if (selectedSessionId) return; + if (searchParams.get('session')) return; + if (!alreadyVisited) return; + if (shouldSkipLastSelectedSessionRestore(location.state)) return; + + const lastSelectedSessionId = readLastSelectedSessionId(); + if (lastSelectedSessionId) { + setSelectedSessionId(lastSelectedSessionId); + } + }, [loadingSessions, location.state, searchParams, selectedSessionId]); + useEffect(() => { if (!selectedSessionId) return; writeLastSelectedSessionId(selectedSessionId); }, [selectedSessionId]); + useEffect(() => { + if (!selectedSessionId) { + setSelectedSessionFallback(null); + return; + } + if (listedSelectedSession) { + setSelectedSessionFallback(null); + return; + } + if (loadingSessions) return; + + let cancelled = false; + sessionApi.get(selectedSessionId) + .then((session) => { + if (cancelled) return; + setSelectedSessionFallback(session as unknown as Session); + }) + .catch((err: any) => { + if (cancelled) return; + const statusCode = err?.response?.status ?? err?.status; + if (statusCode === 403 || statusCode === 404) { + setSelectedSessionId((current) => (current === selectedSessionId ? null : current)); + setSelectedSessionFallback(null); + writeLastSelectedSessionId(null); + } + }); + return () => { + cancelled = true; + }; + }, [listedSelectedSession, loadingSessions, selectedSessionId]); + useEffect(() => { let cancelled = false; setLoadingEnabledModels(true); @@ -458,34 +547,6 @@ export default function SessionPage() { } }, [refetchSessions, selectedSessionId, toast, t]); - useEffect(() => { - if (loadingSessions) return; - if (searchParams.get('session')) return; - - if (selectedSessionId && sessions.some((session) => session.id === selectedSessionId)) { - return; - } - - const lastSelectedSessionId = readLastSelectedSessionId(); - const fallbackSession = lastSelectedSessionId - ? sessions.find((session) => session.id === lastSelectedSessionId) - : undefined; - - if (fallbackSession && fallbackSession.id !== selectedSessionId) { - setSelectedSessionId(fallbackSession.id); - return; - } - - if (!fallbackSession && selectedSessionId) { - setSelectedSessionId(null); - } - }, [ - loadingSessions, - searchParams, - selectedSessionId, - sessions, - ]); - const handleCreateAndSend = useCallback(async ( text: string, imageParts?: ImagePartData[], @@ -869,6 +930,18 @@ export default function SessionPage() { ); }) )} + {hasMoreSessions && ( +
+ +
+ )}
{/* Bottom:批量操作栏 / 批量选择入口 */} diff --git a/webui/src/types/index.ts b/webui/src/types/index.ts index eb584929d..ade721ab1 100644 --- a/webui/src/types/index.ts +++ b/webui/src/types/index.ts @@ -111,6 +111,8 @@ export interface MessageError { */ export interface MessagePart { id: string; + messageID?: string; + sessionID?: string; type: 'text' | 'tool' | 'file' | 'reasoning' | 'toolCall' | 'toolResult' | 'thinking' | 'image' | 'step-start' | 'step-finish'; // Text part text?: string;