diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3593712791..ae40f7164d 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); if (originid != InvalidRepOriginId) - replorigin_drop(originid); + replorigin_drop(originid, false); /* * If there is no slot associated with the subscription, we can finish diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 1c665312a4..4f32e7861c 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -79,15 +79,15 @@ #include "access/xact.h" #include "catalog/indexing.h" - #include "nodes/execnodes.h" #include "replication/origin.h" #include "replication/logical.h" - +#include "pgstat.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/condition_variable.h" #include "storage/copydir.h" #include "utils/builtins.h" @@ -125,6 +125,11 @@ typedef struct ReplicationState int acquired_by; /* + * Condition variable that's signalled when acquired_by changes. + */ + ConditionVariable origin_cv; + + /* * Lock protecting remote_lsn and local_lsn. */ LWLock lock; @@ -324,9 +329,9 @@ replorigin_create(char *roname) * Needs to be called in a transaction. */ void -replorigin_drop(RepOriginId roident) +replorigin_drop(RepOriginId roident, bool nowait) { - HeapTuple tuple = NULL; + HeapTuple tuple; Relation rel; int i; @@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident) rel = heap_open(ReplicationOriginRelationId, ExclusiveLock); +restart: + tuple = NULL; /* cleanup the slot state info */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); @@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident) { if (state->acquired_by != 0) { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not drop replication origin with OID %d, in use by PID %d", - state->roident, - state->acquired_by))); + ConditionVariable *cv; + + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not drop replication origin with OID %d, in use by PID %d", + state->roident, + state->acquired_by))); + cv = &state->origin_cv; + + LWLockRelease(ReplicationOriginLock); + ConditionVariablePrepareToSleep(cv); + ConditionVariableSleep(cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + goto restart; } /* first WAL log */ @@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void) MemSet(replication_states, 0, ReplicationOriginShmemSize()); for (i = 0; i < max_replication_slots; i++) + { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); + ConditionVariableInit(&replication_states[i].origin_cv); + } } LWLockRegisterTranche(replication_states_ctl->tranche_id, @@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush) static void ReplicationOriginExitCleanup(int code, Datum arg) { + ConditionVariable *cv = NULL; + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); if (session_replication_state != NULL && session_replication_state->acquired_by == MyProcPid) { + cv = &session_replication_state->origin_cv; + session_replication_state->acquired_by = 0; session_replication_state = NULL; } LWLockRelease(ReplicationOriginLock); + + if (cv) + ConditionVariableBroadcast(cv); } /* @@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node) session_replication_state->acquired_by = MyProcPid; LWLockRelease(ReplicationOriginLock); + + /* probably this one is pointless */ + ConditionVariableBroadcast(&session_replication_state->origin_cv); } /* @@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node) void replorigin_session_reset(void) { + ConditionVariable *cv; + Assert(max_replication_slots != 0); if (session_replication_state == NULL) @@ -1077,9 +1109,12 @@ replorigin_session_reset(void) LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); session_replication_state->acquired_by = 0; + cv = &session_replication_state->origin_cv; session_replication_state = NULL; LWLockRelease(ReplicationOriginLock); + + ConditionVariableBroadcast(cv); } /* @@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS) roident = replorigin_by_name(name, false); Assert(OidIsValid(roident)); - replorigin_drop(roident); + replorigin_drop(roident, false); pfree(name); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index ca56c01469..a9595c3c3d 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(char *name, bool missing_ok); extern RepOriginId replorigin_create(char *name); -extern void replorigin_drop(RepOriginId roident); +extern void replorigin_drop(RepOriginId roident, bool nowait); extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname);