Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions database_admin/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ package database_admin
import (
"app/base/utils"
"database/sql"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/golang-migrate/migrate/v4/database"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
)

var lockUsers = []string{"listener", "evaluator", "manager", "vmaas_sync"}

const activeAppSessionsQuery = `SELECT usename || ' ' || substring(query for 50)
FROM pg_stat_activity
WHERE usename = ANY($1)
LIMIT 1`

func execOrPanic(db *sql.DB, query string, args ...interface{}) {
if _, err := db.Exec(query, args...); err != nil {
panic(err)
Expand All @@ -38,18 +45,28 @@ func releaseAdvisoryLock(db *sql.DB) {
execOrPanic(db, "SELECT pg_advisory_unlock(123)")
}

// findActiveAppSession returns the first open session for lockUsers, if any.
func findActiveAppSession(db *sql.DB) (session string, found bool, err error) {
err = db.QueryRow(activeAppSessionsQuery, pq.Array(lockUsers)).Scan(&session)
if errors.Is(err, sql.ErrNoRows) {
return "", false, nil
}
if err != nil {
return "", false, err
}
return session, true, nil
}

// Wait for closing of all lockUsers database sessions.
func waitForSessionClosed(db *sql.DB) {
for {
session := ""
err := db.QueryRow(
"SELECT usename || ' ' || substring(query for 50) FROM pg_stat_activity WHERE "+
"usename IN (?) LIMIT 30;", lockUsers,
).Scan(&session)
session, found, err := findActiveAppSession(db)
if err != nil {
log.Info(err)
utils.LogError("err", err.Error(), "failed to check app database sessions")
time.Sleep(time.Second)
continue
Comment on lines 64 to +67

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Loop on error without a termination condition may mask persistent failures.

If findActiveAppSession keeps failing (e.g., due to permissions on pg_stat_activity or a persistent network issue), this loop will sleep and retry forever, causing the migration to hang on non-recoverable errors. Please add a max retry/timeout mechanism or surface the error after a threshold so the failure is explicit instead of an infinite wait.

}
if session == "" {
if !found {
log.Info("No ", strings.Join(lockUsers, ", "), " sessions found")
return
}
Expand Down
65 changes: 65 additions & 0 deletions database_admin/update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package database_admin

import (
"app/base/database"
"app/base/utils"
"database/sql"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func openAppDB(t *testing.T, user, password string) *sql.DB {
t.Helper()
url := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
user, password,
utils.CoreCfg.DBHost, utils.CoreCfg.DBPort,
utils.CoreCfg.DBName, utils.CoreCfg.DBSslMode,
)
db, err := sql.Open("postgres", url)
require.NoError(t, err)
require.NoError(t, db.Ping())
t.Cleanup(func() { _ = db.Close() })
return db
}

// Query errors must not be treated as "no sessions". Use an unreachable host/port so
// QueryRow fails on connect; sql.Open itself does not connect.
func TestFindActiveAppSessionInvalidDB(t *testing.T) {
db, err := sql.Open("postgres", "postgres://127.0.0.1:1/nope?sslmode=disable&connect_timeout=1")
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })

_, found, err := findActiveAppSession(db)
assert.Error(t, err)
assert.False(t, found)
}

func TestFindActiveAppSessionNoRows(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

_, found, err := findActiveAppSession(db)
assert.NoError(t, err)
assert.False(t, found)
}

func TestFindActiveAppSessionFound(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_ = openAppDB(t, "manager", utils.Getenv("MANAGER_PASSWORD", "manager"))

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

session, found, err := findActiveAppSession(db)
require.NoError(t, err)
assert.True(t, found)
assert.Contains(t, session, "manager")
}
Loading