Skip to content
Snippets Groups Projects
Commit 193b0111 authored by Lucas Fernandes de Oliveira's avatar Lucas Fernandes de Oliveira
Browse files

[WIP] [ci-skip] Remake postgres adapter

parent 2f321571
Branches issue/63
No related tags found
No related merge requests found
Pipeline #18865 skipped
...@@ -24,7 +24,7 @@ import { Source } from "../core/source"; ...@@ -24,7 +24,7 @@ import { Source } from "../core/source";
import { FilterOperator } from "../core/filter"; import { FilterOperator } from "../core/filter";
import { Pool, PoolConfig } from "pg"; import { Pool, PoolConfig } from "pg";
import { DataType } from "../common/types"; import { DataType } from "../common/types";
import { each } from "async"; import * as async from "async";
/** Adapter which connects with a PostgreSQL database. */ /** Adapter which connects with a PostgreSQL database. */
export class PostgresAdapter extends SQLAdapter { export class PostgresAdapter extends SQLAdapter {
...@@ -186,7 +186,7 @@ export class PostgresAdapter extends SQLAdapter { ...@@ -186,7 +186,7 @@ export class PostgresAdapter extends SQLAdapter {
}); });
queryCopy.push(this.persistSource(source)); queryCopy.push(this.persistSource(source));
each(queryCopy, (query, callback) => { async.each(queryCopy, (query, callback) => {
this.executeQuery(query, callback); this.executeQuery(query, callback);
}, (err: Error) => { }, (err: Error) => {
if (err) { if (err) {
...@@ -198,4 +198,287 @@ export class PostgresAdapter extends SQLAdapter { ...@@ -198,4 +198,287 @@ export class PostgresAdapter extends SQLAdapter {
return; return;
}); });
} }
/**
* Asynchronously sends a message to the database to create
* database elements (tables) required to maniputate a view.
* @param view - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract initView(view: View, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, false);
}
else {
cback(errQuery);
}
return;
}
cb(null, true);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "CREATE TABLE " + view.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
], (errQuery, warn1) => {
if (warn1) {
const msg = "[WARNING] View \"" + view.name + "\" relation already exists";
cb(new Error(msg));
}
else {
cb(errQuery);
}
return;
});
}
/**
* Asynchronously sends a message to the database to create
* database elements (tables) required to maniputate a source.
* @param source - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract initSource(source: Source, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, false);
}
else {
cback(errQuery);
}
return;
}
cb(null, true);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "CREATE TABLE " + source.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
, (cback, warn1) => {
const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;";
const strRegex = "relation \"" + source.saName + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, warn1, false);
}
else {
cback(errQuery);
}
return;
}
cb(null, warn1, true);
return;
});
}
, (warn1, warn2, cback) => {
if (warn2) {
cback(null, warn1, true);
return;
}
const query = "CREATE TABLE " + source.saName + ";";
this.executeQuery(query, (errQuery) => {
return cback(errQuery, warn1, false);
});
return;
}
], (errQuery, warn1, war2) => {
let msg = "";
if (warn1) {
msg += "[WARNING] Source \"" + source.name + "\" relation already exists";
}
if (warn2) {
msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists";
}
if (msg === "") {
cb(errQuery);
}
else {
cb(new Error(msg));
}
});
}
/**
* Asynchronously sends a message to the database to delete
* all tuples stored in the given view.
* @param view - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract truncateView(view: View, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + view.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, true);
}
else {
cback(errQuery);
}
return;
}
cb(null, false);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "TRUNCATE TABLE " + view.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
], (errQuery, warn1) => {
if (warn1) {
const msg = "[WARNING] View \"" + view.name + "\" relation does not exists";
cb(new Error(msg));
}
else {
cb(errQuery);
}
return;
});
}
/**
* Asynchronously sends a message to the database to delete
* all tuples stored in the given source (and staging area).
* @param source - Element to be initialized on database
* @param cb - Callback function with operation status.
* @param cb.error - Error information when the method fails.
*/
public abstract truncateSource(source: Source, cb: (err: Error) => void): void {
async.waterfall([
(cback) => {
const query = "SELECT COUNT(1) FROM " + source.name + " LIMIT 1;";
const strRegex = "relation \"" + source.name + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, true);
}
else {
cback(errQuery);
}
return;
}
cb(null, false);
return;
});
}
, (warn, cback) => {
if (warn) {
cback(null, true);
return;
}
const query = "TRUNCATE TABLE " + source.name + ";";
this.executeQuery(query, (errQuery) => cback(errQuery, false));
return;
}
, (cback, warn1) => {
const query = "SELECT COUNT(1) FROM " + source.saName + " LIMIT 1;";
const strRegex = "relation \"" + source.saName + "\" does not exist";
const regex = new RegExp(strRegex);
this.executeQuery(query, (errQuery) => {
if (errQuery) {
if (regex.test(errQuery.message.toLowerCase())) {
cback(null, warn1, true);
}
else {
cback(errQuery);
}
return;
}
cb(null, warn1, false);
return;
});
}
, (warn1, warn2, cback) => {
if (warn2) {
cback(null, warn1, true);
return;
}
const query = "TRUNCATE TABLE " + source.saName + ";";
this.executeQuery(query, (errQuery) => {
return cback(errQuery, warn1, false);
});
return;
}
], (errQuery, warn1, war2) => {
let msg = "";
if (warn1) {
msg += "[WARNING] Source \"" + source.name + "\" relation already exists";
}
if (warn2) {
msg += "[WARNING] Source \"" + source.name + "\" staging area relation already exists";
}
if (msg === "") {
cb(errQuery);
}
else {
cb(new Error(msg));
}
});
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment