Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
| Topic | Location |
|--------|----------|
| Architecture and components | [docs/md/architecture.md](docs/md/architecture.md) |
| Database layout | [docs/md/database.md](docs/md/database.md) |
| Database layout and migrations | [docs/md/database.md](docs/md/database.md) |
| Local dev, tests, OpenAPI | [README.md](README.md) |
| Commits, PRs, contribution style | [CONTRIBUTING.md](CONTRIBUTING.md) |

Expand All @@ -30,6 +30,7 @@ Prefer these sources over guessing when behavior or schema matters.
| Evaluation | `evaluator/`, topic names in code and `conf/` |
| Advisory sync | `tasks/vmaas_sync/` |
| Migrations | `database_admin/migrations/` (verify naming against existing migrations) |
| Migration flow and session flags | `database_admin/update.go`, [docs/md/database.md#migrations](docs/md/database.md#migrations) |
| Database schema and SQL | `database_admin/schema/` |
| Containers and local orchestration | `docker-compose.yml`, `docker-compose.test.yml`, `Dockerfile*` |
| Scheduled jobs | `tasks/` |
Expand Down Expand Up @@ -85,3 +86,29 @@ Manager Component (REST API)
Response to User
```

---

## Database migrations: `terminate_db_sessions`

When advising on migrations or deploy config, use [docs/md/database.md#migrations](docs/md/database.md#migrations). Summary for agents:

**Default:** do **not** set `terminate_db_sessions`. It defaults to `false`; normal deploys must stay unchanged.

**What it does:** After `NOLOGIN` on app DB users, database-admin optionally runs `pg_terminate_backend` on open `listener` / `evaluator` / `manager` / `vmaas_sync` sessions, then waits until `pg_stat_activity` shows none, then runs DDL. Code: `prepareForMigration()` in `database_admin/update.go`.

**Recommend `terminate_db_sessions=true` only when:**

- The migration is a **major DDL** change likely to need exclusive locks or long runtimes (large `ALTER TABLE`, partition restructuring, similar)
- Migration logs show blocking after user lock with app sessions still present
- Ops are doing a **one-off major migration deploy** via `DATABASE_ADMIN_CONFIG` on the **db-migration Job**

**Do not recommend it when:**

- The change is a routine migration or standard release deploy
- The user is working locally or in CI
- There is no session-lock symptom — it forcibly drops client connections and is not a safe default

**How to set (production):** `DATABASE_ADMIN_CONFIG=terminate_db_sessions=true` on the db-migration Job for that deploy only; remove afterward. Do not enable on manager/listener/evaluator pods.

**Related:** Session wait logic and `pg_stat_activity` queries are in `database_admin/update.go`. Deploy layout (single migration Job, `check-for-db` init) is in `deploy/clowdapp.yaml`.
2 changes: 2 additions & 0 deletions database_admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ var (
unlockUsers = utils.PodConfig.GetBool("unlock_users", false)
// rerun config.sql
updateDBConfig = utils.PodConfig.GetBool("update_db_config", false)
// Terminate lockUsers sessions after NOLOGIN (for major DDL migrations)
terminateDBSessions = utils.PodConfig.GetBool("terminate_db_sessions", false)
)
84 changes: 76 additions & 8 deletions database_admin/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ package database_admin
import (
"app/base/utils"
"database/sql"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/golang-migrate/migrate/v4/database"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
)

var lockUsers = []string{"listener", "evaluator", "manager", "vmaas_sync"}

const activeAppSessionsQuery = `SELECT usename || ' ' || substring(query for 50)
FROM pg_stat_activity
WHERE usename = ANY($1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider aligning the pg_stat_activity filters between the "find" and "list" queries.

activeAppSessionsQuery and activeAppSessionPIDsQuery use slightly different filters (e.g., only the latter excludes pg_backend_pid()). Please either share a common base condition or make their semantics intentionally consistent, so the “active” sessions you wait on are the same ones you terminate. Adding an explicit pg_backend_pid() exclusion to both would also guard against future config changes (e.g., if lockUsers ever includes the current user).

LIMIT 1`

func execOrPanic(db *sql.DB, query string, args ...interface{}) {
if _, err := db.Exec(query, args...); err != nil {
panic(err)
Expand All @@ -38,18 +45,71 @@ func releaseAdvisoryLock(db *sql.DB) {
execOrPanic(db, "SELECT pg_advisory_unlock(123)")
}

// findActiveAppSession returns the first open session for lockUsers, if any.
func findActiveAppSession(db *sql.DB) (session string, found bool, err error) {
err = db.QueryRow(activeAppSessionsQuery, pq.Array(lockUsers)).Scan(&session)
if errors.Is(err, sql.ErrNoRows) {
return "", false, nil
}
if err != nil {
return "", false, err
}
return session, true, nil
}

type appSession struct {
pid int
usename string
query string
}

const activeAppSessionPIDsQuery = `SELECT pid, usename, coalesce(substring(query for 50), '')
FROM pg_stat_activity
WHERE usename = ANY($1)
AND pid <> pg_backend_pid()`

func listActiveAppSessions(db *sql.DB) ([]appSession, error) {
rows, err := db.Query(activeAppSessionPIDsQuery, pq.Array(lockUsers))
if err != nil {
return nil, err
}
defer rows.Close()

var sessions []appSession
for rows.Next() {
var session appSession
if err := rows.Scan(&session.pid, &session.usename, &session.query); err != nil {
return nil, err
}
sessions = append(sessions, session)
}
return sessions, rows.Err()
}

func terminateAppSessions(db *sql.DB) {
sessions, err := listActiveAppSessions(db)
if err != nil {
panic(err)
}
for _, session := range sessions {
if _, err := db.Exec("SELECT pg_terminate_backend($1)", session.pid); err != nil {
panic(err)
}
utils.LogInfo("pid", session.pid, "user", session.usename, "query", session.query,
"terminated app database session")
}
}

// Wait for closing of all lockUsers database sessions.
func waitForSessionClosed(db *sql.DB) {
for {
session := ""
err := db.QueryRow(
"SELECT usename || ' ' || substring(query for 50) FROM pg_stat_activity WHERE "+
"usename IN (?) LIMIT 30;", lockUsers,
).Scan(&session)
session, found, err := findActiveAppSession(db)
if err != nil {
log.Info(err)
utils.LogError("err", err.Error(), "failed to check app database sessions")
time.Sleep(time.Second)
continue
}
if session == "" {
if !found {
log.Info("No ", strings.Join(lockUsers, ", "), " sessions found")
return
}
Expand Down Expand Up @@ -80,10 +140,18 @@ func unblockUsers(db *sql.DB) {
}
}

func startMigration(conn database.Driver, db *sql.DB, migrationFilesURL string) {
func prepareForMigration(db *sql.DB) {
log.Info("Blocking writing users during the migration")
blockUsers(db)
if terminateDBSessions {
log.Info("Terminating active app database sessions")
terminateAppSessions(db)
}
waitForSessionClosed(db)
}

func startMigration(conn database.Driver, db *sql.DB, migrationFilesURL string) {
prepareForMigration(db)

MigrateUp(conn, migrationFilesURL)

Expand Down
162 changes: 162 additions & 0 deletions database_admin/update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package database_admin

import (
"app/base/database"
"app/base/utils"
"database/sql"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func openAppDB(t *testing.T, user, password string) *sql.DB {
t.Helper()
sslModeCert := utils.CoreCfg.DBSslMode
if utils.CoreCfg.DBSslRootCert != "" {
sslModeCert += "&sslrootcert=" + utils.CoreCfg.DBSslRootCert
}
url := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
user, password,
utils.CoreCfg.DBHost, utils.CoreCfg.DBPort,
utils.CoreCfg.DBName, sslModeCert,
)
db, err := sql.Open("postgres", url)
require.NoError(t, err)
require.NoError(t, db.Ping())
t.Cleanup(func() { _ = db.Close() })
return db
}

// Query errors must not be treated as "no sessions". Use an unreachable host/port so
// QueryRow fails on connect; sql.Open itself does not connect.
func TestFindActiveAppSessionInvalidDB(t *testing.T) {
db, err := sql.Open("postgres", "postgres://127.0.0.1:1/nope?sslmode=disable&connect_timeout=1")
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })

_, found, err := findActiveAppSession(db)
assert.Error(t, err)
assert.False(t, found)
}

func TestFindActiveAppSessionNoRows(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

_, found, err := findActiveAppSession(db)
assert.NoError(t, err)
assert.False(t, found)
}

func TestFindActiveAppSessionFound(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_ = openAppDB(t, "manager", utils.Getenv("MANAGER_PASSWORD", "manager"))

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

session, found, err := findActiveAppSession(db)
require.NoError(t, err)
assert.True(t, found)
assert.Contains(t, session, "manager")
}

func TestListActiveAppSessionsFound(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_ = openAppDB(t, "manager", utils.Getenv("MANAGER_PASSWORD", "manager"))

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

sessions, err := listActiveAppSessions(db)
require.NoError(t, err)

found := false
for _, session := range sessions {
if session.usename == "manager" {
found = true
assert.NotZero(t, session.pid)
break
}
}
assert.True(t, found)
}

func TestTerminateAppSessions(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

listenerDB := openAppDB(t, "listener", utils.Getenv("LISTENER_PASSWORD", "listener"))
listenerDB.SetMaxOpenConns(1)

_, db := dbConn()
t.Cleanup(func() {
unblockUsers(db)
_ = db.Close()
})

var listenerPID int
require.NoError(t, listenerDB.QueryRow("SELECT pg_backend_pid()").Scan(&listenerPID))

sessions, err := listActiveAppSessions(db)
require.NoError(t, err)
found := false
for _, session := range sessions {
if session.pid == listenerPID {
found = true
break
}
}
require.True(t, found)

blockUsers(db)
terminateAppSessions(db)

var one int
err = listenerDB.QueryRow("SELECT 1").Scan(&one)
assert.Error(t, err)
}

func TestStartMigrationBeforeMigrateUp(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

oldTerminate := terminateDBSessions
terminateDBSessions = true
t.Cleanup(func() { terminateDBSessions = oldTerminate })

listenerDB := openAppDB(t, "listener", utils.Getenv("LISTENER_PASSWORD", "listener"))
listenerDB.SetMaxOpenConns(1)

_, db := dbConn()
t.Cleanup(func() {
unblockUsers(db)
_ = db.Close()
})

prepareForMigration(db)

var one int
err := listenerDB.QueryRow("SELECT 1").Scan(&one)
assert.Error(t, err)

_, found, err := findActiveAppSession(db)
require.NoError(t, err)
assert.False(t, found)

for _, user := range lockUsers {
var canLogin bool
err := db.QueryRow("SELECT rolcanlogin FROM pg_roles WHERE rolname = $1", user).Scan(&canLogin)
require.NoError(t, err)
assert.False(t, canLogin, "user %s should remain blocked before MigrateUp", user)
}
}
1 change: 1 addition & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ parameters:
- {name: MEM_REQUEST_DATABASE_ADMIN, value: 128Mi}
- {name: GOMEMLIMIT_DATABASE_ADMIN, value: '115MiB'} # set to 90% of the default memory limit 128Mi (don't forget `B`)
- {name: DATABASE_ADMIN_CONFIG, value: ''} # Set 'schema_version=XXX' if need specific database schema
# 'terminate_db_sessions=true' to kill app DB sessions before DDL
# 'force_schema_version=XXX' to reset the dirty flag to false and force the specific version, it will follow up with the schema upgrade defined by schema_version

# Common parameters
Expand Down
5 changes: 4 additions & 1 deletion docs/md/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ description of the component and data layout are in [separate page](database.md)

- **database-admin** - Executes database initialization and migrations. It needs all rights for the database. It also
creates database users for all components and updates passwords for them, so it reads passwords for admin and all
components from environment variables. Using container CLI it's possible to manually manage database
components from environment variables. Before DDL it sets app users (`listener`, `evaluator`, `manager`, `vmaas_sync`)
to `NOLOGIN`, optionally terminates lingering sessions when `terminate_db_sessions=true`, waits until no app sessions
remain, runs migrations, then restores `LOGIN`. See [Database migrations](database.md#migrations) for when to enable
session termination. Using container CLI it's possible to manually manage database
(`./scripts/psql.sh`). See [component environment variables](../../conf/database_admin.env)

### Components cooperation schema
Expand Down
Loading
Loading