From 3a476fd39a3c35247a6b251fbe36d4e311224d25 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sun, 17 May 2026 12:47:32 +0300 Subject: [PATCH] feat: switch heartbeat repositories to use MongoDB directly Replace the hedging dual-write pattern with direct MongoDB usage for heartbeat monitors and heartbeats. When HEARTBEAT_DB_BACKEND is set to 'hedging', it now routes to MongoDB instead of the dual-write hedging repository. This is the first migration step before switching to MongoDB completely. - Remove hedging_heartbeat_monitor_repository.go - Remove hedging_heartbeat_repository.go - Remove HedgingFailureCounter from DI container - Route 'hedging' env value to MongoDB in both repository factories Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/go.mod | 4 + api/go.sum | 10 ++ api/pkg/di/container.go | 39 +----- .../hedging_heartbeat_monitor_repository.go | 128 ------------------ .../hedging_heartbeat_repository.go | 79 ----------- 5 files changed, 16 insertions(+), 244 deletions(-) delete mode 100644 api/pkg/repositories/hedging_heartbeat_monitor_repository.go delete mode 100644 api/pkg/repositories/hedging_heartbeat_repository.go diff --git a/api/go.mod b/api/go.mod index 1fe7dca1..08f6a5c0 100644 --- a/api/go.mod +++ b/api/go.mod @@ -41,6 +41,7 @@ require ( github.com/redis/go-redis/extra/redisotel/v9 v9.19.0 github.com/redis/go-redis/v9 v9.19.0 github.com/rs/zerolog v1.35.1 + github.com/schollz/progressbar/v3 v3.19.0 github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 github.com/thedevsaddam/govalidator v1.9.10 @@ -141,6 +142,7 @@ require ( github.com/mattn/go-isatty v0.0.22 // indirect github.com/mattn/go-runewidth v0.0.23 // indirect github.com/mattn/go-sqlite3 v1.14.44 // indirect + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -153,6 +155,7 @@ require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.19.0 // indirect github.com/richardlehane/mscfb v1.0.6 // indirect github.com/richardlehane/msoleps v1.0.6 // indirect + github.com/rivo/uniseg v0.4.7 // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect @@ -193,6 +196,7 @@ require ( golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect golang.org/x/sys v0.44.0 // indirect + golang.org/x/term v0.43.0 // indirect golang.org/x/text v0.37.0 // indirect golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.44.0 // indirect diff --git a/api/go.sum b/api/go.sum index fa1163a2..0f6dcce5 100644 --- a/api/go.sum +++ b/api/go.sum @@ -78,6 +78,8 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM= +github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY= github.com/clipperhouse/displaywidth v0.11.0 h1:lBc6kY44VFw+TDx4I8opi/EtL9m20WSEFgwIwO+UVM8= github.com/clipperhouse/displaywidth v0.11.0/go.mod h1:bkrFNkf81G8HyVqmKGxsPufD3JhNl3dSqnGhOoSD/o0= github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJv2v7Vk= @@ -245,6 +247,8 @@ github.com/mattn/go-runewidth v0.0.23 h1:7ykA0T0jkPpzSvMS5i9uoNn2Xy3R383f9HDx3Ry github.com/mattn/go-runewidth v0.0.23/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= github.com/mattn/go-sqlite3 v1.14.44 h1:3VSe+xafpbzsLbdr2AWlAZk9yRHiBhTBakioXaCKTF8= github.com/mattn/go-sqlite3 v1.14.44/go.mod h1:pjEuOr8IwzLJP2MfGeTb0A35jauH+C2kbHKBr7yXKVQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -291,10 +295,14 @@ github.com/richardlehane/mscfb v1.0.6 h1:eN3bvvZCp00bs7Zf52bxNwAx5lJDBK1tCuH19qq github.com/richardlehane/mscfb v1.0.6/go.mod h1:pe0+IUIc0AHh0+teNzBlJCtSyZdFOGgV4ZK9bsoV+Jo= github.com/richardlehane/msoleps v1.0.6 h1:9BvkpjvD+iUBalUY4esMwv6uBkfOip/Lzvd93jvR9gg= github.com/richardlehane/msoleps v1.0.6/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/zerolog v1.35.1 h1:m7xQeoiLIiV0BCEY4Hs+j2NG4Gp2o2KPKmhnnLiazKI= github.com/rs/zerolog v1.35.1/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw= +github.com/schollz/progressbar/v3 v3.19.0 h1:Ea18xuIRQXLAUidVDox3AbwfUhD0/1IvohyTutOIFoc= +github.com/schollz/progressbar/v3 v3.19.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= @@ -477,6 +485,8 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4= +golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index e9ca3718..6da12033 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -310,23 +310,6 @@ func (container *Container) MongoDB() *mongoDriver.Database { return container.mongoDB } -// HedgingFailureCounter creates an OTel counter for hedging secondary write failures -func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { - meter := otel.GetMeterProvider().Meter( - container.projectID, - otelMetric.WithInstrumentationVersion(otel.Version()), - ) - counter, err := meter.Int64Counter( - "hedging.secondary.write.failures", - otelMetric.WithUnit("1"), - otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), - ) - if err != nil { - container.logger.Fatal(stacktrace.Propagate(err, "cannot create hedging failure counter")) - } - return counter -} - // DBWithoutMigration creates an instance of gorm.DB if it has not been created already func (container *Container) DBWithoutMigration() (db *gorm.DB) { if container.db != nil { @@ -922,22 +905,13 @@ func (container *Container) MessageThreadRepository() (repository repositories.M // HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) { switch os.Getenv("HEARTBEAT_DB_BACKEND") { - case "mongodb": + case "mongodb", "hedging": container.logger.Debug("creating MongoDB repositories.HeartbeatMonitorRepository") return repositories.NewMongoHeartbeatMonitorRepository( container.Logger(), container.Tracer(), container.MongoDB(), ) - case "hedging": - container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") - return repositories.NewHedgingHeartbeatMonitorRepository( - container.Logger(), - container.Tracer(), - repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()), - container.HedgingFailureCounter(), - ) default: container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository") return repositories.NewGormHeartbeatMonitorRepository( @@ -1760,22 +1734,13 @@ func (container *Container) RegisterSwaggerRoutes() { // HeartbeatRepository registers a new instance of repositories.HeartbeatRepository func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { switch os.Getenv("HEARTBEAT_DB_BACKEND") { - case "mongodb": + case "mongodb", "hedging": container.logger.Debug("creating MongoDB repositories.HeartbeatRepository") return repositories.NewMongoHeartbeatRepository( container.Logger(), container.Tracer(), container.MongoDB(), ) - case "hedging": - container.logger.Debug("creating hedging repositories.HeartbeatRepository") - return repositories.NewHedgingHeartbeatRepository( - container.Logger(), - container.Tracer(), - repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()), - container.HedgingFailureCounter(), - ) default: container.logger.Debug("creating GORM repositories.HeartbeatRepository") return repositories.NewGormHeartbeatRepository( diff --git a/api/pkg/repositories/hedging_heartbeat_monitor_repository.go b/api/pkg/repositories/hedging_heartbeat_monitor_repository.go deleted file mode 100644 index 3304230c..00000000 --- a/api/pkg/repositories/hedging_heartbeat_monitor_repository.go +++ /dev/null @@ -1,128 +0,0 @@ -package repositories - -import ( - "context" - "fmt" - - "github.com/google/uuid" - otelMetric "go.opentelemetry.io/otel/metric" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/palantir/stacktrace" -) - -// hedgingHeartbeatMonitorRepository writes to both primary and secondary repositories. -// Reads only hit primary. Secondary writes are fail-open. -type hedgingHeartbeatMonitorRepository struct { - logger telemetry.Logger - tracer telemetry.Tracer - primary HeartbeatMonitorRepository - secondary HeartbeatMonitorRepository - failureCounter otelMetric.Int64Counter -} - -// NewHedgingHeartbeatMonitorRepository creates a hedging HeartbeatMonitorRepository -func NewHedgingHeartbeatMonitorRepository( - logger telemetry.Logger, - tracer telemetry.Tracer, - primary HeartbeatMonitorRepository, - secondary HeartbeatMonitorRepository, - failureCounter otelMetric.Int64Counter, -) HeartbeatMonitorRepository { - return &hedgingHeartbeatMonitorRepository{ - logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatMonitorRepository{})), - tracer: tracer, - primary: primary, - secondary: secondary, - failureCounter: failureCounter, - } -} - -func (repository *hedgingHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.Store(ctx, monitor); err != nil { - return err - } - - if err := repository.secondary.Store(ctx, monitor); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for monitor [%s]", monitor.ID))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -} - -func (repository *hedgingHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { - return repository.primary.Load(ctx, userID, phoneNumber) -} - -func (repository *hedgingHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { - return repository.primary.Exists(ctx, userID, monitorID) -} - -func (repository *hedgingHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.UpdateQueueID(ctx, monitorID, queueID); err != nil { - return err - } - - if err := repository.secondary.UpdateQueueID(ctx, monitorID, queueID); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdateQueueID failed for monitor [%s]", monitorID))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -} - -func (repository *hedgingHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.Delete(ctx, userID, phoneNumber); err != nil { - return err - } - - if err := repository.secondary.Delete(ctx, userID, phoneNumber); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for monitor with owner [%s]", phoneNumber))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -} - -func (repository *hedgingHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil { - return err - } - - if err := repository.secondary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdatePhoneOnline failed for monitor [%s]", monitorID))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -} - -func (repository *hedgingHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil { - return err - } - - if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete all failed for user [%s]", userID))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -} diff --git a/api/pkg/repositories/hedging_heartbeat_repository.go b/api/pkg/repositories/hedging_heartbeat_repository.go deleted file mode 100644 index 07346264..00000000 --- a/api/pkg/repositories/hedging_heartbeat_repository.go +++ /dev/null @@ -1,79 +0,0 @@ -package repositories - -import ( - "context" - "fmt" - - otelMetric "go.opentelemetry.io/otel/metric" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/palantir/stacktrace" -) - -// hedgingHeartbeatRepository writes to both primary and secondary repositories. -// Reads only hit primary. Secondary writes are fail-open. -type hedgingHeartbeatRepository struct { - logger telemetry.Logger - tracer telemetry.Tracer - primary HeartbeatRepository - secondary HeartbeatRepository - failureCounter otelMetric.Int64Counter -} - -// NewHedgingHeartbeatRepository creates a hedging HeartbeatRepository -func NewHedgingHeartbeatRepository( - logger telemetry.Logger, - tracer telemetry.Tracer, - primary HeartbeatRepository, - secondary HeartbeatRepository, - failureCounter otelMetric.Int64Counter, -) HeartbeatRepository { - return &hedgingHeartbeatRepository{ - logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatRepository{})), - tracer: tracer, - primary: primary, - secondary: secondary, - failureCounter: failureCounter, - } -} - -func (repository *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.Store(ctx, heartbeat); err != nil { - return err - } - - if err := repository.secondary.Store(ctx, heartbeat); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for heartbeat [%s]", heartbeat.ID))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -} - -func (repository *hedgingHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { - return repository.primary.Index(ctx, userID, owner, params) -} - -func (repository *hedgingHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { - return repository.primary.Last(ctx, userID, owner) -} - -func (repository *hedgingHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil { - return err - } - - if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil { - repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for user [%s]", userID))) - repository.failureCounter.Add(ctx, 1) - } - - return nil -}