Select Git revision
postgres.ts
Forked from
C3SL / blendb
24 commits behind, 12 commits ahead of the upstream repository.
-
Lucas Fernandes de Oliveira authored
Signed-off-by:
Lucas Fernandes de Oliveira <lfoliveira@inf.ufpr.br>
Lucas Fernandes de Oliveira authoredSigned-off-by:
Lucas Fernandes de Oliveira <lfoliveira@inf.ufpr.br>
postgres.ts 16.98 KiB
/*
* Copyright (C) 2018 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blend.
*
* blend is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blend is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blend. If not, see <http://www.gnu.org/licenses/>.
*/
import { SQLAdapter } from "./sql";
import { View } from "../core/view";
import { Source } from "../core/source";
import { FilterOperator } from "../core/filter";
import { Pool, PoolConfig } from "pg";
import { DataType } from "../common/types";
import * as async from "async";
/** Adapter which connects with a PostgreSQL database. */
export class PostgresAdapter extends SQLAdapter {
/** Information used to connect with a PostgreSQL database. */
private pool: Pool;
/**
* Creates a new adapter with the database connection configuration.
* @param config - The information required to create a connection with
* the database.
*/
constructor (config: PoolConfig) {
super();
this.pool = new Pool(config);
}
/**
* Asynchronously reads all data from given view.
* In other words perform a SELECT query.
* @param view - "Location" from all data should be read.
* @param cb - Callback function which contains the data read.
* @param cb.error - Error information when the method fails.
* @param cb.result - Data got from view.
*/
public getDataFromView(view: View, cb: (error: Error, result?: any[]) => void): void {
const query = this.getQueryFromView(view);
this.executeQuery(query, cb);
}
/**
* Asynchronously executes a query and get its result.
* @param query - Query (SQL format) to be executed.
* @param cb - Callback function which contains the data read.
* @param cb.error - Error information when the method fails.
* @param cb.result - Query result.
*/
private executeQuery(query: string, cb: (err: Error, result?: any[]) => void): void{
this.pool.connect((err, client, done) => {
if (err) {
cb (err);
return;
}
client.query(query, [], (error, result) => {
// call 'done()' to release client back to pool
done();
cb(error, (result) ? result.rows : null);
});
});
}
/**
* Asynchronously insert one register into a given Source.
* @param source - Insertion "location".
* @param cb - Callback function which contains the query result.
* @param cb.error - Error information when the method fails.
* @param cb.result - Query result.
*/
public insertIntoSource(source: Source, data: any[], cb: (err: Error, result?: any[]) => void): void {
const query = this.getQueryFromSource(source, data);
this.executeQuery(query, cb);
}
/**
* Materialize a given view.
* @param view - View to be materialized.
*/
public materializeView(view: View): boolean {
return false;
}
/**
* Cast BlenDB data types to be used in PostgreSQL queries.
* @param quotedValue - SQL query attribute wrapped by quotes.
* @param dt - Attribute data type.
*/
protected typeCast(quotedValue: string, dt: DataType): string {
switch (dt) {
case DataType.DATE:
return quotedValue + "::DATE";
case DataType.INTEGER:
return quotedValue + "::INTEGER";
case DataType.BOOLEAN:
return quotedValue + "::BOOLEAN";
default:
return quotedValue;
}
}
/**
* Translate filter operator to be used in PostgreSQL queries.
* @param lSide - Operation left side operator.
* @param rSide - Operation right side operator.
* @param op - Operation to be performed.
*/
protected applyOperator(lSide: string, rSide: string, op: FilterOperator): string {
switch (op) {
case FilterOperator.EQUAL:
return lSide + " = " + rSide;
case FilterOperator.NOTEQUAL:
return lSide + " != " + rSide;
case FilterOperator.GREATER:
return lSide + " > " + rSide;
case FilterOperator.LOWER:
return lSide + " < " + rSide;
case FilterOperator.GREATEREQ:
return lSide + " >= " + rSide;
case FilterOperator.LOWEREQ:
return lSide + " <= " + rSide;
default:
return "";
}
}
/**
* Asynchronously sends a message to the database to begin a
* transaction.
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public beginTransaction(cb: (err: Error) => void): void {
this.executeQuery("BEGIN;", cb);
}
/**
* Asynchronously sends a message to the database to commit a
* transaction.
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public commitTransaction(cb: (err: Error) => void): void {
this.executeQuery("COMMIT;", cb);
}
/**
* Asynchronously sends a message to the database to rollback a
* transaction.
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public rollbackTransaction(cb: (err: Error) => void): void {
this.executeQuery("ROLLBACK;", cb);
}
/**
* Asynchronously makes Source data available for reading, using its
* Transformers to update the respective Views.
* WARNING: This function MUST be used inside a transaction
* because possibly inserts and deletes data from multiple sources
* and a fail in the middle of the process WILL cause inconsistence.
* The begin/end of the transaction is not put inside this function
* to give flexibility to the user.
* @param source - "Location" with not "readble" data.
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public useTransformers(source: Source, cb: (err: Error) => void): void {
const queryCopy = source.transformers.map((i) => {
return this.translateTransformer(i, source);
});
queryCopy.push(this.persistSource(source));
async.each(queryCopy, (query, callback) => {
this.executeQuery(query, callback);
}, (err: Error) => {
if (err) {
cb(err);
return;
}
const query = this.truncSourceSa(source);
this.executeQuery(query, cb);
return;
});
}
/**
* Asynchronously sends a message to the database to create
* database elements (tables) required to maniputate a view.
* @param view - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract initView(view: View, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, false);
}
else {
cback(errQuery);
}
return;
}
cb(null, true);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "CREATE TABLE " + view.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
], (errQuery, warn1) => {
if (warn1) {
const msg = "[WARNING] View \"" + view.name + "\" relation already exists";
cb(new Error(msg));
}
else {
cb(errQuery);
}
return;
});
}
/**
* Asynchronously sends a message to the database to create
* database elements (tables) required to maniputate a source.
* @param source - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract initSource(source: Source, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, false);
}
else {
cback(errQuery);
}
return;
}
cb(null, true);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "CREATE TABLE " + source.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
, (cback, warn1) => {
const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;";
const strRegex = "relation \"" + source.saName + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, warn1, false);
}
else {
cback(errQuery);
}
return;
}
cb(null, warn1, true);
return;
});
}
, (warn1, warn2, cback) => {
if (warn2) {
cback(null, warn1, true);
return;
}
const query = "CREATE TABLE " + source.saName + ";";
this.executeQuery(query, (errQuery) => {
return cback(errQuery, warn1, false);
});
return;
}
], (errQuery, warn1, war2) => {
let msg = "";
if (warn1) {
msg += "[WARNING] Source \"" + source.name + "\" relation already exists";
}
if (warn2) {
msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists";
}
if (msg === "") {
cb(errQuery);
}
else {
cb(new Error(msg));
}
});
}
/**
* Asynchronously sends a message to the database to delete
* all tuples stored in the given view.
* @param view - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract truncateView(view: View, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, true);
}
else {
cback(errQuery);
}
return;
}
cb(null, false);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "TRUNCATE TABLE " + view.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
], (errQuery, warn1) => {
if (warn1) {
const msg = "[WARNING] View \"" + view.name + "\" relation does not exists";
cb(new Error(msg));
}
else {
cb(errQuery);
}
return;
});
}
/**
* Asynchronously sends a message to the database to delete
* all tuples stored in the given source (and staging area).
* @param source - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract truncateSource(source: Source, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, true);
}
else {
cback(errQuery);
}
return;
}
cb(null, false);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "TRUNCATE TABLE " + source.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
, (cback, warn1) => {
const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;";
const strRegex = "relation \"" + source.saName + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, warn1, true);
}
else {
cback(errQuery);
}
return;
}
cb(null, warn1, false);
return;
});
}
, (warn1, warn2, cback) => {
if (warn2) {
cback(null, warn1, true);
return;
}
const query = "TRUNCATE TABLE " + source.saName + ";";
this.executeQuery(query, (errQuery) => {
return cback(errQuery, warn1, false);
});
return;
}
], (errQuery, warn1, war2) => {
let msg = "";
if (warn1) {
msg += "[WARNING] Source \"" + source.name + "\" relation already exists";
}
if (warn2) {
msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists";
}
if (msg === "") {
cb(errQuery);
}
else {
cb(new Error(msg));
}
});
}
}