Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions cmd/odek/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/BackendStack21/odek"
"github.com/BackendStack21/odek/internal/config"
"github.com/BackendStack21/odek/internal/llm"
"github.com/BackendStack21/odek/internal/loop"
"github.com/BackendStack21/odek/internal/render"
"github.com/BackendStack21/odek/internal/schedule"
Expand Down Expand Up @@ -462,9 +463,16 @@ func (r telegramRunner) Run(ctx context.Context, job schedule.Job) (string, int6

// telegramDeliverer delivers via the live bot for telegram jobs (sharing its
// client and rate limiting) and falls back to the CLI deliverer for stdout/log.
//
// sessions, when set, lets a delivered result be recorded into the target
// chat's conversation (Option B) so a follow-up message in that chat sees what
// the schedule posted. The scheduled run itself stays isolated — only its
// output is written back.
type telegramDeliverer struct {
bot *telegram.Bot
fallback cliDeliverer
sessions *telegram.SessionManager
log schedule.Logger
}

func (d telegramDeliverer) Deliver(ctx context.Context, job schedule.Job, result string) error {
Expand All @@ -478,14 +486,79 @@ func (d telegramDeliverer) Deliver(ctx context.Context, job schedule.Job, result
if chatID == 0 {
return fmt.Errorf("no chat id (set the job's telegram:<chatID> or telegram.default_chat_id)")
}
return sendTelegramResult(ctx, d.bot, chatID, result)
if err := sendTelegramResult(ctx, d.bot, chatID, result); err != nil {
return err
}
// Best-effort: record the delivered turn into the chat's conversation so
// the agent can follow up on it. A failure here must NOT fail the run — the
// message was already sent — so it is logged, not returned.
if err := d.recordScheduledTurn(chatID, job, result); err != nil && d.log != nil {
d.log.Error("schedule: record delivered turn into session failed", "id", job.ID, "chat", chatID, "error", err)
}
return nil
}

// recordScheduledTurn appends the scheduled task and its result to the target
// chat's EXISTING session, so a later interactive turn has them in context. It
// deliberately does NOT create a session when none exists: a notification-only
// chat (one that's never been used interactively) shouldn't accumulate an
// ever-growing transcript of scheduled posts. The write is serialized with the
// interactive handler — and with concurrent deliveries to the same chat —
// through the per-chat mutex, so it can't interleave with a live turn's
// read-modify-write of the same session.
func (d telegramDeliverer) recordScheduledTurn(chatID int64, job schedule.Job, result string) error {
if d.sessions == nil || result == "" {
return nil
}

mu := getChatMutex(chatID)
mu.Lock()
defer mu.Unlock()

cs, err := d.sessions.Load(chatID)
if err != nil {
return err
}
if cs == nil {
return nil // no active conversation to attach to — deliver-only
}

label := job.Name
if label == "" {
label = job.ID
}

msgs := make([]llm.Message, len(cs.Messages), len(cs.Messages)+2)
copy(msgs, cs.Messages)

// Normally append a user turn (clearly marked as scheduler-originated, not
// typed live) followed by the assistant result — well-formed and ready for
// the next user message. But an existing session can already END on a bare
// user message (a turn cancelled before the agent replied, or a
// context-injection command). Appending another user turn there would put
// two user messages back-to-back, which strict providers (Anthropic) reject
// on the next call. In that case fold the label into a single assistant
// message so roles stay alternating. Either way the session ends on an
// assistant turn. Secrets are redacted by Store.Save.
if n := len(msgs); n > 0 && msgs[n-1].Role == "user" {
msgs = append(msgs, llm.Message{
Role: "assistant",
Content: fmt.Sprintf("⏰ [scheduled task %q ran]\n%s", label, result),
})
} else {
msgs = append(msgs,
llm.Message{Role: "user", Content: fmt.Sprintf("⏰ [scheduled task %q ran]\n%s", label, job.Task)},
llm.Message{Role: "assistant", Content: result},
)
}
return d.sessions.Save(chatID, msgs)
}

// startSchedulerForBot starts the embedded scheduler unless an external
// `odek schedule daemon` already holds the lock (in which case the bot defers
// to it, to avoid double-firing). It returns a stop func that releases the
// lock; the scheduler goroutine itself stops when ctx is cancelled.
func startSchedulerForBot(ctx context.Context, bot *telegram.Bot, resolved config.ResolvedConfig, system string, log telegram.Logger, st *schedule.Store) func() {
func startSchedulerForBot(ctx context.Context, bot *telegram.Bot, resolved config.ResolvedConfig, system string, log telegram.Logger, st *schedule.Store, sessions *telegram.SessionManager) func() {
if !resolved.Schedules.Enabled {
log.Info("schedule: embedded scheduler disabled by config")
return func() {}
Expand All @@ -508,7 +581,7 @@ func startSchedulerForBot(ctx context.Context, bot *telegram.Bot, resolved confi
}
sched := schedule.New(st,
telegramRunner{resolved: resolved, system: system, bot: bot, mcpTools: mcpTools},
telegramDeliverer{bot: bot, fallback: cliDeliverer{resolved: resolved}},
telegramDeliverer{bot: bot, fallback: cliDeliverer{resolved: resolved}, sessions: sessions, log: log},
schedulerOptions(resolved.Schedules, log),
)
scheduleUnlockRef = unlock
Expand Down
4 changes: 2 additions & 2 deletions cmd/odek/schedule_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestAcquireScheduleLock_HomeError(t *testing.T) {

func TestStartSchedulerForBot_Disabled(t *testing.T) {
stop := startSchedulerForBot(context.Background(), nil, config.ResolvedConfig{}, "system",
telegram.NewFileLogger(telegram.LogInfo, ""), nil)
telegram.NewFileLogger(telegram.LogInfo, ""), nil, nil)
stop() // disabled → no-op stop, must not panic
}

Expand All @@ -306,7 +306,7 @@ func TestStartSchedulerForBot_StartAndStop(t *testing.T) {
Schedules: config.ScheduleConfig{Enabled: true, MaxConcurrent: 2, Timezone: "UTC"},
}
ctx, cancel := context.WithCancel(context.Background())
stop := startSchedulerForBot(ctx, bot, resolved, "system", telegram.NewFileLogger(telegram.LogInfo, ""), st)
stop := startSchedulerForBot(ctx, bot, resolved, "system", telegram.NewFileLogger(telegram.LogInfo, ""), st, nil)
cancel()
stop() // drains the scheduler goroutine, cleans up MCP, releases the lock
}
Expand Down
198 changes: 198 additions & 0 deletions cmd/odek/schedule_session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package main

import (
"context"
"strings"
"testing"
"time"

"github.com/BackendStack21/odek/internal/config"
"github.com/BackendStack21/odek/internal/llm"
"github.com/BackendStack21/odek/internal/schedule"
"github.com/BackendStack21/odek/internal/session"
"github.com/BackendStack21/odek/internal/telegram"
)

func newTestDeliverer(t *testing.T) (telegramDeliverer, *telegram.SessionManager, <-chan string) {
t.Helper()
t.Setenv("HOME", t.TempDir())
store, err := session.NewStore()
if err != nil {
t.Fatalf("NewStore: %v", err)
}
sm := telegram.NewSessionManager(store, time.Hour)
bot, recv := newRecordingTestBot(t)
d := telegramDeliverer{
bot: bot,
fallback: cliDeliverer{resolved: config.ResolvedConfig{}},
sessions: sm,
log: schedule.NopLogger{},
}
return d, sm, recv
}

// TestScheduleDeliver_RecordsIntoExistingSession verifies Option B: a delivered
// scheduled result is appended to the target chat's existing conversation as a
// labeled user turn + the assistant result, so a follow-up message sees it.
func TestScheduleDeliver_RecordsIntoExistingSession(t *testing.T) {
d, sm, recv := newTestDeliverer(t)
chatID := int64(5551)
if err := sm.Save(chatID, []llm.Message{
{Role: "system", Content: "sys"},
{Role: "user", Content: "hi"},
{Role: "assistant", Content: "hello"},
}); err != nil {
t.Fatalf("seed session: %v", err)
}

job := schedule.Job{
ID: "jb-1", Name: "daily digest", Task: "summarize my day",
Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID},
}
if err := d.Deliver(context.Background(), job, "the digest"); err != nil {
t.Fatalf("Deliver: %v", err)
}

// The message was actually sent.
select {
case got := <-recv:
if !strings.Contains(got, "digest") {
t.Errorf("sent text = %q, want it to contain 'digest'", got)
}
case <-time.After(2 * time.Second):
t.Fatal("no message was sent to Telegram")
}

// The conversation now ends with the scheduled exchange.
cs, err := sm.Load(chatID)
if err != nil || cs == nil {
t.Fatalf("Load: %v, cs=%v", err, cs)
}
if len(cs.Messages) != 5 {
t.Fatalf("expected 5 messages (3 seed + 2 scheduled), got %d", len(cs.Messages))
}
userTurn := cs.Messages[3]
if userTurn.Role != "user" || !strings.Contains(userTurn.Content, "scheduled task") || !strings.Contains(userTurn.Content, "summarize my day") {
t.Errorf("scheduled user turn wrong: %+v", userTurn)
}
asstTurn := cs.Messages[4]
if asstTurn.Role != "assistant" || asstTurn.Content != "the digest" {
t.Errorf("assistant result turn wrong: %+v", asstTurn)
}
}

// TestScheduleDeliver_PreservesAlternationAfterUserEndingSession guards the
// edge where the session already ends on a bare user message (a turn cancelled
// before the agent replied, or a context-injection command). Appending another
// user turn would produce two consecutive user messages, which Anthropic
// rejects on the next call. The write-back must fold into a single assistant
// turn instead — and never produce two same-role messages in a row.
func TestScheduleDeliver_PreservesAlternationAfterUserEndingSession(t *testing.T) {
d, sm, recv := newTestDeliverer(t)
chatID := int64(5560)
if err := sm.Save(chatID, []llm.Message{
{Role: "system", Content: "sys"},
{Role: "user", Content: "an interrupted turn"}, // session ends on user
}); err != nil {
t.Fatalf("seed: %v", err)
}

job := schedule.Job{
ID: "jb-9", Name: "digest", Task: "summarize",
Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID},
}
if err := d.Deliver(context.Background(), job, "the result"); err != nil {
t.Fatalf("Deliver: %v", err)
}
<-recv

cs, err := sm.Load(chatID)
if err != nil || cs == nil {
t.Fatalf("Load: %v cs=%v", err, cs)
}
// No two consecutive same-role messages anywhere.
for i := 1; i < len(cs.Messages); i++ {
if cs.Messages[i].Role == cs.Messages[i-1].Role {
t.Fatalf("consecutive %q messages at %d: %+v", cs.Messages[i].Role, i, cs.Messages)
}
}
// The result was recorded, and the session ends on an assistant turn.
last := cs.Messages[len(cs.Messages)-1]
if last.Role != "assistant" || !strings.Contains(last.Content, "the result") {
t.Errorf("last message should be the assistant result, got %+v", last)
}
}

// TestScheduleDeliver_NoSessionNotCreated verifies a notification-only chat
// (never used interactively) is NOT given a session just because a schedule
// posted to it — avoiding an ever-growing transcript of scheduled posts.
func TestScheduleDeliver_NoSessionNotCreated(t *testing.T) {
d, sm, recv := newTestDeliverer(t)
chatID := int64(5552)

job := schedule.Job{
ID: "jb-2", Name: "ping", Task: "ping",
Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID},
}
if err := d.Deliver(context.Background(), job, "pong"); err != nil {
t.Fatalf("Deliver: %v", err)
}

select {
case <-recv:
case <-time.After(2 * time.Second):
t.Fatal("no message was sent to Telegram")
}

cs, err := sm.Load(chatID)
if err != nil {
t.Fatalf("Load: %v", err)
}
if cs != nil {
t.Errorf("a notification-only chat should not get a session, got %d messages", len(cs.Messages))
}
}

// TestScheduleDeliver_EmptyResultNotRecorded verifies an empty result is not
// appended (nothing meaningful to record).
func TestScheduleDeliver_EmptyResultNotRecorded(t *testing.T) {
d, sm, _ := newTestDeliverer(t)
chatID := int64(5553)
if err := sm.Save(chatID, []llm.Message{{Role: "user", Content: "hi"}}); err != nil {
t.Fatalf("seed: %v", err)
}

job := schedule.Job{
ID: "jb-3", Name: "noop", Task: "t",
Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: chatID},
}
if err := d.Deliver(context.Background(), job, ""); err != nil {
t.Fatalf("Deliver: %v", err)
}

cs, err := sm.Load(chatID)
if err != nil || cs == nil {
t.Fatalf("Load: %v cs=%v", err, cs)
}
if len(cs.Messages) != 1 {
t.Errorf("empty result must not append, got %d messages", len(cs.Messages))
}
}

// TestScheduleDeliver_NilSessionManagerNoPanic verifies the write-back is a
// safe no-op when no SessionManager is wired (e.g. the CLI daemon path).
func TestScheduleDeliver_NilSessionManagerNoPanic(t *testing.T) {
bot, recv := newRecordingTestBot(t)
d := telegramDeliverer{bot: bot, fallback: cliDeliverer{resolved: config.ResolvedConfig{}}}
job := schedule.Job{
Deliver: schedule.Delivery{Kind: schedule.DeliverTelegram, ChatID: 5554},
}
if err := d.Deliver(context.Background(), job, "result"); err != nil {
t.Fatalf("Deliver: %v", err)
}
select {
case <-recv:
case <-time.After(2 * time.Second):
t.Fatal("no message was sent")
}
}
2 changes: 1 addition & 1 deletion cmd/odek/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func telegramCmd(args []string) error {
// process's resolved config — so no environment-inheritance problem and no
// separate cron daemon. If an external `odek schedule daemon` already holds
// the lock, this defers to it instead of double-firing.
stopScheduler := startSchedulerForBot(ctx, bot, resolved, systemMessage, handlerLog, scheduleStore)
stopScheduler := startSchedulerForBot(ctx, bot, resolved, systemMessage, handlerLog, scheduleStore, sessionManager)
defer stopScheduler()

// 17. Process updates until the channel is closed (ctx cancelled).
Expand Down
Loading