Re: [Solved] Generic logging system for pre-hstore using plperl triggers

From: Diego Augusto Molina <diegoaugustomolina(at)gmail(dot)com>
To: Filip Rembiałkowski <plk(dot)zuber(at)gmail(dot)com>
Cc: pgsql-general(at)postgresql(dot)org
Subject: Re: [Solved] Generic logging system for pre-hstore using plperl triggers
Date: 2011-09-27 19:41:14
Message-ID: CAGOxLdHDUE8Oc+b4fwEZYuBxvHit2MfTRYnk_7ipDQAT581Kgg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general

/* Created by Diego Augusto Molina in 2011 for Tucuman Government,
Argentina. */

/*
-- Execute the following accordingly to your needs.
CREATE TRUSTED PROCEDURAL LANGUAGE 'plpgsql';
CREATE TRUSTED PROCEDURAL LANGUAGE 'plperl';
*/

CREATE ROLE auditor NOSUPERUSER NOINHERIT NOCREATEDB NOCREATEROLE;
CREATE ROLE audit LOGIN ENCRYPTED PASSWORD 'test.1234' NOSUPERUSER
NOINHERIT NOCREATEDB NOCREATEROLE;
CREATE SCHEMA audit AUTHORIZATION audit;
ALTER ROLE auditor SET search_path=audit;
ALTER ROLE audit SET search_path=audit;
SET search_path=audit;
SET SESSION AUTHORIZATION audit;

CREATE SEQUENCE seq_audit
INCREMENT 1
MINVALUE -9223372036854775808
MAXVALUE 9223372036854775807
START 0
CACHE 1
CYCLE;
ALTER TABLE seq_audit OWNER TO audit;

CREATE SEQUENCE seq_elems
INCREMENT 1
MINVALUE -32768
MAXVALUE 32767
START 0
CACHE 1
CYCLE;
ALTER TABLE seq_elems OWNER TO audit;

CREATE TABLE field
(
id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
value name NOT NULL,
CONSTRAINT field_pk PRIMARY KEY (id)
WITH (FILLFACTOR=100),
CONSTRAINT field_uq_value UNIQUE (value)
WITH (FILLFACTOR=100)
)
WITH (
OIDS=FALSE
);
ALTER TABLE field OWNER TO audit;
GRANT ALL ON TABLE field TO audit;
GRANT SELECT ON TABLE field TO auditor;

CREATE TABLE client_inet
(
id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
value inet NOT NULL DEFAULT inet_client_addr(),
CONSTRAINT dir_inet_pk PRIMARY KEY (id )
WITH (FILLFACTOR=100),
CONSTRAINT dir_inet_uq_value UNIQUE (value)
WITH (FILLFACTOR=95)
)
WITH (
OIDS=FALSE
);
ALTER TABLE client_inet
OWNER TO audit;
GRANT ALL ON TABLE client_inet TO audit;
GRANT SELECT ON TABLE client_inet TO auditor;

CREATE TABLE schema
(
id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
value name NOT NULL,
CONSTRAINT schema_pk PRIMARY KEY (id )
WITH (FILLFACTOR=100),
CONSTRAINT schema_uq_value UNIQUE (value )
WITH (FILLFACTOR=100)
)
WITH (
OIDS=FALSE
);
ALTER TABLE schema
OWNER TO audit;
GRANT ALL ON TABLE schema TO audit;
GRANT SELECT ON TABLE schema TO auditor;

CREATE TABLE table
(
id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
value name NOT NULL,
CONSTRAINT table_pk PRIMARY KEY (id )
WITH (FILLFACTOR=100),
CONSTRAINT table_uq_value UNIQUE (value )
WITH (FILLFACTOR=100)
)
WITH (
OIDS=FALSE
);
ALTER TABLE table
OWNER TO audit;
GRANT ALL ON TABLE table TO audit;
GRANT SELECT ON TABLE table TO auditor;

CREATE TABLE user
(
id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
value name NOT NULL DEFAULT "current_user"(),
CONSTRAINT user_pk PRIMARY KEY (id )
WITH (FILLFACTOR=100),
CONSTRAINT user_uq_value UNIQUE (value )
WITH (FILLFACTOR=95)
)
WITH (
OIDS=FALSE
);
ALTER TABLE user
OWNER TO audit;
GRANT ALL ON TABLE user TO audit;
GRANT SELECT ON TABLE user TO auditor;

CREATE TABLE audit
(
id bigint,
type character(1),
tstmp timestamp with time zone DEFAULT now(),
schema smallint,
table smallint,
user smallint,
client_inet smallint,
client_port integer DEFAULT inet_client_port(),
pid integer DEFAULT pg_backend_pid()
)
WITH (
OIDS=FALSE
);
ALTER TABLE audit OWNER TO audit;
GRANT ALL ON TABLE audit TO audit;
GRANT SELECT ON TABLE audit TO auditor;

CREATE TABLE audet
(
id bigint,
field smallint,
is_pk boolean,
before text,
after text
)
WITH (
OIDS=FALSE
);
ALTER TABLE audet OWNER TO audit;
GRANT ALL ON TABLE audet TO audit;
GRANT SELECT ON TABLE audet TO auditor;

CREATE OR REPLACE FUNCTION tgf_ins_audet()
RETURNS trigger AS
$BODY$
begin
execute E'insert into audet_' || tg_argv[0] || E'
(
id,
field,
is_pk,
before,
after
) values
(
'||coalesce(new.id::text,'NULL')||E',
'||coalesce(new.field::text,'NULL')||E',
'||coalesce(new.is_pk::text,'NULL')||E',
'||coalesce(quote_literal(new.before),'NULL')||E',
'||coalesce(quote_literal(new.after),'NULL')||E'
)';
return null;
end;$BODY$ LANGUAGE plpgsql VOLATILE;
ALTER FUNCTION tgf_ins_audet() SET search_path=auditoria;
ALTER FUNCTION tgf_ins_audet() OWNER TO audit;
GRANT EXECUTE ON FUNCTION tgf_ins_audet() TO audit;

CREATE OR REPLACE FUNCTION tgf_ins_audit()
RETURNS trigger AS
$BODY$
begin
execute E'insert into audit_' || tg_argv[0] || E'
(
id,
type,
tstmp,
schema,
table,
user,
client_inet,
client_port,
pid
) values
(
'||coalesce(new.id::text,'NULL')||E',
'||coalesce(quote_literal(new.type),'NULL')||E',
'||coalesce(quote_literal(new.tstmp),'NULL')||E',
'||coalesce(new.schema::text,'NULL')||E',
'||coalesce(new.table::text,'NULL')||E',
'||coalesce(new.user::text,'NULL')||E',
'||coalesce(new.client_inet::text,'NULL')||E',
'||coalesce(new.client_port::text,'NULL')||E',
'||coalesce(new.pid::text,'NULL')||E'
)';
return null;
end;$BODY$ LANGUAGE plpgsql VOLATILE;
ALTER FUNCTION tgf_ins_audit() SET search_path=auditoria;
ALTER FUNCTION tgf_ins_audit() OWNER TO audit;
GRANT EXECUTE ON FUNCTION tgf_ins_audit() TO audit;

CREATE TRIGGER tg_audit_20110518
BEFORE INSERT
ON audit
FOR EACH ROW
EXECUTE PROCEDURE tgf_ins_audit(20110518);

CREATE TRIGGER tg_audet_20110907
BEFORE INSERT
ON audet
FOR EACH ROW
EXECUTE PROCEDURE tgf_ins_audet(20110907);

CREATE OR REPLACE FUNCTION rotate(character)
RETURNS void AS
$BODY$
/* Created by Diego Augusto Molina in 2011 for Tucuman Government,
Argentina. */
declare
first_execution boolean := false;
cur_start char(8) := null;
cur_tstmp_min timestamp with time zone;
cur_tstmp_max timestamp with time zone;
cur_id_min bigint;
cur_id_max bigint;
new_start char(8);
begin
/* Determine the creation tstmp of the tables
=========================================================================
*/
select substring(max(c.relname::text) from $1 || E'_(........)')
into cur_start
from
pg_namespace n inner join
pg_class c on (n.oid = c.relnamespace)
where
n.nspname = 'audit'::name and
c.relname::text like $1 || '_%';
if cur_start is null then
first_execution := true;
cur_start := '';
end if;
new_start := cast(to_char(current_timestamp,'YYYYMMDD') as name);

case $1
when 'audit' then /* if I'm rotating the table audit
================================================================== */
/* current table */
if not first_execution then
execute 'select min(tstmp), max(tstmp) from audit_' || cur_start
into cur_tstmp_min, cur_tstmp_max;
execute $$
alter index idx_audit_$$|| cur_start ||$$_id
set (fillfactor = 100);
alter index idx_audit_$$|| cur_start ||$$_tstmp
set (fillfactor = 100);
alter index idx_audit_$$|| cur_start ||$$_schema__table
set (fillfactor = 100);
alter index idx_audit_$$|| cur_start ||$$_user
set (fillfactor = 100);

cluster audit_$$|| cur_start ||$$;
analyze audit_$$|| cur_start ||$$;
alter table audit_$$|| cur_start ||$$ add
constraint audit_$$|| cur_start ||$$_ck_exclusion
check (
tstmp >= '$$|| cur_tstmp_min ||$$'
and
tstmp <= '$$|| cur_tstmp_max ||$$'
)
$$;
end if;

execute $$
/* new table */
create table audit_$$|| new_start ||$$ () inherits (audit);
create index idx_audit_$$|| new_start ||$$_id
on audit_$$|| new_start ||$$ using btree (id) with
(fillfactor = 99);
create index idx_audit_$$|| new_start ||$$_tstmp
on audit_$$|| new_start ||$$ using btree (tstmp) with
(fillfactor = 99);
create index idx_audit_$$|| new_start ||$$_schema__tabla
on audit_$$|| new_start ||$$ using btree (schema, table) with
(fillfactor = 95);
create index idx_audit_$$|| new_start ||$$_user
on audit_$$|| new_start ||$$ using btree (usuario) with
(fillfactor = 95);
cluster audit_$$|| new_start ||$$ using idx_audit_$$||
new_start ||$$_tstmp;

/* Parent table */
drop trigger if exists tg_audit_$$|| cur_start ||$$ on audit;
create trigger tg_audit_$$|| new_start ||$$
before insert
on audit
for each row
execute procedure tgf_ins_audit('$$|| new_start ||$$');
$$;
when 'audet' then /* if I'm rotating the table audet
================================================================== */
/* current table */
if not first_execution then
execute 'select min(id), max(id) from audet_' || cur_start
into cur_id_min, cur_id_max;
execute $$
alter index idx_audet_$$|| cur_start ||$$_id set
(fillfactor = 100);
alter index idx_audet_$$|| cur_start ||$$_fieldpk set
(fillfactor = 100);

cluster audet_$$|| cur_start ||$$;
analyze audet_$$|| cur_start ||$$;
alter table audet_$$|| cur_start ||$$ add
constraint audet_$$|| cur_start ||$$_ck_exclusion
check (
id >= '$$|| cur_id_min ||$$'
and
id <= '$$|| cur_id_max ||$$'
);

/* Parent table */
drop trigger tg_audet_$$|| cur_start ||$$ on audet;
$$;
end if;

execute $$
/* new table */
create table audet_$$|| new_start ||$$ () inherits (audet);
create index idx_audet_$$|| new_start ||$$_id on
audet_$$|| new_start ||$$ using btree (id) with (fillfactor = 99);
create index idx_audet_$$|| new_start ||$$_fieldpk on
audet_$$|| new_start ||$$ using btree (field) with (fillfactor = 99)
where es_pk;
cluster audet_$$|| new_start ||$$ using idx_audet_$$||
new_start ||$$_id;

/* Parent table */
create trigger tg_audet_$$|| new_start ||$$
before insert
on audet
for each row
execute procedure tgf_ins_audet('$$|| new_start ||$$');
$$;
else /* if I got a wrong argument
=====================================================================================
*/
raise notice E'Error: expected \'audit\' o \'audet\'. Got \'%\'.', $1;
return;
end case;
end;$BODY$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
ALTER FUNCTION rotate(character) SET search_path=auditoria;
ALTER FUNCTION rotate(character) OWNER TO audit;
GRANT EXECUTE ON FUNCTION rotate(character) TO audit;

CREATE OR REPLACE FUNCTION audit() RETURNS trigger AS $BODY$

## Created by Diego Augusto Molina in 2011 for Tucuman Government, Argentina.

## This is the trigger which should be called by any table we want to
audit. If it receives
## arguments, they will be interpreted as the primary key of the
table. If any of the
## arguments is not a column of the table, or no arguments is received
at all, a probing process
## is taken which ends up determining the pk of the table. Thus, it is
better to create the trigger
## calling this function with no arguments. See the TODO list.

## Usage:
## CREATE TRIGGER <tg_name> AFTER {INSERT | UPDATE | DELETE} [ OR
...] ON <table_name>
## FOR EACH ROW EXECUTE PROCEDURE audit.audit( { <column_list> |
<nothing_at_all> } );

## KNOWN ISSUE #1: you don't want to use this trigger on a table which
has a column of some type
## that doesn't have an implicit cast to string. That
would cause a runtime error
## killing your transaction! See TODO #2. Be easy,
most 'common' types have an
## implicit cast to string by default.

## TODO #1: In 'P3', instead of asking '( scalar keys %pk == 0 )' each
time, put an 'else'.
## TODO #1.1: if the pk was not passed as argument, at the end of the
probing execute an 'alter
## trigger' so that next time there's no probing at all. This
would be unfriendly with
## modifications in the table definition, which should carry
an update in the trigger
## (putting no arguments at all would imply probing again for
the first time and then we
## just use it!).
## TODO #2: search for a way to save the binary contents of the
columns instead of the formatted
## content. The table 'field' would have an extra column of
type 'type' and that would
## help describing the field audited. That would solve the
problem with strange fields
## (assuming that _any_ value can be converted to it's
binary/internal representation).
## This may carry some extra complexity, maybe needing extra
tables holding information
## about types.
## TODO #3: make this function receive only two parameters: two arrays
of type name[], the first
## holding the set of columns which are the primary key, the
second one is the set of
## columns which in addition to the pk one's are to be
registered. Note that pk columns
## will always be registered because that will identify the
tuple modified.
## TODO #4: support for TRUNCATE event.

# P0. Declarations and general definitions
###################################################
##############################################################################################

my $elog_pref =
"(schm:$_TD->{table_schema};tab:$_TD->{table_name};trg:$_TD->{name};evt:$_TD->{event}):";
my $rv = ""; # Query execution
my $val = ""; # Iterating value
my %tables = ( # Value of the respective
tables inserted in "audit"
"user" => 'pg_catalog."session_user"()',
"table" => "'$_TD->{table_name}'",
"schema" => "'$_TD->{table_schema}'",
"client_inet" => "pg_catalog.inet_client_addr()"
);
my $id = ""; # Id of the tuple inserted
in "audit"
my $field = ""; # Field id
my $is_pk = 0; # Determines if a field is
part of the PK
my $before = ""; # Value of a field in OLD
my $after = ""; # Value of a field in NEW
my %cols = (); # Columns of the table
my %pk = (); # Primary key

# Copy columns from some available transitional variable
-------------------------------------
if (exists $_TD->{new}){
%cols = %{$_TD->{new}};
} else {
%cols = %{$_TD->{old}};
}

# P1. Create necessary tuples in user, table, schema and
client_inet #########################
##############################################################################################

foreach $val (keys %tables){
$rv = spi_exec_query("select id from $val where value = $tables{$val}");
if ( $rv->{status} != SPI_OK_SELECT ){
elog(ERROR, "$elog_pref Error querying table '$val'.");
}
if ( $rv->{processed} == 1 ){
$tables{$val} = $rv->{rows}[0]->{id};
} else {
$rv = spi_exec_query("insert into $val (value) values
($tables{$val}) returning id");
if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){
elog(ERROR, "$elog_pref Error inserting in table '$val'.");
}
$tables{$val} = $rv->{rows}[0]->{id};
}
}

# P2. Insert in audit
########################################################################
##############################################################################################

$rv = spi_exec_query("select nextval('seq_audit'::regclass) as id");
if ( $rv->{status} != SPI_OK_SELECT ){
elog(ERROR, "$elog_pref Error querying next value of sequence
'seq_audit'.");
}
$id = $rv->{rows}[0]->{id};
$rv = spi_exec_query("insert into audit (id, type, schema, table,
user, client_inet)
values (
$id,
substring('$_TD->{event}', 1, 1),
$tables{'schema'},
$tables{'table'},
$tables{'user'},
$tables{'client_inet'}
)
");
if ($rv->{status} != SPI_OK_INSERT){
elog(ERROR,"$elog_pref Error inserting tuple in table 'audit'.");
}

# P3. Determine PK of the table
##############################################################
##############################################################################################

if ( scalar keys %pk == 0){
# Criterion 1: if got params, each of them is a column of the
table, and all of them make
# up the pk of the table
-------------------------------------------------------------------
elog(DEBUG, "$elog_pref Searching pk in the trigger's params.");
if ($_TD->{argc} > 0){
ARGS: foreach $val ( @{$_TD->{args}} ){
if (exists $cols{$val}){
$pk{$val} = "-";
} else {
%pk = ();
elog(DEBUG, "$elog_pref The column '$val' given as
argument does not exist. Skipping to next criterion.");
last ARGS;
}
}
}
}
if ( scalar keys %pk == 0 ) {
# Criterion 2: search the pk in the system catalogs
---------------------------------------
elog(DEBUG, "$elog_pref Searching pk in system catalogs.");
$rv = spi_exec_query("
select a.attname from
( select cl.oid, unnest(c.conkey) as att
from
pg_catalog.pg_constraint c inner join
pg_catalog.pg_class cl on (c.conrelid = cl.oid)
where
c.contype = 'p'
and cl.oid = $_TD->{relid}
) as c inner join
pg_catalog.pg_attribute a on (c.att = a.attnum and c.oid = a.attrelid)
");
if ( $rv->{status} == SPI_OK_SELECT ){
if ( $rv->{processed} > 0 ){
foreach $val ($rv->{rows}){
$pk{$val->{attname}} = "-";
}
}
} else {
elog(DEBUG, "$elog_pref Error querying the system catalogs.
Skipping to next criterion.");
}
}
if ( scalar keys %pk == 0) {
# Criterion 3: if the table has OIDs, use that as pk and emit
a warning -------------------
elog(DEBUG, "$elog_pref Searching OIDs in the table.");
$rv = spi_exec_query("select * from pg_catalog.pg_class where
oid = $_TD->{relid} and relhasoids = true");
if( $rv->{status} == SPI_OK_SELECT ){
if ( $rv->{processed} > 0 ){
%pk = ("oid","-");
elog(DEBUG, "$elog_pref Using OIDs as table pk for
'$_TD->{table_name}' because no previous criterion could find one.");
}
} else {
elog(DEBUG, "$elog_pref Error querying the system catalogs.
Skipping to next criterion.");
}
}
if ( scalar keys %pk == 0){
# Default criterion: all tuples
-----------------------------------------------------------
elog(DEBUG, "$elog_pref Could not find a suitable pk. Logging
every column.");
%pk = %cols;
}

# P4. Insert in audet
########################################################################
##############################################################################################

foreach $val (keys %cols){
$is_pk = 0 + exists($pk{$val});
if ( $_TD->{event} ne "UPDATE" || $is_pk || $_TD->{new}{$val} ne
$_TD->{old}{$val} ){
$before = (exists $_TD->{old}) ? "'".$_TD->{old}{$val}."'" : "NULL";
$after = (exists $_TD->{new}) ? "'".$_TD->{new}{$val}."'" : "NULL";
if ( $_TD->{event} eq "UPDATE" && $_TD->{new}{$val} eq
$_TD->{old}{$val}){
# We don't save the previous state of the column which
is part of the pk while updating
# if it hasn't changed.
$before = "NULL";
}
$rv = spi_exec_query("select id from field where value = '$val'");
if ( $rv->{status} != SPI_OK_SELECT ){
elog(ERROR, "$elog_pref Error querying table 'field'.");
}
if ( $rv->{processed} > 0 ){
$field = $rv->{rows}[0]->{id};
} else {
$rv = spi_exec_query("insert into field (value) values
('$val') returning id");
if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){
elog(ERROR, "$elog_pref Error executing insert returning
in table 'field'.");
}
$field = $rv->{rows}[0]->{id};
}
$rv = spi_exec_query("insert into audet (id, field, is_pk,
before, after)
values ($id, $field, cast($is_pk as boolean), cast($before
as text), cast($after as text))");
if ( $rv->{status} ne SPI_OK_INSERT ){
elog(ERROR, "$elog_pref Error inserting tuples in table 'audet'.");
}
}
}

# P5. Finishing
##############################################################################
##############################################################################################

return;

$BODY$
LANGUAGE plperl VOLATILE SECURITY DEFINER;
ALTER FUNCTION audit() SET search_path=auditoria;
ALTER FUNCTION audit() OWNER TO audit;
GRANT EXECUTE ON FUNCTION audit() TO public;

In response to

Responses

Browse pgsql-general by date

  From Date Subject
Next Message Diego Augusto Molina 2011-09-27 19:47:31 Re: [Solved] Generic logging system for pre-hstore using plperl triggers
Previous Message John R Pierce 2011-09-27 18:41:04 Re: Quick-and-Dirty Data Entry with LibreOffice3?