Skip to content
Snippets Groups Projects
Commit 383e79db authored by Eduardo L. Buratti's avatar Eduardo L. Buratti
Browse files

Refactor core module

parent 23c8b800
No related branches found
No related tags found
No related merge requests found
/*
* Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blend.
*
* blend is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blend is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blend. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
const hash = require('util/hash');
const Source = require('core/source');
const Transformer = require('core/transformer');
const Aggregate = require('core/aggregate');
class BlenDB {
constructor() {
this.sources = new Map();
this.transformers = new Map();
this.aggregates = new Map();
}
source(name, options) {
if (this.sources.has(name)) {
return this.sources.get(name);
}
else {
const source = new Source(name, options);
this.sources.set(name, source);
return source;
}
}
transformer(name, options) {
if (this.transformers.has(name)) {
return this.transformers.get(name);
}
else {
const transformer = new Transformer(name, options);
this.transformers.set(name, transformer);
return transformer;
}
}
aggregate(metrics, dimensions, options) {
metrics = Array.from(metrics);
dimensions = Array.from(dimensions);
const id = hash.sha1(metrics.sort() + dimensions.sort());
if (this.aggregates.has(id)) {
return this.aggregates.get(id);
}
else {
const aggregate = new Aggregate(metrics, dimensions, options);
this.aggregates.set(id, aggregate);
return aggregate;
}
}
process() {
this.transformers.forEach((transformer) => {
const source = this.source(transformer.source);
const aggr = this.aggregate(transformer.metrics,
transformer.dimensions);
source.forEach((doc) => {
aggr.push({
metrics: transformer.extractMetrics(doc),
dimensions: transformer.extractDimensions(doc)
});
});
// TODO: stream support
// source.stream()
// .pipe(transformer.stream());
// .pipe(aggregate.stream());
});
console.log(this.aggregates);
}
}
module.exports = { BlenDB, Source, Transformer };
......@@ -20,39 +20,21 @@
'use strict';
class Serializer {
dump(obj) {
return JSON.stringify(obj, (key, value) => {
if (typeof value === 'function') {
return value.toString()
.replace(/[\n\r\t]/g, '')
.replace(/ +/g, ' ');
}
return value;
});
}
class Aggregate {
constructor(metrics, dimensions, options) {
this.metrics = metrics;
this.dimensions = dimensions;
load(str) {
return JSON.parse(str, (key, value) => {
if (key === '') {
return value;
this.data = [];
}
if (typeof value === 'string') {
let rfunc = /function[^\(]*\(([^\)]*)\)[^\{]*\{(.*)\}[^\}]*$/;
let match = value.replace(/\n/g, '').match(rfunc);
if (match) {
let args = match[1].split(',')
.map((arg) => arg.replace(/\s+/, ''));
return new Function(args, match[2]);
}
push(data) {
this.data.push(data);
}
return value;
});
truncate() {
this.data = [];
}
}
module.exports = new Serializer();
module.exports = Aggregate;
/*
* Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blendb.
*
* blendb is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blendb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
const async = require('async');
const mongo = require('core/mongo');
const serializer = require('util/serializer');
const hash = require('util/hash');
class Aggregator {
removeAggregate(id, callback) {
const aggregates = mongo.db.collection('meta.aggregates');
aggregates.findOneAndDelete({ _id: id }, (err) => {
if (err) {
callback(err);
return;
}
let aggr = mongo.db.collection('aggr.' + id);
aggr.remove({}, callback);
});
}
createAggregate(dimensions, metrics, callback) {
const aggregates = mongo.db.collection('meta.aggregates');
let doc = {
_id: hash.sha1(serializer.dump(dimensions) +
serializer.dump(metrics)),
dimensions: dimensions,
metrics: metrics
};
aggregates.insert(doc, (err) => {
if (err) {
callback(err);
return;
}
callback(null, doc);
});
}
rebuildBaseAggregates(callback) {
this.cleanAggregates((err) => {
if (err) {
callback(err);
return;
}
this.buildBaseAggregates(callback);
});
}
buildBaseAggregates(callback) {
let classes = mongo.db.collection('meta.classes');
classes.find({}).toArray((err, result) => {
if (err) {
callback(err);
return;
}
async.map(result, (cls, cb) => { this.buildBaseAggregateFromClass(cls, cb); }, (err) => {
if (err) {
callback(err);
return;
}
return callback(null);
});
});
}
buildBaseAggregateFromClass(cls, callback) {
this.createAggregate(cls.dimensions, cls.metrics, (err, aggr) => {
const raw = mongo.db.collection('raw.' + cls.name);
const aggrData = mongo.db.collection('aggr.' + aggr._id);
const functions = serializer.load(cls.functions);
raw.find({}).forEach((doc) => {
let data = {
dimensions: functions.extractDimensions.apply(doc),
metrics: functions.extractMetrics.apply(doc)
};
// TODO: aggrData.insert(data);
}, callback);
});
}
cleanAggregates(callback) {
let aggregates = mongo.db.collection('meta.aggregates');
aggregates.find({}).toArray((err, result) => {
if (err) {
callback(err);
return;
}
async.map(result, (aggr, callback) => {
let aggrCol = mongo.db.collection('aggr.' + aggr.name);
aggrCol.remove({}, (err) => {
if (err) {
callback(err);
return;
}
aggregates.remove({ _id: aggr._id }, callback);
});
},
(err) => {
if (err) {
callback(err);
return;
}
callback(null);
});
});
}
query(metrics, dimensions, callback) {
this.findClosestAggregate(metrics, dimensions, (err, aggr) => {
if (err) {
callback(err);
return;
}
callback(null, null);
});
}
findClosestAggregate(metrics, dimensions, callback) {
let aggregates = mongo.db.collection('meta.aggregates');
aggregates.find({
metrics: {
$all: metrics
},
dimensions: {
$all: dimensions
}
}).toArray((err, result) => {
if (err) {
callback(err);
return;
}
if ((!result) || (result.length <= 0)) {
callback('Query could not be aswered, no aggregate available.');
return;
}
// fetch the closest aggregation available
let closestAggr;
for (const aggr of result) {
if ((!closestAggr) ||
(aggr.dimensions.length < closestAggr.dimensions.length)) {
closestAggr = aggr;
}
}
callback(null, closestAggr);
});
}
}
module.exports = new Aggregator();
/*
* Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blendb.
*
* blendb is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blendb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
class Source {
constructor(name, options) {
this.name = name;
this.data = [];
}
push(doc) {
this.data.push(doc);
}
forEach(callback) {
this.data.forEach(callback);
}
}
module.exports = Source;
/*
* Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blendb.
*
* blendb is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blendb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
class Transformer {
constructor(name, options) {
this.source = options.source || null;
this.metrics = options.metrics || [];
this.dimensions = options.dimensions || [];
this.extractors = {
metrics: options.extractors.metrics || ((doc) => null),
dimensions: options.extractors.dimensions || ((doc) => null)
};
}
extractMetrics(doc) {
return this.extractors.metrics(doc);
}
extractDimensions(doc) {
return this.extractors.dimensions(doc);
}
}
module.exports = Transformer;
......@@ -23,9 +23,26 @@
const crypto = require('crypto');
class Hash {
sha1(content) {
sha1(...objects) {
let hash = crypto.createHash('sha1');
hash.update(content);
objects
.map((obj) => {
switch (typeof obj) {
case 'string':
return obj;
case 'object':
return JSON.stringify(obj);
default:
throw new TypeError(typeof obj +
' cannot be hashed');
}
})
.sort()
.map((objStr) => {
hash.update(objStr);
});
return hash.digest('hex');
}
}
......
test.js 0 → 100644
#!/usr/bin/env node
/*
* Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blendb.
*
* blendb is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blendb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
// Add the ./src directory to require's search path to facilitate import
// modules later on (avoiding the require('../../../../module') problem).
require('app-module-path').addPath(__dirname + '/src');
// connect to mongodb
// const mongo = require('core/mongo');
// mongo.connect('mongodb://pyke/blend');
const blendb = require('blendb');
const db = new blendb.BlenDB();
const netSource = db.source('networkData');
for (let i = 0; i < 100; i++) {
netSource.push({
a: i
});
}
db.transformer('networkTraffic', {
source: 'networkData',
metrics: ['met:downBytes', 'met:upBytes'],
dimensions: ['dim:date', 'dim:city', 'dim:state', 'dim:point'],
extractors: {
metrics: function extractMetrics(doc) {
return {
'met:downBytes': 5464,
'met:upBytes': 342
};
},
dimensions: function extractDimensions(doc) {
return {
'dim:date': '2016/06/12',
'dim:city': 41442,
'dim:state': 41,
'dim:point': 5344
};
}
}
});
db.process();
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment