diff --git a/README.md b/README.md index d37e39f..79a5697 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,8 @@ The server dynamically filters the available tools based on the permissions asso - **Required Permission**: `metrics-data.read` - **Sample Prompt**: "Show the top 10 underutilized pods by memory quota in cluster 'production'" +> **Note:** When a time window is provided, the underlying PromQL is wrapped in the aggregation appropriate for each tool (`avg_over_time`, `max_over_time`, `min_over_time`, `increase`, etc.) and evaluated at `end`. See [`internal/infra/mcp/tools/README.md`](./internal/infra/mcp/tools/README.md) for the per-tool aggregation table. + ### Sysdig Secure - **`list_runtime_events`** diff --git a/cmd/server/main.go b/cmd/server/main.go index 708455c..ff7a4ba 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -119,22 +119,22 @@ func setupHandler(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *mcp tools.NewToolRunSysql(sysdigClient), tools.NewToolGenerateSysql(sysdigClient), - tools.NewK8sListClusters(sysdigClient), - tools.NewK8sListNodes(sysdigClient), - tools.NewK8sListCronjobs(sysdigClient), - tools.NewK8sListWorkloads(sysdigClient), - tools.NewK8sListPodContainers(sysdigClient), - tools.NewK8sListTopUnavailablePods(sysdigClient), - tools.NewK8sListTopRestartedPods(sysdigClient), - tools.NewK8sListTopHttpErrorsInPods(sysdigClient), - tools.NewK8sListTopNetworkErrorsInPods(sysdigClient), - tools.NewK8sListCountPodsPerCluster(sysdigClient), - tools.NewK8sListUnderutilizedPodsCPUQuota(sysdigClient), - tools.NewK8sListTopCPUConsumedWorkload(sysdigClient), - tools.NewK8sListTopCPUConsumedContainer(sysdigClient), - tools.NewK8sListUnderutilizedPodsMemoryQuota(sysdigClient), - tools.NewK8sListTopMemoryConsumedWorkload(sysdigClient), - tools.NewK8sListTopMemoryConsumedContainer(sysdigClient), + tools.NewK8sListClusters(sysdigClient, systemClock), + tools.NewK8sListNodes(sysdigClient, systemClock), + tools.NewK8sListCronjobs(sysdigClient, systemClock), + tools.NewK8sListWorkloads(sysdigClient, systemClock), + tools.NewK8sListPodContainers(sysdigClient, systemClock), + tools.NewK8sListTopUnavailablePods(sysdigClient, systemClock), + tools.NewK8sListTopRestartedPods(sysdigClient, systemClock), + tools.NewK8sListTopHttpErrorsInPods(sysdigClient, systemClock), + tools.NewK8sListTopNetworkErrorsInPods(sysdigClient, systemClock), + tools.NewK8sListCountPodsPerCluster(sysdigClient, systemClock), + tools.NewK8sListUnderutilizedPodsCPUQuota(sysdigClient, systemClock), + tools.NewK8sListTopCPUConsumedWorkload(sysdigClient, systemClock), + tools.NewK8sListTopCPUConsumedContainer(sysdigClient, systemClock), + tools.NewK8sListUnderutilizedPodsMemoryQuota(sysdigClient, systemClock), + tools.NewK8sListTopMemoryConsumedWorkload(sysdigClient, systemClock), + tools.NewK8sListTopMemoryConsumedContainer(sysdigClient, systemClock), ) return handler } diff --git a/internal/infra/mcp/tools/README.md b/internal/infra/mcp/tools/README.md index f69c95a..4f3e312 100644 --- a/internal/infra/mcp/tools/README.md +++ b/internal/infra/mcp/tools/README.md @@ -38,6 +38,35 @@ The handler filters tools dynamically based on the Sysdig user's permissions. Ea |---|---|---|---|---| | `generate_sysql` | `tool_generate_sysql.go` | Convert natural language to SysQL via Sysdig Sage. | `sage.exec` (does not work with Service Accounts) | "Create a SysQL to list S3 buckets." | +## Historical range (start / end) + +All Sysdig Monitor `k8s_list_*` tools accept two optional parameters: + +- `start` — RFC3339 timestamp, e.g. `2026-04-16T00:00:00Z` +- `end` — RFC3339 timestamp, e.g. `2026-04-16T01:00:00Z` + +When omitted, tools return an instant snapshot (current behaviour). When provided, +the underlying PromQL is wrapped in the aggregation appropriate for each tool and +evaluated at `end`: + +| Tool group | Wrapping applied when windowed | +|---|---| +| CPU / memory usage, underutilized quota, pod count | `avg_over_time(metric[Ns])` | +| Top restarted pods | `increase(kube_pod_container_status_restarts_total[Ns])` | +| Top unavailable pods | `min_over_time(kube_workload_status_unavailable[Ns]) >= 1` (Sysdig-canonical pattern — requires continuous unavailability for the entire window) | +| HTTP / network errors | `sum_over_time(metric[Ns]) / N` (rate per second) | +| Inventory tools (clusters, nodes, workloads, pod_containers, cronjobs) | `max_over_time(metric[Ns]) > 0` (workloads with status=ready/desired/running drop the `> 0` guard) | + +Validation rules (helper: `utils.go`): + +- `end` without `start` → error. +- `start` without `end` → `end` defaults to now. +- `end` in the future → clamped to now. +- `end <= start` → error. + +Windowed queries carry a 60 s client-side PromQL `Timeout` to fail fast before the +Sysdig edge proxy's own 80–90 s cut-off. + # Adding a New Tool 1. **See other tools:** Check how other tools are implemented so you can have the context on how they should look like. diff --git a/internal/infra/mcp/tools/tool_k8s_list_clusters.go b/internal/infra/mcp/tools/tool_k8s_list_clusters.go index 85ac4ef..7942083 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_clusters.go +++ b/internal/infra/mcp/tools/tool_k8s_list_clusters.go @@ -8,27 +8,31 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListClusters struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListClusters(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListClusters { +func NewK8sListClusters(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListClusters { return &K8sListClusters{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListClusters) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_clusters", - mcp.WithDescription("Lists the cluster information for all clusters or just the cluster specified."), + mcp.WithDescription("Lists the cluster information for all clusters or just the cluster specified. Optionally pass start/end (RFC3339) to list clusters that existed at any point in the window."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithNumber("limit", mcp.Description("Maximum number of clusters to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -41,16 +45,21 @@ func (t *K8sListClusters) handle(ctx context.Context, request mcp.CallToolReques clusterName := mcp.ParseString(request, "cluster_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := "kube_cluster_info" - if clusterName != "" { - query = fmt.Sprintf("kube_cluster_info{cluster=\"%s\"}", clusterName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil } + query := buildKubeClusterInfoQuery(clusterName, tw) + limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -69,3 +78,14 @@ func (t *K8sListClusters) handle(ctx context.Context, request mcp.CallToolReques return mcp.NewToolResultJSON(queryResponse) } + +func buildKubeClusterInfoQuery(clusterName string, tw TimeWindow) string { + metric := "kube_cluster_info" + if clusterName != "" { + metric = fmt.Sprintf(`kube_cluster_info{cluster="%s"}`, clusterName) + } + if !tw.IsZero() { + return fmt.Sprintf("max_over_time(%s%s) > 0", metric, tw.RangeSelector()) + } + return metric +} diff --git a/internal/infra/mcp/tools/tool_k8s_list_clusters_test.go b/internal/infra/mcp/tools/tool_k8s_list_clusters_test.go index 0aaf394..5acffe1 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_clusters_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_clusters_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListClusters Tool", func() { var ( tool *tools.K8sListClusters mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListClusters Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListClusters(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListClusters(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -103,6 +109,39 @@ var _ = Describe("KubernetesListClusters Tool", func() { Limit: new(sysdig.LimitQuery(20)), }, ), + Entry("windowed, no filters", + "k8s_list_clusters", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_clusters", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_cluster_info[3600s]) > 0`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), + Entry("windowed, cluster_name filter", + "k8s_list_clusters", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_clusters", + Arguments: map[string]any{ + "cluster_name": "my_cluster", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_cluster_info{cluster="my_cluster"}[3600s]) > 0`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster.go b/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster.go index 3ec4ab0..a058682 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster.go +++ b/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster.go @@ -9,28 +9,32 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListCountPodsPerCluster struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListCountPodsPerCluster(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListCountPodsPerCluster { +func NewK8sListCountPodsPerCluster(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListCountPodsPerCluster { return &K8sListCountPodsPerCluster{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListCountPodsPerCluster) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_count_pods_per_cluster", - mcp.WithDescription("List the count of running Kubernetes Pods grouped by cluster and namespace."), + mcp.WithDescription("List the count of running Kubernetes Pods grouped by cluster and namespace. Optionally pass start/end (RFC3339) to count pods averaged over a historical window instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithNumber("limit", mcp.Description("Maximum number of results to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -44,13 +48,21 @@ func (t *K8sListCountPodsPerCluster) handle(ctx context.Context, request mcp.Cal namespaceName := mcp.ParseString(request, "namespace_name", "") limit := mcp.ParseInt(request, "limit", 20) - query := buildKubePodCountQuery(clusterName, namespaceName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildKubePodCountQuery(clusterName, namespaceName, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -70,7 +82,7 @@ func (t *K8sListCountPodsPerCluster) handle(ctx context.Context, request mcp.Cal return mcp.NewToolResultJSON(queryResponse) } -func buildKubePodCountQuery(clusterName, namespaceName string) string { +func buildKubePodCountQuery(clusterName, namespaceName string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -84,5 +96,10 @@ func buildKubePodCountQuery(clusterName, namespaceName string) string { filterString = fmt.Sprintf("{%s}", strings.Join(filters, ",")) } - return fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name) (kube_pod_info%s)", filterString) + metric := fmt.Sprintf("kube_pod_info%s", filterString) + if !tw.IsZero() { + metric = fmt.Sprintf("avg_over_time(%s%s)", metric, tw.RangeSelector()) + } + + return fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name) (%s)", metric) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster_test.go b/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster_test.go index f9b5c33..618d678 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_count_pods_per_cluster_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListCountPodsPerCluster Tool", func() { var ( tool *tools.K8sListCountPodsPerCluster mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListCountPodsPerCluster Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListCountPodsPerCluster(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListCountPodsPerCluster(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -116,6 +122,22 @@ var _ = Describe("KubernetesListCountPodsPerCluster Tool", func() { Limit: new(sysdig.LimitQuery(20)), }, ), + Entry("windowed, both start and end", + "k8s_list_count_pods_per_cluster", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_count_pods_per_cluster", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `sum by (kube_cluster_name, kube_namespace_name) (avg_over_time(kube_pod_info[3600s]))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 20), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_cronjobs.go b/internal/infra/mcp/tools/tool_k8s_list_cronjobs.go index 15199e5..4e6e626 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_cronjobs.go +++ b/internal/infra/mcp/tools/tool_k8s_list_cronjobs.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListCronjobs struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListCronjobs(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListCronjobs { +func NewK8sListCronjobs(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListCronjobs { return &K8sListCronjobs{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListCronjobs) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_cronjobs", - mcp.WithDescription("Retrieves information from the cronjobs in the cluster."), + mcp.WithDescription("Retrieves information from the cronjobs in the cluster. Optionally pass start/end (RFC3339) to list cronjobs that existed at any point in the window."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("cronjob_name", mcp.Description("The name of the cronjob to filter by.")), @@ -32,6 +35,7 @@ func (t *K8sListCronjobs) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of cronjobs to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -46,13 +50,21 @@ func (t *K8sListCronjobs) handle(ctx context.Context, request mcp.CallToolReques cronjobName := mcp.ParseString(request, "cronjob_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildKubeCronjobInfoQuery(clusterName, namespaceName, cronjobName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildKubeCronjobInfoQuery(clusterName, namespaceName, cronjobName, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -72,7 +84,7 @@ func (t *K8sListCronjobs) handle(ctx context.Context, request mcp.CallToolReques return mcp.NewToolResultJSON(queryResponse) } -func buildKubeCronjobInfoQuery(clusterName, namespaceName, cronjobName string) string { +func buildKubeCronjobInfoQuery(clusterName, namespaceName, cronjobName string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -84,9 +96,12 @@ func buildKubeCronjobInfoQuery(clusterName, namespaceName, cronjobName string) s filters = append(filters, fmt.Sprintf("kube_cronjob_name=\"%s\"", cronjobName)) } - if len(filters) == 0 { - return "kube_cronjob_info" + metric := "kube_cronjob_info" + if len(filters) > 0 { + metric = fmt.Sprintf("kube_cronjob_info{%s}", strings.Join(filters, ",")) } - - return fmt.Sprintf("kube_cronjob_info{%s}", strings.Join(filters, ",")) + if !tw.IsZero() { + return fmt.Sprintf("max_over_time(%s%s) > 0", metric, tw.RangeSelector()) + } + return metric } diff --git a/internal/infra/mcp/tools/tool_k8s_list_cronjobs_test.go b/internal/infra/mcp/tools/tool_k8s_list_cronjobs_test.go index 15f0b2c..f94445c 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_cronjobs_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_cronjobs_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListCronjobs Tool", func() { var ( tool *tools.K8sListCronjobs mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListCronjobs Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListCronjobs(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListCronjobs(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -129,6 +135,23 @@ var _ = Describe("KubernetesListCronjobs Tool", func() { Limit: new(sysdig.LimitQuery(10)), }, ), + Entry("windowed, with cluster filter", + "k8s_list_cronjobs", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_cronjobs", + Arguments: map[string]any{ + "cluster_name": "my_cluster", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_cronjob_info{kube_cluster_name="my_cluster"}[3600s]) > 0`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_nodes.go b/internal/infra/mcp/tools/tool_k8s_list_nodes.go index 1aa9c81..f44e2f3 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_nodes.go +++ b/internal/infra/mcp/tools/tool_k8s_list_nodes.go @@ -9,28 +9,32 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListNodes struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListNodes(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListNodes { +func NewK8sListNodes(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListNodes { return &K8sListNodes{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListNodes) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_nodes", - mcp.WithDescription("Lists the information from all nodes, all nodes from a cluster or a specific node with some name."), + mcp.WithDescription("Lists the information from all nodes, all nodes from a cluster or a specific node with some name. Optionally pass start/end (RFC3339) to list nodes that existed at any point in the window."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("node_name", mcp.Description("The name of the node to filter by.")), mcp.WithNumber("limit", mcp.Description("Maximum number of nodes to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -44,13 +48,21 @@ func (t *K8sListNodes) handle(ctx context.Context, request mcp.CallToolRequest) nodeName := mcp.ParseString(request, "node_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildKubeNodeInfoQuery(clusterName, nodeName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildKubeNodeInfoQuery(clusterName, nodeName, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -70,7 +82,7 @@ func (t *K8sListNodes) handle(ctx context.Context, request mcp.CallToolRequest) return mcp.NewToolResultJSON(queryResponse) } -func buildKubeNodeInfoQuery(clusterName, nodeName string) string { +func buildKubeNodeInfoQuery(clusterName, nodeName string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("cluster=\"%s\"", clusterName)) @@ -79,9 +91,12 @@ func buildKubeNodeInfoQuery(clusterName, nodeName string) string { filters = append(filters, fmt.Sprintf("kube_node_name=\"%s\"", nodeName)) } - if len(filters) == 0 { - return "kube_node_info" + metric := "kube_node_info" + if len(filters) > 0 { + metric = fmt.Sprintf("kube_node_info{%s}", strings.Join(filters, ",")) } - - return fmt.Sprintf("kube_node_info{%s}", strings.Join(filters, ",")) + if !tw.IsZero() { + return fmt.Sprintf("max_over_time(%s%s) > 0", metric, tw.RangeSelector()) + } + return metric } diff --git a/internal/infra/mcp/tools/tool_k8s_list_nodes_test.go b/internal/infra/mcp/tools/tool_k8s_list_nodes_test.go index 9c9d4f2..a10f7a1 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_nodes_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_nodes_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListNodes Tool", func() { var ( tool *tools.K8sListNodes mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListNodes Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListNodes(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListNodes(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -129,6 +135,23 @@ var _ = Describe("KubernetesListNodes Tool", func() { Limit: new(sysdig.LimitQuery(20)), }, ), + Entry("windowed, cluster filter", + "k8s_list_nodes", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_nodes", + Arguments: map[string]any{ + "cluster_name": "my_cluster", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_node_info{cluster="my_cluster"}[3600s]) > 0`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_pod_containers.go b/internal/infra/mcp/tools/tool_k8s_list_pod_containers.go index 5a42a65..572af57 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_pod_containers.go +++ b/internal/infra/mcp/tools/tool_k8s_list_pod_containers.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListPodContainers struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListPodContainers(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListPodContainers { +func NewK8sListPodContainers(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListPodContainers { return &K8sListPodContainers{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListPodContainers) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_pod_containers", - mcp.WithDescription("Retrieves information from a particular pod and container."), + mcp.WithDescription("Retrieves information from a particular pod and container. Optionally pass start/end (RFC3339) to list pod containers that existed at any point in the window."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -37,6 +40,7 @@ func (t *K8sListPodContainers) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of pod containers to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -56,13 +60,21 @@ func (t *K8sListPodContainers) handle(ctx context.Context, request mcp.CallToolR nodeName := mcp.ParseString(request, "node_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildKubePodContainerInfoQuery(clusterName, namespaceName, workloadType, workloadName, podName, containerName, imagePullstring, nodeName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildKubePodContainerInfoQuery(clusterName, namespaceName, workloadType, workloadName, podName, containerName, imagePullstring, nodeName, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -82,7 +94,7 @@ func (t *K8sListPodContainers) handle(ctx context.Context, request mcp.CallToolR return mcp.NewToolResultJSON(queryResponse) } -func buildKubePodContainerInfoQuery(clusterName, namespaceName, workloadType, workloadName, podName, containerName, imagePullstring, nodeName string) string { +func buildKubePodContainerInfoQuery(clusterName, namespaceName, workloadType, workloadName, podName, containerName, imagePullstring, nodeName string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -109,9 +121,12 @@ func buildKubePodContainerInfoQuery(clusterName, namespaceName, workloadType, wo filters = append(filters, fmt.Sprintf("kube_node_name=\"%s\"", nodeName)) } - if len(filters) == 0 { - return "kube_pod_container_info" + metric := "kube_pod_container_info" + if len(filters) > 0 { + metric = fmt.Sprintf("kube_pod_container_info{%s}", strings.Join(filters, ",")) } - - return fmt.Sprintf("kube_pod_container_info{%s}", strings.Join(filters, ",")) + if !tw.IsZero() { + return fmt.Sprintf("max_over_time(%s%s) > 0", metric, tw.RangeSelector()) + } + return metric } diff --git a/internal/infra/mcp/tools/tool_k8s_list_pod_containers_test.go b/internal/infra/mcp/tools/tool_k8s_list_pod_containers_test.go index 7dafb51..6a4b946 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_pod_containers_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_pod_containers_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListPodContainers Tool", func() { var ( tool *tools.K8sListPodContainers mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListPodContainers Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListPodContainers(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListPodContainers(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -203,6 +209,23 @@ var _ = Describe("KubernetesListPodContainers Tool", func() { Limit: new(sysdig.LimitQuery(10)), }, ), + Entry("windowed, with cluster filter", + "k8s_list_pod_containers", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_pod_containers", + Arguments: map[string]any{ + "cluster_name": "my_cluster", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_pod_container_info{kube_cluster_name="my_cluster"}[3600s]) > 0`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container.go b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container.go index da42d23..f315176 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopCPUConsumedContainer struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopCPUConsumedContainer(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopCPUConsumedContainer { +func NewK8sListTopCPUConsumedContainer(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopCPUConsumedContainer { return &K8sListTopCPUConsumedContainer{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopCPUConsumedContainer) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_cpu_consumed_container", - mcp.WithDescription("Identifies the Kubernetes containers consuming the most CPU (in cores)."), + mcp.WithDescription("Identifies the Kubernetes containers consuming the most CPU (in cores). Optionally pass start/end (RFC3339) to query a historical window (averaged over the window) instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -33,6 +36,7 @@ func (t *K8sListTopCPUConsumedContainer) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of containers to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -48,11 +52,19 @@ func (t *K8sListTopCPUConsumedContainer) handle(ctx context.Context, request mcp workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query := buildTopCPUConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName, limit) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildTopCPUConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName, limit, tw) params := &sysdig.GetQueryV1Params{ Query: query, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -72,7 +84,7 @@ func (t *K8sListTopCPUConsumedContainer) handle(ctx context.Context, request mcp return mcp.NewToolResultJSON(queryResponse) } -func buildTopCPUConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName string, limit int) string { +func buildTopCPUConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName string, limit int, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf(`kube_cluster_name="%s"`, clusterName)) @@ -92,5 +104,10 @@ func buildTopCPUConsumedByContainerQuery(clusterName, namespaceName, workloadTyp filterString = fmt.Sprintf("{%s}", strings.Join(filters, ",")) } - return fmt.Sprintf("topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name)(sysdig_container_cpu_cores_used%s))", limit, filterString) + metric := fmt.Sprintf("sysdig_container_cpu_cores_used%s", filterString) + if !tw.IsZero() { + metric = fmt.Sprintf("avg_over_time(%s%s)", metric, tw.RangeSelector()) + } + + return fmt.Sprintf("topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name)(%s))", limit, metric) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container_test.go index 04ada33..f5fcbcd 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_container_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopCPUConsumedContainer Tool", func() { var ( tool *tools.K8sListTopCPUConsumedContainer mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListTopCPUConsumedContainer Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopCPUConsumedContainer(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopCPUConsumedContainer(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -79,6 +85,21 @@ var _ = Describe("KubernetesListTopCPUConsumedContainer Tool", func() { Query: `topk(10, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name)(sysdig_container_cpu_cores_used{kube_cluster_name="test-cluster",kube_namespace_name="test-namespace",kube_workload_type="deployment",kube_workload_name="test-workload"}))`, }, ), + Entry("windowed, both start and end", context.Background(), "k8s_list_top_cpu_consumed_container", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_cpu_consumed_container", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name)(avg_over_time(sysdig_container_cpu_cores_used[3600s])))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload.go b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload.go index 99c03ca..8fd0cba 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopCPUConsumedWorkload struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopCPUConsumedWorkload(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopCPUConsumedWorkload { +func NewK8sListTopCPUConsumedWorkload(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopCPUConsumedWorkload { return &K8sListTopCPUConsumedWorkload{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopCPUConsumedWorkload) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_cpu_consumed_workload", - mcp.WithDescription("Identifies the Kubernetes workloads (all containers) consuming the most CPU (in cores)."), + mcp.WithDescription("Identifies the Kubernetes workloads (all containers) consuming the most CPU (in cores). Optionally pass start/end (RFC3339) to query a historical window (averaged over the window) instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -33,6 +36,7 @@ func (t *K8sListTopCPUConsumedWorkload) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of workloads to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -48,11 +52,19 @@ func (t *K8sListTopCPUConsumedWorkload) handle(ctx context.Context, request mcp. workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query := buildTopCPUConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName, limit) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildTopCPUConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName, limit, tw) params := &sysdig.GetQueryV1Params{ Query: query, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -72,7 +84,7 @@ func (t *K8sListTopCPUConsumedWorkload) handle(ctx context.Context, request mcp. return mcp.NewToolResultJSON(queryResponse) } -func buildTopCPUConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName string, limit int) string { +func buildTopCPUConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName string, limit int, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf(`kube_cluster_name="%s"`, clusterName)) @@ -92,5 +104,10 @@ func buildTopCPUConsumedByWorkloadQuery(clusterName, namespaceName, workloadType filterString = fmt.Sprintf("{%s}", strings.Join(filters, ",")) } - return fmt.Sprintf("topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(sysdig_container_cpu_cores_used%s))", limit, filterString) + metric := fmt.Sprintf("sysdig_container_cpu_cores_used%s", filterString) + if !tw.IsZero() { + metric = fmt.Sprintf("avg_over_time(%s%s)", metric, tw.RangeSelector()) + } + + return fmt.Sprintf("topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(%s))", limit, metric) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload_test.go index a1e4082..f2f8f2d 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_cpu_consumed_workload_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopCPUConsumedWorkload Tool", func() { var ( tool *tools.K8sListTopCPUConsumedWorkload mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListTopCPUConsumedWorkload Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopCPUConsumedWorkload(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopCPUConsumedWorkload(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -51,7 +57,7 @@ var _ = Describe("KubernetesListTopCPUConsumedWorkload Tool", func() { Expect(ok).To(BeTrue()) Expect(resultData.Text).To(MatchJSON(`{"status":"success"}`)) }, - Entry(nil, + Entry("instant snapshot, no params", "k8s_list_top_cpu_consumed_workload", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -63,7 +69,7 @@ var _ = Describe("KubernetesListTopCPUConsumedWorkload Tool", func() { Query: `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(sysdig_container_cpu_cores_used))`, }, ), - Entry(nil, + Entry("instant snapshot, cluster+namespace filter", "k8s_list_top_cpu_consumed_workload", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -79,7 +85,7 @@ var _ = Describe("KubernetesListTopCPUConsumedWorkload Tool", func() { Query: `topk(10, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(sysdig_container_cpu_cores_used{kube_cluster_name="prod",kube_namespace_name="default"}))`, }, ), - Entry(nil, + Entry("instant snapshot, all filters", "k8s_list_top_cpu_consumed_workload", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -97,6 +103,83 @@ var _ = Describe("KubernetesListTopCPUConsumedWorkload Tool", func() { Query: `topk(5, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(sysdig_container_cpu_cores_used{kube_cluster_name="prod",kube_namespace_name="default",kube_workload_type="deployment",kube_workload_name="api"}))`, }, ), + Entry("windowed, both start and end", + "k8s_list_top_cpu_consumed_workload", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_cpu_consumed_workload", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(avg_over_time(sysdig_container_cpu_cores_used[3600s])))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), + Entry("windowed, start only defaults end to now", + "k8s_list_top_cpu_consumed_workload", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_cpu_consumed_workload", + Arguments: map[string]any{ + "start": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(avg_over_time(sysdig_container_cpu_cores_used[3600s])))`, + time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC), + ), + ), + Entry("windowed, with filters", + "k8s_list_top_cpu_consumed_workload", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_cpu_consumed_workload", + Arguments: map[string]any{ + "cluster_name": "prod", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name)(avg_over_time(sysdig_container_cpu_cores_used{kube_cluster_name="prod"}[3600s])))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), ) }) + + When("the time window is invalid", func() { + It("rejects an unparseable start", func() { + serverTool := mcpServer.GetTool("k8s_list_top_cpu_consumed_workload") + result, err := serverTool.Handler(context.Background(), mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_cpu_consumed_workload", + Arguments: map[string]any{"start": "not-a-timestamp"}, + }, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsError).To(BeTrue()) + }) + + It("rejects an end before start", func() { + serverTool := mcpServer.GetTool("k8s_list_top_cpu_consumed_workload") + result, err := serverTool.Handler(context.Background(), mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_cpu_consumed_workload", + Arguments: map[string]any{ + "start": "2026-04-16T11:00:00Z", + "end": "2026-04-16T10:00:00Z", + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsError).To(BeTrue()) + }) + }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods.go b/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods.go index 90d9dd9..b6c2778 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods.go @@ -10,23 +10,26 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopHttpErrorsInPods struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopHttpErrorsInPods(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopHttpErrorsInPods { +func NewK8sListTopHttpErrorsInPods(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopHttpErrorsInPods { return &K8sListTopHttpErrorsInPods{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopHttpErrorsInPods) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_http_errors_in_pods", - mcp.WithDescription("Lists the pods with the highest rate of HTTP 4xx and 5xx errors over a specified time interval, allowing filtering by cluster, namespace, workload type, and workload name."), - mcp.WithString("interval", mcp.Description("Time interval for the query (e.g. '1h', '30m'). Default is '1h'.")), + mcp.WithDescription("Lists the pods with the highest rate of HTTP 4xx and 5xx errors over a time window, allowing filtering by cluster, namespace, workload type, and workload name. Pass start/end (RFC3339) to specify the window. The legacy 'interval' param is retained for backward compatibility; start/end take precedence when both are provided."), + mcp.WithString("interval", mcp.Description("Time interval for the query (e.g. '1h', '30m'). Default is '1h'. Ignored when start/end are provided.")), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -35,6 +38,7 @@ func (t *K8sListTopHttpErrorsInPods) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of pods to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -51,9 +55,19 @@ func (t *K8sListTopHttpErrorsInPods) handle(ctx context.Context, request mcp.Cal workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query, err := buildTopHttpErrorsQuery(interval, limit, clusterName, namespaceName, workloadType, workloadName) + tw, err := ParseTimeWindow(request, t.clock) if err != nil { - return mcp.NewToolResultErrorFromErr("failed to build query", err), nil + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + var query string + if !tw.IsZero() { + query = buildTopHttpErrorsWindowedQuery(tw.RangeSelector(), tw.WindowSeconds(), limit, clusterName, namespaceName, workloadType, workloadName) + } else { + query, err = buildTopHttpErrorsLegacyQuery(interval, limit, clusterName, namespaceName, workloadType, workloadName) + if err != nil { + return mcp.NewToolResultErrorFromErr("failed to build query", err), nil + } } limitQuery := sysdig.LimitQuery(limit) @@ -61,6 +75,9 @@ func (t *K8sListTopHttpErrorsInPods) handle(ctx context.Context, request mcp.Cal Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -80,13 +97,25 @@ func (t *K8sListTopHttpErrorsInPods) handle(ctx context.Context, request mcp.Cal return mcp.NewToolResultJSON(queryResponse) } -func buildTopHttpErrorsQuery(interval string, limit int, clusterName, namespaceName, workloadType, workloadName string) (string, error) { +func buildTopHttpErrorsLegacyQuery(interval string, limit int, clusterName, namespaceName, workloadType, workloadName string) (string, error) { duration, err := time.ParseDuration(interval) if err != nil { return "", fmt.Errorf("invalid interval format: %w", err) } seconds := duration.Seconds() + filterStr := httpErrorsFilterString(clusterName, namespaceName, workloadType, workloadName) + return fmt.Sprintf("topk(%d,sum(sum_over_time(sysdig_container_net_http_error_count{%s}[%s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / %f", + limit, filterStr, interval, seconds), nil +} + +func buildTopHttpErrorsWindowedQuery(rangeSelector string, windowSeconds int64, limit int, clusterName, namespaceName, workloadType, workloadName string) string { + filterStr := httpErrorsFilterString(clusterName, namespaceName, workloadType, workloadName) + return fmt.Sprintf("topk(%d,sum(sum_over_time(sysdig_container_net_http_error_count{%s}%s)) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / %d", + limit, filterStr, rangeSelector, windowSeconds) +} + +func httpErrorsFilterString(clusterName, namespaceName, workloadType, workloadName string) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=~\"%s\"", clusterName)) @@ -100,13 +129,5 @@ func buildTopHttpErrorsQuery(interval string, limit int, clusterName, namespaceN if workloadName != "" { filters = append(filters, fmt.Sprintf("kube_workload_name=\"%s\"", workloadName)) } - - filterStr := "" - if len(filters) > 0 { - filterStr = strings.Join(filters, ",") - } - - // topk(20,sum(sum_over_time(sysdig_container_net_http_error_count{kube_cluster_name=~"demo-kube-gke"}[1h])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / 3600 - return fmt.Sprintf("topk(%d,sum(sum_over_time(sysdig_container_net_http_error_count{%s}[%s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / %f", - limit, filterStr, interval, seconds), nil + return strings.Join(filters, ",") } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods_test.go index 3f7ee3e..e3c011d 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_http_errors_in_pods_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopHttpErrorsInPods Tool", func() { var ( tool *tools.K8sListTopHttpErrorsInPods mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ctx context.Context @@ -28,7 +32,9 @@ var _ = Describe("KubernetesListTopHttpErrorsInPods Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopHttpErrorsInPods(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopHttpErrorsInPods(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) ctx = context.Background() @@ -53,7 +59,7 @@ var _ = Describe("KubernetesListTopHttpErrorsInPods Tool", func() { Expect(ok).To(BeTrue()) Expect(resultData.Text).To(MatchJSON(`{"status":"success"}`)) }, - Entry("default params", + Entry("default params (legacy path, no interval explicitly set)", "k8s_list_top_http_errors_in_pods", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -66,7 +72,7 @@ var _ = Describe("KubernetesListTopHttpErrorsInPods Tool", func() { Limit: new(sysdig.LimitQuery(20)), }, ), - Entry("with custom params", + Entry("legacy path, explicit interval", "k8s_list_top_http_errors_in_pods", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -84,7 +90,7 @@ var _ = Describe("KubernetesListTopHttpErrorsInPods Tool", func() { Limit: new(sysdig.LimitQuery(5)), }, ), - Entry("with all params", + Entry("legacy path, all filters", "k8s_list_top_http_errors_in_pods", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -104,6 +110,40 @@ var _ = Describe("KubernetesListTopHttpErrorsInPods Tool", func() { Limit: new(sysdig.LimitQuery(10)), }, ), + Entry("windowed path via start/end", + "k8s_list_top_http_errors_in_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_http_errors_in_pods", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `topk(20,sum(sum_over_time(sysdig_container_net_http_error_count{}[3600s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / 3600`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 20), + ), + Entry("windowed path takes precedence over interval", + "k8s_list_top_http_errors_in_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_http_errors_in_pods", + Arguments: map[string]any{ + "interval": "5m", // should be ignored + "cluster_name": "prod", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `topk(20,sum(sum_over_time(sysdig_container_net_http_error_count{kube_cluster_name=~"prod"}[3600s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / 3600`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 20), + ), ) It("returns error for invalid interval", func() { diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container.go b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container.go index 72ece7b..942c89e 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopMemoryConsumedContainer struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopMemoryConsumedContainer(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopMemoryConsumedContainer { +func NewK8sListTopMemoryConsumedContainer(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopMemoryConsumedContainer { return &K8sListTopMemoryConsumedContainer{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopMemoryConsumedContainer) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_memory_consumed_container", - mcp.WithDescription("Lists memory-intensive containers."), + mcp.WithDescription("Lists memory-intensive containers. Optionally pass start/end (RFC3339) to query a historical window (averaged over the window) instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -33,6 +36,7 @@ func (t *K8sListTopMemoryConsumedContainer) RegisterInServer(s *server.MCPServer mcp.Description("Maximum number of containers to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -48,13 +52,21 @@ func (t *K8sListTopMemoryConsumedContainer) handle(ctx context.Context, request workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query := buildTopMemoryConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName, limit) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildTopMemoryConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName, limit, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -74,7 +86,7 @@ func (t *K8sListTopMemoryConsumedContainer) handle(ctx context.Context, request return mcp.NewToolResultJSON(queryResponse) } -func buildTopMemoryConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName string, limit int) string { +func buildTopMemoryConsumedByContainerQuery(clusterName, namespaceName, workloadType, workloadName string, limit int, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf(`kube_cluster_name="%s"`, clusterName)) @@ -94,5 +106,10 @@ func buildTopMemoryConsumedByContainerQuery(clusterName, namespaceName, workload filterString = "{" + strings.Join(filters, ", ") + "}" } - return fmt.Sprintf(`topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name) (sysdig_container_memory_used_bytes%s))`, limit, filterString) + metric := fmt.Sprintf("sysdig_container_memory_used_bytes%s", filterString) + if !tw.IsZero() { + metric = fmt.Sprintf("avg_over_time(%s%s)", metric, tw.RangeSelector()) + } + + return fmt.Sprintf(`topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name) (%s))`, limit, metric) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container_test.go index 6cd7186..660a2dd 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_container_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopMemoryConsumedContainer Tool", func() { var ( tool *tools.K8sListTopMemoryConsumedContainer mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListTopMemoryConsumedContainer Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopMemoryConsumedContainer(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopMemoryConsumedContainer(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -100,6 +106,22 @@ var _ = Describe("KubernetesListTopMemoryConsumedContainer Tool", func() { Limit: new(sysdig.LimitQuery(5)), }, ), + Entry("windowed, both start and end", + "k8s_list_top_memory_consumed_container", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_memory_consumed_container", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, container_label_io_kubernetes_container_name) (avg_over_time(sysdig_container_memory_used_bytes[3600s])))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 20), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload.go b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload.go index 1ec7158..32cc2d7 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopMemoryConsumedWorkload struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopMemoryConsumedWorkload(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopMemoryConsumedWorkload { +func NewK8sListTopMemoryConsumedWorkload(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopMemoryConsumedWorkload { return &K8sListTopMemoryConsumedWorkload{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopMemoryConsumedWorkload) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_memory_consumed_workload", - mcp.WithDescription("Lists memory-intensive workloads (all containers)."), + mcp.WithDescription("Lists memory-intensive workloads (all containers). Optionally pass start/end (RFC3339) to query a historical window (averaged over the window) instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -33,6 +36,7 @@ func (t *K8sListTopMemoryConsumedWorkload) RegisterInServer(s *server.MCPServer) mcp.Description("Maximum number of workloads to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -48,13 +52,21 @@ func (t *K8sListTopMemoryConsumedWorkload) handle(ctx context.Context, request m workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query := buildTopMemoryConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName, limit) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildTopMemoryConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName, limit, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -74,7 +86,7 @@ func (t *K8sListTopMemoryConsumedWorkload) handle(ctx context.Context, request m return mcp.NewToolResultJSON(queryResponse) } -func buildTopMemoryConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName string, limit int) string { +func buildTopMemoryConsumedByWorkloadQuery(clusterName, namespaceName, workloadType, workloadName string, limit int, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -94,6 +106,11 @@ func buildTopMemoryConsumedByWorkloadQuery(clusterName, namespaceName, workloadT filterString = fmt.Sprintf("{%s}", strings.Join(filters, ",")) } - innerQuery := fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name) (sysdig_container_memory_used_bytes%s)", filterString) + metric := fmt.Sprintf("sysdig_container_memory_used_bytes%s", filterString) + if !tw.IsZero() { + metric = fmt.Sprintf("avg_over_time(%s%s)", metric, tw.RangeSelector()) + } + + innerQuery := fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name) (%s)", metric) return fmt.Sprintf("topk(%d, %s)", limit, innerQuery) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload_test.go index 8c7e94d..e3f8b4e 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_memory_consumed_workload_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopMemoryConsumedWorkload Tool", func() { var ( tool *tools.K8sListTopMemoryConsumedWorkload mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListTopMemoryConsumedWorkload Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopMemoryConsumedWorkload(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopMemoryConsumedWorkload(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -122,6 +128,22 @@ var _ = Describe("KubernetesListTopMemoryConsumedWorkload Tool", func() { Limit: new(sysdig.LimitQuery(5)), }, ), + Entry("windowed, both start and end", + "k8s_list_top_memory_consumed_workload", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_memory_consumed_workload", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name) (avg_over_time(sysdig_container_memory_used_bytes[3600s])))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 20), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods.go b/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods.go index 6703a2e..83290a0 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods.go @@ -10,23 +10,26 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopNetworkErrorsInPods struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopNetworkErrorsInPods(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopNetworkErrorsInPods { +func NewK8sListTopNetworkErrorsInPods(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopNetworkErrorsInPods { return &K8sListTopNetworkErrorsInPods{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopNetworkErrorsInPods) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_network_errors_in_pods", - mcp.WithDescription("Shows the top network errors by pod over a given interval, aggregated by cluster, namespace, workload type, and workload name. The result is an average rate of network errors per second."), - mcp.WithString("interval", mcp.Description("Time interval for the query (e.g. '1h', '30m'). Default is '1h'.")), + mcp.WithDescription("Shows the top network errors by pod over a time window, aggregated by cluster, namespace, workload type, and workload name. The result is an average rate of network errors per second. Pass start/end (RFC3339) to specify the window. The legacy 'interval' param is retained for backward compatibility; start/end take precedence when both are provided."), + mcp.WithString("interval", mcp.Description("Time interval for the query (e.g. '1h', '30m'). Default is '1h'. Ignored when start/end are provided.")), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -35,6 +38,7 @@ func (t *K8sListTopNetworkErrorsInPods) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of pods to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -51,9 +55,19 @@ func (t *K8sListTopNetworkErrorsInPods) handle(ctx context.Context, request mcp. workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query, err := buildTopNetworkErrorsQuery(interval, limit, clusterName, namespaceName, workloadType, workloadName) + tw, err := ParseTimeWindow(request, t.clock) if err != nil { - return mcp.NewToolResultErrorFromErr("failed to build query", err), nil + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + var query string + if !tw.IsZero() { + query = buildTopNetworkErrorsWindowedQuery(tw.RangeSelector(), tw.WindowSeconds(), limit, clusterName, namespaceName, workloadType, workloadName) + } else { + query, err = buildTopNetworkErrorsLegacyQuery(interval, limit, clusterName, namespaceName, workloadType, workloadName) + if err != nil { + return mcp.NewToolResultErrorFromErr("failed to build query", err), nil + } } limitQuery := sysdig.LimitQuery(limit) @@ -61,6 +75,9 @@ func (t *K8sListTopNetworkErrorsInPods) handle(ctx context.Context, request mcp. Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -80,13 +97,25 @@ func (t *K8sListTopNetworkErrorsInPods) handle(ctx context.Context, request mcp. return mcp.NewToolResultJSON(queryResponse) } -func buildTopNetworkErrorsQuery(interval string, limit int, clusterName, namespaceName, workloadType, workloadName string) (string, error) { +func buildTopNetworkErrorsLegacyQuery(interval string, limit int, clusterName, namespaceName, workloadType, workloadName string) (string, error) { duration, err := time.ParseDuration(interval) if err != nil { return "", fmt.Errorf("invalid interval format: %w", err) } seconds := duration.Seconds() + filterStr := networkErrorsFilterString(clusterName, namespaceName, workloadType, workloadName) + return fmt.Sprintf("topk(%d,sum(sum_over_time(sysdig_container_net_error_count{%s}[%s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / %f", + limit, filterStr, interval, seconds), nil +} + +func buildTopNetworkErrorsWindowedQuery(rangeSelector string, windowSeconds int64, limit int, clusterName, namespaceName, workloadType, workloadName string) string { + filterStr := networkErrorsFilterString(clusterName, namespaceName, workloadType, workloadName) + return fmt.Sprintf("topk(%d,sum(sum_over_time(sysdig_container_net_error_count{%s}%s)) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / %d", + limit, filterStr, rangeSelector, windowSeconds) +} + +func networkErrorsFilterString(clusterName, namespaceName, workloadType, workloadName string) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=~\"%s\"", clusterName)) @@ -100,12 +129,5 @@ func buildTopNetworkErrorsQuery(interval string, limit int, clusterName, namespa if workloadName != "" { filters = append(filters, fmt.Sprintf("kube_workload_name=\"%s\"", workloadName)) } - - filterStr := "" - if len(filters) > 0 { - filterStr = strings.Join(filters, ",") - } - - return fmt.Sprintf("topk(%d,sum(sum_over_time(sysdig_container_net_error_count{%s}[%s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / %f", - limit, filterStr, interval, seconds), nil + return strings.Join(filters, ",") } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods_test.go index 24aa986..b2bc677 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_network_errors_in_pods_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopNetworkErrorsInPods Tool", func() { var ( tool *tools.K8sListTopNetworkErrorsInPods mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ctx context.Context @@ -28,7 +32,9 @@ var _ = Describe("KubernetesListTopNetworkErrorsInPods Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopNetworkErrorsInPods(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopNetworkErrorsInPods(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) ctx = context.Background() @@ -53,7 +59,7 @@ var _ = Describe("KubernetesListTopNetworkErrorsInPods Tool", func() { Expect(ok).To(BeTrue()) Expect(resultData.Text).To(MatchJSON(`{"status":"success"}`)) }, - Entry("default params", + Entry("default params (legacy path)", "k8s_list_top_network_errors_in_pods", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -66,7 +72,7 @@ var _ = Describe("KubernetesListTopNetworkErrorsInPods Tool", func() { Limit: new(sysdig.LimitQuery(20)), }, ), - Entry("with custom params", + Entry("legacy path, explicit interval", "k8s_list_top_network_errors_in_pods", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -84,7 +90,7 @@ var _ = Describe("KubernetesListTopNetworkErrorsInPods Tool", func() { Limit: new(sysdig.LimitQuery(5)), }, ), - Entry("with all params", + Entry("legacy path, all filters", "k8s_list_top_network_errors_in_pods", mcp.CallToolRequest{ Params: mcp.CallToolParams{ @@ -104,6 +110,22 @@ var _ = Describe("KubernetesListTopNetworkErrorsInPods Tool", func() { Limit: new(sysdig.LimitQuery(10)), }, ), + Entry("windowed path via start/end", + "k8s_list_top_network_errors_in_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_network_errors_in_pods", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `topk(20,sum(sum_over_time(sysdig_container_net_error_count{}[3600s])) by (kube_cluster_name, kube_namespace_name, kube_workload_type, kube_workload_name, kube_pod_name)) / 3600`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 20), + ), ) It("returns error for invalid interval", func() { diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods.go b/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods.go index 22e04ad..24c8f60 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopRestartedPods struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopRestartedPods(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopRestartedPods { +func NewK8sListTopRestartedPods(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopRestartedPods { return &K8sListTopRestartedPods{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopRestartedPods) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_restarted_pods", - mcp.WithDescription("Lists the pods with the highest number of container restarts in the specified scope (cluster, namespace, workload, or individual pod). By default, it returns the top 10."), + mcp.WithDescription("Lists the pods with the highest number of container restarts in the specified scope (cluster, namespace, workload, or individual pod). By default, it returns the top 10. Optionally pass start/end (RFC3339) to count restarts that occurred *within* the window (via increase() on the counter) instead of total lifetime restarts."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -34,6 +37,7 @@ func (t *K8sListTopRestartedPods) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of pods to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -50,11 +54,19 @@ func (t *K8sListTopRestartedPods) handle(ctx context.Context, request mcp.CallTo podName := mcp.ParseString(request, "pod_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildKubeTopRestartsQuery(clusterName, namespaceName, workloadType, workloadName, podName, limit) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildKubeTopRestartsQuery(clusterName, namespaceName, workloadType, workloadName, podName, limit, tw) params := &sysdig.GetQueryV1Params{ Query: query, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -74,7 +86,7 @@ func (t *K8sListTopRestartedPods) handle(ctx context.Context, request mcp.CallTo return mcp.NewToolResultJSON(queryResponse) } -func buildKubeTopRestartsQuery(clusterName, namespaceName, workloadType, workloadName, podName string, limit int) string { +func buildKubeTopRestartsQuery(clusterName, namespaceName, workloadType, workloadName, podName string, limit int, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -97,5 +109,10 @@ func buildKubeTopRestartsQuery(clusterName, namespaceName, workloadType, workloa filterString = "{" + strings.Join(filters, ",") + "}" } - return fmt.Sprintf("topk(%d, sum by(pod, kube_cluster_name, kube_namespace_name) (kube_pod_container_status_restarts_total%s) > 0)", limit, filterString) + metric := fmt.Sprintf("kube_pod_container_status_restarts_total%s", filterString) + if !tw.IsZero() { + metric = fmt.Sprintf("increase(%s%s)", metric, tw.RangeSelector()) + } + + return fmt.Sprintf("topk(%d, sum by(pod, kube_cluster_name, kube_namespace_name) (%s) > 0)", limit, metric) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods_test.go index 1043c45..2908cdc 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_restarted_pods_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopRestartedPods Tool", func() { var ( tool *tools.K8sListTopRestartedPods mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListTopRestartedPods Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopRestartedPods(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopRestartedPods(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -117,6 +123,39 @@ var _ = Describe("KubernetesListTopRestartedPods Tool", func() { Query: `topk(10, sum by(pod, kube_cluster_name, kube_namespace_name) (kube_pod_container_status_restarts_total{kube_cluster_name="my_cluster",kube_namespace_name="my_namespace",kube_workload_type="deployment",kube_workload_name="my_workload",kube_pod_name="my_pod"}) > 0)`, }, ), + Entry("windowed, no filters", + "k8s_list_top_restarted_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_restarted_pods", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(10, sum by(pod, kube_cluster_name, kube_namespace_name) (increase(kube_pod_container_status_restarts_total[3600s])) > 0)`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), + Entry("windowed, with filters", + "k8s_list_top_restarted_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_restarted_pods", + Arguments: map[string]any{ + "cluster_name": "prod", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(10, sum by(pod, kube_cluster_name, kube_namespace_name) (increase(kube_pod_container_status_restarts_total{kube_cluster_name="prod"}[3600s])) > 0)`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods.go b/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods.go index 41feb87..64f677b 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListTopUnavailablePods struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListTopUnavailablePods(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListTopUnavailablePods { +func NewK8sListTopUnavailablePods(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListTopUnavailablePods { return &K8sListTopUnavailablePods{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListTopUnavailablePods) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_top_unavailable_pods", - mcp.WithDescription("Shows the top N pods with the highest number of unavailable or unready replicas in a Kubernetes cluster, ordered from highest to lowest."), + mcp.WithDescription("Shows the top N pods with the highest number of unavailable or unready replicas in a Kubernetes cluster, ordered from highest to lowest. Optionally pass start/end (RFC3339) to report workloads that were *continuously* unavailable for the entire window (matches Sysdig's `WorkloadReplicasMismatch` advisory semantics)."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithString("workload_type", mcp.Description("The type of the workload to filter by.")), @@ -33,6 +36,7 @@ func (t *K8sListTopUnavailablePods) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of pods to return."), mcp.DefaultNumber(20), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -48,11 +52,19 @@ func (t *K8sListTopUnavailablePods) handle(ctx context.Context, request mcp.Call workloadName := mcp.ParseString(request, "workload_name", "") limit := mcp.ParseInt(request, "limit", 20) - query := buildTopUnavailablePodsQuery(limit, clusterName, namespaceName, workloadType, workloadName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildTopUnavailablePodsQuery(limit, clusterName, namespaceName, workloadType, workloadName, tw) params := &sysdig.GetQueryV1Params{ Query: query, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -72,7 +84,7 @@ func (t *K8sListTopUnavailablePods) handle(ctx context.Context, request mcp.Call return mcp.NewToolResultJSON(queryResponse) } -func buildTopUnavailablePodsQuery(limit int, clusterName, namespaceName, workloadType, workloadName string) string { +func buildTopUnavailablePodsQuery(limit int, clusterName, namespaceName, workloadType, workloadName string, tw TimeWindow) string { baseFilters := []string{} if clusterName != "" { baseFilters = append(baseFilters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -87,6 +99,12 @@ func buildTopUnavailablePodsQuery(limit int, clusterName, namespaceName, workloa baseFilters = append(baseFilters, fmt.Sprintf("kube_workload_name=\"%s\"", workloadName)) } + if !tw.IsZero() { + filtersStr := strings.Join(baseFilters, ",") + return fmt.Sprintf("topk(%d, sum by (kube_cluster_name, kube_namespace_name, kube_workload_name) (min_over_time(kube_workload_status_unavailable{%s}%s) >= 1))", + limit, filtersStr, tw.RangeSelector()) + } + // Filters for kube_workload_status_desired and kube_daemonset_status_number_ready commonFiltersStr := strings.Join(baseFilters, ",") diff --git a/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods_test.go b/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods_test.go index 297a064..cb276d2 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_top_unavailable_pods_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListTopUnavailablePods Tool", func() { var ( tool *tools.K8sListTopUnavailablePods mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListTopUnavailablePods Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListTopUnavailablePods(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListTopUnavailablePods(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -153,6 +159,40 @@ var _ = Describe("KubernetesListTopUnavailablePods Tool", func() { )`, }, ), + Entry("windowed, no filters (Sysdig-canonical pattern)", + "k8s_list_top_unavailable_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_unavailable_pods", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(20, sum by (kube_cluster_name, kube_namespace_name, kube_workload_name) (min_over_time(kube_workload_status_unavailable{}[3600s]) >= 1))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), + Entry("windowed, with cluster filter", + "k8s_list_top_unavailable_pods", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_top_unavailable_pods", + Arguments: map[string]any{ + "cluster_name": "my-cluster", + "limit": 5, + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + newWindowedQueryParams( + `topk(5, sum by (kube_cluster_name, kube_namespace_name, kube_workload_name) (min_over_time(kube_workload_status_unavailable{kube_cluster_name="my-cluster"}[3600s]) >= 1))`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota.go b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota.go index 4525c3d..d900afe 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota.go +++ b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota.go @@ -9,28 +9,32 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListUnderutilizedPodsCPUQuota struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListUnderutilizedPodsCPUQuota(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListUnderutilizedPodsCPUQuota { +func NewK8sListUnderutilizedPodsCPUQuota(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListUnderutilizedPodsCPUQuota { return &K8sListUnderutilizedPodsCPUQuota{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListUnderutilizedPodsCPUQuota) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_underutilized_pods_cpu_quota", - mcp.WithDescription("List Kubernetes pods with CPU usage below 25% of the quota limit."), + mcp.WithDescription("List Kubernetes pods with CPU usage below 25% of the quota limit. Optionally pass start/end (RFC3339) to evaluate the ratio averaged over a historical window instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithNumber("limit", mcp.Description("Maximum number of pods to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -44,13 +48,21 @@ func (t *K8sListUnderutilizedPodsCPUQuota) handle(ctx context.Context, request m namespaceName := mcp.ParseString(request, "namespace_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildUnderutilizedPodsQuery(clusterName, namespaceName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildUnderutilizedPodsQuery(clusterName, namespaceName, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -70,7 +82,7 @@ func (t *K8sListUnderutilizedPodsCPUQuota) handle(ctx context.Context, request m return mcp.NewToolResultJSON(queryResponse) } -func buildUnderutilizedPodsQuery(clusterName, namespaceName string) string { +func buildUnderutilizedPodsQuery(clusterName, namespaceName string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -84,5 +96,13 @@ func buildUnderutilizedPodsQuery(clusterName, namespaceName string) string { filterString = fmt.Sprintf("{%s}", strings.Join(filters, ",")) } - return fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(sysdig_container_cpu_cores_used%s) / (sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(sysdig_container_cpu_cores_quota_limit%s) > 0) < 0.25", filterString, filterString) + used := fmt.Sprintf("sysdig_container_cpu_cores_used%s", filterString) + quota := fmt.Sprintf("sysdig_container_cpu_cores_quota_limit%s", filterString) + if !tw.IsZero() { + sel := tw.RangeSelector() + used = fmt.Sprintf("avg_over_time(%s%s)", used, sel) + quota = fmt.Sprintf("avg_over_time(%s%s)", quota, sel) + } + + return fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(%s) / (sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(%s) > 0) < 0.25", used, quota) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota_test.go b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota_test.go index 4d9e702..2184808 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_cpu_quota_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListUnderutilizedPodsCPUQuota Tool", func() { var ( tool *tools.K8sListUnderutilizedPodsCPUQuota mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListUnderutilizedPodsCPUQuota Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListUnderutilizedPodsCPUQuota(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListUnderutilizedPodsCPUQuota(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -116,6 +122,22 @@ var _ = Describe("KubernetesListUnderutilizedPodsCPUQuota Tool", func() { Limit: new(sysdig.LimitQuery(10)), }, ), + Entry("windowed, both start and end", + "k8s_list_underutilized_pods_cpu_quota", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_underutilized_pods_cpu_quota", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(avg_over_time(sysdig_container_cpu_cores_used[3600s])) / (sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(avg_over_time(sysdig_container_cpu_cores_quota_limit[3600s])) > 0) < 0.25`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota.go b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota.go index bfd2251..f13bcb1 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota.go +++ b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota.go @@ -9,28 +9,32 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListUnderutilizedPodsMemoryQuota struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListUnderutilizedPodsMemoryQuota(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListUnderutilizedPodsMemoryQuota { +func NewK8sListUnderutilizedPodsMemoryQuota(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListUnderutilizedPodsMemoryQuota { return &K8sListUnderutilizedPodsMemoryQuota{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListUnderutilizedPodsMemoryQuota) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_underutilized_pods_memory_quota", - mcp.WithDescription("List Kubernetes pods with memory usage below 25% of the limit."), + mcp.WithDescription("List Kubernetes pods with memory usage below 25% of the limit. Optionally pass start/end (RFC3339) to evaluate the ratio averaged over a historical window instead of the current instant snapshot."), mcp.WithString("cluster_name", mcp.Description("The name of the cluster to filter by.")), mcp.WithString("namespace_name", mcp.Description("The name of the namespace to filter by.")), mcp.WithNumber("limit", mcp.Description("Maximum number of pods to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -44,13 +48,21 @@ func (t *K8sListUnderutilizedPodsMemoryQuota) handle(ctx context.Context, reques namespaceName := mcp.ParseString(request, "namespace_name", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildUnderutilizedPodsByMemoryQuery(clusterName, namespaceName) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildUnderutilizedPodsByMemoryQuery(clusterName, namespaceName, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -70,7 +82,7 @@ func (t *K8sListUnderutilizedPodsMemoryQuota) handle(ctx context.Context, reques return mcp.NewToolResultJSON(queryResponse) } -func buildUnderutilizedPodsByMemoryQuery(clusterName, namespaceName string) string { +func buildUnderutilizedPodsByMemoryQuery(clusterName, namespaceName string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf(`kube_cluster_name="%s"`, clusterName)) @@ -84,5 +96,13 @@ func buildUnderutilizedPodsByMemoryQuery(clusterName, namespaceName string) stri filterString = fmt.Sprintf("{%s}", strings.Join(filters, ",")) } - return fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(sysdig_container_memory_used_bytes%s) / (sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(sysdig_container_memory_limit_bytes%s) > 0) < 0.25", filterString, filterString) + used := fmt.Sprintf("sysdig_container_memory_used_bytes%s", filterString) + limit := fmt.Sprintf("sysdig_container_memory_limit_bytes%s", filterString) + if !tw.IsZero() { + sel := tw.RangeSelector() + used = fmt.Sprintf("avg_over_time(%s%s)", used, sel) + limit = fmt.Sprintf("avg_over_time(%s%s)", limit, sel) + } + + return fmt.Sprintf("sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(%s) / (sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(%s) > 0) < 0.25", used, limit) } diff --git a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota_test.go b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota_test.go index 91f55ff..0a93bc2 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_underutilized_pods_memory_quota_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListUnderutilizedPodsMemoryQuota Tool", func() { var ( tool *tools.K8sListUnderutilizedPodsMemoryQuota mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListUnderutilizedPodsMemoryQuota Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListUnderutilizedPodsMemoryQuota(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListUnderutilizedPodsMemoryQuota(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -81,6 +87,22 @@ var _ = Describe("KubernetesListUnderutilizedPodsMemoryQuota Tool", func() { Limit: new(sysdig.LimitQuery(20)), }, ), + Entry("windowed, both start and end", + "k8s_list_underutilized_pods_memory_quota", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_underutilized_pods_memory_quota", + Arguments: map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(avg_over_time(sysdig_container_memory_used_bytes[3600s])) / (sum by (kube_cluster_name, kube_namespace_name, kube_pod_name)(avg_over_time(sysdig_container_memory_limit_bytes[3600s])) > 0) < 0.25`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tool_k8s_list_workloads.go b/internal/infra/mcp/tools/tool_k8s_list_workloads.go index 0d3b9a5..53ba894 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_workloads.go +++ b/internal/infra/mcp/tools/tool_k8s_list_workloads.go @@ -9,22 +9,25 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) type K8sListWorkloads struct { SysdigClient sysdig.ExtendedClientWithResponsesInterface + clock clock.Clock } -func NewK8sListWorkloads(sysdigClient sysdig.ExtendedClientWithResponsesInterface) *K8sListWorkloads { +func NewK8sListWorkloads(sysdigClient sysdig.ExtendedClientWithResponsesInterface, clk clock.Clock) *K8sListWorkloads { return &K8sListWorkloads{ SysdigClient: sysdigClient, + clock: clk, } } func (t *K8sListWorkloads) RegisterInServer(s *server.MCPServer) { tool := mcp.NewTool("k8s_list_workloads", - mcp.WithDescription("Lists all the workloads that are in a particular state, desired, ready, running or unavailable. The LLM can filter by cluster, namespace, workload name or type."), + mcp.WithDescription("Lists all the workloads that are in a particular state, desired, ready, running or unavailable. The LLM can filter by cluster, namespace, workload name or type. Optionally pass start/end (RFC3339) to query over a historical window (peak value per workload; for the 'unavailable' status only workloads unavailable at any point in the window are returned)."), mcp.WithString("status", mcp.Description("The status of the workload."), mcp.Enum("desired", "ready", "running", "unavailable"), @@ -41,6 +44,7 @@ func (t *K8sListWorkloads) RegisterInServer(s *server.MCPServer) { mcp.Description("Maximum number of workloads to return."), mcp.DefaultNumber(10), ), + WithTimeWindowParams(), mcp.WithOutputSchema[map[string]any](), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -57,13 +61,21 @@ func (t *K8sListWorkloads) handle(ctx context.Context, request mcp.CallToolReque workloadType := mcp.ParseString(request, "workload_type", "") limit := mcp.ParseInt(request, "limit", 10) - query := buildKubeWorkloadInfoQuery(status, clusterName, namespaceName, workloadName, workloadType) + tw, err := ParseTimeWindow(request, t.clock) + if err != nil { + return mcp.NewToolResultErrorFromErr("invalid time window", err), nil + } + + query := buildKubeWorkloadInfoQuery(status, clusterName, namespaceName, workloadName, workloadType, tw) limitQuery := sysdig.LimitQuery(limit) params := &sysdig.GetQueryV1Params{ Query: query, Limit: &limitQuery, } + if err := tw.ApplyToParams(params); err != nil { + return mcp.NewToolResultErrorFromErr("failed to build eval time", err), nil + } httpResp, err := t.SysdigClient.GetQueryV1(ctx, params) if err != nil { @@ -83,7 +95,7 @@ func (t *K8sListWorkloads) handle(ctx context.Context, request mcp.CallToolReque return mcp.NewToolResultJSON(queryResponse) } -func buildKubeWorkloadInfoQuery(status, clusterName, namespaceName, workloadName, workloadType string) string { +func buildKubeWorkloadInfoQuery(status, clusterName, namespaceName, workloadName, workloadType string, tw TimeWindow) string { filters := []string{} if clusterName != "" { filters = append(filters, fmt.Sprintf("kube_cluster_name=\"%s\"", clusterName)) @@ -98,11 +110,19 @@ func buildKubeWorkloadInfoQuery(status, clusterName, namespaceName, workloadName filters = append(filters, fmt.Sprintf("kube_workload_type=\"%s\"", workloadType)) } - metric := fmt.Sprintf("kube_workload_status_%s", status) + base := fmt.Sprintf("kube_workload_status_%s", status) - if len(filters) == 0 { - return metric + metric := base + if len(filters) > 0 { + metric = fmt.Sprintf("%s{%s}", base, strings.Join(filters, ",")) } - return fmt.Sprintf("%s{%s}", metric, strings.Join(filters, ",")) + if !tw.IsZero() { + wrapped := fmt.Sprintf("max_over_time(%s%s)", metric, tw.RangeSelector()) + if status == "unavailable" { + return wrapped + " > 0" + } + return wrapped + } + return metric } diff --git a/internal/infra/mcp/tools/tool_k8s_list_workloads_test.go b/internal/infra/mcp/tools/tool_k8s_list_workloads_test.go index 7e49679..c1dd0ee 100644 --- a/internal/infra/mcp/tools/tool_k8s_list_workloads_test.go +++ b/internal/infra/mcp/tools/tool_k8s_list_workloads_test.go @@ -5,21 +5,25 @@ import ( "context" "io" "net/http" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig/mocks" - "go.uber.org/mock/gomock" ) var _ = Describe("KubernetesListWorkloads Tool", func() { var ( tool *tools.K8sListWorkloads mockSysdig *mocks.MockExtendedClientWithResponsesInterface + mockClock *mocks_clock.MockClock mcpServer *server.MCPServer ctrl *gomock.Controller ) @@ -27,7 +31,9 @@ var _ = Describe("KubernetesListWorkloads Tool", func() { BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) mockSysdig = mocks.NewMockExtendedClientWithResponsesInterface(ctrl) - tool = tools.NewK8sListWorkloads(mockSysdig) + mockClock = mocks_clock.NewMockClock(ctrl) + mockClock.EXPECT().Now().AnyTimes().Return(time.Date(2026, time.April, 16, 12, 0, 0, 0, time.UTC)) + tool = tools.NewK8sListWorkloads(mockSysdig, mockClock) mcpServer = server.NewMCPServer("test", "test") tool.RegisterInServer(mcpServer) }) @@ -148,6 +154,58 @@ var _ = Describe("KubernetesListWorkloads Tool", func() { Limit: new(sysdig.LimitQuery(10)), }, ), + Entry("windowed, ready status (no > 0 guard)", + "k8s_list_workloads", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_workloads", + Arguments: map[string]any{ + "status": "ready", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_workload_status_ready[3600s])`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), + Entry("windowed, desired status (no > 0 guard)", + "k8s_list_workloads", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_workloads", + Arguments: map[string]any{ + "status": "desired", + "cluster_name": "prod", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_workload_status_desired{kube_cluster_name="prod"}[3600s])`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), + Entry("windowed, unavailable status (> 0 guard)", + "k8s_list_workloads", + mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "k8s_list_workloads", + Arguments: map[string]any{ + "status": "unavailable", + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }, + }, + }, + mergeLimit(newWindowedQueryParams( + `max_over_time(kube_workload_status_unavailable[3600s]) > 0`, + time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC), + ), 10), + ), ) }) }) diff --git a/internal/infra/mcp/tools/tools_suite_test.go b/internal/infra/mcp/tools/tools_suite_test.go index 4c7fa3b..398cd28 100644 --- a/internal/infra/mcp/tools/tools_suite_test.go +++ b/internal/infra/mcp/tools/tools_suite_test.go @@ -2,12 +2,32 @@ package tools_test import ( "testing" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) func TestTools(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Tools Suite") } + +func newWindowedQueryParams(query string, end time.Time) sysdig.GetQueryV1Params { + var qt sysdig.Time + Expect(qt.FromQueryTime1(end.Unix())).To(Succeed()) + timeout := sysdig.Timeout("60s") + return sysdig.GetQueryV1Params{ + Query: query, + Time: &qt, + Timeout: &timeout, + } +} + +func mergeLimit(p sysdig.GetQueryV1Params, limit int) sysdig.GetQueryV1Params { + lq := sysdig.LimitQuery(limit) + p.Limit = &lq + return p +} diff --git a/internal/infra/mcp/tools/utils.go b/internal/infra/mcp/tools/utils.go index 7a09045..56612e2 100644 --- a/internal/infra/mcp/tools/utils.go +++ b/internal/infra/mcp/tools/utils.go @@ -1,14 +1,18 @@ package tools import ( + "fmt" "log/slog" + "time" "github.com/mark3labs/mcp-go/mcp" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/sysdig" ) func Examples[T any](examples ...T) mcp.PropertyOption { return func(schema map[string]any) { - schema["exampes"] = examples + schema["examples"] = examples } } @@ -52,3 +56,100 @@ func RequiredPermissionsFromTool(tool mcp.Tool) []string { return requiredPermissions } + +const ( + windowedQueryTimeout = "60s" + timeParamStart = "start" + timeParamEnd = "end" + startParamDescription = "Start of the query window as an RFC3339 timestamp (e.g. 2026-04-01T00:00:00Z). When omitted, the tool returns an instant snapshot (current behavior). When provided without end, end defaults to now." + endParamDescription = "End of the query window as an RFC3339 timestamp (e.g. 2026-04-01T01:00:00Z). Requires start. If in the future, clamped to now." +) + +type TimeWindow struct { + Start time.Time + End time.Time +} + +func (w TimeWindow) IsZero() bool { + return w.Start.IsZero() && w.End.IsZero() +} + +func (w TimeWindow) RangeSelector() string { + return fmt.Sprintf("[%ds]", int64(w.End.Sub(w.Start).Seconds())) +} + +func (w TimeWindow) WindowSeconds() int64 { + return int64(w.End.Sub(w.Start).Seconds()) +} + +func (w TimeWindow) EvalTime() (*sysdig.Time, error) { + if w.IsZero() { + return nil, nil + } + var qt sysdig.Time + if err := qt.FromQueryTime1(w.End.Unix()); err != nil { + return nil, fmt.Errorf("building eval time: %w", err) + } + return &qt, nil +} + +func (w TimeWindow) ApplyToParams(params *sysdig.GetQueryV1Params) error { + evalTime, err := w.EvalTime() + if err != nil { + return err + } + params.Time = evalTime + if !w.IsZero() { + timeout := sysdig.Timeout(windowedQueryTimeout) + params.Timeout = &timeout + } + return nil +} + +func WithTimeWindowParams() mcp.ToolOption { + return func(t *mcp.Tool) { + mcp.WithString(timeParamStart, mcp.Description(startParamDescription))(t) + mcp.WithString(timeParamEnd, mcp.Description(endParamDescription))(t) + } +} + +// Reads "start" and "end" from the request, validates them, and return the resolved TimeWindow. + +func ParseTimeWindow(request mcp.CallToolRequest, clk clock.Clock) (TimeWindow, error) { + startStr := mcp.ParseString(request, timeParamStart, "") + endStr := mcp.ParseString(request, timeParamEnd, "") + + if startStr == "" && endStr == "" { + return TimeWindow{}, nil + } + + if startStr == "" && endStr != "" { + return TimeWindow{}, fmt.Errorf("end requires start") + } + + start, err := time.Parse(time.RFC3339, startStr) + if err != nil { + return TimeWindow{}, fmt.Errorf("invalid start timestamp %q: must be RFC3339 (e.g. 2026-04-01T00:00:00Z)", startStr) + } + + var end time.Time + if endStr == "" { + end = clk.Now().Truncate(time.Second) + } else { + end, err = time.Parse(time.RFC3339, endStr) + if err != nil { + return TimeWindow{}, fmt.Errorf("invalid end timestamp %q: must be RFC3339 (e.g. 2026-04-01T01:00:00Z)", endStr) + } + now := clk.Now().Truncate(time.Second) + if end.After(now) { + end = now + } + } + + if !end.After(start) { + return TimeWindow{}, fmt.Errorf("end (%s) must be after start (%s)", end.Format(time.RFC3339), start.Format(time.RFC3339)) + } + + return TimeWindow{Start: start, End: end}, nil +} + diff --git a/internal/infra/mcp/tools/utils_test.go b/internal/infra/mcp/tools/utils_test.go new file mode 100644 index 0000000..5263c53 --- /dev/null +++ b/internal/infra/mcp/tools/utils_test.go @@ -0,0 +1,111 @@ +package tools_test + +import ( + "time" + + "github.com/mark3labs/mcp-go/mcp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + + mocks_clock "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/clock/mocks" + "github.com/sysdiglabs/sysdig-mcp-server/internal/infra/mcp/tools" +) + +var _ = Describe("ParseTimeWindow", func() { + var ( + ctrl *gomock.Controller + mockClock *mocks_clock.MockClock + now time.Time + ) + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + mockClock = mocks_clock.NewMockClock(ctrl) + now = time.Date(2026, time.April, 16, 12, 0, 0, 500000000, time.UTC) + }) + + AfterEach(func() { ctrl.Finish() }) + + makeRequest := func(args map[string]any) mcp.CallToolRequest { + return mcp.CallToolRequest{ + Params: mcp.CallToolParams{Arguments: args}, + } + } + + It("returns zero TimeWindow when neither start nor end is provided", func() { + tw, err := tools.ParseTimeWindow(makeRequest(map[string]any{}), mockClock) + Expect(err).NotTo(HaveOccurred()) + Expect(tw.IsZero()).To(BeTrue()) + }) + + It("returns error when end is provided without start", func() { + _, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "end": "2026-04-16T11:00:00Z", + }), mockClock) + Expect(err).To(MatchError(ContainSubstring("end requires start"))) + }) + + It("returns error for invalid RFC3339 start", func() { + _, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "not-a-timestamp", + }), mockClock) + Expect(err).To(MatchError(ContainSubstring("invalid start timestamp"))) + }) + + It("returns error for invalid RFC3339 end", func() { + _, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "not-a-timestamp", + }), mockClock) + Expect(err).To(MatchError(ContainSubstring("invalid end timestamp"))) + }) + + It("clamps end to now when end is in the future", func() { + mockClock.EXPECT().Now().Return(now) + tw, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2099-01-01T00:00:00Z", + }), mockClock) + Expect(err).NotTo(HaveOccurred()) + Expect(tw.End).To(Equal(now.Truncate(time.Second))) + }) + + It("defaults end to now when only start is provided", func() { + mockClock.EXPECT().Now().Return(now) + tw, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "2026-04-16T10:00:00Z", + }), mockClock) + Expect(err).NotTo(HaveOccurred()) + Expect(tw.End).To(Equal(now.Truncate(time.Second))) + Expect(tw.Start).To(Equal(time.Date(2026, time.April, 16, 10, 0, 0, 0, time.UTC))) + }) + + It("returns error when end is not after start", func() { + mockClock.EXPECT().Now().Return(now) + _, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "2026-04-16T11:00:00Z", + "end": "2026-04-16T10:00:00Z", + }), mockClock) + Expect(err).To(MatchError(ContainSubstring("must be after start"))) + }) + + It("returns correct TimeWindow when both start and end are valid past timestamps", func() { + mockClock.EXPECT().Now().Return(now) + tw, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "2026-04-16T10:00:00Z", + "end": "2026-04-16T11:00:00Z", + }), mockClock) + Expect(err).NotTo(HaveOccurred()) + Expect(tw.Start).To(Equal(time.Date(2026, time.April, 16, 10, 0, 0, 0, time.UTC))) + Expect(tw.End).To(Equal(time.Date(2026, time.April, 16, 11, 0, 0, 0, time.UTC))) + }) + + It("truncates sub-second precision from now so RangeSelector never emits [0s]", func() { + mockClock.EXPECT().Now().Return(now) + _, err := tools.ParseTimeWindow(makeRequest(map[string]any{ + "start": "2026-04-16T12:00:00Z", + }), mockClock) + Expect(err).To(HaveOccurred()) + }) +})