Re: Using the database to validate data

From: Adam Brusselback <adambrusselback(at)gmail(dot)com>
To: JPLapham <lapham(at)jandr(dot)org>
Cc: "pgsql-general(at)postgresql(dot)org" <pgsql-general(at)postgresql(dot)org>
Subject: Re: Using the database to validate data
Date: 2015-07-27 17:26:00
Message-ID: CAMjNa7e=st9iXPtaMwRXB+PiXAkM=61PBAnaAhsyY-b7ZvcGfA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general

A little late to the party, but i'll share how I do my data imports /
validation for anyone interested.

I have a bunch of data that comes in from various sources, and it isn't
always guaranteed to be in the correct format, have the right foreign keys,
or even the right data types.

I have a staging table that is in the format of the feed I have coming in,
with all columns text and no constraints at all on the data columns.
Example:

> CREATE TABLE import_sale
> (
> client_id uuid NOT NULL,
> row_id uuid NOT NULL DEFAULT gen_random_uuid(),
> row_date timestamp with time zone NOT NULL DEFAULT now(),
> file_id uuid NOT NULL,
> sale_number character varying,
> company_number character varying,
> invoice_number character varying,
> invoice_date character varying,
> order_date character varying,
> ship_date character varying,
> sale_date character varying,
> product_number character varying,
> quantity character varying,
> quantity_uom character varying,
> price character varying,
> reduction character varying,
> direct_indicator character varying,
> redistributor_company_number character varying,
> freight numeric,
> processed_ind boolean DEFAULT false,
> CONSTRAINT import_sales_client_id_fkey FOREIGN KEY (client_id)
> REFERENCES client (client_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT import_sales_file_id_fkey FOREIGN KEY (file_id)
> REFERENCES import_file (file_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT import_sales_row_id_unique UNIQUE (row_id)
> );

I use a talend package, or COPY to get the data into this table. However
you want to do that is up to you.

I have a final table that I want all this data to eventually get to once
there are no issues with it. Example:

> CREATE TABLE sale
> (
> sale_id uuid NOT NULL DEFAULT gen_random_uuid(),
> client_id uuid NOT NULL,
> source_row_id uuid NOT NULL,
> sale_number character varying NOT NULL,
> company_id uuid NOT NULL,
> invoice_number character varying NOT NULL,
> invoice_date date,
> order_date date,
> ship_date date,
> sale_date date NOT NULL,
> product_id uuid NOT NULL,
> quantity numeric NOT NULL,
> uom_type_id uuid NOT NULL,
> price numeric NOT NULL,
> reduction numeric NOT NULL,
> redistributor_company_id uuid,
> freight numeric,
> active_range tstzrange DEFAULT tstzrange(now(), NULL::timestamp with
> time zone),
> CONSTRAINT sale_pkey PRIMARY KEY (sale_id),
> CONSTRAINT sale_client_id_fkey FOREIGN KEY (client_id)
> REFERENCES client (client_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT sale_company_id_fkey FOREIGN KEY (company_id)
> REFERENCES company (company_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT sale_product_id_fkey FOREIGN KEY (product_id)
> REFERENCES product (product_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT sale_redistributor_company_id_fkey FOREIGN KEY
> (redistributor_company_id)
> REFERENCES company (company_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT sale_source_row_id_fkey FOREIGN KEY (source_row_id)
> REFERENCES import_sale (row_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT sale_uom_type_id_fkey FOREIGN KEY (uom_type_id)
> REFERENCES uom_type (uom_type_id) MATCH SIMPLE
> ON UPDATE NO ACTION ON DELETE NO ACTION,
> CONSTRAINT sale_sale_number_active_range_excl EXCLUDE
> USING gist (sale_number WITH =, (client_id::character varying) WITH =,
> active_range WITH &&),
> CONSTRAINT sale_unique UNIQUE (sale_number, client_id, active_range)
> );

I then have couple functions which run over the data and do the validations
/ insert / update where necessary.

This one validates that the data is able to map to all the foreign keys,
the data types can be converted properly, and that not null constraints are
enforced.

CREATE OR REPLACE FUNCTION import_validate_sale()
> RETURNS void AS
> $BODY$
> /*
> Remove any prior exceptions
> */
> DELETE FROM import_sale_error
> WHERE EXISTS (
> SELECT 1
> FROM import_sale
> WHERE import_sale_error.row_id = import_sale.row_id);
> /*
> Null checks for required fields
> */
> INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'sale_number is null, but required.'
> FROM import_sale s
> WHERE s.sale_number IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'distributor company_number is null, but required.'
> FROM import_sale s
> WHERE s.company_number IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'invoice_number is null, but required.'
> FROM import_sale s
> WHERE s.invoice_number IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'sale_date is null, but required.'
> FROM import_sale s
> WHERE s.sale_date IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'product_number is null, but required.'
> FROM import_sale s
> WHERE s.product_number IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'quantity is null, but required.'
> FROM import_sale s
> WHERE s.quantity IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'quantity_uom is null, but required.'
> FROM import_sale s
> WHERE s.quantity_uom IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'price is null, but required.'
> FROM import_sale s
> WHERE s.price IS NULL;
>
> /*
> Check type conversions
> */
> INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'invoice_date cannot be converted to date.'
> FROM import_sale s
> WHERE can_convert_date(s.invoice_date) = FALSE;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'order_date cannot be converted to date.'
> FROM import_sale s
> WHERE can_convert_date(s.order_date) = FALSE;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'ship_date cannot be converted to date.'
> FROM import_sale s
> WHERE can_convert_date(s.ship_date) = FALSE;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'sale_date cannot be converted to date.'
> FROM import_sale s
> WHERE can_convert_date(s.sale_date) = FALSE;
>

> INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'quantity cannot be converted to numeric.'
> FROM import_sale s
> WHERE can_convert_numeric(s.quantity) = FALSE;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'price cannot be converted to numeric.'
> FROM import_sale s
> WHERE can_convert_numeric(s.price) = FALSE;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'reduction cannot be converted to numeric.'
> FROM import_sale s
> WHERE can_convert_numeric(s.reduction) = FALSE;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'freight cannot be converted to numeric.'
> FROM import_sale s
> WHERE can_convert_numeric(s.freight) = FALSE;
>

> /*
> Check item resolutions
> */
> INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'Distributor company record could not be mapped.'
> FROM import_sale s
> LEFT JOIN company c
> ON s.company_number = c.company_number
> WHERE c.company_id IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'Re-Distributor company record could not be mapped.'
> FROM import_sale s
> LEFT JOIN company c
> ON s.redistributor_company_number = c.company_number
> WHERE c.company_id IS NULL
> AND s.redistributor_company_number IS NOT NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'Product record could not be mapped.'
> FROM import_sale s
> LEFT JOIN product p
> ON s.product_number = p.product_number
> WHERE p.product_id IS NULL;
>
> INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'Unit Of Measure record could not be mapped.'
> FROM import_sale s
> LEFT JOIN uom_type u
> ON u.uom_type_cd = s.quantity_uom
> WHERE u.uom_type_id IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
> SELECT s.row_id, 'Product does not have a conversion rate for this Unit Of
> Measure.'
> FROM import_sale s
> INNER JOIN product p
> ON s.product_number = p.product_number
> INNER JOIN uom_type u
> ON u.uom_type_cd = s.quantity_uom
> LEFT JOIN product_uom_conversion puc
> ON p.product_id = puc.product_id
> AND u.uom_type_id = puc.uom_type_id
> WHERE puc.rate IS NULL;
>
> $BODY$
> LANGUAGE sql VOLATILE
> COST 100;

The other function is then in charge of the insert into the final table:

> CREATE OR REPLACE FUNCTION import_sale()
> RETURNS void AS
> $BODY$
> /**
> Insert into the production sale table where no errors exist.
> **/
>
> INSERT INTO sale(
> client_id
> , source_row_id
> , sale_number
> , company_id
> , invoice_number
> , invoice_date
> , order_date
> , ship_date
> , sale_date
> , product_id
> , quantity
> , uom_type_id
> , price
> , reduction
> , redistributor_company_id
> , freight)
> SELECT
> s.client_id
> , s.row_id
> , s.sale_number
> , cmpd.company_id
> , s.invoice_number
> , s.invoice_date::date
> , s.order_date::date
> , s.ship_date::date
> , s.sale_date::date
> , p.product_id
> , s.quantity::numeric
> , uom.uom_type_id
> , s.price::numeric
> , s.reduction::numeric
> , cmpr.company_id
> , s.freight
> FROM import_sale s
> INNER JOIN company cmpd
> ON cmpd.company_number = s.company_number
> LEFT JOIN company cmpr
> ON cmpr.company_number = s.redistributor_company_number
> INNER JOIN product p
> ON s.product_number = p.product_number
> INNER JOIN uom_type uom
> ON uom.uom_type_cd = s.quantity_uom
> WHERE NOT EXISTS (
> SELECT 1
> FROM import_sale_error se
> WHERE se.row_id = s.row_id)
> ----new sale_number
> AND (NOT EXISTS (
> SELECT 1
> FROM sale s2
> WHERE s.row_id = s2.source_row_id
> AND s.client_id = s2.client_id)
> ----existing sale_number with changes
> OR EXISTS (
> SELECT 1
> FROM sale s2
> WHERE s.processed_ind = false
> AND s.sale_number = s2.sale_number
> AND s.client_id = s2.client_id
> AND (cmpd.company_id != s2.company_id
> OR s.invoice_number != s2.invoice_number
> OR s.invoice_date::date != s2.invoice_date
> OR s.order_date::date != s2.order_date
> OR s.ship_date::date != s2.ship_date
> OR s.sale_date::date != s2.sale_date
> OR p.product_id != s2.product_id
> OR s.quantity::numeric != s2.quantity
> OR uom.uom_type_id != s2.uom_type_id
> OR s.price::numeric != s2.price
> OR s.reduction::numeric != s2.reduction
> OR cmpr.company_id != s2.redistributor_company_id)
> );
>
>
> -------------------------------------------------------------------------
> --Update processed_ind
> UPDATE import_sale AS s
> SET processed_ind = true
> WHERE NOT EXISTS (
> SELECT 1
> FROM import_sale_error se
> WHERE se.row_id = s.row_id);
>
> $BODY$
> LANGUAGE sql VOLATILE
> COST 1000;

So all of that together is a pretty solid system for importing data, and
showing a user what went wrong with the data they sent if it is a bad row.

The other good part of this, is it's all set based. This process will run
through 20,000 lines on a single core server in around 5-10 seconds.

At one of my old jobs, we had something that did the same type of
validations / inserts, but did it row by row in a cursor (not written by
me), and that took a good 5 min (if I remember correctly) to process 20,000
lines. This was also on a server running Sql Server on a 32 core machine.

Anyways, good luck!
-Adam

On Fri, Jul 24, 2015 at 9:55 AM, JPLapham <lapham(at)jandr(dot)org> wrote:

> Zdeněk Bělehrádek wrote
> > What about creating a SAVEPOINT before each INSERT, and if the INSERT
> > returns
> > an error, then ROLLBACK TO SAVEPOINT? This way you will have all the
> > insertable data in your table, and you can still ROLLBACK the whole
> > transaction, or COMMIT it if there were no errors.
> >
> > It will probably be quite slow, but if you have only thousands of lines,
> > it
> > should be fast enough for your usecase IMHO.
> >
> > -- Zdeněk Bělehrádek
>
> Hmmm, interesting. Thanks, if that works, it would be exactly what I'm
> looking for!
>
> You are right, speed is not an issue.
>
> -Jon
>
>
>
> --
> View this message in context:
> http://postgresql.nabble.com/Using-the-database-to-validate-data-tp5859046p5859239.html
> Sent from the PostgreSQL - general mailing list archive at Nabble.com.
>
>
> --
> Sent via pgsql-general mailing list (pgsql-general(at)postgresql(dot)org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-general
>

In response to

Browse pgsql-general by date

  From Date Subject
Next Message Stephen Frost 2015-07-27 17:43:13 Re: Creating a user for pg_start_backup
Previous Message Adrian Klaver 2015-07-27 15:57:50 Re: Connections closing due to "terminating connection due to administrator command"