diff --git a/pg_query_state.c b/pg_query_state.c index 739a44e..d3056ff 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -64,21 +64,12 @@ static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, /* Global variables */ List *QueryDescStack = NIL; -static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; -static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; -static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; +ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; +ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; +ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; static bool module_initialized = false; static int reqid = 0; -typedef struct -{ - slock_t mutex; /* protect concurrent access to `userid` */ - Oid userid; - Latch *caller; - pg_atomic_uint32 n_peers; -} RemoteUserIdResult; - -static void SendCurrentUserId(void); static void SendBgWorkerPids(void); static Oid GetRemoteBackendUserId(PGPROC *proc); static List *GetRemoteBackendWorkers(PGPROC *proc); @@ -90,12 +81,27 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, bool buffers, bool triggers, ExplainFormat format); +static shm_mq_result shm_mq_receive_with_timeout(shm_mq_handle *mqh, + Size *nbytesp, + void **datap, + int64 timeout); /* Shared memory variables */ static shm_toc *toc = NULL; -static RemoteUserIdResult *counterpart_userid = NULL; pg_qs_params *params = NULL; shm_mq *mq = NULL; +/* + * reqid and *mq_req_id are used to control request/response match on requestor side. + * + * - reqid is static variable used for tracking current request id on requestor side. + * - *mq_req_id is current shared message queue reqid + * + * *mq_req_id is used on signal handler as processed reqid and is set to shm_mq_msg.reqid, + * futher, when response arrives, this value is compared with reqid on requestor side. + * This prevents processing stale or outdated replies from previous attempts, timeouts, + * or concurrent calls. + */ +uint32 *mq_req_id = NULL; /* * Estimate amount of shared memory needed. @@ -111,9 +117,9 @@ pg_qs_shmem_size() nkeys = 3; - shm_toc_estimate_chunk(&e, sizeof(RemoteUserIdResult)); shm_toc_estimate_chunk(&e, sizeof(pg_qs_params)); shm_toc_estimate_chunk(&e, (Size) QUEUE_SIZE); + shm_toc_estimate_chunk(&e, sizeof(uint32)); shm_toc_estimate_keys(&e, nkeys); size = shm_toc_estimate(&e); @@ -132,36 +138,37 @@ pg_qs_shmem_startup(void) void *shmem; int num_toc = 0; + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); shmem = ShmemInitStruct("pg_query_state", shmem_size, &found); if (!found) { toc = shm_toc_create(PG_QS_MODULE_KEY, shmem, shmem_size); - counterpart_userid = shm_toc_allocate(toc, sizeof(RemoteUserIdResult)); - shm_toc_insert(toc, num_toc++, counterpart_userid); - SpinLockInit(&counterpart_userid->mutex); - pg_atomic_init_u32(&counterpart_userid->n_peers, 0); - params = shm_toc_allocate(toc, sizeof(pg_qs_params)); shm_toc_insert(toc, num_toc++, params); mq = shm_toc_allocate(toc, QUEUE_SIZE); shm_toc_insert(toc, num_toc++, mq); + + mq_req_id = shm_toc_allocate(toc, sizeof(uint32)); + shm_toc_insert(toc, num_toc++, mq_req_id); + *mq_req_id = 0; } else { toc = shm_toc_attach(PG_QS_MODULE_KEY, shmem); #if PG_VERSION_NUM < 100000 - counterpart_userid = shm_toc_lookup(toc, num_toc++); params = shm_toc_lookup(toc, num_toc++); mq = shm_toc_lookup(toc, num_toc++); + mq_req_id = shm_toc_lookup(toc, num_toc++); #else - counterpart_userid = shm_toc_lookup(toc, num_toc++, false); params = shm_toc_lookup(toc, num_toc++, false); mq = shm_toc_lookup(toc, num_toc++, false); + mq_req_id = shm_toc_lookup(toc, num_toc++, false); #endif } + LWLockRelease(AddinShmemInitLock); if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -260,8 +267,7 @@ pg_qs_shmem_request(void) /* * ExecutorStart hook: - * set up flags to store runtime statistics, - * push current query description in global stack + * Set up flags to store runtime statistics. */ static void qs_ExecutorStart(QueryDesc *queryDesc, int eflags) @@ -523,8 +529,6 @@ pg_query_state(PG_FUNCTION_ARGS) shm_mq_msg *msg; List *bg_worker_procs = NIL; List *msgs; - instr_time start_time; - instr_time cur_time; if (!module_initialized) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -563,22 +567,7 @@ pg_query_state(PG_FUNCTION_ARGS) */ LockShmem(&tag, PG_QS_RCV_KEY); - INSTR_TIME_SET_CURRENT(start_time); - - while (pg_atomic_read_u32(&counterpart_userid->n_peers) != 0) - { - pg_usleep(1000000); /* wait one second */ - CHECK_FOR_INTERRUPTS(); - - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - if (INSTR_TIME_GET_MILLISEC(cur_time) > MAX_RCV_TIMEOUT) - { - elog(WARNING, "pg_query_state: last request was interrupted"); - break; - } - } + reqid = *mq_req_id + 1; counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) @@ -588,10 +577,6 @@ pg_query_state(PG_FUNCTION_ARGS) errmsg("permission denied"))); } - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - pg_write_barrier(); - bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -744,16 +729,6 @@ pg_query_state(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -static void -SendCurrentUserId(void) -{ - SpinLockAcquire(&counterpart_userid->mutex); - counterpart_userid->userid = GetUserId(); - SpinLockRelease(&counterpart_userid->mutex); - - SetLatch(counterpart_userid->caller); -} - /* * Extract effective user id from backend on which `proc` points. * @@ -765,7 +740,12 @@ SendCurrentUserId(void) static Oid GetRemoteBackendUserId(PGPROC *proc) { - Oid result; + int sig_result; + shm_mq_handle *mqh; + shm_mq_result mq_receive_result; + shm_mq_userid_msg *msg; + Size msg_len; + LOCKTAG tag; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -774,40 +754,53 @@ GetRemoteBackendUserId(PGPROC *proc) #endif Assert(UserIdPollReason != INVALID_PROCSIGNAL); - Assert(counterpart_userid); + Assert(mq); - counterpart_userid->userid = InvalidOid; - counterpart_userid->caller = MyLatch; - pg_write_barrier(); + LockShmem(&tag, PG_QS_SND_KEY); + /* fill in parameters of query state request */ + params->reason = UserIdPollReason; + mq = shm_mq_create(mq, QUEUE_SIZE); + shm_mq_set_sender(mq, proc); + shm_mq_set_receiver(mq, MyProc); + *mq_req_id = reqid; + UnlockShmem(&tag); #if PG_VERSION_NUM >= 170000 - SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); #else - SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); #endif - for (;;) + if (sig_result == -1) { - SpinLockAcquire(&counterpart_userid->mutex); - result = counterpart_userid->userid; - SpinLockRelease(&counterpart_userid->mutex); - - if (result != InvalidOid) - break; + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("can't send signal to get remote backend userid"))); + return InvalidOid; + } + mqh = shm_mq_attach(mq, NULL, NULL); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); + if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != sizeof(shm_mq_userid_msg)) + { #if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET, 0); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION); + shm_mq_detach(mq); #else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, - PG_WAIT_EXTENSION); + shm_mq_detach(mqh); #endif - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("error in message queue data transmitting"))); + return InvalidOid; } - return result; +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + return msg->userid; } /* @@ -833,7 +826,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, { shm_mq_result mq_receive_result; - mq_receive_result = receive_msg_by_parts(mqh, nbytesp, datap, timeout, &rc, true); + mq_receive_result = receive_msg_by_parts(mqh, nbytesp, datap, delay, &rc, true); if (mq_receive_result != SHM_MQ_WOULD_BLOCK) return mq_receive_result; if (rc & WL_TIMEOUT || delay <= 0) @@ -916,12 +909,17 @@ SendBgWorkerPids(void) int i; shm_mq_handle *mqh; LOCKTAG tag; - shm_mq_result result; + msg_by_parts_result result; LockShmem(&tag, PG_QS_SND_KEY); - mqh = shm_mq_attach(mq, NULL, NULL); + if (shm_mq_get_sender(mq) != MyProc || params->reason != WorkerPollReason) + { + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + goto connection_cleanup; + } + foreach(iter, QueryDescStack) { QueryDesc *curQueryDesc = (QueryDesc *) lfirst(iter); @@ -934,7 +932,7 @@ SendBgWorkerPids(void) msg_len = offsetof(BgWorkerPids, pids) + sizeof(pid_t) * list_length(all_workers); msg = palloc(msg_len); - msg->reqid = params->reqid; + msg->reqid = *mq_req_id; msg->number = list_length(all_workers); i = 0; foreach(iter, all_workers) @@ -945,16 +943,23 @@ SendBgWorkerPids(void) msg->pids[i++] = current_pid; } -#if PG_VERSION_NUM < 150000 - result = shm_mq_send(mqh, msg_len, msg, false); -#else - result = shm_mq_send(mqh, msg_len, msg, false, true); -#endif + result = send_msg_by_parts(mqh, msg_len, msg); + + pfree(msg); /* Check for failure. */ - if(result == SHM_MQ_DETACHED) - elog(WARNING, "could not send message queue to shared-memory queue: receiver has been detached"); + if(result != MSG_BY_PARTS_SUCCEEDED) + { + elog(WARNING, "pg_query_state: peer seems to have detached"); + goto connection_cleanup; + } +connection_cleanup: +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif UnlockShmem(&tag); } @@ -983,9 +988,12 @@ GetRemoteBackendWorkers(PGPROC *proc) Assert(mq); LockShmem(&tag, PG_QS_SND_KEY); + /* fill in parameters of query state request */ + params->reason = WorkerPollReason; mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); + *mq_req_id = reqid; UnlockShmem(&tag); #if PG_VERSION_NUM >= 170000 @@ -998,7 +1006,10 @@ GetRemoteBackendWorkers(PGPROC *proc) goto signal_error; mqh = shm_mq_attach(mq, NULL, NULL); - mq_receive_result = shm_mq_receive(mqh, &msg_len, (void **) &msg, false); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != offsetof(BgWorkerPids, pids) + msg->number*sizeof(pid_t)) goto mq_error; @@ -1010,7 +1021,6 @@ GetRemoteBackendWorkers(PGPROC *proc) continue; result = lcons(current_proc, result); } - #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); #else @@ -1023,21 +1033,17 @@ GetRemoteBackendWorkers(PGPROC *proc) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("invalid send signal"))); mq_error: +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("error in message queue data transmitting"))); return NIL; } -static shm_mq_msg * -copy_msg(shm_mq_msg *msg) -{ - shm_mq_msg *result = palloc(msg->length); - - memcpy(result, msg, msg->length); - return result; -} - static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap, int64 timeout, int *rc, bool nowait) @@ -1113,7 +1119,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, ExplainFormat format) { List *result = NIL; - List *alive_procs = NIL; ListCell *iter; int sig_result; shm_mq_handle *mqh; @@ -1125,20 +1130,20 @@ GetRemoteBackendQueryStates(PGPROC *leader, Assert(QueryStatePollReason != INVALID_PROCSIGNAL); Assert(mq); + /* initialize message queue that will transfer query states */ + LockShmem(&tag, PG_QS_SND_KEY); /* fill in parameters of query state request */ + params->reason = QueryStatePollReason; params->verbose = verbose; params->costs = costs; params->timing = timing; params->buffers = buffers; params->triggers = triggers; params->format = format; - pg_write_barrier(); - - /* initialize message queue that will transfer query states */ - LockShmem(&tag, PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, leader); shm_mq_set_receiver(mq, MyProc); + *mq_req_id = reqid; UnlockShmem(&tag); /* @@ -1157,58 +1162,34 @@ GetRemoteBackendQueryStates(PGPROC *leader, if (sig_result == -1) goto signal_error; - foreach(iter, pworkers) - { - PGPROC *proc = (PGPROC *) lfirst(iter); - if (!proc || !proc->pid) - continue; - - pg_atomic_add_fetch_u32(&counterpart_userid->n_peers, 1); - -#if PG_VERSION_NUM >= 170000 - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->vxid.procNumber); -#else - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->backendId); -#endif - - if (sig_result == -1) - { - if (errno != ESRCH) - goto signal_error; - continue; - } - - alive_procs = lappend(alive_procs, proc); - } /* extract query state from leader process */ mqh = shm_mq_attach(mq, NULL, NULL); elog(DEBUG1, "Wait response from leader %d", leader->pid); - mq_receive_result = receive_msg_by_parts(mqh, &len, (void **) &msg, - 0, NULL, false); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS) goto mq_error; if (msg->reqid != reqid) goto mq_error; Assert(len == msg->length); - result = lappend(result, copy_msg(msg)); + result = lappend(result, msg); #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); #else shm_mq_detach(mqh); #endif - /* * collect results from all alived parallel workers */ - foreach(iter, alive_procs) + foreach(iter, pworkers) { PGPROC *proc = (PGPROC *) lfirst(iter); + if (!proc || !proc->pid) + continue; /* prepare message queue to transfer data */ elog(DEBUG1, "Wait response from worker %d", proc->pid); @@ -1218,8 +1199,26 @@ GetRemoteBackendQueryStates(PGPROC *leader, shm_mq_set_receiver(mq, MyProc); /* this function notifies the counterpart to come into data transfer */ + *mq_req_id = reqid; UnlockShmem(&tag); +#if PG_VERSION_NUM >= 170000 + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->vxid.procNumber); +#else + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->backendId); +#endif + + if (sig_result == -1) + { + if (errno != ESRCH) + goto signal_error; + continue; + } + /* retrieve result data from message queue */ mqh = shm_mq_attach(mq, NULL, NULL); mq_receive_result = shm_mq_receive_with_timeout(mqh, @@ -1236,7 +1235,7 @@ GetRemoteBackendQueryStates(PGPROC *leader, Assert(len == msg->length); /* aggregate result data */ - result = lappend(result, copy_msg(msg)); + result = lappend(result, msg); #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); @@ -1261,15 +1260,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, return NIL; } -void -DetachPeer(void) -{ - int n_peers = pg_atomic_fetch_sub_u32(&counterpart_userid->n_peers, 1); - if (n_peers <= 0) - ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("pg_query_state peer is not responding"))); -} - /* * Extract the number of actual rows and planned rows from * the plan for one node in text format. Returns their ratio, @@ -1434,6 +1424,7 @@ pg_progress_bar(PG_FUNCTION_ARGS) List *msgs; double progress; double old_progress; + LOCKTAG tag; if (PG_NARGS() == 2) { @@ -1467,6 +1458,10 @@ pg_progress_bar(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("backend with pid=%d not found", pid))); + LockShmem(&tag, PG_QS_RCV_KEY); + + reqid = *mq_req_id + 1; + counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1474,11 +1469,6 @@ pg_progress_bar(PG_FUNCTION_ARGS) old_progress = 0; progress = 0; - if (SRF_IS_FIRSTCALL()) - { - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - } bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -1486,23 +1476,30 @@ pg_progress_bar(PG_FUNCTION_ARGS) 0, 1, 0, 0, 0, EXPLAIN_FORMAT_JSON); if (list_length(msgs) == 0) + { elog(WARNING, "backend does not reply"); + UnlockShmem(&tag); + PG_RETURN_FLOAT8((float8) -1); + } msg = (shm_mq_msg *) linitial(msgs); switch (msg->result_code) { case QUERY_NOT_RUNNING: elog(INFO, "query not runing"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); break; case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); default: break; } if (msg->result_code == QS_RETURNED && delay == 0) { + UnlockShmem(&tag); progress = GetCurrentNumericState(msg); if (progress < 0) { @@ -1540,12 +1537,18 @@ pg_progress_bar(PG_FUNCTION_ARGS) 0, 1, 0, 0, 0, EXPLAIN_FORMAT_JSON); if (list_length(msgs) == 0) + { elog(WARNING, "backend does not reply"); + UnlockShmem(&tag); + PG_RETURN_FLOAT8((float8) -1); + } msg = (shm_mq_msg *) linitial(msgs); } if (progress > -1) elog(INFO, "\rProgress = 1.000000"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) 1); } + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); } diff --git a/pg_query_state.h b/pg_query_state.h index 8272069..2ccaba5 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -50,9 +50,18 @@ typedef enum { QUERY_NOT_RUNNING, /* Backend doesn't execute any query */ STAT_DISABLED, /* Collection of execution statistics is disabled */ - QS_RETURNED /* Backend succx[esfully returned its query state */ + QS_RETURNED /* Backend successfully returned its query state */ } PG_QS_RequestResult; +/* + * An self-explanarory enum describing the send_msg_by_parts results + */ +typedef enum +{ + MSG_BY_PARTS_SUCCEEDED, + MSG_BY_PARTS_FAILED +} msg_by_parts_result; + /* * Format of transmited data through message queue */ @@ -68,12 +77,21 @@ typedef struct text records */ } shm_mq_msg; +/* + * User id transmit format. + */ +typedef struct +{ + Oid userid; + uint32 reqid; +} shm_mq_userid_msg; + #define BASE_SIZEOF_SHM_MQ_MSG (offsetof(shm_mq_msg, stack_depth)) /* pg_query_state arguments */ typedef struct { - int reqid; + ProcSignalReason reason; bool verbose; bool costs; bool timing; @@ -83,17 +101,23 @@ typedef struct } pg_qs_params; /* pg_query_state */ -extern bool pg_qs_enable; -extern bool pg_qs_timing; -extern bool pg_qs_buffers; -extern List *QueryDescStack; -extern pg_qs_params *params; -extern shm_mq *mq; +extern bool pg_qs_enable; +extern bool pg_qs_timing; +extern bool pg_qs_buffers; +extern List *QueryDescStack; +extern pg_qs_params * params; +extern shm_mq *mq; +extern uint32 *mq_req_id; + +extern ProcSignalReason UserIdPollReason; +extern ProcSignalReason QueryStatePollReason; +extern ProcSignalReason WorkerPollReason; /* signal_handler.c */ extern void SendQueryState(void); -extern void DetachPeer(void); +extern void SendCurrentUserId(void); extern void UnlockShmem(LOCKTAG *tag); extern void LockShmem(LOCKTAG *tag, uint32 key); +extern msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); #endif diff --git a/signal_handler.c b/signal_handler.c index 09b1cf2..e4962a5 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -31,17 +31,6 @@ typedef struct char *plan; } stack_frame; -/* - * An self-explanarory enum describing the send_msg_by_parts results - */ -typedef enum -{ - MSG_BY_PARTS_SUCCEEDED, - MSG_BY_PARTS_FAILED -} msg_by_parts_result; - -static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); - /* * Get List of stack_frames as a stack of function calls starting from outermost call. * Each entry contains query text and query state in form of EXPLAIN ANALYZE output. @@ -197,7 +186,7 @@ shm_mq_send_nonblocking(shm_mq_handle *mqh, Size nbytes, const void *data, Size * send_msg_by_parts sends data through the queue as a bunch of messages * of smaller size */ -static msg_by_parts_result +msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data) { int bytes_left; @@ -229,55 +218,22 @@ void SendQueryState(void) { shm_mq_handle *mqh; - instr_time start_time; - instr_time cur_time; - int64 delay = MAX_SND_TIMEOUT; - int reqid = params->reqid; LOCKTAG tag; - INSTR_TIME_SET_CURRENT(start_time); - - /* wait until caller sets this process as sender to message queue */ - for (;;) - { - if (shm_mq_get_sender(mq) == MyProc) - break; - -#if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay, PG_WAIT_IPC); -#else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, delay, PG_WAIT_IPC); -#endif - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - delay = MAX_SND_TIMEOUT - (int64) INSTR_TIME_GET_MILLISEC(cur_time); - if (delay <= 0) - { - elog(WARNING, "pg_query_state: failed to receive request from leader"); - DetachPeer(); - return; - } - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); - } - LockShmem(&tag, PG_QS_SND_KEY); elog(DEBUG1, "Worker %d receives pg_query_state request from %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); mqh = shm_mq_attach(mq, NULL, NULL); - if (reqid != params->reqid || shm_mq_get_sender(mq) != MyProc) + if (shm_mq_get_sender(mq) != MyProc || params->reason != QueryStatePollReason) { - UnlockShmem(&tag); - return; + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + goto connection_cleanup; } /* check if module is enabled */ if (!pg_qs_enable) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -286,7 +242,7 @@ SendQueryState(void) /* check if backend doesn't execute any query */ else if (list_length(QueryDescStack) == 0) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -299,7 +255,7 @@ SendQueryState(void) int msglen = sizeof(shm_mq_msg) + serialized_stack_length(qs_stack); shm_mq_msg *msg = palloc(msglen); - msg->reqid = reqid; + msg->reqid = *mq_req_id; msg->length = msglen; msg->proc = MyProc; msg->result_code = QS_RETURNED; @@ -313,24 +269,49 @@ SendQueryState(void) msg->stack_depth = list_length(qs_stack); serialize_stack(msg->stack, qs_stack); + list_free_deep(qs_stack); + if(send_msg_by_parts(mqh, msglen, msg) != MSG_BY_PARTS_SUCCEEDED) { elog(WARNING, "pg_query_state: peer seems to have detached"); + pfree(msg); goto connection_cleanup; } + else + pfree(msg); } elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); - DetachPeer(); + +connection_cleanup: +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif UnlockShmem(&tag); +} - return; +void +SendCurrentUserId(void) +{ + shm_mq_handle *mqh; + shm_mq_userid_msg msg; + LOCKTAG tag; + + msg.userid = GetUserId(); + LockShmem(&tag, PG_QS_SND_KEY); + + mqh = shm_mq_attach(mq, NULL, NULL); + msg.reqid = *mq_req_id; + if (shm_mq_get_sender(mq) != MyProc || params->reason != UserIdPollReason) + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + else if (send_msg_by_parts(mqh, sizeof(msg), &msg) != MSG_BY_PARTS_SUCCEEDED) + elog(WARNING, "could not send message queue to shared-memory queue."); -connection_cleanup: #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); #else shm_mq_detach(mqh); #endif - DetachPeer(); UnlockShmem(&tag); }