Skip to content
Snippets Groups Projects
Select Git revision
  • develop default protected
  • tg-felipe
  • issue/97
  • issue/63
  • icde-2019-experiments
  • issue/85
  • master
  • issue/20
  • refactor/engine
  • issue/6
  • feature/diagrams
  • wip-transformers
12 results

postgres.ts

Blame
  • Forked from C3SL / blendb
    24 commits behind, 12 commits ahead of the upstream repository.
    postgres.ts 16.98 KiB
    /*
     * Copyright (C) 2018 Centro de Computacao Cientifica e Software Livre
     * Departamento de Informatica - Universidade Federal do Parana
     *
     * This file is part of blend.
     *
     * blend is free software: you can redistribute it and/or modify
     * it under the terms of the GNU General Public License as published by
     * the Free Software Foundation, either version 3 of the License, or
     * (at your option) any later version.
     *
     * blend is distributed in the hope that it will be useful,
     * but WITHOUT ANY WARRANTY; without even the implied warranty of
     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     * GNU General Public License for more details.
     *
     * You should have received a copy of the GNU General Public License
     * along with blend.  If not, see <http://www.gnu.org/licenses/>.
     */
    
    import { SQLAdapter } from "./sql";
    import { View } from "../core/view";
    import { Source } from "../core/source";
    import { FilterOperator } from "../core/filter";
    import { Pool, PoolConfig } from "pg";
    import { DataType } from "../common/types";
    import * as async from "async";
    
    /** Adapter which connects with a PostgreSQL database. */
    export class PostgresAdapter extends SQLAdapter {
        /** Information used to connect with a PostgreSQL database. */
        private pool: Pool;
    
        /**
         * Creates a new adapter with the database connection configuration.
         * @param config - The information required to create a connection with
         * the database.
         */
        constructor (config: PoolConfig) {
            super();
            this.pool = new Pool(config);
        }
    
        /**
         * Asynchronously reads all data from given view.
         * In other words perform a SELECT query.
         * @param view - "Location" from all data should be read.
         * @param cb - Callback function which contains the data read.
         * @param cb.error - Error information when the method fails.
         * @param cb.result - Data got from view.
         */
        public getDataFromView(view: View, cb: (error: Error, result?: any[]) => void): void {
            const query = this.getQueryFromView(view);
            this.executeQuery(query, cb);
        }
    
        /**
         * Asynchronously executes a query and get its result.
         * @param query - Query (SQL format) to be executed.
         * @param cb - Callback function which contains the data read.
         * @param cb.error - Error information when the method fails.
         * @param cb.result - Query result.
         */
        private executeQuery(query: string, cb: (err: Error, result?: any[]) =>  void): void{
            this.pool.connect((err, client, done) => {
                if (err) {
                    cb (err);
                    return;
                }
                client.query(query, [], (error, result) => {
                    // call 'done()' to release client back to pool
                    done();
                    cb(error, (result) ? result.rows : null);
                });
            });
        }
    
        /**
         * Asynchronously insert one register into a given Source.
         * @param source - Insertion "location".
         * @param cb - Callback function which contains the query result.
         * @param cb.error - Error information when the method fails.
         * @param cb.result - Query result.
         */
        public insertIntoSource(source: Source, data: any[], cb: (err: Error, result?: any[]) =>  void): void {
            const query = this.getQueryFromSource(source, data);
            this.executeQuery(query, cb);
        }
    
        /**
         * Materialize a given view.
         * @param view - View to  be materialized.
         */
        public materializeView(view: View): boolean {
            return false;
        }
    
        /**
         * Cast BlenDB data types to be used in PostgreSQL queries.
         * @param quotedValue - SQL query attribute wrapped by quotes.
         * @param dt - Attribute data type.
         */
        protected typeCast(quotedValue: string, dt: DataType): string {
            switch (dt) {
                case DataType.DATE:
                    return quotedValue + "::DATE";
                case DataType.INTEGER:
                    return quotedValue + "::INTEGER";
                case DataType.BOOLEAN:
                    return quotedValue + "::BOOLEAN";
                default:
                    return quotedValue;
            }
        }
    
        /**
         * Translate filter operator to be used in PostgreSQL queries.
         * @param lSide - Operation left side operator.
         * @param rSide - Operation right side operator.
         * @param op - Operation to be performed.
         */
        protected applyOperator(lSide: string, rSide: string, op: FilterOperator): string {
            switch (op) {
                case FilterOperator.EQUAL:
                    return lSide + " = " + rSide;
                case FilterOperator.NOTEQUAL:
                    return lSide + " != " + rSide;
                case FilterOperator.GREATER:
                    return lSide + " > " + rSide;
                case FilterOperator.LOWER:
                    return lSide + " < " + rSide;
                case FilterOperator.GREATEREQ:
                    return lSide + " >= " + rSide;
                case FilterOperator.LOWEREQ:
                    return lSide + " <= " + rSide;
                default:
                    return  "";
            }
        }
    
        /**
         * Asynchronously sends a message to the database to begin a
         * transaction.
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public beginTransaction(cb: (err: Error) =>  void): void {
            this.executeQuery("BEGIN;", cb);
        }
    
        /**
         * Asynchronously sends a message to the database to commit a
         * transaction.
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public commitTransaction(cb: (err: Error) =>  void): void {
            this.executeQuery("COMMIT;", cb);
        }
    
        /**
         * Asynchronously sends a message to the database to rollback a
         * transaction.
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public rollbackTransaction(cb: (err: Error) =>  void): void {
            this.executeQuery("ROLLBACK;", cb);
        }
    
        /**
         * Asynchronously makes Source data available for reading, using its
         * Transformers to update the respective Views.
         * WARNING: This function MUST be used inside a transaction
         * because possibly inserts and deletes data from multiple sources
         * and a fail in the middle of the process WILL cause inconsistence.
         * The begin/end of the transaction is not put inside this function
         * to give flexibility to the user.
         * @param source - "Location" with not "readble" data.
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public useTransformers(source: Source, cb: (err: Error) =>  void): void {
            const queryCopy = source.transformers.map((i) => {
                return this.translateTransformer(i, source);
            });
    
            queryCopy.push(this.persistSource(source));
            async.each(queryCopy, (query, callback) => {
                this.executeQuery(query, callback);
            }, (err: Error) => {
                if (err) {
                    cb(err);
                    return;
                }
                const query = this.truncSourceSa(source);
                this.executeQuery(query, cb);
                return;
            });
        }
    
        /**
         * Asynchronously sends a message to the database to create
         * database elements (tables) required to maniputate a view.
         * @param view - Element to be initialized on database
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public abstract initView(view: View, cb: (err: Error) =>  void): void {
            async.waterfall([
                (cback) => {
                    const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;";
                    const strRegex = "relation \"" + source.name + "\" does not exist";
                    const regex = new RegExp(strRegex);
                    this.executeQuery(query, (errQuery) => {
                        if (errQuery)  {
                            if (regex.test(errQuery.message.toLowerCase())) {
                                cback(null, false);
                            }
    
                            else {
                                cback(errQuery);
                            }
    
                            return;
                        }
    
                        cb(null, true);
                        return;
                    });
                }
                , (warn, cback) => {
                    if (warn) {
                        cback(null, true);
                        return;
                    }
                    const query = "CREATE TABLE " + view.name + ";";
                    this.executeQuery(query, (errQuery) => cback(errQuery, false));
                    return;
                }
            ], (errQuery, warn1) =>  {
                if (warn1) {
                    const msg = "[WARNING] View \"" + view.name + "\" relation already exists";
                    cb(new Error(msg));
                }
    
                else {
                    cb(errQuery);
                }
    
                return;
            });
        }
        /**
         * Asynchronously sends a message to the database to create
         * database elements (tables) required to maniputate a source.
         * @param source - Element to be initialized on database
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public abstract initSource(source: Source, cb: (err: Error) =>  void): void {
            async.waterfall([
                (cback) => {
                    const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;";
                    const strRegex = "relation \"" + source.name + "\" does not exist";
                    const regex = new RegExp(strRegex);
                    this.executeQuery(query, (errQuery) => {
                        if (errQuery)  {
                            if (regex.test(errQuery.message.toLowerCase())) {
                                cback(null, false);
                            }
    
                            else {
                                cback(errQuery);
                            }
    
                            return;
                        }
    
                        cb(null, true);
                        return;
                    });
                }
                , (warn, cback) => {
                    if (warn) {
                        cback(null, true);
                        return;
                    }
                    const query = "CREATE TABLE " + source.name + ";";
                    this.executeQuery(query, (errQuery) => cback(errQuery, false));
                    return;
                }
                , (cback, warn1) => {
                    const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;";
                    const strRegex = "relation \"" + source.saName + "\" does not exist";
                    const regex = new RegExp(strRegex);
                    this.executeQuery(query, (errQuery) => {
                        if (errQuery)  {
                            if (regex.test(errQuery.message.toLowerCase())) {
                                cback(null, warn1, false);
                            }
    
                            else {
                                cback(errQuery);
                            }
    
                            return;
                        }
    
                        cb(null, warn1, true);
                        return;
                    });
                }
                , (warn1, warn2, cback) => {
                    if (warn2) {
                        cback(null, warn1, true);
                        return;
                    }
                    const query = "CREATE TABLE " + source.saName + ";";
                    this.executeQuery(query, (errQuery) => {
                        return cback(errQuery, warn1, false);
                    });
                    return;
                }
            ], (errQuery, warn1, war2) =>  {
                let msg = "";
                if (warn1) {
                    msg += "[WARNING] Source \"" + source.name + "\" relation already exists";
                }
                if (warn2) {
                    msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists";
                }
    
                if (msg === "") {
                    cb(errQuery);
                }
    
                else {
                    cb(new Error(msg));
                }
            });
        }
        /**
         * Asynchronously sends a message to the database to delete
         * all tuples stored in the given view.
         * @param view - Element to be initialized on database
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public abstract truncateView(view: View, cb: (err: Error) =>  void): void {
            async.waterfall([
                (cback) => {
                    const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;";
                    const strRegex = "relation \"" + source.name + "\" does not exist";
                    const regex = new RegExp(strRegex);
                    this.executeQuery(query, (errQuery) => {
                        if (errQuery)  {
                            if (regex.test(errQuery.message.toLowerCase())) {
                                cback(null, true);
                            }
    
                            else {
                                cback(errQuery);
                            }
    
                            return;
                        }
    
                        cb(null, false);
                        return;
                    });
                }
                , (warn, cback) => {
                    if (warn) {
                        cback(null, true);
                        return;
                    }
                    const query = "TRUNCATE TABLE " + view.name + ";";
                    this.executeQuery(query, (errQuery) => cback(errQuery, false));
                    return;
                }
            ], (errQuery, warn1) =>  {
                if (warn1) {
                    const msg = "[WARNING] View \"" + view.name + "\" relation does not exists";
                    cb(new Error(msg));
                }
    
                else {
                    cb(errQuery);
                }
    
                return;
            });
        }
        /**
         * Asynchronously sends a message to the database to delete
         * all tuples stored in the given source (and staging area).
         * @param source - Element to be initialized on database
         * @param cb - Callback function with operation status.
         * @param cb.error - Error information when the method fails.
         */
        public abstract truncateSource(source: Source, cb: (err: Error) =>  void): void {
            async.waterfall([
                (cback) => {
                    const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;";
                    const strRegex = "relation \"" + source.name + "\" does not exist";
                    const regex = new RegExp(strRegex);
                    this.executeQuery(query, (errQuery) => {
                        if (errQuery)  {
                            if (regex.test(errQuery.message.toLowerCase())) {
                                cback(null, true);
                            }
    
                            else {
                                cback(errQuery);
                            }
    
                            return;
                        }
    
                        cb(null, false);
                        return;
                    });
                }
                , (warn, cback) => {
                    if (warn) {
                        cback(null, true);
                        return;
                    }
                    const query = "TRUNCATE TABLE " + source.name + ";";
                    this.executeQuery(query, (errQuery) => cback(errQuery, false));
                    return;
                }
                , (cback, warn1) => {
                    const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;";
                    const strRegex = "relation \"" + source.saName + "\" does not exist";
                    const regex = new RegExp(strRegex);
                    this.executeQuery(query, (errQuery) => {
                        if (errQuery)  {
                            if (regex.test(errQuery.message.toLowerCase())) {
                                cback(null, warn1, true);
                            }
    
                            else {
                                cback(errQuery);
                            }
    
                            return;
                        }
    
                        cb(null, warn1, false);
                        return;
                    });
                }
                , (warn1, warn2, cback) => {
                    if (warn2) {
                        cback(null, warn1, true);
                        return;
                    }
                    const query = "TRUNCATE TABLE " + source.saName + ";";
                    this.executeQuery(query, (errQuery) => {
                        return cback(errQuery, warn1, false);
                    });
                    return;
                }
            ], (errQuery, warn1, war2) =>  {
                let msg = "";
                if (warn1) {
                    msg += "[WARNING] Source \"" + source.name + "\" relation already exists";
                }
                if (warn2) {
                    msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists";
                }
    
                if (msg === "") {
                    cb(errQuery);
                }
    
                else {
                    cb(new Error(msg));
                }
            });
        }
    }