From 193b01111f6d12d9f30ef08761ad17aeebb9fd63 Mon Sep 17 00:00:00 2001 From: Lucas Fernandes de Oliveira <lfoliveira@inf.ufpr.br> Date: Thu, 13 Dec 2018 11:44:14 -0200 Subject: [PATCH] [WIP] [ci-skip] Remake postgres adapter Signed-off-by: Lucas Fernandes de Oliveira <lfoliveira@inf.ufpr.br> --- src/adapter/postgres.ts | 287 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 285 insertions(+), 2 deletions(-) diff --git a/src/adapter/postgres.ts b/src/adapter/postgres.ts index 8d34ac9d..f236f7a6 100644 --- a/src/adapter/postgres.ts +++ b/src/adapter/postgres.ts @@ -24,7 +24,7 @@ import { Source } from "../core/source"; import { FilterOperator } from "../core/filter"; import { Pool, PoolConfig } from "pg"; import { DataType } from "../common/types"; -import { each } from "async"; +import * as async from "async"; /** Adapter which connects with a PostgreSQL database. */ export class PostgresAdapter extends SQLAdapter { @@ -186,7 +186,7 @@ export class PostgresAdapter extends SQLAdapter { }); queryCopy.push(this.persistSource(source)); - each(queryCopy, (query, callback) => { + async.each(queryCopy, (query, callback) => { this.executeQuery(query, callback); }, (err: Error) => { if (err) { @@ -198,4 +198,287 @@ export class PostgresAdapter extends SQLAdapter { return; }); } + + /** + * Asynchronously sends a message to the database to create + * database elements (tables) required to maniputate a view. + * @param view - Element to be initialized on database + * @param cb - Callback function with operation status. + * @param cb.error - Error information when the method fails. + */ + public abstract initView(view: View, cb: (err: Error) => void): void { + async.waterfall([ + (cback) => { + const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;"; + const strRegex = "relation \"" + source.name + "\" does not exist"; + const regex = new RegExp(strRegex); + this.executeQuery(query, (errQuery) => { + if (errQuery) { + if (regex.test(errQuery.message.toLowerCase())) { + cback(null, false); + } + + else { + cback(errQuery); + } + + return; + } + + cb(null, true); + return; + }); + } + , (warn, cback) => { + if (warn) { + cback(null, true); + return; + } + const query = "CREATE TABLE " + view.name + ";"; + this.executeQuery(query, (errQuery) => cback(errQuery, false)); + return; + } + ], (errQuery, warn1) => { + if (warn1) { + const msg = "[WARNING] View \"" + view.name + "\" relation already exists"; + cb(new Error(msg)); + } + + else { + cb(errQuery); + } + + return; + }); + } + /** + * Asynchronously sends a message to the database to create + * database elements (tables) required to maniputate a source. + * @param source - Element to be initialized on database + * @param cb - Callback function with operation status. + * @param cb.error - Error information when the method fails. + */ + public abstract initSource(source: Source, cb: (err: Error) => void): void { + async.waterfall([ + (cback) => { + const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;"; + const strRegex = "relation \"" + source.name + "\" does not exist"; + const regex = new RegExp(strRegex); + this.executeQuery(query, (errQuery) => { + if (errQuery) { + if (regex.test(errQuery.message.toLowerCase())) { + cback(null, false); + } + + else { + cback(errQuery); + } + + return; + } + + cb(null, true); + return; + }); + } + , (warn, cback) => { + if (warn) { + cback(null, true); + return; + } + const query = "CREATE TABLE " + source.name + ";"; + this.executeQuery(query, (errQuery) => cback(errQuery, false)); + return; + } + , (cback, warn1) => { + const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;"; + const strRegex = "relation \"" + source.saName + "\" does not exist"; + const regex = new RegExp(strRegex); + this.executeQuery(query, (errQuery) => { + if (errQuery) { + if (regex.test(errQuery.message.toLowerCase())) { + cback(null, warn1, false); + } + + else { + cback(errQuery); + } + + return; + } + + cb(null, warn1, true); + return; + }); + } + , (warn1, warn2, cback) => { + if (warn2) { + cback(null, warn1, true); + return; + } + const query = "CREATE TABLE " + source.saName + ";"; + this.executeQuery(query, (errQuery) => { + return cback(errQuery, warn1, false); + }); + return; + } + ], (errQuery, warn1, war2) => { + let msg = ""; + if (warn1) { + msg += "[WARNING] Source \"" + source.name + "\" relation already exists"; + } + if (warn2) { + msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists"; + } + + if (msg === "") { + cb(errQuery); + } + + else { + cb(new Error(msg)); + } + }); + } + /** + * Asynchronously sends a message to the database to delete + * all tuples stored in the given view. + * @param view - Element to be initialized on database + * @param cb - Callback function with operation status. + * @param cb.error - Error information when the method fails. + */ + public abstract truncateView(view: View, cb: (err: Error) => void): void { + async.waterfall([ + (cback) => { + const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;"; + const strRegex = "relation \"" + source.name + "\" does not exist"; + const regex = new RegExp(strRegex); + this.executeQuery(query, (errQuery) => { + if (errQuery) { + if (regex.test(errQuery.message.toLowerCase())) { + cback(null, true); + } + + else { + cback(errQuery); + } + + return; + } + + cb(null, false); + return; + }); + } + , (warn, cback) => { + if (warn) { + cback(null, true); + return; + } + const query = "TRUNCATE TABLE " + view.name + ";"; + this.executeQuery(query, (errQuery) => cback(errQuery, false)); + return; + } + ], (errQuery, warn1) => { + if (warn1) { + const msg = "[WARNING] View \"" + view.name + "\" relation does not exists"; + cb(new Error(msg)); + } + + else { + cb(errQuery); + } + + return; + }); + } + /** + * Asynchronously sends a message to the database to delete + * all tuples stored in the given source (and staging area). + * @param source - Element to be initialized on database + * @param cb - Callback function with operation status. + * @param cb.error - Error information when the method fails. + */ + public abstract truncateSource(source: Source, cb: (err: Error) => void): void { + async.waterfall([ + (cback) => { + const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;"; + const strRegex = "relation \"" + source.name + "\" does not exist"; + const regex = new RegExp(strRegex); + this.executeQuery(query, (errQuery) => { + if (errQuery) { + if (regex.test(errQuery.message.toLowerCase())) { + cback(null, true); + } + + else { + cback(errQuery); + } + + return; + } + + cb(null, false); + return; + }); + } + , (warn, cback) => { + if (warn) { + cback(null, true); + return; + } + const query = "TRUNCATE TABLE " + source.name + ";"; + this.executeQuery(query, (errQuery) => cback(errQuery, false)); + return; + } + , (cback, warn1) => { + const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;"; + const strRegex = "relation \"" + source.saName + "\" does not exist"; + const regex = new RegExp(strRegex); + this.executeQuery(query, (errQuery) => { + if (errQuery) { + if (regex.test(errQuery.message.toLowerCase())) { + cback(null, warn1, true); + } + + else { + cback(errQuery); + } + + return; + } + + cb(null, warn1, false); + return; + }); + } + , (warn1, warn2, cback) => { + if (warn2) { + cback(null, warn1, true); + return; + } + const query = "TRUNCATE TABLE " + source.saName + ";"; + this.executeQuery(query, (errQuery) => { + return cback(errQuery, warn1, false); + }); + return; + } + ], (errQuery, warn1, war2) => { + let msg = ""; + if (warn1) { + msg += "[WARNING] Source \"" + source.name + "\" relation already exists"; + } + if (warn2) { + msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists"; + } + + if (msg === "") { + cb(errQuery); + } + + else { + cb(new Error(msg)); + } + }); + } } -- GitLab