/* * Copyright (C) 2018 Centro de Computacao Cientifica e Software Livre * Departamento de Informatica - Universidade Federal do Parana * * This file is part of blend. * * blend is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * blend is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with blend. If not, see <http://www.gnu.org/licenses/>. */ import { SQLAdapter } from "./sql"; import { View } from "../core/view"; import { Source } from "../core/source"; import { FilterOperator } from "../core/filter"; import { Pool, PoolConfig } from "pg"; import { DataType } from "../common/types"; import * as async from "async"; /** Adapter which connects with a PostgreSQL database. */ export class PostgresAdapter extends SQLAdapter { /** Information used to connect with a PostgreSQL database. */ private pool: Pool; /** * Creates a new adapter with the database connection configuration. * @param config - The information required to create a connection with * the database. */ constructor (config: PoolConfig) { super(); this.pool = new Pool(config); } /** * Asynchronously reads all data from given view. * In other words perform a SELECT query. * @param view - "Location" from all data should be read. * @param cb - Callback function which contains the data read. * @param cb.error - Error information when the method fails. * @param cb.result - Data got from view. */ public getDataFromView(view: View, cb: (error: Error, result?: any[]) => void): void { const query = this.getQueryFromView(view); this.executeQuery(query, cb); } /** * Asynchronously executes a query and get its result. * @param query - Query (SQL format) to be executed. * @param cb - Callback function which contains the data read. * @param cb.error - Error information when the method fails. * @param cb.result - Query result. */ private executeQuery(query: string, cb: (err: Error, result?: any[]) => void): void{ this.pool.connect((err, client, done) => { if (err) { cb (err); return; } client.query(query, [], (error, result) => { // call 'done()' to release client back to pool done(); cb(error, (result) ? result.rows : null); }); }); } /** * Asynchronously insert one register into a given Source. * @param source - Insertion "location". * @param cb - Callback function which contains the query result. * @param cb.error - Error information when the method fails. * @param cb.result - Query result. */ public insertIntoSource(source: Source, data: any[], cb: (err: Error, result?: any[]) => void): void { const query = this.getQueryFromSource(source, data); this.executeQuery(query, cb); } /** * Materialize a given view. * @param view - View to be materialized. */ public materializeView(view: View): boolean { return false; } /** * Cast BlenDB data types to be used in PostgreSQL queries. * @param quotedValue - SQL query attribute wrapped by quotes. * @param dt - Attribute data type. */ protected typeCast(quotedValue: string, dt: DataType): string { switch (dt) { case DataType.DATE: return quotedValue + "::DATE"; case DataType.INTEGER: return quotedValue + "::INTEGER"; case DataType.BOOLEAN: return quotedValue + "::BOOLEAN"; default: return quotedValue; } } /** * Translate filter operator to be used in PostgreSQL queries. * @param lSide - Operation left side operator. * @param rSide - Operation right side operator. * @param op - Operation to be performed. */ protected applyOperator(lSide: string, rSide: string, op: FilterOperator): string { switch (op) { case FilterOperator.EQUAL: return lSide + " = " + rSide; case FilterOperator.NOTEQUAL: return lSide + " != " + rSide; case FilterOperator.GREATER: return lSide + " > " + rSide; case FilterOperator.LOWER: return lSide + " < " + rSide; case FilterOperator.GREATEREQ: return lSide + " >= " + rSide; case FilterOperator.LOWEREQ: return lSide + " <= " + rSide; default: return ""; } } /** * Asynchronously sends a message to the database to begin a * transaction. * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public beginTransaction(cb: (err: Error) => void): void { this.executeQuery("BEGIN;", cb); } /** * Asynchronously sends a message to the database to commit a * transaction. * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public commitTransaction(cb: (err: Error) => void): void { this.executeQuery("COMMIT;", cb); } /** * Asynchronously sends a message to the database to rollback a * transaction. * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public rollbackTransaction(cb: (err: Error) => void): void { this.executeQuery("ROLLBACK;", cb); } /** * Asynchronously makes Source data available for reading, using its * Transformers to update the respective Views. * WARNING: This function MUST be used inside a transaction * because possibly inserts and deletes data from multiple sources * and a fail in the middle of the process WILL cause inconsistence. * The begin/end of the transaction is not put inside this function * to give flexibility to the user. * @param source - "Location" with not "readble" data. * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public useTransformers(source: Source, cb: (err: Error) => void): void { const queryCopy = source.transformers.map((i) => { return this.translateTransformer(i, source); }); queryCopy.push(this.persistSource(source)); async.each(queryCopy, (query, callback) => { this.executeQuery(query, callback); }, (err: Error) => { if (err) { cb(err); return; } const query = this.truncSourceSa(source); this.executeQuery(query, cb); return; }); } /** * Asynchronously sends a message to the database to create * database elements (tables) required to maniputate a view. * @param view - Element to be initialized on database * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public abstract initView(view: View, cb: (err: Error) => void): void { async.waterfall([ (cback) => { const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;"; const strRegex = "relation \"" + source.name + "\" does not exist"; const regex = new RegExp(strRegex); this.executeQuery(query, (errQuery) => { if (errQuery) { if (regex.test(errQuery.message.toLowerCase())) { cback(null, false); } else { cback(errQuery); } return; } cb(null, true); return; }); } , (warn, cback) => { if (warn) { cback(null, true); return; } const query = "CREATE TABLE " + view.name + ";"; this.executeQuery(query, (errQuery) => cback(errQuery, false)); return; } ], (errQuery, warn1) => { if (warn1) { const msg = "[WARNING] View \"" + view.name + "\" relation already exists"; cb(new Error(msg)); } else { cb(errQuery); } return; }); } /** * Asynchronously sends a message to the database to create * database elements (tables) required to maniputate a source. * @param source - Element to be initialized on database * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public abstract initSource(source: Source, cb: (err: Error) => void): void { async.waterfall([ (cback) => { const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;"; const strRegex = "relation \"" + source.name + "\" does not exist"; const regex = new RegExp(strRegex); this.executeQuery(query, (errQuery) => { if (errQuery) { if (regex.test(errQuery.message.toLowerCase())) { cback(null, false); } else { cback(errQuery); } return; } cb(null, true); return; }); } , (warn, cback) => { if (warn) { cback(null, true); return; } const query = "CREATE TABLE " + source.name + ";"; this.executeQuery(query, (errQuery) => cback(errQuery, false)); return; } , (cback, warn1) => { const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;"; const strRegex = "relation \"" + source.saName + "\" does not exist"; const regex = new RegExp(strRegex); this.executeQuery(query, (errQuery) => { if (errQuery) { if (regex.test(errQuery.message.toLowerCase())) { cback(null, warn1, false); } else { cback(errQuery); } return; } cb(null, warn1, true); return; }); } , (warn1, warn2, cback) => { if (warn2) { cback(null, warn1, true); return; } const query = "CREATE TABLE " + source.saName + ";"; this.executeQuery(query, (errQuery) => { return cback(errQuery, warn1, false); }); return; } ], (errQuery, warn1, war2) => { let msg = ""; if (warn1) { msg += "[WARNING] Source \"" + source.name + "\" relation already exists"; } if (warn2) { msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists"; } if (msg === "") { cb(errQuery); } else { cb(new Error(msg)); } }); } /** * Asynchronously sends a message to the database to delete * all tuples stored in the given view. * @param view - Element to be initialized on database * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public abstract truncateView(view: View, cb: (err: Error) => void): void { async.waterfall([ (cback) => { const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;"; const strRegex = "relation \"" + source.name + "\" does not exist"; const regex = new RegExp(strRegex); this.executeQuery(query, (errQuery) => { if (errQuery) { if (regex.test(errQuery.message.toLowerCase())) { cback(null, true); } else { cback(errQuery); } return; } cb(null, false); return; }); } , (warn, cback) => { if (warn) { cback(null, true); return; } const query = "TRUNCATE TABLE " + view.name + ";"; this.executeQuery(query, (errQuery) => cback(errQuery, false)); return; } ], (errQuery, warn1) => { if (warn1) { const msg = "[WARNING] View \"" + view.name + "\" relation does not exists"; cb(new Error(msg)); } else { cb(errQuery); } return; }); } /** * Asynchronously sends a message to the database to delete * all tuples stored in the given source (and staging area). * @param source - Element to be initialized on database * @param cb - Callback function with operation status. * @param cb.error - Error information when the method fails. */ public abstract truncateSource(source: Source, cb: (err: Error) => void): void { async.waterfall([ (cback) => { const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;"; const strRegex = "relation \"" + source.name + "\" does not exist"; const regex = new RegExp(strRegex); this.executeQuery(query, (errQuery) => { if (errQuery) { if (regex.test(errQuery.message.toLowerCase())) { cback(null, true); } else { cback(errQuery); } return; } cb(null, false); return; }); } , (warn, cback) => { if (warn) { cback(null, true); return; } const query = "TRUNCATE TABLE " + source.name + ";"; this.executeQuery(query, (errQuery) => cback(errQuery, false)); return; } , (cback, warn1) => { const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;"; const strRegex = "relation \"" + source.saName + "\" does not exist"; const regex = new RegExp(strRegex); this.executeQuery(query, (errQuery) => { if (errQuery) { if (regex.test(errQuery.message.toLowerCase())) { cback(null, warn1, true); } else { cback(errQuery); } return; } cb(null, warn1, false); return; }); } , (warn1, warn2, cback) => { if (warn2) { cback(null, warn1, true); return; } const query = "TRUNCATE TABLE " + source.saName + ";"; this.executeQuery(query, (errQuery) => { return cback(errQuery, warn1, false); }); return; } ], (errQuery, warn1, war2) => { let msg = ""; if (warn1) { msg += "[WARNING] Source \"" + source.name + "\" relation already exists"; } if (warn2) { msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists"; } if (msg === "") { cb(errQuery); } else { cb(new Error(msg)); } }); } }