From d4661c9af4ecdb93e2a069c782373367e625bfa4 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Sat, 6 Jun 2026 18:35:01 +0000 Subject: [PATCH 1/4] Add UFFD snapshot pager graduation Detach running UFFD-backed VMs from their snapshot memory pager after a soak period instead of leaving them pinned for the life of the restore. A new pager /sessions/{id}/complete endpoint populates the remaining pages from the backing file and unregisters userfaultfd, so the VM keeps running on resident memory with no pager dependency and no pause or network interruption. This bounds the number of active pager sessions and lets old pager versions drain to zero and exit. A background controller (lib/uffdgraduate) drives graduations subject to min_session_age, max_concurrent, and an optional max_active_sessions ceiling, prioritising sessions on outdated pager versions. Disabled by default and only active on the uffd backend. The detach is gated behind a new hypervisor capability so the controller stays hypervisor-agnostic. Co-Authored-By: Claude Opus 4.7 --- cmd/api/config/config.go | 62 +++++- cmd/api/main.go | 40 ++++ lib/hypervisor/firecracker/firecracker.go | 21 +- lib/hypervisor/hypervisor.go | 5 + lib/instances/firecracker_uffd_graduate.go | 90 ++++++++ lib/providers/uffd_graduation.go | 77 +++++++ lib/uffdgraduate/README.md | 44 ++++ lib/uffdgraduate/config.go | 83 +++++++ lib/uffdgraduate/config_test.go | 52 +++++ lib/uffdgraduate/controller.go | 223 +++++++++++++++++++ lib/uffdgraduate/controller_test.go | 170 +++++++++++++++ lib/uffdgraduate/metrics.go | 86 ++++++++ lib/uffdgraduate/types.go | 26 +++ lib/uffdpager/README.md | 9 + lib/uffdpager/complete_linux.go | 240 +++++++++++++++++++++ lib/uffdpager/complete_linux_test.go | 50 +++++ lib/uffdpager/server_faults_linux.go | 13 +- lib/uffdpager/server_linux.go | 9 + lib/uffdpager/server_sessions_linux.go | 19 ++ lib/uffdpager/supervisor_linux.go | 16 +- lib/uffdpager/supervisor_unsupported.go | 4 + lib/uffdpager/types.go | 6 + 22 files changed, 1322 insertions(+), 23 deletions(-) create mode 100644 lib/instances/firecracker_uffd_graduate.go create mode 100644 lib/providers/uffd_graduation.go create mode 100644 lib/uffdgraduate/README.md create mode 100644 lib/uffdgraduate/config.go create mode 100644 lib/uffdgraduate/config_test.go create mode 100644 lib/uffdgraduate/controller.go create mode 100644 lib/uffdgraduate/controller_test.go create mode 100644 lib/uffdgraduate/metrics.go create mode 100644 lib/uffdgraduate/types.go create mode 100644 lib/uffdpager/complete_linux.go create mode 100644 lib/uffdpager/complete_linux_test.go diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index 8690ad52..260b06d6 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -193,13 +193,27 @@ type CapacityConfig struct { // HypervisorConfig holds hypervisor settings. type HypervisorConfig struct { - Default string `koanf:"default"` - CloudHypervisorDefaultVersion string `koanf:"cloud_hypervisor_default_version"` - FirecrackerBinaryPath string `koanf:"firecracker_binary_path"` - FirecrackerSnapshotMemoryBackend string `koanf:"firecracker_snapshot_memory_backend"` - FirecrackerUFFDCacheMaxBytes string `koanf:"firecracker_uffd_cache_max_bytes"` - FirecrackerMaxConcurrentRestores int `koanf:"firecracker_max_concurrent_restores"` - Memory HypervisorMemoryConfig `koanf:"memory"` + Default string `koanf:"default"` + CloudHypervisorDefaultVersion string `koanf:"cloud_hypervisor_default_version"` + FirecrackerBinaryPath string `koanf:"firecracker_binary_path"` + FirecrackerSnapshotMemoryBackend string `koanf:"firecracker_snapshot_memory_backend"` + FirecrackerUFFDCacheMaxBytes string `koanf:"firecracker_uffd_cache_max_bytes"` + FirecrackerMaxConcurrentRestores int `koanf:"firecracker_max_concurrent_restores"` + FirecrackerUFFDGraduation FirecrackerUFFDGraduationConfig `koanf:"firecracker_uffd_graduation"` + Memory HypervisorMemoryConfig `koanf:"memory"` +} + +// FirecrackerUFFDGraduationConfig controls the background controller that +// detaches running UFFD-backed VMs from their snapshot memory pager once they +// have soaked, bounding active pager sessions and letting old pager versions +// retire. Disabled by default and only active on the uffd backend. +type FirecrackerUFFDGraduationConfig struct { + Enabled bool `koanf:"enabled"` + MinSessionAge string `koanf:"min_session_age"` + MaxConcurrent int `koanf:"max_concurrent"` + MaxActiveSessions int `koanf:"max_active_sessions"` + ScanInterval string `koanf:"scan_interval"` + CompletionTimeout string `koanf:"completion_timeout"` } // HypervisorMemoryConfig holds guest memory management settings. @@ -413,6 +427,14 @@ func defaultConfig() *Config { FirecrackerSnapshotMemoryBackend: "file", FirecrackerUFFDCacheMaxBytes: "4294967296", FirecrackerMaxConcurrentRestores: 32, + FirecrackerUFFDGraduation: FirecrackerUFFDGraduationConfig{ + Enabled: false, + MinSessionAge: "10m", + MaxConcurrent: 1, + MaxActiveSessions: 0, + ScanInterval: "1m", + CompletionTimeout: "10m", + }, Memory: HypervisorMemoryConfig{ Enabled: false, KernelPageInitMode: "hardened", @@ -640,6 +662,9 @@ func (c *Config) Validate() error { if err := validateByteSize("hypervisor.firecracker_uffd_cache_max_bytes", c.Hypervisor.FirecrackerUFFDCacheMaxBytes); err != nil { return err } + if err := c.validateFirecrackerUFFDGraduation(); err != nil { + return err + } if err := validateDuration("hypervisor.memory.active_ballooning.poll_interval", c.Hypervisor.Memory.ActiveBallooning.PollInterval); err != nil { return err } @@ -692,6 +717,29 @@ func validateDuration(field string, value string) error { return nil } +func (c *Config) validateFirecrackerUFFDGraduation() error { + g := c.Hypervisor.FirecrackerUFFDGraduation + if !g.Enabled { + return nil + } + for field, value := range map[string]string{ + "hypervisor.firecracker_uffd_graduation.min_session_age": g.MinSessionAge, + "hypervisor.firecracker_uffd_graduation.scan_interval": g.ScanInterval, + "hypervisor.firecracker_uffd_graduation.completion_timeout": g.CompletionTimeout, + } { + if err := validateDuration(field, value); err != nil { + return err + } + } + if g.MaxConcurrent < 0 { + return fmt.Errorf("hypervisor.firecracker_uffd_graduation.max_concurrent must not be negative") + } + if g.MaxActiveSessions < 0 { + return fmt.Errorf("hypervisor.firecracker_uffd_graduation.max_active_sessions must not be negative") + } + return nil +} + func intPtr(v int) *int { return &v } diff --git a/cmd/api/main.go b/cmd/api/main.go index da2a2326..3a92291b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -35,8 +35,10 @@ import ( "github.com/kernel/hypeman/lib/ocicachegc" "github.com/kernel/hypeman/lib/otel" "github.com/kernel/hypeman/lib/paths" + "github.com/kernel/hypeman/lib/providers" "github.com/kernel/hypeman/lib/registry" "github.com/kernel/hypeman/lib/scopes" + "github.com/kernel/hypeman/lib/uffdgraduate" "github.com/kernel/hypeman/lib/vmm" nethttpmiddleware "github.com/oapi-codegen/nethttp-middleware" "github.com/riandyrn/otelchi" @@ -131,6 +133,33 @@ func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGC return true } +func configureUFFDGraduationController(cfg *config.Config, instanceManager instances.Manager, logger *slog.Logger) (*uffdgraduate.Controller, error) { + g := cfg.Hypervisor.FirecrackerUFFDGraduation + if !g.Enabled { + return nil, nil + } + minSessionAge, err := time.ParseDuration(g.MinSessionAge) + if err != nil { + return nil, fmt.Errorf("invalid hypervisor.firecracker_uffd_graduation.min_session_age %q: %w", g.MinSessionAge, err) + } + scanInterval, err := time.ParseDuration(g.ScanInterval) + if err != nil { + return nil, fmt.Errorf("invalid hypervisor.firecracker_uffd_graduation.scan_interval %q: %w", g.ScanInterval, err) + } + completionTimeout, err := time.ParseDuration(g.CompletionTimeout) + if err != nil { + return nil, fmt.Errorf("invalid hypervisor.firecracker_uffd_graduation.completion_timeout %q: %w", g.CompletionTimeout, err) + } + return providers.ProvideUFFDGraduationController(instanceManager, uffdgraduate.Config{ + Enabled: true, + MinSessionAge: minSessionAge, + MaxConcurrent: g.MaxConcurrent, + MaxActiveSessions: g.MaxActiveSessions, + ScanInterval: scanInterval, + CompletionTimeout: completionTimeout, + }, logger), nil +} + func run() error { // Load config early for OTel initialization // Config path can be specified via CONFIG_PATH env var or defaults to platform-specific locations @@ -565,6 +594,17 @@ func run() error { return app.AutoStandbyController.Run(gctx) }) } + + uffdGraduationController, err := configureUFFDGraduationController(app.Config, app.InstanceManager, logger) + if err != nil { + return err + } + if uffdGraduationController != nil { + grp.Go(func() error { + logger.Info("starting uffd graduation controller") + return uffdGraduationController.Run(gctx) + }) + } if app.HealthCheckController != nil { grp.Go(func() error { logger.Info("starting health check controller") diff --git a/lib/hypervisor/firecracker/firecracker.go b/lib/hypervisor/firecracker/firecracker.go index c2bb7db3..3040e2a7 100644 --- a/lib/hypervisor/firecracker/firecracker.go +++ b/lib/hypervisor/firecracker/firecracker.go @@ -57,16 +57,17 @@ func (f *Firecracker) Capabilities() hypervisor.Capabilities { func capabilities() hypervisor.Capabilities { return hypervisor.Capabilities{ - SupportsSnapshot: true, - SupportsHotplugMemory: false, - SupportsBalloonControl: true, - SupportsPause: true, - SupportsVsock: true, - SupportsGPUPassthrough: false, - SupportsDiskIOLimit: true, - SupportsGracefulVMMShutdown: false, - SupportsSnapshotBaseReuse: true, - SupportsConcurrentForkPrepare: true, + SupportsSnapshot: true, + SupportsHotplugMemory: false, + SupportsBalloonControl: true, + SupportsPause: true, + SupportsVsock: true, + SupportsGPUPassthrough: false, + SupportsDiskIOLimit: true, + SupportsGracefulVMMShutdown: false, + SupportsSnapshotBaseReuse: true, + SupportsConcurrentForkPrepare: true, + UsesDetachableSnapshotMemoryPager: true, } } diff --git a/lib/hypervisor/hypervisor.go b/lib/hypervisor/hypervisor.go index ce96fa85..03ef7cfd 100644 --- a/lib/hypervisor/hypervisor.go +++ b/lib/hypervisor/hypervisor.go @@ -263,6 +263,11 @@ type Capabilities struct { // SupportsDiskResize indicates if live disk resizing (/vm.resize-disk) is available. // Cloud Hypervisor v50.0+ only. SupportsDiskResize bool + + // UsesDetachableSnapshotMemoryPager indicates restores can be backed by an + // external snapshot-memory pager that a running VM can later be detached + // from (populate remaining pages, then release the session). + UsesDetachableSnapshotMemoryPager bool } // VsockDialer provides vsock connectivity to a guest VM. diff --git a/lib/instances/firecracker_uffd_graduate.go b/lib/instances/firecracker_uffd_graduate.go new file mode 100644 index 00000000..3a3cd59b --- /dev/null +++ b/lib/instances/firecracker_uffd_graduate.go @@ -0,0 +1,90 @@ +package instances + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/logger" + "github.com/kernel/hypeman/lib/uffdpager" +) + +// UFFDGraduationTargetVersion returns the pager version that new restores bind +// to, or "" when no UFFD pager is configured. The graduation controller treats +// sessions on a different version as the highest priority to retire. +func (m *manager) UFFDGraduationTargetVersion() string { + if m == nil || m.firecrackerUFFDPager == nil { + return "" + } + return m.firecrackerUFFDPager.VersionKey() +} + +// GraduateSnapshotMemoryPager detaches a running UFFD-backed VM from its pager. +// The pager populates the remaining snapshot pages and unregisters the session, +// leaving the VM running on resident memory with no pager dependency. The VM is +// never paused and its network is untouched, so this is safe to run on a live +// VM that is serving traffic. +func (m *manager) GraduateSnapshotMemoryPager(ctx context.Context, id string) error { + lock := m.getInstanceLock(id) + lock.Lock() + defer lock.Unlock() + + log := logger.FromContext(ctx) + + meta, err := m.loadMetadata(id) + if err != nil { + return err + } + stored := &meta.StoredMetadata + + if !m.snapshotMemoryPagerDetachable(stored.HypervisorType) { + return fmt.Errorf("hypervisor %s does not use a detachable snapshot memory pager", stored.HypervisorType) + } + sessionID := strings.TrimSpace(stored.FirecrackerUFFDSessionID) + if sessionID == "" { + // Nothing bound to a pager; the VM is already independent. + return nil + } + if m.firecrackerUFFDPager == nil { + return fmt.Errorf("firecracker uffd pager is not configured") + } + + inst := m.toInstanceWithoutHydration(ctx, meta) + if inst.State != StateRunning { + return fmt.Errorf("%w: cannot graduate snapshot memory pager from state %s", ErrInvalidState, inst.State) + } + + version := strings.TrimSpace(stored.FirecrackerUFFDPagerVersion) + if version == "" { + version = m.firecrackerUFFDPager.VersionKey() + } + + log.InfoContext(ctx, "graduating instance off uffd snapshot memory pager", + "instance_id", id, "session_id", sessionID, "pager_version", version) + + err = m.firecrackerUFFDPager.CompleteSessionVersion(ctx, version, sessionID) + if err != nil && !errors.Is(err, uffdpager.ErrSessionNotFound) { + return fmt.Errorf("complete uffd session: %w", err) + } + + // The session is gone whether we completed it or the pager had already lost + // it, so clear the binding. Standby and health paths key off the session ID + // and must not chase a session that no longer exists. + stored.FirecrackerUFFDSessionID = "" + stored.FirecrackerUFFDPagerVersion = "" + meta = &metadata{StoredMetadata: *stored} + if saveErr := m.saveMetadata(meta); saveErr != nil { + return fmt.Errorf("save metadata after graduation: %w", saveErr) + } + return nil +} + +func (m *manager) snapshotMemoryPagerDetachable(hvType hypervisor.Type) bool { + caps, ok := hypervisor.CapabilitiesForType(hvType) + if !ok { + return false + } + return caps.UsesDetachableSnapshotMemoryPager +} diff --git a/lib/providers/uffd_graduation.go b/lib/providers/uffd_graduation.go new file mode 100644 index 00000000..1f743c0d --- /dev/null +++ b/lib/providers/uffd_graduation.go @@ -0,0 +1,77 @@ +package providers + +import ( + "context" + "log/slog" + "strings" + + "github.com/kernel/hypeman/lib/instances" + "github.com/kernel/hypeman/lib/uffdgraduate" + "go.opentelemetry.io/otel" +) + +// uffdGraduationManager is the subset of the instance manager the graduation +// controller needs. ListInstances is on the public interface; the other two are +// concrete manager methods, matched here by type assertion. +type uffdGraduationManager interface { + ListInstances(ctx context.Context, filter *instances.ListInstancesFilter) ([]instances.Instance, error) + GraduateSnapshotMemoryPager(ctx context.Context, id string) error + UFFDGraduationTargetVersion() string +} + +type uffdGraduationStore struct { + manager uffdGraduationManager +} + +func (s uffdGraduationStore) ListPagerInstances(ctx context.Context) ([]uffdgraduate.Instance, error) { + insts, err := s.manager.ListInstances(ctx, nil) + if err != nil { + return nil, err + } + out := make([]uffdgraduate.Instance, 0, len(insts)) + for _, inst := range insts { + if inst.State != instances.StateRunning { + continue + } + if strings.TrimSpace(inst.FirecrackerUFFDSessionID) == "" { + continue + } + out = append(out, uffdgraduate.Instance{ + ID: inst.Id, + Name: inst.Name, + PagerVersion: inst.FirecrackerUFFDPagerVersion, + }) + } + return out, nil +} + +func (s uffdGraduationStore) GraduateInstance(ctx context.Context, id string) error { + return s.manager.GraduateSnapshotMemoryPager(ctx, id) +} + +func (s uffdGraduationStore) TargetVersion() string { + return s.manager.UFFDGraduationTargetVersion() +} + +// ProvideUFFDGraduationController builds the controller, or returns nil when the +// feature is disabled or no UFFD pager is configured (file backend / non-linux). +func ProvideUFFDGraduationController(instanceManager instances.Manager, cfg uffdgraduate.Config, log *slog.Logger) *uffdgraduate.Controller { + if instanceManager == nil || log == nil || !cfg.Enabled { + return nil + } + mgr, ok := instanceManager.(uffdGraduationManager) + if !ok { + return nil + } + if strings.TrimSpace(mgr.UFFDGraduationTargetVersion()) == "" { + return nil + } + return uffdgraduate.NewController( + uffdGraduationStore{manager: mgr}, + cfg, + uffdgraduate.ControllerOptions{ + Log: log.With("controller", "uffd_graduation"), + Meter: otel.GetMeterProvider().Meter("hypeman/uffdgraduate"), + }, + ) +} diff --git a/lib/uffdgraduate/README.md b/lib/uffdgraduate/README.md new file mode 100644 index 00000000..a9ec896b --- /dev/null +++ b/lib/uffdgraduate/README.md @@ -0,0 +1,44 @@ +# UFFD Graduation + +This controller detaches running VMs from the UFFD snapshot memory pager after +they have been up for a while, so the number of VMs depending on a pager stays +bounded and old pager versions can drain to zero and exit. + +## Why detach instead of migrate or fall back to file + +A UFFD-backed VM is pinned to its pager session for the life of the restore. The +memory backend (anonymous + userfaultfd vs. a private file mapping) is fixed when +the VM is restored, so there is no way to move a running VM onto the file backend +without restarting the VMM — which would drop its network connections. + +What can be done without touching the VM is to let the pager finish its job: it +populates every page that has not yet been faulted in from the backing file, then +unregisters userfaultfd and closes the session. The guest never pauses and its +network is untouched. The VM ends up running on resident memory with no pager +dependency. + +The cost is that the populated pages become resident anonymous memory (reclaimable +only to swap, unlike clean file-backed pages), and completion reads the whole +remaining image once. That is why graduation is paced and only applied to VMs that +have already had a soak period. + +## What it does + +On each scan the controller lists running VMs that still depend on a detachable +pager, then graduates eligible ones subject to the configured limits: + +- a session must be at least `min_session_age` old (tracked in memory; a control + plane restart restarts the soak, which is only more conservative) +- at most `max_concurrent` graduations run at once +- when `max_active_sessions` is zero, every soaked session is graduated + (time-based weaning); when positive, only enough oldest sessions are graduated + to bring the live count back to the ceiling +- sessions bound to an outdated pager version are always graduated after the soak, + so old pager versions retire quickly + +## Limits + +- The feature is disabled by default and only does anything when the host runs the + `uffd` snapshot memory backend. +- A failed graduation leaves the VM untouched (still on its pager); it is retried + on a later scan. diff --git a/lib/uffdgraduate/config.go b/lib/uffdgraduate/config.go new file mode 100644 index 00000000..b7d3f35c --- /dev/null +++ b/lib/uffdgraduate/config.go @@ -0,0 +1,83 @@ +package uffdgraduate + +import ( + "fmt" + "time" +) + +// Config controls how aggressively the controller graduates VMs off the pager. +// The zero value is disabled, so the feature is a no-op until explicitly turned +// on. +type Config struct { + Enabled bool + + // MinSessionAge is how long a session must have been observed before it is + // eligible to graduate. It gives the pager time to do its job as a restore + // accelerator and avoids churning a freshly restored VM. + MinSessionAge time.Duration + + // MaxConcurrent bounds simultaneous graduations. Each one reads the whole + // remaining memory image, so this is the IO/RAM blast-radius lever. + MaxConcurrent int + + // MaxActiveSessions is the hard ceiling on concurrent pager sessions. When + // zero, graduation is purely time based: every session past MinSessionAge is + // graduated. When positive, only enough oldest sessions are graduated to get + // back to the ceiling (sessions on an outdated pager version are always + // graduated regardless of the ceiling). + MaxActiveSessions int + + // ScanInterval is how often the controller evaluates sessions. + ScanInterval time.Duration + + // CompletionTimeout bounds a single graduation. Completion reads the whole + // remaining image, so this is generous. + CompletionTimeout time.Duration +} + +const ( + defaultMinSessionAge = 10 * time.Minute + defaultMaxConcurrent = 1 + defaultScanInterval = time.Minute + defaultCompletionTimeout = 10 * time.Minute +) + +// Normalize fills in defaults for unset fields. +func (c Config) Normalize() Config { + if c.MinSessionAge <= 0 { + c.MinSessionAge = defaultMinSessionAge + } + if c.MaxConcurrent <= 0 { + c.MaxConcurrent = defaultMaxConcurrent + } + if c.ScanInterval <= 0 { + c.ScanInterval = defaultScanInterval + } + if c.CompletionTimeout <= 0 { + c.CompletionTimeout = defaultCompletionTimeout + } + if c.MaxActiveSessions < 0 { + c.MaxActiveSessions = 0 + } + return c +} + +// Validate rejects nonsensical enabled configs. +func (c Config) Validate() error { + if !c.Enabled { + return nil + } + if c.MinSessionAge < 0 { + return fmt.Errorf("uffd graduation min_session_age must not be negative") + } + if c.MaxConcurrent < 0 { + return fmt.Errorf("uffd graduation max_concurrent must not be negative") + } + if c.MaxActiveSessions < 0 { + return fmt.Errorf("uffd graduation max_active_sessions must not be negative") + } + if c.ScanInterval < 0 { + return fmt.Errorf("uffd graduation scan_interval must not be negative") + } + return nil +} diff --git a/lib/uffdgraduate/config_test.go b/lib/uffdgraduate/config_test.go new file mode 100644 index 00000000..187386ca --- /dev/null +++ b/lib/uffdgraduate/config_test.go @@ -0,0 +1,52 @@ +package uffdgraduate + +import ( + "testing" + "time" +) + +func TestConfigNormalizeDefaults(t *testing.T) { + got := Config{Enabled: true}.Normalize() + if got.MinSessionAge != defaultMinSessionAge { + t.Fatalf("MinSessionAge = %s, want %s", got.MinSessionAge, defaultMinSessionAge) + } + if got.MaxConcurrent != defaultMaxConcurrent { + t.Fatalf("MaxConcurrent = %d, want %d", got.MaxConcurrent, defaultMaxConcurrent) + } + if got.ScanInterval != defaultScanInterval { + t.Fatalf("ScanInterval = %s, want %s", got.ScanInterval, defaultScanInterval) + } + if got.CompletionTimeout != defaultCompletionTimeout { + t.Fatalf("CompletionTimeout = %s, want %s", got.CompletionTimeout, defaultCompletionTimeout) + } + if got.MaxActiveSessions != 0 { + t.Fatalf("MaxActiveSessions = %d, want 0", got.MaxActiveSessions) + } +} + +func TestConfigNormalizeKeepsExplicit(t *testing.T) { + in := Config{ + Enabled: true, + MinSessionAge: 2 * time.Minute, + MaxConcurrent: 4, + MaxActiveSessions: 8, + ScanInterval: 30 * time.Second, + CompletionTimeout: time.Minute, + } + got := in.Normalize() + if got != in { + t.Fatalf("Normalize changed explicit config: got %+v want %+v", got, in) + } +} + +func TestConfigValidate(t *testing.T) { + if err := (Config{Enabled: false, MinSessionAge: -1}).Validate(); err != nil { + t.Fatalf("disabled config should always validate, got %v", err) + } + if err := (Config{Enabled: true, MaxConcurrent: -1}).Validate(); err == nil { + t.Fatal("expected error for negative max_concurrent") + } + if err := (Config{Enabled: true, MinSessionAge: time.Minute}).Validate(); err != nil { + t.Fatalf("valid enabled config should pass, got %v", err) + } +} diff --git a/lib/uffdgraduate/controller.go b/lib/uffdgraduate/controller.go new file mode 100644 index 00000000..2ceb215e --- /dev/null +++ b/lib/uffdgraduate/controller.go @@ -0,0 +1,223 @@ +package uffdgraduate + +import ( + "context" + "log/slog" + "sort" + "sync" + "time" + + "go.opentelemetry.io/otel/metric" +) + +// ControllerOptions configures logging, metrics, and time. +type ControllerOptions struct { + Log *slog.Logger + Meter metric.Meter + Now func() time.Time +} + +// Controller periodically detaches eligible running VMs from their snapshot +// memory pager so the pool of active pager sessions stays bounded and old pager +// versions can drain to zero and exit. +type Controller struct { + store InstanceStore + cfg Config + log *slog.Logger + now func() time.Time + + metrics *Metrics + wg sync.WaitGroup + + mu sync.Mutex + firstSeen map[string]time.Time + inFlight map[string]struct{} +} + +// NewController builds a controller. cfg is normalized here. +func NewController(store InstanceStore, cfg Config, opts ControllerOptions) *Controller { + log := opts.Log + if log == nil { + log = slog.Default() + } + now := opts.Now + if now == nil { + now = time.Now + } + c := &Controller{ + store: store, + cfg: cfg.Normalize(), + log: log, + now: now, + firstSeen: make(map[string]time.Time), + inFlight: make(map[string]struct{}), + } + c.metrics = newMetrics(opts.Meter, c) + return c +} + +// Run scans on an interval until ctx is cancelled. +func (c *Controller) Run(ctx context.Context) error { + if !c.cfg.Enabled { + <-ctx.Done() + return nil + } + c.log.Info("uffd graduation controller started", + "min_session_age", c.cfg.MinSessionAge, + "max_concurrent", c.cfg.MaxConcurrent, + "max_active_sessions", c.cfg.MaxActiveSessions, + "scan_interval", c.cfg.ScanInterval, + ) + + ticker := time.NewTicker(c.cfg.ScanInterval) + defer ticker.Stop() + + c.scan(ctx) + for { + select { + case <-ctx.Done(): + c.wg.Wait() + return nil + case <-ticker.C: + c.scan(ctx) + } + } +} + +func (c *Controller) scan(ctx context.Context) { + insts, err := c.store.ListPagerInstances(ctx) + if err != nil { + c.recordError("list") + c.log.Warn("uffd graduation scan failed to list instances", "error", err) + return + } + target := c.store.TargetVersion() + now := c.now() + + c.mu.Lock() + present := make(map[string]struct{}, len(insts)) + for _, inst := range insts { + present[inst.ID] = struct{}{} + if _, ok := c.firstSeen[inst.ID]; !ok { + c.firstSeen[inst.ID] = now + } + } + for id := range c.firstSeen { + if _, ok := present[id]; !ok { + delete(c.firstSeen, id) + } + } + + candidates := c.selectCandidatesLocked(insts, target, now) + launch := make([]Instance, 0, len(candidates)) + for _, inst := range candidates { + if len(c.inFlight) >= c.cfg.MaxConcurrent { + break + } + if _, busy := c.inFlight[inst.ID]; busy { + continue + } + c.inFlight[inst.ID] = struct{}{} + launch = append(launch, inst) + } + c.mu.Unlock() + + for _, inst := range launch { + c.wg.Add(1) + go c.graduate(ctx, inst) + } +} + +// selectCandidatesLocked returns the soaked instances that should graduate, +// ordered by priority: outdated pager versions first, then oldest first. +func (c *Controller) selectCandidatesLocked(insts []Instance, target string, now time.Time) []Instance { + type candidate struct { + inst Instance + outdated bool + age time.Duration + } + + soaked := make([]candidate, 0, len(insts)) + for _, inst := range insts { + seen, ok := c.firstSeen[inst.ID] + if !ok { + continue + } + if now.Sub(seen) < c.cfg.MinSessionAge { + continue + } + if _, busy := c.inFlight[inst.ID]; busy { + continue + } + outdated := target != "" && inst.PagerVersion != "" && inst.PagerVersion != target + soaked = append(soaked, candidate{inst: inst, outdated: outdated, age: now.Sub(seen)}) + } + + sort.SliceStable(soaked, func(i, j int) bool { + if soaked[i].outdated != soaked[j].outdated { + return soaked[i].outdated + } + return soaked[i].age > soaked[j].age + }) + + overCap := 0 + if c.cfg.MaxActiveSessions > 0 { + overCap = len(insts) - c.cfg.MaxActiveSessions + } + + out := make([]Instance, 0, len(soaked)) + for _, s := range soaked { + switch { + case c.cfg.MaxActiveSessions == 0: + out = append(out, s.inst) + case s.outdated: + out = append(out, s.inst) + case overCap > 0: + out = append(out, s.inst) + overCap-- + } + } + return out +} + +func (c *Controller) graduate(ctx context.Context, inst Instance) { + defer c.wg.Done() + defer func() { + c.mu.Lock() + delete(c.inFlight, inst.ID) + c.mu.Unlock() + }() + + gctx, cancel := context.WithTimeout(ctx, c.cfg.CompletionTimeout) + defer cancel() + + c.log.Info("graduating instance off uffd pager", + "instance_id", inst.ID, "instance_name", inst.Name, "pager_version", inst.PagerVersion) + + if err := c.store.GraduateInstance(gctx, inst.ID); err != nil { + c.recordAttempt("error") + c.recordError("graduate") + c.log.Warn("uffd graduation failed", "instance_id", inst.ID, "instance_name", inst.Name, "error", err) + return + } + + c.recordAttempt("success") + // Drop first-seen so a later rebind (e.g. after a future restore) restarts + // its soak rather than graduating immediately. + c.mu.Lock() + delete(c.firstSeen, inst.ID) + c.mu.Unlock() + c.log.Info("instance graduated off uffd pager", "instance_id", inst.ID, "instance_name", inst.Name) +} + +func (c *Controller) trackedSessions() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.firstSeen) +} + +func (c *Controller) inFlightCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.inFlight) +} diff --git a/lib/uffdgraduate/controller_test.go b/lib/uffdgraduate/controller_test.go new file mode 100644 index 00000000..7909ada3 --- /dev/null +++ b/lib/uffdgraduate/controller_test.go @@ -0,0 +1,170 @@ +package uffdgraduate + +import ( + "context" + "sync" + "testing" + "time" +) + +type fakeClock struct { + mu sync.Mutex + t time.Time +} + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.t +} + +func (c *fakeClock) advance(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.t = c.t.Add(d) +} + +type fakeStore struct { + mu sync.Mutex + insts []Instance + target string + graduated []string + gradCh chan string + err error +} + +func newFakeStore(target string, insts ...Instance) *fakeStore { + return &fakeStore{insts: insts, target: target, gradCh: make(chan string, 16)} +} + +func (f *fakeStore) ListPagerInstances(context.Context) ([]Instance, error) { + f.mu.Lock() + defer f.mu.Unlock() + return append([]Instance(nil), f.insts...), f.err +} + +func (f *fakeStore) GraduateInstance(_ context.Context, id string) error { + f.mu.Lock() + if f.err != nil { + err := f.err + f.mu.Unlock() + return err + } + f.graduated = append(f.graduated, id) + rm := f.insts[:0] + for _, inst := range f.insts { + if inst.ID != id { + rm = append(rm, inst) + } + } + f.insts = rm + f.mu.Unlock() + f.gradCh <- id + return nil +} + +func (f *fakeStore) TargetVersion() string { return f.target } + +func newTestController(store InstanceStore, cfg Config, clock *fakeClock) *Controller { + return NewController(store, cfg, ControllerOptions{Now: clock.Now}) +} + +func ids(insts []Instance) []string { + out := make([]string, len(insts)) + for i, inst := range insts { + out[i] = inst.ID + } + return out +} + +func TestSelectCandidatesTimeBasedWeaning(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", + Instance{ID: "a", PagerVersion: "old"}, + Instance{ID: "b", PagerVersion: "new"}, + Instance{ID: "fresh", PagerVersion: "new"}, + ) + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute}, clock) + + c.firstSeen["a"] = base.Add(-20 * time.Minute) + c.firstSeen["b"] = base.Add(-15 * time.Minute) + c.firstSeen["fresh"] = base.Add(-time.Minute) + + got := ids(c.selectCandidatesLocked(store.insts, "new", clock.Now())) + // Both soaked instances graduate; the outdated one is ordered first; the + // fresh instance is excluded. + if len(got) != 2 || got[0] != "a" || got[1] != "b" { + t.Fatalf("candidates = %v, want [a b]", got) + } +} + +func TestSelectCandidatesCapKeepsNewest(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", + Instance{ID: "old1", PagerVersion: "new"}, + Instance{ID: "old2", PagerVersion: "new"}, + ) + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute, MaxActiveSessions: 1}, clock) + c.firstSeen["old1"] = base.Add(-30 * time.Minute) + c.firstSeen["old2"] = base.Add(-20 * time.Minute) + + got := ids(c.selectCandidatesLocked(store.insts, "new", clock.Now())) + // Over the ceiling by one; only the oldest is graduated, newest stays warm. + if len(got) != 1 || got[0] != "old1" { + t.Fatalf("candidates = %v, want [old1]", got) + } +} + +func TestSelectCandidatesOutdatedAlwaysGraduates(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", + Instance{ID: "outdated", PagerVersion: "old"}, + Instance{ID: "current", PagerVersion: "new"}, + ) + // Ceiling above the live count, so capacity alone would graduate nothing. + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute, MaxActiveSessions: 5}, clock) + c.firstSeen["outdated"] = base.Add(-20 * time.Minute) + c.firstSeen["current"] = base.Add(-20 * time.Minute) + + got := ids(c.selectCandidatesLocked(store.insts, "new", clock.Now())) + if len(got) != 1 || got[0] != "outdated" { + t.Fatalf("candidates = %v, want [outdated]", got) + } +} + +func TestScanRespectsSoak(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", Instance{ID: "vm1", PagerVersion: "old"}) + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute, MaxConcurrent: 1}, clock) + + c.scan(context.Background()) // records first-seen, age 0 -> no graduation + select { + case id := <-store.gradCh: + t.Fatalf("unexpected graduation before soak: %s", id) + case <-time.After(50 * time.Millisecond): + } + + clock.advance(11 * time.Minute) + c.scan(context.Background()) + select { + case id := <-store.gradCh: + if id != "vm1" { + t.Fatalf("graduated %s, want vm1", id) + } + case <-time.After(2 * time.Second): + t.Fatal("expected graduation after soak") + } + + // First-seen is dropped on success so a rebind restarts the soak. + c.wg.Wait() + c.mu.Lock() + _, tracked := c.firstSeen["vm1"] + c.mu.Unlock() + if tracked { + t.Fatal("expected first-seen cleared after successful graduation") + } +} diff --git a/lib/uffdgraduate/metrics.go b/lib/uffdgraduate/metrics.go new file mode 100644 index 00000000..f93e4c35 --- /dev/null +++ b/lib/uffdgraduate/metrics.go @@ -0,0 +1,86 @@ +package uffdgraduate + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type Metrics struct { + attemptsTotal metric.Int64Counter + errorsTotal metric.Int64Counter + inFlightGauge metric.Int64ObservableGauge + trackedGauge metric.Int64ObservableGauge +} + +func newMetrics(meter metric.Meter, controller *Controller) *Metrics { + if meter == nil { + return &Metrics{} + } + + attemptsTotal, err := meter.Int64Counter( + "hypeman_uffd_graduation_attempts_total", + metric.WithDescription("Total UFFD pager graduation attempts"), + ) + if err != nil { + return &Metrics{} + } + errorsTotal, err := meter.Int64Counter( + "hypeman_uffd_graduation_errors_total", + metric.WithDescription("Total UFFD graduation controller errors"), + ) + if err != nil { + return &Metrics{} + } + inFlightGauge, err := meter.Int64ObservableGauge( + "hypeman_uffd_graduation_in_flight", + metric.WithDescription("Graduations currently in flight"), + ) + if err != nil { + return &Metrics{} + } + trackedGauge, err := meter.Int64ObservableGauge( + "hypeman_uffd_graduation_tracked_sessions", + metric.WithDescription("Pager-backed sessions currently tracked by the graduation controller"), + ) + if err != nil { + return &Metrics{} + } + + m := &Metrics{ + attemptsTotal: attemptsTotal, + errorsTotal: errorsTotal, + inFlightGauge: inFlightGauge, + trackedGauge: trackedGauge, + } + + _, _ = meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { + if controller == nil { + return nil + } + observer.ObserveInt64(m.inFlightGauge, int64(controller.inFlightCount())) + observer.ObserveInt64(m.trackedGauge, int64(controller.trackedSessions())) + return nil + }, inFlightGauge, trackedGauge) + + return m +} + +func (c *Controller) recordAttempt(status string) { + if c.metrics == nil || c.metrics.attemptsTotal == nil { + return + } + c.metrics.attemptsTotal.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("status", status), + )) +} + +func (c *Controller) recordError(operation string) { + if c.metrics == nil || c.metrics.errorsTotal == nil { + return + } + c.metrics.errorsTotal.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("operation", operation), + )) +} diff --git a/lib/uffdgraduate/types.go b/lib/uffdgraduate/types.go new file mode 100644 index 00000000..82735a9c --- /dev/null +++ b/lib/uffdgraduate/types.go @@ -0,0 +1,26 @@ +package uffdgraduate + +import "context" + +// Instance is a running VM that currently depends on a detachable snapshot +// memory pager. PagerVersion is the version it is bound to. +type Instance struct { + ID string + Name string + PagerVersion string +} + +// InstanceStore is the controller's view of the instance manager. It is kept +// narrow and free of hypervisor/UFFD types so the controller stays agnostic to +// how graduation is actually performed. +type InstanceStore interface { + // ListPagerInstances returns running instances that still depend on a + // detachable snapshot memory pager, each tagged with its bound version. + ListPagerInstances(ctx context.Context) ([]Instance, error) + // GraduateInstance detaches the instance from its pager. The call blocks + // until the pager has populated remaining pages and released the session. + GraduateInstance(ctx context.Context, id string) error + // TargetVersion is the pager version new restores bind to. Sessions on a + // different version are prioritised so old pager versions can retire. + TargetVersion() string +} diff --git a/lib/uffdpager/README.md b/lib/uffdpager/README.md index aec2decb..4724e22c 100644 --- a/lib/uffdpager/README.md +++ b/lib/uffdpager/README.md @@ -37,8 +37,17 @@ Hypeman talks to the pager over Unix HTTP at - `GET /stats` - `POST /sessions` - `POST /sessions/{id}/close` +- `POST /sessions/{id}/complete` - `POST /drain` +`POST /sessions/{id}/complete` populates every outstanding page of a session +from the backing file and then unregisters userfaultfd, so the restored VM stops +depending on this pager and keeps running on resident memory. The VM is never +paused and its network is untouched. Completion reads the whole remaining memory +image, so it is paced by the caller; the request is bounded by its context +rather than a fixed timeout. Unregister happens only after a full populate, +because once a range is unregistered the kernel zero-fills any still-absent page. + Firecracker does not use this control socket directly. Each restore session gets its own Unix socket that receives Firecracker's UFFD file descriptor and memory regions. diff --git a/lib/uffdpager/complete_linux.go b/lib/uffdpager/complete_linux.go new file mode 100644 index 00000000..f5d52f91 --- /dev/null +++ b/lib/uffdpager/complete_linux.go @@ -0,0 +1,240 @@ +//go:build linux + +package uffdpager + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "strings" + "unsafe" + + "github.com/go-chi/chi/v5" + "golang.org/x/sys/unix" +) + +// _UFFDIO_UNREGISTER ioctl. Derived the same way as uffdioCopy in +// server_faults_linux.go: _IOR(UFFDIO=0xAA, _UFFDIO_UNREGISTER=1, struct +// uffdio_range{u64 start; u64 len}) on the asm-generic ioctl encoding. +const uffdioUnregister = 0x8010aa01 + +type uffdioRange struct { + start uint64 + len uint64 +} + +// completeRequest is a single /complete attempt handed to the fault loop. The +// reply channel is buffered so the fault loop never blocks delivering a result +// even if the HTTP caller has already given up. +type completeRequest struct { + resp chan error +} + +// handleComplete drives an existing session to fully populate its guest memory +// from the backing file and detach userfaultfd, so the restored VM stops +// depending on this pager. The VM keeps running throughout. +func (s *server) handleComplete(w http.ResponseWriter, r *http.Request) { + id := sanitizeSessionID(chi.URLParam(r, "id")) + s.mu.Lock() + sess := s.sessions[id] + s.mu.Unlock() + if sess == nil { + http.Error(w, "session not found", http.StatusNotFound) + return + } + + req := &completeRequest{resp: make(chan error, 1)} + select { + case sess.completeReqCh <- req: + case <-sess.done: + http.Error(w, "session closed", http.StatusConflict) + return + default: + http.Error(w, "completion already in progress", http.StatusConflict) + return + } + sess.wake() + + select { + case err := <-req.resp: + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + case <-sess.done: + http.Error(w, "session closed before completion", http.StatusInternalServerError) + case <-r.Context().Done(): + http.Error(w, r.Context().Err().Error(), http.StatusGatewayTimeout) + } +} + +// wake writes the wake pipe so the fault loop breaks out of an idle poll and +// notices a pending completion request. +func (s *session) wake() { + if s.wakeW >= 0 { + var b [1]byte + _, _ = unix.Write(s.wakeW, b[:]) + } +} + +// takeCompletion runs a queued completion request if one is pending. It returns +// true only when the session fully completed and the fault loop should exit so +// the session tears down; on failure the session keeps serving faults so the VM +// stays safely backed by the pager. +func (s *session) takeCompletion() bool { + select { + case req := <-s.completeReqCh: + err := s.completeAndUnregister() + req.resp <- err + return err == nil + default: + return false + } +} + +// completeAndUnregister populates every page from the backing file and only then +// unregisters the ranges. Unregister must happen strictly after a full populate: +// once a range is unregistered the kernel zero-fills any page that was still +// absent, which would be corruption. If the populate fails the ranges are left +// registered so the fault loop can keep serving them. +func (s *session) completeAndUnregister() error { + if err := s.populateAll(); err != nil { + return err + } + return s.unregisterAll() +} + +func (s *session) populateAll() error { + var firstErr error + noteErr := func(err error) { + if firstErr == nil { + firstErr = err + } + } + + eventBuf := make([]byte, uffdMsgSize) + for _, mapping := range s.mappings { + pageSize := int(mapping.PageSize) + if pageSize <= 0 { + continue + } + page := make([]byte, pageSize) + for off := uint64(0); off+uint64(pageSize) <= mapping.Size; off += uint64(pageSize) { + fileOffset := int64(mapping.Offset + off) + n, err := s.backingFile.ReadAt(page, fileOffset) + if n != pageSize { + noteErr(fmt.Errorf("read backing page at %d: read %d/%d bytes: %w", fileOffset, n, pageSize, err)) + continue + } + s.server.backingBytesRead.Add(int64(n)) + if err := s.populatePage(mapping.BaseHostVirtAddr+off, page, eventBuf); err != nil { + s.server.copyErrors.Add(1) + noteErr(fmt.Errorf("populate page at %#x: %w", mapping.BaseHostVirtAddr+off, err)) + } + } + } + return firstErr +} + +func (s *session) unregisterAll() error { + var firstErr error + for _, mapping := range s.mappings { + // EINVAL means the range is already unregistered, which makes a retried + // completion idempotent. + if err := uffdUnregister(s.uffdFD, mapping.BaseHostVirtAddr, mapping.Size); err != nil && !errors.Is(err, unix.EINVAL) { + if firstErr == nil { + firstErr = fmt.Errorf("unregister range %#x len %d: %w", mapping.BaseHostVirtAddr, mapping.Size, err) + } + } + } + return firstErr +} + +// populatePage copies one page into guest memory. UFFDIO_COPY returns EEXIST +// (treated as success) for pages the guest already faulted in, and can return +// EAGAIN when a pending remove/unmap event from ballooning blocks the copy; +// draining those events and retrying clears it, mirroring the fault loop. +func (s *session) populatePage(dst uint64, page, eventBuf []byte) error { + const maxAttempts = 5 + var lastErr error + for attempt := 0; attempt < maxAttempts; attempt++ { + err := uffdCopy(s.uffdFD, dst, page) + if err == nil { + s.server.copies.Add(1) + return nil + } + if !errors.Is(err, unix.EAGAIN) { + return err + } + drainUFFDEvents(s.uffdFD, eventBuf) + lastErr = err + } + return lastErr +} + +func drainUFFDEvents(fd int, buf []byte) { + for { + _, ok, err := readUFFDEvent(fd, buf) + if err != nil || !ok { + return + } + } +} + +func drainWake(fd int) { + var buf [16]byte + for { + n, err := unix.Read(fd, buf[:]) + if n <= 0 || err != nil { + return + } + } +} + +func newWakePipe() (int, int, error) { + var fds [2]int + if err := unix.Pipe2(fds[:], unix.O_NONBLOCK|unix.O_CLOEXEC); err != nil { + return -1, -1, err + } + return fds[0], fds[1], nil +} + +func uffdUnregister(fd int, start, length uint64) error { + rng := uffdioRange{start: start, len: length} + _, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(fd), uintptr(uffdioUnregister), uintptr(unsafe.Pointer(&rng))) + if errno != 0 { + return errno + } + return nil +} + +// CompleteSessionVersion asks the pager for the given version to fully populate +// and detach the session. It uses a context-bounded client with no fixed +// timeout because completion reads the whole backing image. +func (s *Supervisor) CompleteSessionVersion(ctx context.Context, versionKey, sessionID string) error { + if s == nil || strings.TrimSpace(versionKey) == "" || strings.TrimSpace(sessionID) == "" { + return nil + } + client := newUnixHTTPClient(pagerControlSocket(s.dataDir, versionKey), 0) + path := "/sessions/" + urlPathEscape(sessionID) + "/complete" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://unix"+path, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + data, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) + if resp.StatusCode == http.StatusNotFound { + return ErrSessionNotFound + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("uffd pager complete %s returned %s: %s", sessionID, resp.Status, strings.TrimSpace(string(data))) + } + return nil +} diff --git a/lib/uffdpager/complete_linux_test.go b/lib/uffdpager/complete_linux_test.go new file mode 100644 index 00000000..e602e17a --- /dev/null +++ b/lib/uffdpager/complete_linux_test.go @@ -0,0 +1,50 @@ +//go:build linux + +package uffdpager + +import ( + "testing" + + "golang.org/x/sys/unix" +) + +// TestUffdioUnregisterValue guards the hand-computed ioctl number against typos +// by recomputing it from the asm-generic _IOR encoding used on the amd64/arm64 +// hosts Hypeman runs on (the same encoding uffdioCopy relies on). +func TestUffdioUnregisterValue(t *testing.T) { + const ( + iocRead = 2 + dirShift = 30 + sizeShift = 16 + typeShift = 8 + uffdioMagic = 0xAA + unregisterNr = 1 + rangeSize = 16 // sizeof(struct uffdio_range) + ) + want := uintptr(iocRead< 0 { + t.Fatalf("expected wake pipe drained, read %d bytes", n) + } +} diff --git a/lib/uffdpager/server_faults_linux.go b/lib/uffdpager/server_faults_linux.go index 17acb6c2..fbdb56ac 100644 --- a/lib/uffdpager/server_faults_linux.go +++ b/lib/uffdpager/server_faults_linux.go @@ -50,7 +50,10 @@ type uffdEvent struct { func (s *session) handleFaults(mappings []guestRegionUffdMapping) { fd := s.uffdFD _ = unix.SetNonblock(fd, true) - pollFDs := []unix.PollFd{{Fd: int32(fd), Events: unix.POLLIN}} + pollFDs := []unix.PollFd{ + {Fd: int32(fd), Events: unix.POLLIN}, + {Fd: int32(s.wakeR), Events: unix.POLLIN}, + } buf := make([]byte, uffdMsgSize) var deferred []uffdEvent for { @@ -62,6 +65,14 @@ func (s *session) handleFaults(mappings []guestRegionUffdMapping) { } return } + if pollFDs[1].Revents&unix.POLLIN != 0 { + drainWake(s.wakeR) + // Populate and unregister in this goroutine; only exit (and tear + // down the session) when completion fully succeeds. + if s.takeCompletion() { + return + } + } if n == 0 || pollFDs[0].Revents&unix.POLLIN == 0 { if pollFDs[0].Revents&(unix.POLLHUP|unix.POLLERR|unix.POLLNVAL) != 0 { return diff --git a/lib/uffdpager/server_linux.go b/lib/uffdpager/server_linux.go index d007ee6a..83605781 100644 --- a/lib/uffdpager/server_linux.go +++ b/lib/uffdpager/server_linux.go @@ -59,6 +59,14 @@ type session struct { closeOnce sync.Once uffdFD int conn *net.UnixConn + + // Graduation: the fault loop watches wakeR alongside the uffd fd, so a + // /complete request handed to completeReqCh can interrupt an idle poll and + // run completion in the same goroutine that owns the uffd. + mappings []guestRegionUffdMapping + wakeR int + wakeW int + completeReqCh chan *completeRequest } func Main(args []string) error { @@ -110,6 +118,7 @@ func (s *server) run() error { router.Get("/stats", s.handleStats) router.Post("/sessions", s.handleCreateSession) router.Post("/sessions/{id}/close", s.handleCloseSession) + router.Post("/sessions/{id}/complete", s.handleComplete) router.Post("/drain", s.handleDrain) s.httpServer = &http.Server{Handler: router} diff --git a/lib/uffdpager/server_sessions_linux.go b/lib/uffdpager/server_sessions_linux.go index cfb14273..0d2f4337 100644 --- a/lib/uffdpager/server_sessions_linux.go +++ b/lib/uffdpager/server_sessions_linux.go @@ -31,6 +31,13 @@ func (s *server) createSession(req CreateSessionRequest) (*session, error) { _ = os.Remove(socketPath) return nil, fmt.Errorf("open backing memory for uffd session %s: %w", id, err) } + wakeR, wakeW, err := newWakePipe() + if err != nil { + _ = backingFile.Close() + _ = listener.Close() + _ = os.Remove(socketPath) + return nil, fmt.Errorf("create wake pipe for uffd session %s: %w", id, err) + } sess := &session{ id: id, @@ -43,6 +50,9 @@ func (s *server) createSession(req CreateSessionRequest) (*session, error) { done: make(chan struct{}), backingFile: backingFile, uffdFD: -1, + wakeR: wakeR, + wakeW: wakeW, + completeReqCh: make(chan *completeRequest, 1), } s.mu.Lock() @@ -135,11 +145,14 @@ func (s *session) run() { sort.Slice(mappings, func(i, j int) bool { return mappings[i].BaseHostVirtAddr < mappings[j].BaseHostVirtAddr }) + s.mappings = mappings s.handleFaults(mappings) } func (s *session) close() { s.closeOnce.Do(func() { + // Closing s.done releases any /complete waiter still blocked on this + // session; the wake pipe fds are closed below. if s.listener != nil { _ = s.listener.Close() } @@ -149,6 +162,12 @@ func (s *session) close() { if s.uffdFD >= 0 { _ = unix.Close(s.uffdFD) } + if s.wakeR >= 0 { + _ = unix.Close(s.wakeR) + } + if s.wakeW >= 0 { + _ = unix.Close(s.wakeW) + } if s.backingFile != nil { _ = s.backingFile.Close() } diff --git a/lib/uffdpager/supervisor_linux.go b/lib/uffdpager/supervisor_linux.go index c7fb4c47..201d01fc 100644 --- a/lib/uffdpager/supervisor_linux.go +++ b/lib/uffdpager/supervisor_linux.go @@ -252,18 +252,24 @@ func (s *Supervisor) clientForVersion(versionKey string) *http.Client { if client := s.clients[versionKey]; client != nil { return client } - socketPath := pagerControlSocket(s.dataDir, versionKey) - client := &http.Client{ + client := newUnixHTTPClient(pagerControlSocket(s.dataDir, versionKey), 10*time.Second) + s.clients[versionKey] = client + return client +} + +// newUnixHTTPClient builds an HTTP client bound to a single unix control socket. +// A timeout of zero leaves the request bounded only by its context, which the +// completion call relies on because it can run longer than a normal request. +func newUnixHTTPClient(socketPath string, timeout time.Duration) *http.Client { + return &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { var d net.Dialer return d.DialContext(ctx, "unix", socketPath) }, }, - Timeout: 10 * time.Second, + Timeout: timeout, } - s.clients[versionKey] = client - return client } func pagerVersionDir(dataDir, versionKey string) string { diff --git a/lib/uffdpager/supervisor_unsupported.go b/lib/uffdpager/supervisor_unsupported.go index ece0704c..52ee5f09 100644 --- a/lib/uffdpager/supervisor_unsupported.go +++ b/lib/uffdpager/supervisor_unsupported.go @@ -29,6 +29,10 @@ func (s *Supervisor) CloseSessionVersion(context.Context, string, string) error return nil } +func (s *Supervisor) CompleteSessionVersion(context.Context, string, string) error { + return fmt.Errorf("uffd pager is only supported on linux") +} + func (s *Supervisor) Stats(context.Context) (*Stats, error) { return nil, fmt.Errorf("uffd pager is only supported on linux") } diff --git a/lib/uffdpager/types.go b/lib/uffdpager/types.go index 99736983..b521b435 100644 --- a/lib/uffdpager/types.go +++ b/lib/uffdpager/types.go @@ -1,5 +1,7 @@ package uffdpager +import "errors" + const ( BackendFile = "file" BackendUFFD = "uffd" @@ -7,6 +9,10 @@ const ( defaultCacheMaxBytes = int64(4 << 30) ) +// ErrSessionNotFound reports that a pager no longer has the requested session, +// so a completion request had nothing to act on. +var ErrSessionNotFound = errors.New("uffd pager session not found") + // CreateSessionRequest describes one Firecracker UFFD restore session. type CreateSessionRequest struct { SessionID string `json:"session_id,omitempty"` From a90d927dc58e935f8bdceb38f1e3d7d317346010 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Sat, 6 Jun 2026 18:39:48 +0000 Subject: [PATCH 2/4] Bump UFFD pager version for graduation endpoint Co-Authored-By: Claude Opus 4.7 --- lib/uffdpager/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/uffdpager/VERSION b/lib/uffdpager/VERSION index d917d3e2..b1e80bb2 100644 --- a/lib/uffdpager/VERSION +++ b/lib/uffdpager/VERSION @@ -1 +1 @@ -0.1.2 +0.1.3 From faf991b4b88b0540767798afed8f37edc006bb06 Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Sat, 6 Jun 2026 19:01:30 +0000 Subject: [PATCH 3/4] Add UFFD graduation integration test Sibling of the UFFD one-shot lifecycle test that detaches a running UFFD-backed VM from its pager and asserts the VM keeps running with its guest memory and disk intact, new writes still work, and a later standby/restore preserves memory. Leaves the existing test unchanged. Co-Authored-By: Claude Opus 4.7 --- lib/instances/firecracker_test.go | 150 ++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/lib/instances/firecracker_test.go b/lib/instances/firecracker_test.go index 111c11b8..d53355a6 100644 --- a/lib/instances/firecracker_test.go +++ b/lib/instances/firecracker_test.go @@ -861,6 +861,156 @@ func TestFCUFFDOneShotLifecycle(t *testing.T) { snapshotDeleted = true } +// TestFCUFFDGraduationLifecycle exercises detaching a running UFFD-backed VM +// from its pager: the pager populates the remaining pages and unregisters the +// session, and the VM must keep running on resident memory with its guest state +// intact. It is a sibling of TestFCUFFDOneShotLifecycle and leaves that test's +// coverage unchanged. +func TestFCUFFDGraduationLifecycle(t *testing.T) { + t.Parallel() + requireFirecrackerIntegrationPrereqs(t) + requireUserfaultfdIntegrationPrereqs(t) + if pagerBinary := strings.TrimSpace(os.Getenv("HYPEMAN_UFFD_PAGER_BINARY")); pagerBinary == "" { + t.Skip("HYPEMAN_UFFD_PAGER_BINARY must point at hypeman-uffd-pager for UFFD integration tests") + } else if st, err := os.Stat(pagerBinary); err != nil || !st.Mode().IsRegular() { + t.Skipf("HYPEMAN_UFFD_PAGER_BINARY is not a regular file: %s", pagerBinary) + } + + mgr, tmpDir := setupTestManagerForFirecrackerWithConfig(t, legacyParallelTestNetworkConfig(testNetworkSeq.Add(1)), ManagerConfig{ + FirecrackerSnapshotMemoryBackend: uffdpager.BackendUFFD, + FirecrackerUFFDCacheMaxBytes: 512 << 20, + }) + ctx := context.Background() + p := paths.New(tmpDir) + + imageManager, err := images.NewManager(p, 1, nil) + require.NoError(t, err) + imageName := integrationTestImageRef(t, "docker.io/library/alpine:latest") + snapshottest.EnsureImageReady(t, ctx, p, imageManager, imageName) + + systemManager := system.NewManager(p) + require.NoError(t, systemManager.EnsureSystemFiles(ctx)) + + source, err := mgr.CreateInstance(ctx, CreateInstanceRequest{ + Name: "fc-uffd-grad-src", + Image: imageName, + Size: 1024 * 1024 * 1024, + OverlaySize: 1024 * 1024 * 1024, + Vcpus: 1, + NetworkEnabled: false, + Hypervisor: hypervisor.TypeFirecracker, + Cmd: []string{"sleep", "infinity"}, + }) + require.NoError(t, err) + sourceID := source.Id + sourceDeleted := false + t.Cleanup(func() { + if !sourceDeleted { + _ = mgr.DeleteInstance(context.Background(), sourceID) + } + }) + + source = requireRunningSleepInstance(t, ctx, mgr, sourceID) + requireGuestTmpfs(t, ctx, source) + writeGuestFile(t, ctx, source, "/root/uffd-grad/source", "source-disk") + writeGuestFile(t, ctx, source, "/dev/shm/uffd-grad/source", "source-memory") + + // A VM with no pager session (the freshly created, file-backed source) is a + // no-op to graduate. + require.NoError(t, mgr.GraduateSnapshotMemoryPager(ctx, sourceID)) + + snapshot, err := mgr.CreateSnapshot(ctx, sourceID, CreateSnapshotRequest{ + Kind: SnapshotKindStandby, + Name: "fc-uffd-grad-snap", + }) + require.NoError(t, err) + snapshotDeleted := false + t.Cleanup(func() { + if !snapshotDeleted { + _ = mgr.DeleteSnapshot(context.Background(), snapshot.Id) + } + }) + + // Forking the standby snapshot to a running VM restores it UFFD-backed and + // pins a live pager session. + parent, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{ + Name: "fc-uffd-grad-parent", + TargetState: StateRunning, + }) + require.NoError(t, err) + parentID := parent.Id + parentDeleted := false + t.Cleanup(func() { + if !parentDeleted { + _ = mgr.DeleteInstance(context.Background(), parentID) + } + }) + + parent = requireRunningSleepInstance(t, ctx, mgr, parentID) + assertGuestFile(t, ctx, parent, "/root/uffd-grad/source", "source-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/source", "source-memory") + writeGuestFile(t, ctx, parent, "/root/uffd-grad/parent", "parent-disk") + writeGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/parent", "parent-memory") + + parentMeta, err := mgr.loadMetadata(parentID) + require.NoError(t, err) + require.NotEmpty(t, parentMeta.StoredMetadata.FirecrackerUFFDSessionID, "running UFFD fork should hold a pager session") + target := mgr.UFFDGraduationTargetVersion() + require.NotEmpty(t, target, "uffd backend should expose a target pager version") + require.Equal(t, target, parentMeta.StoredMetadata.FirecrackerUFFDPagerVersion) + + // Graduate: the pager fully populates memory from the backing file and + // unregisters the session. The VM keeps running with no pager dependency. + require.NoError(t, mgr.GraduateSnapshotMemoryPager(ctx, parentID)) + + parentMeta, err = mgr.loadMetadata(parentID) + require.NoError(t, err) + require.Empty(t, parentMeta.StoredMetadata.FirecrackerUFFDSessionID, "graduation should clear the pager session binding") + require.Empty(t, parentMeta.StoredMetadata.FirecrackerUFFDPagerVersion) + + // The VM is still running and all guest memory and disk content survived the + // populate + unregister. + parent = requireRunningSleepInstance(t, ctx, mgr, parentID) + assertGuestFile(t, ctx, parent, "/root/uffd-grad/source", "source-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/source", "source-memory") + assertGuestFile(t, ctx, parent, "/root/uffd-grad/parent", "parent-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/parent", "parent-memory") + + // New guest memory and disk writes still work, proving the guest did not hang + // on a previously untouched page after userfaultfd was unregistered. + writeGuestFile(t, ctx, parent, "/root/uffd-grad/post", "post-disk") + writeGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/post", "post-memory") + assertGuestFile(t, ctx, parent, "/root/uffd-grad/post", "post-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/post", "post-memory") + + // Graduating again is a no-op now that the session is gone. + require.NoError(t, mgr.GraduateSnapshotMemoryPager(ctx, parentID)) + + // A graduated VM still standbys and restores via the file backend, and its + // memory survives the round trip. + parent, err = mgr.StandbyInstance(ctx, parentID, StandbyInstanceRequest{}) + require.NoError(t, err) + require.Equal(t, StateStandby, parent.State) + + parent, err = mgr.RestoreInstance(ctx, parentID) + require.NoError(t, err) + parent = requireRunningSleepInstance(t, ctx, mgr, parentID) + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/source", "source-memory") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/parent", "parent-memory") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/post", "post-memory") + + parentMeta, err = mgr.loadMetadata(parentID) + require.NoError(t, err) + require.Empty(t, parentMeta.StoredMetadata.FirecrackerUFFDSessionID, "file-backed restore after graduation should not create a pager session") + + require.NoError(t, mgr.DeleteInstance(ctx, parentID)) + parentDeleted = true + require.NoError(t, mgr.DeleteInstance(ctx, sourceID)) + sourceDeleted = true + require.NoError(t, mgr.DeleteSnapshot(ctx, snapshot.Id)) + snapshotDeleted = true +} + func requireRunningSleepInstance(t *testing.T, ctx context.Context, mgr Manager, instanceID string) *Instance { t.Helper() inst, err := waitForInstanceState(ctx, mgr, instanceID, StateRunning, integrationTestTimeout(20*time.Second)) From 4a98fdd765656814cda43de34cae0dac00bf7e2f Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Sat, 6 Jun 2026 19:31:08 +0000 Subject: [PATCH 4/4] Run UFFD graduation test serially to avoid CI contention Overlapping the graduation test's full memory populate with the sibling UFFD lifecycle test's VMs saturated the CI runner and timed out guest-agent readiness. Drop t.Parallel so peak concurrent UFFD VM load matches the pre-existing single-test profile. Co-Authored-By: Claude Opus 4.7 --- lib/instances/firecracker_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/instances/firecracker_test.go b/lib/instances/firecracker_test.go index d53355a6..266c3eeb 100644 --- a/lib/instances/firecracker_test.go +++ b/lib/instances/firecracker_test.go @@ -867,7 +867,10 @@ func TestFCUFFDOneShotLifecycle(t *testing.T) { // intact. It is a sibling of TestFCUFFDOneShotLifecycle and leaves that test's // coverage unchanged. func TestFCUFFDGraduationLifecycle(t *testing.T) { - t.Parallel() + // Intentionally not parallel: graduation forces a full guest-memory populate, + // and overlapping that with the sibling UFFD lifecycle test's VMs saturated + // the CI runner and timed out guest-agent readiness. Running solo keeps peak + // concurrent UFFD VM load the same as before this test existed. requireFirecrackerIntegrationPrereqs(t) requireUserfaultfdIntegrationPrereqs(t) if pagerBinary := strings.TrimSpace(os.Getenv("HYPEMAN_UFFD_PAGER_BINARY")); pagerBinary == "" {