diff --git a/cmd/odek/schedule.go b/cmd/odek/schedule.go index 1ebdf4d..f7430b3 100644 --- a/cmd/odek/schedule.go +++ b/cmd/odek/schedule.go @@ -16,6 +16,7 @@ import ( "github.com/BackendStack21/odek" "github.com/BackendStack21/odek/internal/config" + "github.com/BackendStack21/odek/internal/llm" "github.com/BackendStack21/odek/internal/loop" "github.com/BackendStack21/odek/internal/render" "github.com/BackendStack21/odek/internal/schedule" @@ -462,9 +463,16 @@ func (r telegramRunner) Run(ctx context.Context, job schedule.Job) (string, int6 // telegramDeliverer delivers via the live bot for telegram jobs (sharing its // client and rate limiting) and falls back to the CLI deliverer for stdout/log. +// +// sessions, when set, lets a delivered result be recorded into the target +// chat's conversation (Option B) so a follow-up message in that chat sees what +// the schedule posted. The scheduled run itself stays isolated — only its +// output is written back. type telegramDeliverer struct { bot *telegram.Bot fallback cliDeliverer + sessions *telegram.SessionManager + log schedule.Logger } func (d telegramDeliverer) Deliver(ctx context.Context, job schedule.Job, result string) error { @@ -478,14 +486,79 @@ func (d telegramDeliverer) Deliver(ctx context.Context, job schedule.Job, result if chatID == 0 { return fmt.Errorf("no chat id (set the job's telegram: or telegram.default_chat_id)") } - return sendTelegramResult(ctx, d.bot, chatID, result) + if err := sendTelegramResult(ctx, d.bot, chatID, result); err != nil { + return err + } + // Best-effort: record the delivered turn into the chat's conversation so + // the agent can follow up on it. A failure here must NOT fail the run — the + // message was already sent — so it is logged, not returned. + if err := d.recordScheduledTurn(chatID, job, result); err != nil && d.log != nil { + d.log.Error("schedule: record delivered turn into session failed", "id", job.ID, "chat", chatID, "error", err) + } + return nil +} + +// recordScheduledTurn appends the scheduled task and its result to the target +// chat's EXISTING session, so a later interactive turn has them in context. It +// deliberately does NOT create a session when none exists: a notification-only +// chat (one that's never been used interactively) shouldn't accumulate an +// ever-growing transcript of scheduled posts. The write is serialized with the +// interactive handler — and with concurrent deliveries to the same chat — +// through the per-chat mutex, so it can't interleave with a live turn's +// read-modify-write of the same session. +func (d telegramDeliverer) recordScheduledTurn(chatID int64, job schedule.Job, result string) error { + if d.sessions == nil || result == "" { + return nil + } + + mu := getChatMutex(chatID) + mu.Lock() + defer mu.Unlock() + + cs, err := d.sessions.Load(chatID) + if err != nil { + return err + } + if cs == nil { + return nil // no active conversation to attach to — deliver-only + } + + label := job.Name + if label == "" { + label = job.ID + } + + msgs := make([]llm.Message, len(cs.Messages), len(cs.Messages)+2) + copy(msgs, cs.Messages) + + // Normally append a user turn (clearly marked as scheduler-originated, not + // typed live) followed by the assistant result — well-formed and ready for + // the next user message. But an existing session can already END on a bare + // user message (a turn cancelled before the agent replied, or a + // context-injection command). Appending another user turn there would put + // two user messages back-to-back, which strict providers (Anthropic) reject + // on the next call. In that case fold the label into a single assistant + // message so roles stay alternating. Either way the session ends on an + // assistant turn. Secrets are redacted by Store.Save. + if n := len(msgs); n > 0 && msgs[n-1].Role == "user" { + msgs = append(msgs, llm.Message{ + Role: "assistant", + Content: fmt.Sprintf("⏰ [scheduled task %q ran]\n%s", label, result), + }) + } else { + msgs = append(msgs, + llm.Message{Role: "user", Content: fmt.Sprintf("⏰ [scheduled task %q ran]\n%s", label, job.Task)}, + llm.Message{Role: "assistant", Content: result}, + ) + } + return d.sessions.Save(chatID, msgs) } // startSchedulerForBot starts the embedded scheduler unless an external // `odek schedule daemon` already holds the lock (in which case the bot defers // to it, to avoid double-firing). It returns a stop func that releases the // lock; the scheduler goroutine itself stops when ctx is cancelled. -func startSchedulerForBot(ctx context.Context, bot *telegram.Bot, resolved config.ResolvedConfig, system string, log telegram.Logger, st *schedule.Store) func() { +func startSchedulerForBot(ctx context.Context, bot *telegram.Bot, resolved config.ResolvedConfig, system string, log telegram.Logger, st *schedule.Store, sessions *telegram.SessionManager) func() { if !resolved.Schedules.Enabled { log.Info("schedule: embedded scheduler disabled by config") return func() {} @@ -508,7 +581,7 @@ func startSchedulerForBot(ctx context.Context, bot *telegram.Bot, resolved confi } sched := schedule.New(st, telegramRunner{resolved: resolved, system: system, bot: bot, mcpTools: mcpTools}, - telegramDeliverer{bot: bot, fallback: cliDeliverer{resolved: resolved}}, + telegramDeliverer{bot: bot, fallback: cliDeliverer{resolved: resolved}, sessions: sessions, log: log}, schedulerOptions(resolved.Schedules, log), ) scheduleUnlockRef = unlock diff --git a/cmd/odek/schedule_cli_test.go b/cmd/odek/schedule_cli_test.go index 2ab9fdf..b5e6714 100644 --- a/cmd/odek/schedule_cli_test.go +++ b/cmd/odek/schedule_cli_test.go @@ -291,7 +291,7 @@ func TestAcquireScheduleLock_HomeError(t *testing.T) { func TestStartSchedulerForBot_Disabled(t *testing.T) { stop := startSchedulerForBot(context.Background(), nil, config.ResolvedConfig{}, "system", - telegram.NewFileLogger(telegram.LogInfo, ""), nil) + telegram.NewFileLogger(telegram.LogInfo, ""), nil, nil) stop() // disabled → no-op stop, must not panic } @@ -306,7 +306,7 @@ func TestStartSchedulerForBot_StartAndStop(t *testing.T) { Schedules: config.ScheduleConfig{Enabled: true, MaxConcurrent: 2, Timezone: "UTC"}, } ctx, cancel := context.WithCancel(context.Background()) - stop := startSchedulerForBot(ctx, bot, resolved, "system", telegram.NewFileLogger(telegram.LogInfo, ""), st) + stop := startSchedulerForBot(ctx, bot, resolved, "system", telegram.NewFileLogger(telegram.LogInfo, ""), st, nil) cancel() stop() // drains the scheduler goroutine, cleans up MCP, releases the lock } diff --git a/cmd/odek/schedule_session_test.go b/cmd/odek/schedule_session_test.go new file mode 100644 index 0000000..880740a --- /dev/null +++ b/cmd/odek/schedule_session_test.go @@ -0,0 +1,198 @@ +package main + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/BackendStack21/odek/internal/config" + "github.com/BackendStack21/odek/internal/llm" + "github.com/BackendStack21/odek/internal/schedule" + "github.com/BackendStack21/odek/internal/session" + "github.com/BackendStack21/odek/internal/telegram" +) + +func newTestDeliverer(t *testing.T) (telegramDeliverer, *telegram.SessionManager, <-chan string) { + t.Helper() + t.Setenv("HOME", t.TempDir()) + store, err := session.NewStore() + if err != nil { + t.Fatalf("NewStore: %v", err) + } + sm := telegram.NewSessionManager(store, time.Hour) + bot, recv := newRecordingTestBot(t) + d := telegramDeliverer{ + bot: bot, + fallback: cliDeliverer{resolved: config.ResolvedConfig{}}, + sessions: sm, + log: schedule.NopLogger{}, + } + return d, sm, recv +} + +// TestScheduleDeliver_RecordsIntoExistingSession verifies Option B: a delivered +// scheduled result is appended to the target chat's existing conversation as a +// labeled user turn + the assistant result, so a follow-up message sees it. +func TestScheduleDeliver_RecordsIntoExistingSession(t *testing.T) { + d, sm, recv := newTestDeliverer(t) + chatID := int64(5551) + if err := sm.Save(chatID, []llm.Message{ + {Role: "system", Content: "sys"}, + {Role: "user", Content: "hi"}, + {Role: "assistant", Content: "hello"}, + }); err != nil { + t.Fatalf("seed session: %v", err) + } + + job := schedule.Job{ + ID: "jb-1", Name: "daily digest", Task: "summarize my day", + Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID}, + } + if err := d.Deliver(context.Background(), job, "the digest"); err != nil { + t.Fatalf("Deliver: %v", err) + } + + // The message was actually sent. + select { + case got := <-recv: + if !strings.Contains(got, "digest") { + t.Errorf("sent text = %q, want it to contain 'digest'", got) + } + case <-time.After(2 * time.Second): + t.Fatal("no message was sent to Telegram") + } + + // The conversation now ends with the scheduled exchange. + cs, err := sm.Load(chatID) + if err != nil || cs == nil { + t.Fatalf("Load: %v, cs=%v", err, cs) + } + if len(cs.Messages) != 5 { + t.Fatalf("expected 5 messages (3 seed + 2 scheduled), got %d", len(cs.Messages)) + } + userTurn := cs.Messages[3] + if userTurn.Role != "user" || !strings.Contains(userTurn.Content, "scheduled task") || !strings.Contains(userTurn.Content, "summarize my day") { + t.Errorf("scheduled user turn wrong: %+v", userTurn) + } + asstTurn := cs.Messages[4] + if asstTurn.Role != "assistant" || asstTurn.Content != "the digest" { + t.Errorf("assistant result turn wrong: %+v", asstTurn) + } +} + +// TestScheduleDeliver_PreservesAlternationAfterUserEndingSession guards the +// edge where the session already ends on a bare user message (a turn cancelled +// before the agent replied, or a context-injection command). Appending another +// user turn would produce two consecutive user messages, which Anthropic +// rejects on the next call. The write-back must fold into a single assistant +// turn instead — and never produce two same-role messages in a row. +func TestScheduleDeliver_PreservesAlternationAfterUserEndingSession(t *testing.T) { + d, sm, recv := newTestDeliverer(t) + chatID := int64(5560) + if err := sm.Save(chatID, []llm.Message{ + {Role: "system", Content: "sys"}, + {Role: "user", Content: "an interrupted turn"}, // session ends on user + }); err != nil { + t.Fatalf("seed: %v", err) + } + + job := schedule.Job{ + ID: "jb-9", Name: "digest", Task: "summarize", + Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID}, + } + if err := d.Deliver(context.Background(), job, "the result"); err != nil { + t.Fatalf("Deliver: %v", err) + } + <-recv + + cs, err := sm.Load(chatID) + if err != nil || cs == nil { + t.Fatalf("Load: %v cs=%v", err, cs) + } + // No two consecutive same-role messages anywhere. + for i := 1; i < len(cs.Messages); i++ { + if cs.Messages[i].Role == cs.Messages[i-1].Role { + t.Fatalf("consecutive %q messages at %d: %+v", cs.Messages[i].Role, i, cs.Messages) + } + } + // The result was recorded, and the session ends on an assistant turn. + last := cs.Messages[len(cs.Messages)-1] + if last.Role != "assistant" || !strings.Contains(last.Content, "the result") { + t.Errorf("last message should be the assistant result, got %+v", last) + } +} + +// TestScheduleDeliver_NoSessionNotCreated verifies a notification-only chat +// (never used interactively) is NOT given a session just because a schedule +// posted to it — avoiding an ever-growing transcript of scheduled posts. +func TestScheduleDeliver_NoSessionNotCreated(t *testing.T) { + d, sm, recv := newTestDeliverer(t) + chatID := int64(5552) + + job := schedule.Job{ + ID: "jb-2", Name: "ping", Task: "ping", + Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID}, + } + if err := d.Deliver(context.Background(), job, "pong"); err != nil { + t.Fatalf("Deliver: %v", err) + } + + select { + case <-recv: + case <-time.After(2 * time.Second): + t.Fatal("no message was sent to Telegram") + } + + cs, err := sm.Load(chatID) + if err != nil { + t.Fatalf("Load: %v", err) + } + if cs != nil { + t.Errorf("a notification-only chat should not get a session, got %d messages", len(cs.Messages)) + } +} + +// TestScheduleDeliver_EmptyResultNotRecorded verifies an empty result is not +// appended (nothing meaningful to record). +func TestScheduleDeliver_EmptyResultNotRecorded(t *testing.T) { + d, sm, _ := newTestDeliverer(t) + chatID := int64(5553) + if err := sm.Save(chatID, []llm.Message{{Role: "user", Content: "hi"}}); err != nil { + t.Fatalf("seed: %v", err) + } + + job := schedule.Job{ + ID: "jb-3", Name: "noop", Task: "t", + Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID}, + } + if err := d.Deliver(context.Background(), job, ""); err != nil { + t.Fatalf("Deliver: %v", err) + } + + cs, err := sm.Load(chatID) + if err != nil || cs == nil { + t.Fatalf("Load: %v cs=%v", err, cs) + } + if len(cs.Messages) != 1 { + t.Errorf("empty result must not append, got %d messages", len(cs.Messages)) + } +} + +// TestScheduleDeliver_NilSessionManagerNoPanic verifies the write-back is a +// safe no-op when no SessionManager is wired (e.g. the CLI daemon path). +func TestScheduleDeliver_NilSessionManagerNoPanic(t *testing.T) { + bot, recv := newRecordingTestBot(t) + d := telegramDeliverer{bot: bot, fallback: cliDeliverer{resolved: config.ResolvedConfig{}}} + job := schedule.Job{ + Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: 5554}, + } + if err := d.Deliver(context.Background(), job, "result"); err != nil { + t.Fatalf("Deliver: %v", err) + } + select { + case <-recv: + case <-time.After(2 * time.Second): + t.Fatal("no message was sent") + } +} diff --git a/cmd/odek/telegram.go b/cmd/odek/telegram.go index 34ec5d3..9b7d51e 100644 --- a/cmd/odek/telegram.go +++ b/cmd/odek/telegram.go @@ -700,7 +700,7 @@ func telegramCmd(args []string) error { // process's resolved config — so no environment-inheritance problem and no // separate cron daemon. If an external `odek schedule daemon` already holds // the lock, this defers to it instead of double-firing. - stopScheduler := startSchedulerForBot(ctx, bot, resolved, systemMessage, handlerLog, scheduleStore) + stopScheduler := startSchedulerForBot(ctx, bot, resolved, systemMessage, handlerLog, scheduleStore, sessionManager) defer stopScheduler() // 17. Process updates until the channel is closed (ctx cancelled).