Re: Logical Replication WIP

From: Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, Erik Rijkers <er(at)xs4all(dot)nl>, Steve Singer <steve(at)ssinger(dot)info>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Simon Riggs <simon(at)2ndquadrant(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>, pgsql-hackers-owner(at)postgresql(dot)org
Subject: Re: Logical Replication WIP
Date: 2016-12-13 02:26:57
Message-ID: 8b6a1362-b8e1-0912-193a-f8822884fe69@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 13/12/16 02:41, Andres Freund wrote:
> On 2016-12-10 08:48:55 +0100, Petr Jelinek wrote:
>
>> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
>> new file mode 100644
>> index 0000000..e3560b7
>> --- /dev/null
>> +++ b/src/backend/catalog/pg_publication.c
>> +
>> +Datum pg_get_publication_tables(PG_FUNCTION_ARGS);
>
> Don't we usually put these in a header?
>

We put these to rather random places, I don't mind either way.

>
>> +/*
>> + * Gets list of relation oids for a publication.
>> + *
>> + * This should only be used for normal publications, the FOR ALL TABLES
>> + * should use GetAllTablesPublicationRelations().
>> + */
>> +List *
>> +GetPublicationRelations(Oid pubid)
>> +{
>> + List *result;
>> + Relation pubrelsrel;
>> + ScanKeyData scankey;
>> + SysScanDesc scan;
>> + HeapTuple tup;
>> +
>> + /* Find all publications associated with the relation. */
>> + pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
>> +
>> + ScanKeyInit(&scankey,
>> + Anum_pg_publication_rel_prpubid,
>> + BTEqualStrategyNumber, F_OIDEQ,
>> + ObjectIdGetDatum(pubid));
>> +
>> + scan = systable_beginscan(pubrelsrel, PublicationRelMapIndexId, true,
>> + NULL, 1, &scankey);
>> +
>> + result = NIL;
>> + while (HeapTupleIsValid(tup = systable_getnext(scan)))
>> + {
>> + Form_pg_publication_rel pubrel;
>> +
>> + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
>> +
>> + result = lappend_oid(result, pubrel->prrelid);
>> + }
>> +
>> + systable_endscan(scan);
>> + heap_close(pubrelsrel, NoLock);
>
> In other parts of this you drop the lock, but not here?
>
>
>> + heap_close(rel, NoLock);
>> +
>> + return result;
>> +}
>
> and here.
>

Meh, ignore, that's some pglogical legacy.

>
> Btw, why are matviews not publishable?
>

Because standard way of updating them is REFRESH MATERIALIZED VIEW which
is decoded as inserts into pg_temp_<oid> table. I think we'll have to
rethink how we do this before we can sanely support them.

>> +/*
>> + * Get Publication using name.
>> + */
>> +Publication *
>> +GetPublicationByName(const char *pubname, bool missing_ok)
>> +{
>> + Oid oid;
>> +
>> + oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
>> + if (!OidIsValid(oid))
>> + {
>> + if (missing_ok)
>> + return NULL;
>> +
>> + ereport(ERROR,
>> + (errcode(ERRCODE_UNDEFINED_OBJECT),
>> + errmsg("publication \"%s\" does not exist", pubname)));
>> + }
>> +
>> + return GetPublication(oid);
>> +}
>
> That's racy... Also, shouldn't we specify for how to deal with the
> returned memory for Publication * returning methods?
>

So are most of the other existing functions with similar purpose. The
worst case is that with enough concurrency around same publication name
DDL you'll get cache lookup failure.

I added comment to GetPublication saying that memory is palloced.

>
>> diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
>> new file mode 100644
>> index 0000000..954b2bd
>> --- /dev/null
>> +++ b/src/backend/commands/publicationcmds.c
>> @@ -0,0 +1,613 @@
>
>> +/*
>> + * Create new publication.
>> + */
>> +ObjectAddress
>> +CreatePublication(CreatePublicationStmt *stmt)
>> +{
>> + Relation rel;
>
>> +
>> + values[Anum_pg_publication_puballtables - 1] =
>> + BoolGetDatum(stmt->for_all_tables);
>> + values[Anum_pg_publication_pubinsert - 1] =
>> + BoolGetDatum(publish_insert);
>> + values[Anum_pg_publication_pubupdate - 1] =
>> + BoolGetDatum(publish_update);
>> + values[Anum_pg_publication_pubdelete - 1] =
>> + BoolGetDatum(publish_delete);
>
> I remain convinced that a different representation would be
> better. There'll be more options over time (truncate, DDL at least).
>

So? It's boolean properties, it's not like we store bitmaps in catalogs
much. I very much expect DDL to be much more complex than boolean btw.

>
>> +static void
>> +AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
>> + HeapTuple tup)
>> +{
>> + bool publish_insert_given;
>> + bool publish_update_given;
>> + bool publish_delete_given;
>> + bool publish_insert;
>> + bool publish_update;
>> + bool publish_delete;
>> + ObjectAddress obj;
>> +
>> + parse_publication_options(stmt->options,
>> + &publish_insert_given, &publish_insert,
>> + &publish_update_given, &publish_update,
>> + &publish_delete_given, &publish_delete);
>
> You could pass it a struct instead...
>

Here yes, but in similar code for subscription not, I slightly prefer
consistency between those similar functions.

>
>> +static List *
>> +OpenTableList(List *tables)
>> +{
>> + List *relids = NIL;
>> + List *rels = NIL;
>> + ListCell *lc;
>> +
>> + /*
>> + * Open, share-lock, and check all the explicitly-specified relations
>> + */
>> + foreach(lc, tables)
>> + {
>> + RangeVar *rv = lfirst(lc);
>> + Relation rel;
>> + bool recurse = interpretInhOption(rv->inhOpt);
>> + Oid myrelid;
>> +
>> + rel = heap_openrv(rv, ShareUpdateExclusiveLock);
>> + myrelid = RelationGetRelid(rel);
>> + /* filter out duplicates when user specifies "foo, foo" */
>> + if (list_member_oid(relids, myrelid))
>> + {
>> + heap_close(rel, ShareUpdateExclusiveLock);
>> + continue;
>> + }
>
> This is a quadratic algorithm - that could bite us... Not sure if we
> need to care. If we want to fix it, one approach owuld be to use
> RangeVarGetRelid() instead, and then do a qsort/deduplicate before
> actually opening the relations.
>

I guess it could get really slow only with big inheritance tree, I'll
look into how much work is the other way of doing things (this is not
exactly hot code path).

>>
>> -def_elem: ColLabel '=' def_arg
>> +def_elem: def_key '=' def_arg
>> {
>> $$ = makeDefElem($1, (Node *) $3, @1);
>> }
>> - | ColLabel
>> + | def_key
>> {
>> $$ = makeDefElem($1, NULL, @1);
>> }
>> ;
>
>> +def_key:
>> + ColLabel { $$ = $1; }
>> + | ColLabel ColLabel { $$ = psprintf("%s %s", $1, $2); }
>> + ;
>> +
>
> Not quite sure what this is about? Doesn't that change the accepted
> syntax in a bunch of places?
>

Well all those places have to check the actual values in the C code
later. It will change the error message a bit in some DDL. I made it
this way so that we don't have to introduce same thing as definition
with just this small change.

>
>> @@ -2337,6 +2338,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
>> bms_free(relation->rd_indexattr);
>> bms_free(relation->rd_keyattr);
>> bms_free(relation->rd_idattr);
>> + if (relation->rd_pubactions)
>> + pfree(relation->rd_pubactions);
>> if (relation->rd_options)
>> pfree(relation->rd_options);
>> if (relation->rd_indextuple)
>> @@ -4992,6 +4995,67 @@ RelationGetExclusionInfo(Relation indexRelation,
>> MemoryContextSwitchTo(oldcxt);
>> }
>>
>> +/*
>> + * Get publication actions for the given relation.
>> + */
>> +struct PublicationActions *
>> +GetRelationPublicationActions(Relation relation)
>> +{
>> + List *puboids;
>> + ListCell *lc;
>> + MemoryContext oldcxt;
>> + PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
>> +
>> + if (relation->rd_pubactions)
>> + return memcpy(pubactions, relation->rd_pubactions,
>> + sizeof(PublicationActions));
>> +
>> + /* Fetch the publication membership info. */
>> + puboids = GetRelationPublications(RelationGetRelid(relation));
>> + puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
>> +
>> + foreach(lc, puboids)
>> + {
>> + Oid pubid = lfirst_oid(lc);
>> + HeapTuple tup;
>> + Form_pg_publication pubform;
>> +
>> + tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
>> +
>> + if (!HeapTupleIsValid(tup))
>> + elog(ERROR, "cache lookup failed for publication %u", pubid);
>> +
>> + pubform = (Form_pg_publication) GETSTRUCT(tup);
>> +
>> + pubactions->pubinsert |= pubform->pubinsert;
>> + pubactions->pubupdate |= pubform->pubupdate;
>> + pubactions->pubdelete |= pubform->pubdelete;
>> +
>> + ReleaseSysCache(tup);
>> +
>> + /*
>> + * If we know everything is replicated, there is no point to check
>> + * for other publications.
>> + */
>> + if (pubactions->pubinsert && pubactions->pubupdate &&
>> + pubactions->pubdelete)
>> + break;
>> + }
>> +
>> + if (relation->rd_pubactions)
>> + {
>> + pfree(relation->rd_pubactions);
>> + relation->rd_pubactions = NULL;
>> + }
>> +
>> + /* Now save copy of the actions in the relcache entry. */
>> + oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
>> + relation->rd_pubactions = palloc(sizeof(PublicationActions));
>> + memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
>> + MemoryContextSwitchTo(oldcxt);
>> +
>> + return pubactions;
>> +}
>
>
> Hm. Do we actually have enough cache invalidation support to make this
> cached version correct? I haven't seen anything in that regard? Seems
> to mean that all changes to an ALL TABLES publication need to do a
> global relcache invalidation?
>

Yeah you're right, we definitely don't do enough relcache invalidation
for this.

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message CK Tan 2016-12-13 02:29:42 Re: WIP: Faster Expression Processing and Tuple Deforming (including JIT)
Previous Message Amit Kapila 2016-12-13 02:21:52 Re: Hash Indexes