diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9fff288..22115bd 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -726,10 +726,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); /* - * XXX: If the subscription is for a sequence-only publication, - * creating this origin is unnecessary at this point. It can be created - * later during the ALTER SUBSCRIPTION ... REFRESH command, if the - * publication is updated to include tables or tables in schemas. + * XXX: If the subscription is for a sequence-only publication, creating + * this origin is unnecessary. It can be created later during the ALTER + * SUBSCRIPTION ... REFRESH command, if the publication is updated to + * include tables or tables in schemas. */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -800,9 +800,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * export it. * * XXX: If the subscription is for a sequence-only publication, - * creating this slot is not necessary at the moment. It can be - * created during the ALTER SUBSCRIPTION ... REFRESH command if the - * publication is updated to include tables or tables in schema. + * creating this slot. It can be created later during the ALTER + * SUBSCRIPTION ... REFRESH command, if the publication is updated + * to include tables or tables in schema. */ if (opts.create_slot) { @@ -1021,9 +1021,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr, true); ereport(DEBUG1, - (errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", - relkind == RELKIND_SEQUENCE ? "sequence" : "table", - rv->schemaname, rv->relname, sub->name))); + errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + rv->schemaname, rv->relname, sub->name)); } } @@ -1125,11 +1125,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } ereport(DEBUG1, - (errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", + errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", relkind == RELKIND_SEQUENCE ? "sequence" : "table", get_namespace_name(get_rel_namespace(relid)), get_rel_name(relid), - sub->name))); + sub->name)); } } @@ -1615,8 +1615,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions"))); + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions")); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); @@ -2494,8 +2494,8 @@ fetch_sequence_list(WalReceiverConn *wrconn, char *subname, List *publications) if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not receive list of sequences from the publisher: %s", - res->err))); + errmsg("could not receive list of sequences from the publisher: %s", + res->err)); /* Process sequences. */ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 8211121..1f45564 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -82,9 +82,8 @@ List *sequence_states_not_ready = NIL; * - last_value: The last value of the sequence. * * Returns: - * - TRUE if there are discrepancies between the sequence parameters in - * the publisher and subscriber. - * - FALSE if the parameters match. + * - TRUE if parameters match for the local and remote sequences. + * - FALSE if parameters differ for the local and remote sequences. */ static bool fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, @@ -101,17 +100,17 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, Oid seqtypid; int64 seqstart; int64 seqincrement; - int64 seqmax; int64 seqmin; + int64 seqmax; bool seqcycle; - bool seq_not_match = false; + bool seq_params_match; HeapTuple tup; Form_pg_sequence seqform; initStringInfo(&cmd); appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called, page_lsn,\n" - "seqtypid, seqstart, seqincrement, seqmax, seqmin, seqcycle\n" + "seqtypid, seqstart, seqincrement, seqmin, seqmax, seqcycle\n" "FROM pg_sequence_state(%d), pg_sequence where seqrelid = %d", remoteid, remoteid); @@ -120,16 +119,16 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not receive sequence list from the publisher: %s", - res->err))); + errmsg("could not receive sequence list from the publisher: %s", + res->err)); /* Process the sequence. */ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("sequence \"%s.%s\" not found on publisher", - nspname, relname))); + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("sequence \"%s.%s\" not found on publisher", + nspname, relname)); *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); Assert(!isnull); @@ -152,10 +151,10 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, seqincrement = DatumGetInt64(slot_getattr(slot, 7, &isnull)); Assert(!isnull); - seqmax = DatumGetInt64(slot_getattr(slot, 8, &isnull)); + seqmin = DatumGetInt64(slot_getattr(slot, 8, &isnull)); Assert(!isnull); - seqmin = DatumGetInt64(slot_getattr(slot, 9, &isnull)); + seqmax = DatumGetInt64(slot_getattr(slot, 9, &isnull)); Assert(!isnull); seqcycle = DatumGetBool(slot_getattr(slot, 10, &isnull)); @@ -169,16 +168,17 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, seqform = (Form_pg_sequence) GETSTRUCT(tup); - if (seqform->seqtypid != seqtypid || seqform->seqmin != seqmin || - seqform->seqmax != seqmax || seqform->seqstart != seqstart || - seqform->seqincrement != seqincrement || seqform->seqcycle != seqcycle) - seq_not_match = true; + seq_params_match = seqform->seqtypid == seqtypid && + seqform->seqmin == seqmin && seqform->seqmax == seqmax && + seqform->seqcycle == seqcycle && + seqform->seqstart == seqstart && + seqform->seqincrement == seqincrement; ReleaseSysCache(tup); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - return seq_not_match; + return seq_params_match; } /* @@ -187,6 +187,9 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, * Fetch the sequence value from the publisher and set the subscriber sequence * with the same value. Caller is responsible for locking the local * relation. + * + * The output parameter 'sequence_mismatch' indicates if a local/remote + * sequence parameter mismatch was detected. */ static XLogRecPtr copy_sequence(WalReceiverConn *conn, Relation rel, @@ -207,14 +210,15 @@ copy_sequence(WalReceiverConn *conn, Relation rel, char *relname = RelationGetRelationName(rel); Oid relid = RelationGetRelid(rel); + Assert(!*sequence_mismatch); + /* Fetch Oid. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relkind" - " FROM pg_catalog.pg_class c" - " INNER JOIN pg_catalog.pg_namespace n" - " ON (c.relnamespace = n.oid)" - " WHERE n.nspname = %s" - " AND c.relname = %s", + appendStringInfo(&cmd, "SELECT c.oid, c.relkind\n" + "FROM pg_catalog.pg_class c\n" + "INNER JOIN pg_catalog.pg_namespace n\n" + " ON (c.relnamespace = n.oid)\n" + "WHERE n.nspname = %s AND c.relname = %s", quote_literal_cstr(nspname), quote_literal_cstr(relname)); @@ -222,16 +226,16 @@ copy_sequence(WalReceiverConn *conn, Relation rel, lengthof(tableRow), tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("sequence \"%s.%s\" info could not be fetched from publisher: %s", - nspname, relname, res->err))); + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequence \"%s.%s\" info could not be fetched from publisher: %s", + nspname, relname, res->err)); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("sequence \"%s.%s\" not found on publisher", - nspname, relname))); + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("sequence \"%s.%s\" not found on publisher", + nspname, relname)); remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); Assert(!isnull); @@ -242,7 +246,7 @@ copy_sequence(WalReceiverConn *conn, Relation rel, ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - *sequence_mismatch = fetch_remote_sequence_data(conn, relid, remoteid, + *sequence_mismatch = !fetch_remote_sequence_data(conn, relid, remoteid, nspname, relname, &seq_log_cnt, &seq_is_called, &seq_page_lsn, &seq_last_value); @@ -255,12 +259,12 @@ copy_sequence(WalReceiverConn *conn, Relation rel, } /* - * report_sequence_mismatch + * report_mismatched_sequences * - * Records details of sequence mismatches as a warning. + * Report any sequence mismatches as a single warning log. */ static void -report_sequence_mismatch(StringInfo warning_sequences) +report_mismatched_sequences(StringInfo warning_sequences) { if (warning_sequences->len) { @@ -269,6 +273,7 @@ report_sequence_mismatch(StringInfo warning_sequences) errmsg("parameters differ for the remote and local sequences (%s) for subscription \"%s\"", warning_sequences->data, MySubscription->name), errhint("Alter/Re-create local sequences to have the same parameters as the remote sequences.")); + resetStringInfo(warning_sequences); } } @@ -280,14 +285,14 @@ report_sequence_mismatch(StringInfo warning_sequences) * and subscriber to the warning_sequences string. */ static void -append_mismatched_sequences(StringInfo warning_sequences, Relation rel) +append_mismatched_sequences(StringInfo warning_sequences, Relation seqrel) { if (warning_sequences->len) appendStringInfoString(warning_sequences, ", "); appendStringInfo(warning_sequences, "\"%s.%s\"", - get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel)); + get_namespace_name(RelationGetNamespace(seqrel)), + RelationGetRelationName(seqrel)); } /* @@ -355,15 +360,15 @@ LogicalRepSyncSequences(void) slotname, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err)); seq_count = list_length(sequences_not_synced); foreach_ptr(SubscriptionRelState, seqinfo, sequences_not_synced) { Relation sequence_rel; XLogRecPtr sequence_lsn; - bool sequence_mismatch; + bool sequence_mismatch = false; CHECK_FOR_INTERRUPTS(); @@ -422,7 +427,7 @@ LogicalRepSyncSequences(void) if (sequence_mismatch) append_mismatched_sequences(warning_sequences, sequence_rel); - report_sequence_mismatch(warning_sequences); + report_mismatched_sequences(warning_sequences); PG_RE_THROW(); } PG_END_TRY(); @@ -444,11 +449,9 @@ LogicalRepSyncSequences(void) if (((curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || curr_seq == seq_count) { - /* Obtain the starting index of the current batch. */ - int i = (curr_seq - 1) - ((curr_seq - 1) % MAX_SEQUENCES_SYNC_PER_BATCH); - /* LOG all the sequences synchronized during current batch. */ - for (; i < curr_seq; i++) + for (int i = (curr_seq - 1) - ((curr_seq - 1) % MAX_SEQUENCES_SYNC_PER_BATCH); + i < curr_seq; i++) { SubscriptionRelState *done_seq; @@ -459,7 +462,7 @@ LogicalRepSyncSequences(void) get_subscription_name(subid, false), get_rel_name(done_seq->relid))); } - report_sequence_mismatch(warning_sequences); + report_mismatched_sequences(warning_sequences); ereport(LOG, errmsg("logical replication synchronized %d of %d sequences for subscription \"%s\" ", @@ -469,7 +472,6 @@ LogicalRepSyncSequences(void) CommitTransactionCommand(); start_txn = true; } - } list_free_deep(sequences_not_synced); @@ -554,7 +556,7 @@ process_syncing_sequences_for_apply(void) foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready) { LogicalRepWorker *syncworker; - int nsyncworkers; + int nsyncworkers; if (!started_tx) {