Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
89e9753
test: add concurrency hazard test suite
hongwei1 Jun 11, 2026
9209722
docs: add concurrency test suite summary
hongwei1 Jun 11, 2026
cd8adee
docs: merge concurrency hazard docs into single CONCURRENCY_HAZARDS.md
hongwei1 Jun 11, 2026
5fdc6b1
docs: add ScalaTest simulation plan for concurrent race condition haz…
hongwei1 Jun 11, 2026
b501028
Merge branch 'develop' of https://github.com/OpenBankProject/OBP-API …
hongwei1 Jun 18, 2026
8bc031a
refactor: migrate JSON serialization imports from net.liftweb to org.…
hongwei1 Jun 18, 2026
5de7ff7
feat: implement atomic bank account balance updates using Doobie row …
hongwei1 Jun 18, 2026
26941ab
docs: Update outdated V6/V7 comments in RequestScopeConnection to ref…
hongwei1 Jun 21, 2026
ed1242b
fix(transaction): prevent Doobie escaping and MFA double-spend race c…
hongwei1 Jun 22, 2026
9c3c893
fix(security): prevent authentication counter lost-updates via row-le…
hongwei1 Jun 22, 2026
acdcda6
fix(consent): prevent scheduler stale-save from resurrecting revoked …
hongwei1 Jun 22, 2026
a64d931
fix(concurrency): prevent duplicate-creation races in six getOrCreate…
hongwei1 Jun 22, 2026
5a04b20
fix(concurrency): prevent view-permission races and orphaned AccountA…
hongwei1 Jun 22, 2026
92dc1d8
fix(concurrency): make incrementFutureCounter and decrementFutureCoun…
hongwei1 Jun 22, 2026
ff19daf
docs(concurrency): update CONCURRENCY_HAZARDS.md to reflect all 19 sc…
hongwei1 Jun 22, 2026
5509e50
fix(concurrency): guard getOrCreateSystemView and migrateViewPermissi…
hongwei1 Jun 22, 2026
7e68cc1
fix(concurrency): address five post-review correctness findings
hongwei1 Jun 22, 2026
8144e64
fix(concurrency): guard getOrCreate in mapping providers and bad-logi…
hongwei1 Jun 22, 2026
3bde4fd
fix(concurrency): correct type mismatch and missing import in mapping…
hongwei1 Jun 23, 2026
1141c0d
refactor: clarify variable names and add comments in future counters
hongwei1 Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.openbankproject.commons.model.{AccountId, BankId, BankIdAccountId, Us
import net.liftweb.common._
import net.liftweb.mapper._
import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo


/**
Expand All @@ -31,7 +32,7 @@ object MapperAccountHolders extends MapperAccountHolders with AccountHolders wit

// NOTE: !!! Uses a DIFFERENT TABLE NAME PREFIX TO ALL OTHERS i.e. MAPPER not MAPPED !!!!!

override def dbIndexes = Index(accountBankPermalink, accountPermalink) :: Nil
override def dbIndexes = UniqueIndex(user, accountBankPermalink, accountPermalink) :: Nil

//Note, this method, will not check the existing of bankAccount, any value of BankIdAccountId
//Can create the MapperAccountHolders.
Expand All @@ -51,16 +52,25 @@ object MapperAccountHolders extends MapperAccountHolders with AccountHolders wit
mapperAccountHolder
}
case Empty => {
val holder: MapperAccountHolders = MapperAccountHolders.create
.accountBankPermalink(bankIdAccountId.bankId.value)
.accountPermalink(bankIdAccountId.accountId.value)
.user(user.userPrimaryKey.value)
.source(source.getOrElse(null))
.saveMe
logger.debug(
s"getOrCreateAccountHolder--> create account holder: $holder"
)
Full(holder)
tryo {
MapperAccountHolders.create
.accountBankPermalink(bankIdAccountId.bankId.value)
.accountPermalink(bankIdAccountId.accountId.value)
.user(user.userPrimaryKey.value)
.source(source.getOrElse(null))
.saveMe
} match {
case Full(holder) =>
logger.debug(s"getOrCreateAccountHolder--> create account holder: $holder")
Full(holder)
case Failure(_, _, _) =>
MapperAccountHolders.find(
By(MapperAccountHolders.user, user.userPrimaryKey.value),
By(MapperAccountHolders.accountBankPermalink, bankIdAccountId.bankId.value),
By(MapperAccountHolders.accountPermalink, bankIdAccountId.accountId.value)
)
case other => other
}
}
case Failure(msg, t, c) => Failure(msg, t, c)
case ParamFailure(x,y,z,q) => ParamFailure(x,y,z,q)
Expand Down
27 changes: 20 additions & 7 deletions obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4834,20 +4834,33 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
}

def incrementFutureCounter(serviceName:String) = {
val (serviceNameCounter, serviceNameOpenFuturesCounter) = serviceNameCountersMap.getOrDefault(serviceName,(0,0))
serviceNameCountersMap.put(serviceName,(serviceNameCounter + 1,serviceNameOpenFuturesCounter+1))
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) = serviceNameCountersMap.getOrDefault(serviceName,(0,0))

// Atomically increment both the total-call counter and the open-futures counter for this
// service. ConcurrentHashMap.compute holds the segment lock for the entire lambda, so the
// read-modify-write is a single atomic step — no lost updates under concurrent callers.
// totalCallCount : ever-increasing; used by canOpenFuture for back-off modulo arithmetic.
// openFuturesCount: tracks how many futures are currently in-flight for this service.
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) =
serviceNameCountersMap.compute(serviceName, (_, old) => {
val (totalCallCount, openFuturesCount) = if (old == null) (0, 0) else old
(totalCallCount + 1, openFuturesCount + 1)
})

if(serviceNameOpenFuturesCounterLatest>=expectedOpenFuturesPerService) {
logger.warn(s"WARNING! incrementFutureCounter says: serviceName is $serviceName, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}, which is over expectedOpenFuturesPerService($expectedOpenFuturesPerService)")
}
logger.debug(s"For your information: incrementFutureCounter says: serviceName is $serviceName, serviceNameCounterLatest is ${serviceNameCounterLatest}, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}")
}

def decrementFutureCounter(serviceName:String) = {
val (serviceNameCounter, serviceNameOpenFuturesCounter) = serviceNameCountersMap.getOrDefault(serviceName, (0, 1))
serviceNameCountersMap.put(serviceName, (serviceNameCounter, serviceNameOpenFuturesCounter - 1))
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) = serviceNameCountersMap.getOrDefault(serviceName, (0, 1))
// Atomically decrement only the open-futures counter; totalCallCount is left unchanged
// because it reflects the cumulative number of calls ever started, not the current load.
// The null-guard initialises to (0, 1) and subtracts 1 → (0, 0), which is the safe
// no-op fallback if decrement is somehow called before any increment.
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) =
serviceNameCountersMap.compute(serviceName, (_, old) => {
val (totalCallCount, openFuturesCount) = if (old == null) (0, 1) else old
(totalCallCount, openFuturesCount - 1)
})
logger.debug(s"decrementFutureCounter says: serviceName is $serviceName, serviceNameCounterLatest is $serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}")
}

Expand Down
41 changes: 34 additions & 7 deletions obp-api/src/main/scala/code/api/util/DoobieTransactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ object DoobieUtil extends MdcLoggable {
* None otherwise (background tasks, schedulers, tests without request context).
*/
private def liftCurrentConnection: Option[java.sql.Connection] = {
// DB.currentConnection returns Box[SuperConnection]
// SuperConnection has implicit conversion to java.sql.Connection
DB.currentConnection match {
case Full(superConn) =>
val conn: java.sql.Connection = superConn.connection
if (!conn.isClosed) Some(conn) else None
case _ => None
// 1. Try to fetch the Http4s RequestScopeConnection proxy from Alibaba TTL
Option(code.api.util.http4s.RequestScopeConnection.currentProxy.get()).orElse {
// 2. Fallback: check Lift's DB.currentConnection
DB.currentConnection match {
case Full(superConn) =>
val conn: java.sql.Connection = superConn.connection
if (!conn.isClosed) Some(conn) else None
case _ => None
}
}
}

Expand Down Expand Up @@ -148,6 +150,31 @@ object DoobieUtil extends MdcLoggable {
query.transact(fallbackTransactor)
}

/**
* Fallback transactor that commits. Used for updates outside Lift requests.
*/
private lazy val fallbackUpdateTransactor: Transactor[IO] = {
val liftDataSource = APIUtil.vendor.HikariDatasource.ds
Transactor.fromDataSource[IO].apply(
liftDataSource,
ExecutionContext.global
) // Strategy.default includes commit/rollback
}

/**
* Run a Doobie update synchronously, sharing Lift's transaction when available.
* If not in a Lift request context, uses a transactor that COMMITs the connection.
*/
def runUpdate[A](query: ConnectionIO[A]): A = {
liftCurrentConnection match {
case Some(conn) =>
query.transact(transactorFromConnection(conn)).unsafeRunSync()
case None =>
logger.debug("DoobieUtil.runUpdate: No Lift request context, using fallback update transactor")
query.transact(fallbackUpdateTransactor).unsafeRunSync()
}
}

/**
* Check if the database is SQL Server (for syntax differences like TOP vs LIMIT)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.sql.Connection
import scala.concurrent.Future

/**
* Request-scoped transaction support for v7 http4s endpoints.
* Request-scoped transaction support for Http4s native endpoints.
*
* PROBLEM: Lift Mapper uses a plain ThreadLocal for connection tracking, while
* cats-effect IO switches compute threads across flatMap / IO.fromFuture boundaries.
Expand Down Expand Up @@ -63,11 +63,11 @@ import scala.concurrent.Future
* METRIC WRITES: recordMetric runs in IO.blocking (blocking pool, no TTL from compute
* thread). currentProxy.get() returns null there, so RequestAwareConnectionManager
* falls back to the pool — metric writes use a separate connection and commit
* independently, matching v6 behaviour.
* independently, matching traditional Lift behaviour.
*
* NON-V7 PATHS (v6 via bridge, background tasks): requestProxyLocal is not set,
* currentProxy is null — RequestAwareConnectionManager delegates to APIUtil.vendor
* as before. DB.buildLoanWrapper (v6) continues to manage its own transaction.
* BACKGROUND TASKS / NON-HTTP PATHS: requestProxyLocal is not set, currentProxy is null
* — RequestAwareConnectionManager delegates to APIUtil.vendor. Any Lift Mapper operations
* outside of a Http4s request scope will auto-commit unless wrapped in a Lift LoanWrapper.
*/
object RequestScopeConnection extends MdcLoggable {

Expand Down Expand Up @@ -186,7 +186,7 @@ object RequestScopeConnection extends MdcLoggable {
* If no DB call was made: nothing to commit or close (pool unaffected).
*
* GET/HEAD must NOT be wrapped (they run on auto-commit vendor connections). Used by
* ResourceDocMiddleware (v6/v7) and by services that build their own request scope
* ResourceDocMiddleware and by services that build their own request scope
* without the middleware (e.g. Http4sDynamicEntity).
*/
def withBusinessDBTransaction(io: IO[Response[IO]]): IO[Response[IO]] =
Expand Down Expand Up @@ -250,8 +250,8 @@ object RequestScopeConnection extends MdcLoggable {
* DB.defineConnectionManager(..., new RequestAwareConnectionManager(APIUtil.vendor))
*
* Used by:
* - v7 native endpoints (gets proxy from TTL, set right before Future submission)
* - v6 via bridge / background tasks (TTL is null → delegates to vendor as before)
* - Http4s native endpoints (gets proxy from TTL, set right before Future submission)
* - Background tasks / Non-HTTP paths (TTL is null → delegates to vendor as before)
*/
class RequestAwareConnectionManager(delegate: ConnectionManager) extends ConnectionManager with MdcLoggable {

Expand Down
4 changes: 4 additions & 0 deletions obp-api/src/main/scala/code/api/v4_0_0/Http4s400.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3474,6 +3474,10 @@ object Http4s400 {
}
_ <- NewStyle.function.checkAuthorisationToCreateTransactionRequest(
viewId, BankIdAccountId(fromAccount.bankId, fromAccount.accountId), user, Some(cc))
// Lock the transaction request row before fetching to prevent Double-Spend MFA bypass
_ <- code.util.Helper.booleanToFuture("Failed to acquire transaction request lock", cc = Some(cc)) {
code.bankconnectors.DoobieTransactionRequestQueries.lockTransactionRequest(transReqId.value).isDefined
}
(existingTransactionRequest, _) <- NewStyle.function.getTransactionRequestImpl(transReqId, Some(cc))
_ <- code.util.Helper.booleanToFuture(
TransactionRequestStatusNotInitiatedOrPendingOrForwarded, cc = Some(cc)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package code.bankconnectors

import code.api.util.DoobieUtil
import doobie._
import doobie.implicits._

object DoobieBadLoginAttemptQueries {

private def atomicIncrement(provider: String, username: String): ConnectionIO[Int] =
for {
_ <- sql"""SELECT mbadattemptssincelastsuccessorreset
FROM mappedbadloginattempt
WHERE provider = $provider AND musername = $username
FOR UPDATE""".query[Int].option
rows <- sql"""UPDATE mappedbadloginattempt
SET mbadattemptssincelastsuccessorreset = mbadattemptssincelastsuccessorreset + 1,
mlastfailuredate = NOW()
WHERE provider = $provider AND musername = $username""".update.run
} yield rows

def incrementBadLoginAttempts(provider: String, username: String): Int =
DoobieUtil.runUpdate(atomicIncrement(provider, username))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package code.bankconnectors

import code.api.util.DoobieUtil
import doobie._
import doobie.implicits._
import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo

object DoobieBankAccountQueries {

/**
* Atomically updates the bank account balance using a database row lock (SELECT FOR UPDATE).
*
* @param bankId The bank ID
* @param accountId The account ID
* @param amount The amount to add (can be negative for deductions)
* @return The new balance after the update
*/
def atomicallyUpdateBalance(bankId: String, accountId: String, amount: Long): ConnectionIO[Long] = {
for {
// 1. Lock the row and get the current balance
currentBalance <- sql"SELECT accountbalance FROM mappedbankaccount WHERE bank = $bankId AND theaccountid = $accountId FOR UPDATE".query[Long].unique

newBalance = currentBalance + amount

// 2. Update the row with the new balance
_ <- sql"UPDATE mappedbankaccount SET accountbalance = $newBalance WHERE bank = $bankId AND theaccountid = $accountId".update.run
} yield newBalance
}

def updateBalance(bankId: String, accountId: String, amount: Long): Box[Long] = {
tryo {
DoobieUtil.runUpdate(atomicallyUpdateBalance(bankId, accountId, amount))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package code.bankconnectors

import code.api.util.DoobieUtil
import doobie._
import doobie.implicits._
import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo

object DoobieChallengeQueries {

private def incrementAndSelectCounter(challengeId: String): ConnectionIO[Int] =
for {
_ <- sql"""SELECT attemptcounter
FROM ExpectedChallengeAnswer
WHERE challengeid = $challengeId
FOR UPDATE""".query[Int].option
_ <- sql"""UPDATE ExpectedChallengeAnswer
SET attemptcounter = attemptcounter + 1
WHERE challengeid = $challengeId""".update.run
counter <- sql"""SELECT attemptcounter FROM ExpectedChallengeAnswer
WHERE challengeid = $challengeId""".query[Int].unique
} yield counter

def incrementAndGetChallengeCounter(challengeId: String): Int =
DoobieUtil.runUpdate(incrementAndSelectCounter(challengeId))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package code.bankconnectors

import code.api.util.DoobieUtil
import doobie._
import doobie.implicits._

object DoobieConsentSchedulerQueries {

def conditionallyUpdateStatus(
consentRowId: Long,
guardStatus: String,
newStatus: String,
newNote: String
): Int = DoobieUtil.runUpdate(
sql"""UPDATE mappedconsent
SET mstatus = $newStatus,
mnote = $newNote,
mstatusupdatedatetime = NOW()
WHERE id = $consentRowId
AND mstatus = $guardStatus""".update.run
)

def conditionallyExpireValidBerlinGroupConsent(
consentRowId: Long,
newNote: String
): Int = DoobieUtil.runUpdate(
sql"""UPDATE mappedconsent
SET mstatus = ${"expired"},
mnote = $newNote,
mstatusupdatedatetime = NOW()
WHERE id = $consentRowId
AND mstatus IN ('valid', 'VALID')""".update.run
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package code.bankconnectors

import code.api.util.DoobieUtil
import doobie._
import doobie.implicits._
import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo

object DoobieTransactionRequestQueries {

/**
* Atomically locks the transaction request row using SELECT FOR UPDATE.
* This ensures that concurrent MFA challenge answers cannot be processed simultaneously
* for the same transaction request.
*/
def atomicallyLockTransactionRequest(transReqId: String): ConnectionIO[String] = {
sql"SELECT mstatus FROM mappedtransactionrequest WHERE mtransactionrequestid = $transReqId FOR UPDATE".query[String].unique
}

def lockTransactionRequest(transReqId: String): Box[String] = {
tryo {
DoobieUtil.runUpdate(atomicallyLockTransactionRequest(transReqId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2316,11 +2316,12 @@ object LocalMappedConnector extends Connector with MdcLoggable {
): Box[TransactionId] =
for {
currency <- Full(fromAccount.currency)
//update the balance of the fromAccount for which a transaction is being created
newAccountBalance <- Full(Helper.convertToSmallestCurrencyUnits(fromAccount.balance, currency) + Helper.convertToSmallestCurrencyUnits(amount, currency))

//Here is the `LocalMappedConnector`, once get this point, fromAccount must be a mappedBankAccount. So can use asInstanceOf....
_ <- tryo(fromAccount.asInstanceOf[MappedBankAccount].accountBalance(newAccountBalance).save) ?~! UpdateBankAccountException
// atomically update the balance using Doobie and SELECT FOR UPDATE row locking
newAccountBalance <- DoobieBankAccountQueries.updateBalance(
fromAccount.bankId.value,
fromAccount.accountId.value,
Helper.convertToSmallestCurrencyUnits(amount, currency)
) ?~! UpdateBankAccountException

mappedTransaction <- tryo(MappedTransaction.create
.bank(fromAccount.bankId.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,12 @@ object LocalMappedConnectorInternal extends MdcLoggable {
for {

currency <- Full(fromAccount.currency)
//update the balance of the fromAccount for which a transaction is being created
newAccountBalance <- Full(Helper.convertToSmallestCurrencyUnits(fromAccount.balance, currency) + Helper.convertToSmallestCurrencyUnits(amount, currency))

//Here is the `LocalMappedConnector`, once get this point, fromAccount must be a mappedBankAccount. So can use asInstanceOf....
_ <- tryo(fromAccount.asInstanceOf[MappedBankAccount].accountBalance(newAccountBalance).save) ?~! UpdateBankAccountException
// atomically update the balance using Doobie and SELECT FOR UPDATE row locking
newAccountBalance <- DoobieBankAccountQueries.updateBalance(
fromAccount.bankId.value,
fromAccount.accountId.value,
Helper.convertToSmallestCurrencyUnits(amount, currency)
) ?~! UpdateBankAccountException

mappedTransaction <- tryo(MappedTransaction.create
//No matter which type (SANDBOX_TAN,SEPA,FREE_FORM,COUNTERPARTYE), always filled the following nine fields.
Expand Down
Loading
Loading