diff --git a/src/blendb.js b/src/blendb.js new file mode 100644 index 0000000000000000000000000000000000000000..d1968a761a91f2f7bda2223104105cc2d8a77123 --- /dev/null +++ b/src/blendb.js @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blend. + * + * blend is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blend is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blend. If not, see <http://www.gnu.org/licenses/>. + */ + +'use strict'; + +const hash = require('util/hash'); + +const Source = require('core/source'); +const Transformer = require('core/transformer'); +const Aggregate = require('core/aggregate'); + +class BlenDB { + constructor() { + this.sources = new Map(); + this.transformers = new Map(); + this.aggregates = new Map(); + } + + source(name, options) { + if (this.sources.has(name)) { + return this.sources.get(name); + } + else { + const source = new Source(name, options); + this.sources.set(name, source); + return source; + } + } + + transformer(name, options) { + if (this.transformers.has(name)) { + return this.transformers.get(name); + } + else { + const transformer = new Transformer(name, options); + this.transformers.set(name, transformer); + return transformer; + } + } + + aggregate(metrics, dimensions, options) { + metrics = Array.from(metrics); + dimensions = Array.from(dimensions); + + const id = hash.sha1(metrics.sort() + dimensions.sort()); + + if (this.aggregates.has(id)) { + return this.aggregates.get(id); + } + else { + const aggregate = new Aggregate(metrics, dimensions, options); + this.aggregates.set(id, aggregate); + return aggregate; + } + } + + process() { + this.transformers.forEach((transformer) => { + const source = this.source(transformer.source); + const aggr = this.aggregate(transformer.metrics, + transformer.dimensions); + + source.forEach((doc) => { + aggr.push({ + metrics: transformer.extractMetrics(doc), + dimensions: transformer.extractDimensions(doc) + }); + }); + + // TODO: stream support + // source.stream() + // .pipe(transformer.stream()); + // .pipe(aggregate.stream()); + }); + + console.log(this.aggregates); + } +} + +module.exports = { BlenDB, Source, Transformer }; diff --git a/src/core/aggregate.js b/src/core/aggregate.js new file mode 100644 index 0000000000000000000000000000000000000000..aece430c41bc67c6ff4a7092b40144bbd0ffa507 --- /dev/null +++ b/src/core/aggregate.js @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +'use strict'; + +class Aggregate { + constructor(metrics, dimensions, options) { + this.metrics = metrics; + this.dimensions = dimensions; + + this.data = []; + } + + push(data) { + this.data.push(data); + } + + truncate() { + this.data = []; + } +} + +module.exports = Aggregate; diff --git a/src/core/aggregator.js b/src/core/aggregator.js deleted file mode 100644 index d46a3dd9484381ef771971ac5d5b103e5263df37..0000000000000000000000000000000000000000 --- a/src/core/aggregator.js +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre - * Departamento de Informatica - Universidade Federal do Parana - * - * This file is part of blendb. - * - * blendb is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * blendb is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with blendb. If not, see <http://www.gnu.org/licenses/>. - */ - -'use strict'; - -const async = require('async'); -const mongo = require('core/mongo'); -const serializer = require('util/serializer'); -const hash = require('util/hash'); - -class Aggregator { - removeAggregate(id, callback) { - const aggregates = mongo.db.collection('meta.aggregates'); - aggregates.findOneAndDelete({ _id: id }, (err) => { - if (err) { - callback(err); - return; - } - - let aggr = mongo.db.collection('aggr.' + id); - aggr.remove({}, callback); - }); - } - - createAggregate(dimensions, metrics, callback) { - const aggregates = mongo.db.collection('meta.aggregates'); - - let doc = { - _id: hash.sha1(serializer.dump(dimensions) + - serializer.dump(metrics)), - dimensions: dimensions, - metrics: metrics - }; - - aggregates.insert(doc, (err) => { - if (err) { - callback(err); - return; - } - - callback(null, doc); - }); - } - - rebuildBaseAggregates(callback) { - this.cleanAggregates((err) => { - if (err) { - callback(err); - return; - } - - this.buildBaseAggregates(callback); - }); - } - - buildBaseAggregates(callback) { - let classes = mongo.db.collection('meta.classes'); - - classes.find({}).toArray((err, result) => { - if (err) { - callback(err); - return; - } - - async.map(result, (cls, cb) => { this.buildBaseAggregateFromClass(cls, cb); }, (err) => { - if (err) { - callback(err); - return; - } - - return callback(null); - }); - }); - } - - buildBaseAggregateFromClass(cls, callback) { - this.createAggregate(cls.dimensions, cls.metrics, (err, aggr) => { - const raw = mongo.db.collection('raw.' + cls.name); - const aggrData = mongo.db.collection('aggr.' + aggr._id); - - const functions = serializer.load(cls.functions); - - raw.find({}).forEach((doc) => { - let data = { - dimensions: functions.extractDimensions.apply(doc), - metrics: functions.extractMetrics.apply(doc) - }; - - // TODO: aggrData.insert(data); - }, callback); - }); - } - - cleanAggregates(callback) { - let aggregates = mongo.db.collection('meta.aggregates'); - - aggregates.find({}).toArray((err, result) => { - if (err) { - callback(err); - return; - } - - async.map(result, (aggr, callback) => { - let aggrCol = mongo.db.collection('aggr.' + aggr.name); - aggrCol.remove({}, (err) => { - if (err) { - callback(err); - return; - } - - aggregates.remove({ _id: aggr._id }, callback); - }); - }, - (err) => { - if (err) { - callback(err); - return; - } - - callback(null); - }); - }); - } - - query(metrics, dimensions, callback) { - this.findClosestAggregate(metrics, dimensions, (err, aggr) => { - if (err) { - callback(err); - return; - } - - callback(null, null); - }); - } - - findClosestAggregate(metrics, dimensions, callback) { - let aggregates = mongo.db.collection('meta.aggregates'); - - aggregates.find({ - metrics: { - $all: metrics - }, - dimensions: { - $all: dimensions - } - }).toArray((err, result) => { - if (err) { - callback(err); - return; - } - - if ((!result) || (result.length <= 0)) { - callback('Query could not be aswered, no aggregate available.'); - return; - } - - // fetch the closest aggregation available - let closestAggr; - for (const aggr of result) { - if ((!closestAggr) || - (aggr.dimensions.length < closestAggr.dimensions.length)) { - closestAggr = aggr; - } - } - - callback(null, closestAggr); - }); - } -} - -module.exports = new Aggregator(); diff --git a/src/core/source.js b/src/core/source.js new file mode 100644 index 0000000000000000000000000000000000000000..07ec973568fadec1ef9df03ab2a22b9b4db50404 --- /dev/null +++ b/src/core/source.js @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +'use strict'; + +class Source { + constructor(name, options) { + this.name = name; + + this.data = []; + } + + push(doc) { + this.data.push(doc); + } + + forEach(callback) { + this.data.forEach(callback); + } +} + +module.exports = Source; diff --git a/src/core/transformer.js b/src/core/transformer.js new file mode 100644 index 0000000000000000000000000000000000000000..c8e27a257777a48494a2533d65b251d205569163 --- /dev/null +++ b/src/core/transformer.js @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +'use strict'; + +class Transformer { + constructor(name, options) { + this.source = options.source || null; + this.metrics = options.metrics || []; + this.dimensions = options.dimensions || []; + this.extractors = { + metrics: options.extractors.metrics || ((doc) => null), + dimensions: options.extractors.dimensions || ((doc) => null) + }; + } + + extractMetrics(doc) { + return this.extractors.metrics(doc); + } + + extractDimensions(doc) { + return this.extractors.dimensions(doc); + } +} + +module.exports = Transformer; diff --git a/src/util/hash.js b/src/util/hash.js index abe9f7a01aa68290151323b95315a0a88d0765ad..9a7501f66f812ab18b2075ade47cac16218fc6d6 100644 --- a/src/util/hash.js +++ b/src/util/hash.js @@ -23,9 +23,26 @@ const crypto = require('crypto'); class Hash { - sha1(content) { + sha1(...objects) { let hash = crypto.createHash('sha1'); - hash.update(content); + + objects + .map((obj) => { + switch (typeof obj) { + case 'string': + return obj; + case 'object': + return JSON.stringify(obj); + default: + throw new TypeError(typeof obj + + ' cannot be hashed'); + } + }) + .sort() + .map((objStr) => { + hash.update(objStr); + }); + return hash.digest('hex'); } } diff --git a/src/util/serializer.js b/src/util/serializer.js deleted file mode 100644 index 9c10e68cdf45e1389ee6cef8d35ff5465abb224a..0000000000000000000000000000000000000000 --- a/src/util/serializer.js +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre - * Departamento de Informatica - Universidade Federal do Parana - * - * This file is part of blendb. - * - * blendb is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * blendb is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with blendb. If not, see <http://www.gnu.org/licenses/>. - */ - -'use strict'; - -class Serializer { - dump(obj) { - return JSON.stringify(obj, (key, value) => { - if (typeof value === 'function') { - return value.toString() - .replace(/[\n\r\t]/g, '') - .replace(/ +/g, ' '); - } - - return value; - }); - } - - load(str) { - return JSON.parse(str, (key, value) => { - if (key === '') { - return value; - } - - if (typeof value === 'string') { - let rfunc = /function[^\(]*\(([^\)]*)\)[^\{]*\{(.*)\}[^\}]*$/; - let match = value.replace(/\n/g, '').match(rfunc); - - if (match) { - let args = match[1].split(',') - .map((arg) => arg.replace(/\s+/, '')); - return new Function(args, match[2]); - } - } - - return value; - }); - } -} - -module.exports = new Serializer(); diff --git a/test.js b/test.js new file mode 100644 index 0000000000000000000000000000000000000000..0bd182e7c4a7c69b232d55a6359f022af0ce459f --- /dev/null +++ b/test.js @@ -0,0 +1,66 @@ +#!/usr/bin/env node +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +'use strict'; + +// Add the ./src directory to require's search path to facilitate import +// modules later on (avoiding the require('../../../../module') problem). +require('app-module-path').addPath(__dirname + '/src'); + +// connect to mongodb +// const mongo = require('core/mongo'); +// mongo.connect('mongodb://pyke/blend'); + +const blendb = require('blendb'); + +const db = new blendb.BlenDB(); + +const netSource = db.source('networkData'); + +for (let i = 0; i < 100; i++) { + netSource.push({ + a: i + }); +} + +db.transformer('networkTraffic', { + source: 'networkData', + metrics: ['met:downBytes', 'met:upBytes'], + dimensions: ['dim:date', 'dim:city', 'dim:state', 'dim:point'], + extractors: { + metrics: function extractMetrics(doc) { + return { + 'met:downBytes': 5464, + 'met:upBytes': 342 + }; + }, + dimensions: function extractDimensions(doc) { + return { + 'dim:date': '2016/06/12', + 'dim:city': 41442, + 'dim:state': 41, + 'dim:point': 5344 + }; + } + } +}); + +db.process();