Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/app/api/enterprise.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ async def get_email_templates_endpoint(
DEFAULT_EMAIL_TEMPLATES,
)

templates = await get_email_templates(db=db)
templates = await get_email_templates()
return {
"templates": templates,
"variables": EMAIL_TEMPLATE_VARIABLES,
Expand Down
33 changes: 18 additions & 15 deletions backend/app/api/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -1405,8 +1405,11 @@ async def _handle_feishu_file(
)
_history = convert_chat_messages_to_llm_format(reversed(_hist_r.scalars().all()))

await db.commit()
# Pre-load agent/model for LLM call before releasing DB connection
_agent_model_img, _llm_model_img, _fallback_model_img = await _load_agent_and_model(db, agent_id)

await db.commit()
# ── Phase 1 complete: release connection before slow LLM/HTTP work ──
# For images: call LLM so vision models can actually see the image
if msg_type == "image":
import json as _json_card_img
Expand Down Expand Up @@ -1499,20 +1502,20 @@ async def _img_heartbeat():
_img_heartbeat_task = asyncio.create_task(_img_heartbeat())

# Call LLM with image marker — vision models will parse it
async with _async_session() as _db_img:
try:
reply_text = await _call_agent_llm(
_db_img, agent_id, user_msg_content, history=_history,
user_id=platform_user_id, session_id=session_conv_id, on_chunk=_img_on_chunk,
)
finally:
_img_llm_done = True
if _img_heartbeat_task:
_img_heartbeat_task.cancel()
try:
await _img_heartbeat_task
except Exception:
pass
try:
reply_text = await _call_llm_with_config(
_agent_model_img, _llm_model_img, _fallback_model_img,
agent_id, user_msg_content, history=_history,
user_id=platform_user_id, session_id=session_conv_id, on_chunk=_img_on_chunk,
)
finally:
_img_llm_done = True
if _img_heartbeat_task:
_img_heartbeat_task.cancel()
try:
await _img_heartbeat_task
except Exception:
pass

logger.info(f"[Feishu] Image LLM reply: {reply_text[:100]}")

Expand Down
34 changes: 23 additions & 11 deletions backend/app/api/teams.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
from app.config import get_settings
from app.core.permissions import check_agent_access, is_agent_creator
from app.core.security import get_current_user
from app.database import get_db
from app.database import async_session as _async_session, get_db
from app.models.agent import Agent as AgentModel
from app.models.audit import ChatMessage
from app.models.channel_config import ChannelConfig
from app.models.user import User
from app.schemas.schemas import ChannelConfigOut
from app.services.channel_session import find_or_create_channel_session
from app.api.feishu import _call_agent_llm
from app.api.feishu import _call_llm_with_config, _load_agent_and_model
from app.services.agent_tools import channel_file_sender as _cfs_s
from app.core.security import hash_password as _hp
from pathlib import Path as _Path
Expand Down Expand Up @@ -483,7 +483,13 @@ async def teams_event_webhook(
# Save user message
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
sess.last_message_at = datetime.now(timezone.utc)

# Pre-load agent/model for LLM call before releasing DB connection
_agent_model, _llm_model, _fallback_model = await _load_agent_and_model(db, agent_id)

await db.commit()
# ── Phase 1 complete: release connection before slow LLM call ──
await db.close()

# Set channel_file_sender contextvar for agent → user file delivery
async def _teams_file_sender(file_path, msg: str = ""):
Expand All @@ -503,32 +509,38 @@ async def _teams_file_sender(file_path, msg: str = ""):

_cfs_s_token = _cfs_s.set(_teams_file_sender)

# Call LLM
# Call LLM (no DB session needed)
try:
reply_text = await _call_agent_llm(
db,
reply_text = await _call_llm_with_config(
_agent_model, _llm_model, _fallback_model,
agent_id,
user_text,
history=history,
user_id=platform_user_id,
session_id=session_conv_id,
)
_cfs_s.reset(_cfs_s_token)
logger.info(f"Teams: LLM reply generated: {reply_text[:80]}")
except Exception as e:
logger.exception(f"Teams: Failed to call LLM for agent {agent_id}: {e}")
reply_text = "Sorry, I encountered an error processing your message."
finally:
_cfs_s.reset(_cfs_s_token)

# Save reply
# Save reply (new short transaction)
try:
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
async with _async_session() as _save_db:
_save_db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
from app.models.chat_session import ChatSession
_sess_r = await _save_db.execute(
select(ChatSession).where(ChatSession.id == uuid.UUID(session_conv_id))
)
_sess_fresh = _sess_r.scalar_one_or_none()
if _sess_fresh:
_sess_fresh.last_message_at = datetime.now(timezone.utc)
await _save_db.commit()
logger.info(f"Teams: Saved reply to database for conversation {conversation_id}")
except Exception as e:
logger.exception(f"Teams: Failed to save reply to database: {e}")
await db.rollback()

# Send to Teams
use_managed_identity = config.extra_config.get("use_managed_identity", False)
Expand Down
8 changes: 4 additions & 4 deletions backend/app/api/tenants.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def self_create_company(
avatar_url=new_user.avatar_url,
))
await db.flush()
await registration_service.bind_org_member(db, new_user)
await registration_service.bind_org_member(new_user)

# Generate token scoped to the new user so frontend can switch context
access_token = create_access_token(str(new_user.id), new_user.role)
Expand All @@ -227,7 +227,7 @@ async def self_create_company(
current_user.quota_max_agents = tenant.default_max_agents
current_user.quota_agent_ttl_hours = tenant.default_agent_ttl_hours
await db.flush()
await registration_service.bind_org_member(db, current_user)
await registration_service.bind_org_member(current_user)

await db.commit()

Expand Down Expand Up @@ -341,7 +341,7 @@ async def join_company(
avatar_url=new_user.avatar_url,
))
await db.flush()
await registration_service.bind_org_member(db, new_user)
await registration_service.bind_org_member(new_user)

# Generate token scoped to the new user so frontend can switch context
access_token = create_access_token(str(new_user.id), new_user.role)
Expand All @@ -358,7 +358,7 @@ async def join_company(
current_user.quota_agent_ttl_hours = tenant.default_agent_ttl_hours
final_role = current_user.role
await db.flush()
await registration_service.bind_org_member(db, current_user)
await registration_service.bind_org_member(current_user)

# Increment invitation code usage
code_obj.used_count += 1
Expand Down
30 changes: 21 additions & 9 deletions backend/app/api/whatsapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ async def whatsapp_event_webhook(
if not user_text or not sender_phone:
continue

from app.api.feishu import _call_agent_llm
from app.api.feishu import _call_llm_with_config, _load_agent_and_model
from app.models.agent import Agent as AgentModel, DEFAULT_CONTEXT_WINDOW_SIZE
from app.models.audit import ChatMessage
from app.services.channel_session import find_or_create_channel_session
from app.services.channel_user_service import channel_user_service
from app.database import async_session as _async_session

agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id))
agent_obj = agent_r.scalar_one_or_none()
Expand Down Expand Up @@ -305,11 +306,17 @@ async def whatsapp_event_webhook(

db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
sess.last_message_at = datetime.now(timezone.utc)

# Pre-load agent/model before releasing connection
_agent_model, _llm_model, _fallback_model = await _load_agent_and_model(db, agent_id)

await db.commit()
await db.close()
# ── Phase 1 complete: release connection before slow LLM call ──

try:
reply_text = await _call_agent_llm(
db,
reply_text = await _call_llm_with_config(
_agent_model, _llm_model, _fallback_model,
agent_id,
user_text,
history=history,
Expand All @@ -322,13 +329,18 @@ async def whatsapp_event_webhook(

try:
await _send_whatsapp_messages(config, sender_phone, reply_text)
config.is_connected = True
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
async with _async_session() as _save_db:
Comment on lines 331 to +332

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve WhatsApp connection status updates

When WhatsApp delivery succeeds or fails, this path now only sends the message and records the assistant reply; it no longer updates ChannelConfig.is_connected. The previous code set it to True after a successful _send_whatsapp_messages call and False in the exception branch, so the admin/UI connection indicator can remain stale for WhatsApp agents despite the latest delivery result.

Useful? React with 👍 / 👎.

_save_db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
from app.models.chat_session import ChatSession
_sess_r = await _save_db.execute(
select(ChatSession).where(ChatSession.id == uuid.UUID(session_conv_id))
)
_sess_fresh = _sess_r.scalar_one_or_none()
if _sess_fresh:
_sess_fresh.last_message_at = datetime.now(timezone.utc)
await _save_db.commit()
except Exception as exc:
logger.exception(f"[WhatsApp] Send failed for agent {agent_id}: {exc}")
config.is_connected = False
await db.commit()


return {"ok": True}
20 changes: 17 additions & 3 deletions backend/app/dao/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ async def session(self) -> AsyncGenerator[AsyncSession, None]:
yield context_session
else:
async with async_session() as session:
yield session
token = _session_ctx.set(session)
try:
yield session
if hasattr(session, "commit"):
await session.commit()
except Exception:
if hasattr(session, "rollback"):
await session.rollback()
raise
finally:
_session_ctx.reset(token)

async def get(self, id: Any) -> ModelType | None:
"""Fetch a single record by its primary key ID."""
Expand Down Expand Up @@ -71,10 +81,14 @@ async def update(self, *, db_obj: ModelType, obj_in: dict[str, Any]) -> ModelTyp
async def delete(self, *, id: Any) -> ModelType | None:
"""Delete a record by ID."""
async with self.session() as db:
obj = await self.get(id)
if hasattr(db, "get"):
obj = await db.get(self.model, id)
else:
stmt = select(self.model).where(self.model.id == id)
result = await db.execute(stmt)
obj = result.scalar_one_or_none()
if obj:
if hasattr(db, "delete"):
await db.delete(obj)
await db.flush()
return obj

3 changes: 3 additions & 0 deletions backend/app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ class Base(DeclarativeBase):
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency for getting async database sessions."""
async with async_session() as session:
token = _session_ctx.set(session)
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
_session_ctx.reset(token)


_session_ctx: ContextVar[AsyncSession | None] = ContextVar("db_session_ctx", default=None)
Expand Down
Loading