diff --git a/specs/blendb-api-v1.raml b/specs/blendb-api-v1.raml index 2693c9cae169ea0dbc0c6d65322bc9d88f869b71..4e4c0d5793bf30219ac76237e0ca9ce635a782fa 100644 --- a/specs/blendb-api-v1.raml +++ b/specs/blendb-api-v1.raml @@ -5,59 +5,6 @@ version: v1 baseUri: http://blendb.c3sl.ufpr.br/api/{version} mediaType: application/json -securitySchemes: - - oauth_2_0: - description: | - OAuth2 is a protocol that lets apps request authorization to - private details in the system while avoiding the use of passwords. - This is preferred over Basic Authentication because tokens can be - limited to specific types of data, and can be revoked by users at - any time. - type: OAuth 2.0 - describedBy: - headers: - Authorization: - description: | - Used to send a valid OAuth 2 access token. Do not use - together with the "access_token" query string parameter. - type: string - queryParameters: - access_token: - description: | - Used to send a valid OAuth 2 access token. Do not use - together with the "Authorization" header. - type: string - responses: - 401: - description: | - Bad or expired token. This can happen if access token - has expired or has been revoked by the user. - body: - application/json: - example: | - { - id: "invalid_oauth_token", - message: "Bad or expired token. This can happen if access token has expired or has been revoked by the user." - } - 403: - description: | - Bad OAuth2 request (wrong consumer key, bad nonce, - expired timestamp, ...). - body: - application/json: - example: | - { - id: "invalid_oauth_request", - message: "Bad OAuth2 request (wrong consumer key, bad nonce, expired timestamp, ...)." - } - settings: - authorizationUri: http://simmc.c3sl.ufpr.br/oauth/authorize - accessTokenUri: http://simmc.c3sl.ufpr.br/oauth/access_token - authorizationGrants: [ code, token ] - scopes: - - "user" - - "user:email" - resourceTypes: - base: get?: &common @@ -272,20 +219,21 @@ traits: Fields to be returned. type: string -/metrics: +/meta/diagram: + get: + +/meta/metrics: description: | A Metric represents a statistic that can be queried to generate reports. This collection allows the user to list all the metrics available in the system and their descriptions. - securedBy: [ null, oauth_2_0 ] get: -/dimensions: +/meta/dimensions: description: | A Dimension allows the data to be aggregated by one or more columns. This collection allows the user to list all the dimensions available in the system and their descriptions. - securedBy: [ null, oauth_2_0 ] get: /data: diff --git a/src/api/controllers/data.ts b/src/api/controllers/data.ts index 855cca85b06d91ef68a308660c58ea53fb31374d..14ebfa70151cb02e6518bbee7b2503a9def16f98 100644 --- a/src/api/controllers/data.ts +++ b/src/api/controllers/data.ts @@ -22,10 +22,22 @@ import * as express from "express"; export class DataCtrl { public static read(req: express.Request, res: express.Response, next: express.NextFunction) { + const blendb = req.app.locals.blendb; + let metrics = req.query.metrics.split(","); let dimensions = req.query.dimensions.split(","); - res.status(500).json({ message: "Query execution failed " + - "because of an unknown error." }); + blendb.query({ metrics, dimensions }, (err: any, data: any[]) => { + if (err) { + console.error(err); + res.status(500).json({ message: "Query execution failed " + + "because of an unknown error." }); + return; + } + + console.log(data); + + res.json({ data }); + }); } } diff --git a/src/api/controllers/meta.ts b/src/api/controllers/meta.ts new file mode 100644 index 0000000000000000000000000000000000000000..d281108e1c6a149e129ada5513ca2b31337574f8 --- /dev/null +++ b/src/api/controllers/meta.ts @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +import * as express from "express"; + +export class MetaCtrl { + public static getDiagram(req: express.Request, res: express.Response, next: express.NextFunction) { + const blendb = req.app.locals.blendb; + + let graph: { + nodes: any[], + links: any[] + } = { + nodes: [], + links: [] + }; + + for (const [name, source] of blendb.sources) { + graph.nodes.push({ + id: source.id, + name: name, + tags: source.tags + }); + } + + for (const [name, transformer] of blendb.transformers) { + graph.nodes.push({ + id: transformer.id, + name: name + }); + } + + for (const [source, transformer] of blendb.pipes) { + graph.links.push({ + source: source.id, + target: transformer.id + }); + } + + res.render("meta/diagram", { graph: graph }); + } +} diff --git a/src/api/router-v1.ts b/src/api/router-v1.ts index 1ac9ba57447635d537afc4507616cffd8ee3cbbc..d444868891a35bfd5b8b88ae88e7b77575632690 100644 --- a/src/api/router-v1.ts +++ b/src/api/router-v1.ts @@ -21,10 +21,13 @@ const osprey = require("osprey"); // import controllers +import { MetaCtrl } from "./controllers/meta"; import { DataCtrl } from "./controllers/data"; -import { CollectCtrl } from "./controllers/collect"; +// import { CollectCtrl } from "./controllers/collect"; export const router = osprey.Router(); router.get("/data", DataCtrl.read); -router.post("/collect/{class}", CollectCtrl.write); +// router.post("/collect/{class}", CollectCtrl.write); + +router.get("/meta/diagram", MetaCtrl.getDiagram); diff --git a/src/api/views/meta/diagram.pug b/src/api/views/meta/diagram.pug new file mode 100644 index 0000000000000000000000000000000000000000..2c07f49edca13a8f79851495d1db01be6835ff43 --- /dev/null +++ b/src/api/views/meta/diagram.pug @@ -0,0 +1,109 @@ +doctype html +html(lang="en", charset="utf-8") + head + title BlenDB Diagram + style. + * { + margin: 0; + padding: 0; + border: 0; + font-size: 100%; + font: inherit; + vertical-align: baseline; + } + + body { + overflow: hidden; + } + + .link line { + stroke: #999; + stroke-opacity: 0.6; + } + + .node circle { + stroke: #fff; + stroke-width: 1.5px; + } + + body + script(type="text/javascript", src="https://cdnjs.cloudflare.com/ajax/libs/d3/4.2.6/d3.min.js") + script(type="text/javascript"). + var graph = JSON.parse('!{JSON.stringify(graph)}'); + + var color = d3.scaleOrdinal(d3.schemeCategory20); + + var svg = d3.select("body").append("svg"); + + var link = svg.append("g") + .attr("class", "link") + .selectAll("line") + .data(graph.links) + .enter().append("line"); + + var node = svg.append("g") + .attr("class", "node") + .selectAll("circle") + .data(graph.nodes) + .enter() + .append("circle") + .attr("r", 10) + .call(d3.drag() + .on("start", dragstarted) + .on("drag", dragged) + .on("end", dragended)); + + node.append("title") + .text(function(d) { return d.name; }); + + var simulation = d3.forceSimulation() + .force("link", d3.forceLink().id(function(d) { return d.id; })) + .force("charge", d3.forceManyBody()); + + simulation + .nodes(graph.nodes) + .on("tick", tick); + + simulation + .force("link") + .links(graph.links); + + d3.select(window).on("resize", resize); + resize(); + + function tick() { + link + .attr("x1", function(d) { return d.source.x; }) + .attr("y1", function(d) { return d.source.y; }) + .attr("x2", function(d) { return d.target.x; }) + .attr("y2", function(d) { return d.target.y; }); + + node + .attr("cx", function(d) { return d.x; }) + .attr("cy", function(d) { return d.y; }); + } + + function dragstarted(d) { + if (!d3.event.active) simulation.alphaTarget(0.3).restart(); + d.fx = d.x; + d.fy = d.y; + } + + function dragged(d) { + d.fx = d3.event.x; + d.fy = d3.event.y; + } + + function dragended(d) { + if (!d3.event.active) simulation.alphaTarget(0); + d.fx = null; + d.fy = null; + } + + function resize() { + var width = window.innerWidth; + var height = window.innerHeight; + svg.attr("width", width).attr("height", height); + /*force.size([width, height]).resume();*/ + simulation.force("center", d3.forceCenter(width / 2, height / 2)); + } diff --git a/src/core/aggregate.ts b/src/core/aggregate.ts index a1e3ccf54da6dde4b858b744bb34124995d9c685..1c1102d99c43c6980cf9ddd2d109b6b8bac68165 100644 --- a/src/core/aggregate.ts +++ b/src/core/aggregate.ts @@ -18,53 +18,42 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ -export interface IAggregateData { - metrics: any; - dimensions: any; -} - -export class Aggregate { - public metrics: string[]; - public dimensions: string[]; - private data: IAggregateData[]; +// const miss = require("mississippi"); +const uuid = require("node-uuid"); - constructor(metrics: string[], dimensions: string[], options?: any) { - this.metrics = metrics; - this.dimensions = dimensions; +import { Server } from "./server"; +import { Storage } from "./storage"; - this.data = []; - } +export interface IAggregateOptions { + tags: string[]; +} - public push(data: IAggregateData) { - this.data.push(data); +export class Aggregate { + public id: string; + private server: Server; + private storage: Storage; + private data: any[]; + + constructor(server: Server, options?: IAggregateOptions) { + this.id = uuid.v4(); + this.server = server; + this.storage = server.storage.l3; + this.data = new Array(); } - public truncate() { - this.data = []; + public insert(dimensions: any, metrics: any): void { + this.data.push({ dimensions, metrics }); } - public find(query: any) { - let result: any = []; - - this.data.forEach((doc: IAggregateData) => { - let match = true; - - for (let key in query) { - if (query.hasOwnProperty(key)) { - let value = query[key]; - - if (doc.dimensions[key] !== value) { - match = false; - break; - } - } - } - - if (match) { - result.push(doc); - } - }); - - return result; - } + // public stream() { + // let i = 0; + // return miss.from.obj((size: number, next: Function) => { + // if (i >= this.data.length) { + // next(null, null); + // return; + // } + // + // next(null, this.data[i++]); + // }); + // } } diff --git a/src/core/server.spec.ts b/src/core/server.spec.ts deleted file mode 100644 index 9e0dfbbb0d2f9fa30333608a826674ac3e7d0659..0000000000000000000000000000000000000000 --- a/src/core/server.spec.ts +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre - * Departamento de Informatica - Universidade Federal do Parana - * - * This file is part of blendb. - * - * blendb is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * blendb is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with blendb. If not, see <http://www.gnu.org/licenses/>. - */ - -import { expect } from "chai"; - -import { Server } from "./server"; - -describe("server class", () => { - it("should be able to create and retrieve sources", () => { - const server = new Server(); - - // create two sources - const source1 = server.source("source1"); - const source2 = server.source("source2"); - - // retrieve the first one - const retrieved = server.source("source1"); - - // check if sources were actually created/retrieved - expect(source1).to.be.an("object"); - expect(source2).to.be.an("object"); - expect(retrieved).to.be.an("object"); - - // check if the two created sources are different - expect(source1).to.not.be.equal(source2); - - // check if the retrieved source is the same as the created one - expect(source1).to.be.equal(retrieved); - }); - - it("should be able to create and retrieve aggregates", () => { - const server = new Server(); - - // create two aggregates - const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); - const aggr2 = server.aggregate(["met:two"], ["dim:one", "dim:two"]); - - // retrieve the first one - const retrieved = server.aggregate(["met:one"], ["dim:two", "dim:one"]); - - // check if aggregates were actually created/retrieved - expect(aggr1).to.be.an("object"); - expect(aggr2).to.be.an("object"); - expect(retrieved).to.be.an("object"); - - // check if the two created aggregates are different - expect(aggr1).to.not.be.equal(aggr2); - - // check if the retrieved aggregate is the same as the created one - expect(aggr1).to.be.equal(retrieved); - }); - - it("should be able to create and retrieve transformers", () => { - const server = new Server(); - - const source1 = server.source("source1"); - const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); - const aggr2 = server.aggregate(["met:one", "met:two"], ["dim:one"]); - - // create two transformers - const transformer1 = server.transformer("transformer1", { - source: source1, - destination: aggr1, - functions: { - map: (doc: any, emit: Function) => { return; }, - reduce: (dimensions: any, metrics: any) => { return {}; } - } - }); - const transformer2 = server.transformer("transformer2", { - source: source1, - destination: aggr2, - functions: { - map: (doc: any, emit: Function) => { return; }, - reduce: (dimensions: any, metrics: any) => { return {}; } - } - }); - - // retrieve the first one - const retrieved = server.transformer("transformer1"); - - // check if transformers were actually created/retrieved - expect(transformer1).to.be.an("object"); - expect(transformer2).to.be.an("object"); - expect(retrieved).to.be.an("object"); - - // check if the two created transformers are different - expect(transformer1).to.not.be.equal(transformer2); - - // check if the retrieved transformer is the same as the created one - expect(transformer1).to.be.equal(retrieved); - }); - - it("should fail to create two transformer with name collision", () => { - const server = new Server(); - - const source1 = server.source("source1"); - const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); - const aggr2 = server.aggregate(["met:one", "met:two"], ["dim:one"]); - - server.transformer("transformer1", { - source: source1, - destination: aggr1, - functions: { - map: (doc: any, emit: Function) => { return; }, - reduce: (dimensions: any, metrics: any) => { return {}; } - } - }); - - expect(() => { - server.transformer("transformer1", { - source: source1, - destination: aggr2, - functions: { - map: (doc: any, emit: Function) => { return; }, - reduce: (dimensions: any, metrics: any) => { return {}; } - } - }); - }).to.throw(Error); - }); - - it("should fail to retrieve a transformer that doesn't exist", () => { - const server = new Server(); - - expect(() => { - server.transformer("transformerX"); - }).to.throw(Error); - }); -}); diff --git a/src/core/server.ts b/src/core/server.ts index 235cbca2bcbe488b6eaacb8dc34c8969e00583ee..5c0b616f1efd875918946f1f494779aa8d1f81a1 100644 --- a/src/core/server.ts +++ b/src/core/server.ts @@ -18,54 +18,72 @@ * along with blend. If not, see <http://www.gnu.org/licenses/>. */ -import { Hash } from "../util/hash"; +const miss = require("mississippi"); -import { Source } from "./source"; +import { Source, ISourceOptions } from "./source"; import { Transformer, ITransformerOptions } from "./transformer"; -import { Aggregate } from "./aggregate"; +// import { Aggregate, IAggregateOptions } from "./aggregate"; +import { Storage } from "./storage"; + +import { MongoStorage } from "../storage/mongo"; +import { RedisStorage } from "../storage/redis"; +import { MonetStorage } from "../storage/monet"; + +export interface IQuery { + metrics: string[]; + dimensions: string[]; +} export class Server { - private sources: Map<string, Source>; - private transformers: Map<string, Transformer>; - private aggregates: Map<string, Aggregate>; + public storage: { + l1: Storage, + l2: Storage, + l3: Storage + }; + public sources: Map<string, Source>; + public transformers: Map<string, Transformer>; + // public pipes: Array<[Source, Transformer]>; + // public aggregates: Map<string, Aggregate>; constructor() { + this.storage = { + l1: new MongoStorage(), + l2: new RedisStorage(), + l3: new MonetStorage() + }; + this.sources = new Map(); this.transformers = new Map(); - this.aggregates = new Map(); + // this.pipes = new Array(); + // this.aggregates = new Map(); } - public source(name: string, options?: any) { - if (this.sources.has(name)) { - return this.sources.get(name); - } - else { - const source = new Source(name, options); + public source(name: string, options?: ISourceOptions): Source { + if (typeof options !== "undefined") { + if (this.sources.has(name)) { + throw new Error("A source named '" + name + "' already exists"); + } + + const source = new Source(this, options); this.sources.set(name, source); return source; } - } - - public aggregate(metrics: string[], dimensions: string[], options?: any) { - const id = Hash.sha1(metrics.sort(), dimensions.sort()); - - if (this.aggregates.has(id)) { - return this.aggregates.get(id); - } else { - const aggregate = new Aggregate(metrics, dimensions, options); - this.aggregates.set(id, aggregate); - return aggregate; + if (!this.sources.has(name)) { + throw new Error("A source named '" + name + "' does not exist"); + } + + return this.sources.get(name); } } - public transformer(name: string, options?: ITransformerOptions) { + public transformer(name: string, options?: ITransformerOptions): Transformer { if (typeof options !== "undefined") { if (this.transformers.has(name)) { throw new Error("A transformer named '" + name + "' already exists"); } - const transformer = new Transformer(name, options); + const transformer = new Transformer(this, options); this.transformers.set(name, transformer); return transformer; } @@ -77,4 +95,130 @@ export class Server { return this.transformers.get(name); } } + + // public pipe(source: Source, transformer: Transformer): void { + // function findPipe(pipe: [Source, Transformer]) { + // return (pipe[0] === source) || (pipe[1] === transformer); + // } + // + // if (this.pipes.find(findPipe)) { + // throw new Error("A pipe containing the same source or destination already exists"); + // } + // + // this.pipes.push([source, transformer]); + // } + + public query(query: IQuery, callback: Function) { + let coverageMap = this.buildCoverageMap(query); + + // check if all the metrics are covered by the existing transfomers + for (const metric of query.metrics) { + if (!coverageMap.has(metric)) { + callback("The metric " + metric + " is not covered by any transformer."); + return; + } + } + + // list all the transformers we are going to need + let transformers = new Map(); + for (const [, t] of coverageMap) { + if (!transformers.has(t[0].id)) { + transformers.set(t[0].id, t[0]); + } + } + + // apply transformers + for (const transformer of transformers.values()) { + transformer.query(query.metrics, query.dimensions); + // function write(data: any, enc: string, cb: Function) { + // console.log(data); + // cb(); + // } + // + // function flush(cb: Function) { + // cb(); + // } + // + // miss.pipeline( + // transformer.source.stream(), + // transformer.position(), + // transformer.measure(query.metrics), + // transformer.aggregate(query.dimensions), + // miss.to.obj(write, flush) + // ); + } + + callback(null, []); + } + + private buildCoverageMap(query: IQuery): Map<string, Transformer[]> { + let coverageMap: Map<string, Transformer[]> = new Map(); + + // checks if l1 is covered by l2 + function isCovered(l1: string[], l2: string[]): boolean { + for (const s1 of l1) { + let res = l2.find((s2: string) => { + return s2 === s1; + }); + if (typeof res === "undefined") { + return false; + } + } + return true; + } + + // find the transformers that cover the query + for (const [, transformer] of this.transformers) { + if (isCovered(query.dimensions, transformer.dimensions)) { + for (const metric of transformer.metrics) { + if (query.metrics.indexOf(metric) < 0) { + // if the metric is not queried, it doesn't need to be + // added to the coverage map + continue; + } + + if (!coverageMap.has(metric)) { + coverageMap.set(metric, [transformer]); + } + else { + coverageMap.get(metric).push(transformer); + } + } + } + } + + return this.sortCoverageMap(query, coverageMap); + } + + private sortCoverageMap(query: IQuery, coverageMap: Map<string, Transformer[]>): Map<string, Transformer[]> { + let newCoverageMap: Map<string, Transformer[]> = new Map(); + + let numAppearences: { [tId: string]: number; } = {}; + + // attribute a point for every appearence of a transformer in the map + for (const [, transformers] of coverageMap) { + for (const transformer of transformers) { + numAppearences[transformer.id]++; + } + } + + // select the best transformer for each metric + for (const [metric, transformers] of coverageMap) { + let newTransformers = transformers.sort((a, b) => { + let aAppear = numAppearences[a.id] / query.metrics.length; + let bAppear = numAppearences[b.id] / query.metrics.length; + let aDist = Math.max((a.dimensions.length - query.dimensions.length) * 0.1, 1); + let bDist = Math.max((b.dimensions.length - query.dimensions.length) * 0.1, 1); + + let aPoints = aAppear * 0.5 + (1 - aDist) * 0.5; + let bPoints = bAppear * 0.5 + (1 - bDist) * 0.5; + + return (aPoints > bPoints) ? -1 : 1; + }); + + newCoverageMap.set(metric, newTransformers); + } + + return newCoverageMap; + } } diff --git a/src/core/source.ts b/src/core/source.ts index f030c5660300275aeb7afc12b83ad5a5d159d603..b49a5f3aa5f7575e2eb45f5cb61f7d35be2787f0 100644 --- a/src/core/source.ts +++ b/src/core/source.ts @@ -18,27 +18,53 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ +const miss = require("mississippi"); +const uuid = require("node-uuid"); + +import { Server } from "./server"; +import { Storage } from "./storage"; + +export interface ISourceOptions { + tags: string[]; +} + export class Source { - public name: string; + public id: string; + public tags: string[]; + private server: Server; + private storage: Storage; private data: any[]; - constructor(name: string, options?: any) { - this.name = name; - - this.data = []; + constructor(server: Server, options: ISourceOptions) { + this.id = uuid.v4(); + this.server = server; + this.storage = server.storage.l1; + this.tags = options.tags; + this.data = new Array(); } - public push(doc: any) { - this.data.push(doc); + public insert(doc: any): void { + this.data.push({ + doc: doc + }); } - public forEach(callback: Function) { - this.data.forEach((value: any, index: number, array: any[]) => { - callback(value); + public stream() { + let i = 0; + return miss.from.obj((size: number, next: Function) => { + if (i >= this.data.length) { + next(null, null); + return; + } + + next(null, this.data[i++]); }); } - public truncate() { - this.data = []; - } + // insert(data: any): void { + // this.storage.insert({ + // tags: this.tags, + // data: data + // }); + // } } diff --git a/src/core/storage.ts b/src/core/storage.ts new file mode 100644 index 0000000000000000000000000000000000000000..83edda589ee1eb1852bb6789bc306af0e18096a7 --- /dev/null +++ b/src/core/storage.ts @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +export abstract class Storage { + public abstract read(): void; + public abstract filter(query: any): void; +} diff --git a/src/core/transformer.spec.ts b/src/core/transformer.spec.ts index 003deec9c3f582199dc5d7ae15bcc49efe3eac12..8ee290713c41184fe9fe5848cd530e769aae4b63 100644 --- a/src/core/transformer.spec.ts +++ b/src/core/transformer.spec.ts @@ -18,68 +18,99 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ -import { expect } from "chai"; +const miss = require("mississippi"); +import { expect } from "chai"; import { Hash } from "../util/hash"; - +import { Server } from "./server"; import { Transformer } from "./transformer"; -import { Source } from "./source"; -import { Aggregate } from "./aggregate"; describe("transformer class", () => { - const source = new Source("testSource"); - const aggregate = new Aggregate(["met:one"], ["dim:one", "dim:two"]); + const mockServer = new (class MockServer extends Server { + // + })(); + + const transformer = new Transformer(mockServer, { + dimensions: ["dim:one", "dim:two"], + metrics: ["met:one"], + operators: { + position: ((data: any) => { + return { + "dim:one": data.id, + "dim:two": Hash.sha1(data.seed) + }; + }), + measure: ((data: any) => { + return { + "met:one": Math.floor(data.seed * 100000) / 100 + }; + }), + aggregate: { + "met:one": "sum" + } + } + }); - it("should be able to aggregate data", () => { - source.truncate(); + it("should be able to retrieve dimensions with by using a position operator", (callback: Function) => { + let i = 0; + let source = miss.from.obj((size: Number, next: Function) => { + if (i >= 1024) { + next(null, null); + return; + } - for (let i = 0; i < 1000; i++) { - source.push({ - id: i % 10, - seed: Math.random() + next(null, { + doc: { + id: i % 10, + seed: ((i % 10) === 0) ? 5 : Math.random() + } }); - } + i++; + }); + + let sink = miss.to.obj((data: any, enc: string, cb: Function) => { + expect(data).to.have.property("dimensions"); + expect(data).to.have.property("doc"); + expect(data.dimensions).to.have.property("dim:one"); + expect(data.dimensions).to.have.property("dim:two"); + cb(); + }, (err: any) => { + callback(); + }); - source.push({ id: 10, seed: 5 }); - source.push({ id: 10, seed: 5 }); - source.push({ id: 10, seed: 5 }); - source.push({ id: 10, seed: 5 }); - source.push({ id: 10, seed: 5 }); + miss.pipeline(source, transformer.position(), sink); + }); - const transformer = new Transformer("processTransformer", { - source: source, - destination: aggregate, - functions: { - map: ((doc: any, emit: Function) => { - emit({ - "dim:one": doc.id, - "dim:two": Hash.sha1(doc.seed) - }, { - "met:one": Math.floor(doc.seed * 100000) / 100 - }); - }), - reduce: ((dimensions: any, metrics: any) => { - let tmp = 0; - metrics.forEach((met: any) => { - tmp += met["met:one"]; - }); - return { - "met:one": tmp - }; - }) + it("should be able to retrieve metrics with by using a measure operator", (callback: Function) => { + let i = 0; + let source = miss.from.obj((size: Number, next: Function) => { + if (i >= 1024) { + next(null, null); + return; } - }); - transformer.apply(); + next(null, { + dimensions: { + "dim:one": i % 10, + "dim:two": Hash.sha1(((i % 10) === 0) ? 5 : Math.random()) + }, + doc: { + id: i % 10, + seed: ((i % 10) === 0) ? 5 : Math.random() + } + }); + i++; + }); - let result = aggregate.find({ - "dim:one": 10, - "dim:two": Hash.sha1(5) + let sink = miss.to.obj((data: any, enc: string, cb: Function) => { + expect(data).to.have.property("dimensions"); + expect(data).to.have.property("metrics"); + expect(data.metrics).to.have.property("met:one"); + cb(); + }, (err: any) => { + callback(); }); - expect(result).to.have.length(1); - expect(result[0]).to.have.property("metrics"); - expect(result[0].metrics).to.have.property("met:one"); - expect(result[0].metrics["met:one"]).to.be.equal(25000); + miss.pipeline(source, transformer.measure(), sink); }); }); diff --git a/src/core/transformer.ts b/src/core/transformer.ts index e3c6734cde7b6939e72619d2e3f399dfa9613153..b76a724de6de401519e353f1ecc59bcf89e3d98f 100644 --- a/src/core/transformer.ts +++ b/src/core/transformer.ts @@ -18,70 +18,89 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ -import { Hash } from "../util/hash"; +const miss = require("mississippi"); +const uuid = require("node-uuid"); +import { Server } from "./server"; import { Source } from "./source"; -import { Aggregate } from "./aggregate"; export interface ITransformerOptions { - source: Source; - destination: Aggregate; - functions: { - map: (doc: any, emit: Function) => void; - reduce: (dimensions: any, metrics: any) => any; + dimensions: string[]; + metrics: string[]; + operators: { + position: (doc: any) => any; + measure: (doc: any) => any; + aggregate: (doc: any, newDimensions: string[]) => any; }; } export class Transformer { - public name: string; - public source: Source; - public destination: Aggregate; - private functions: any; - - constructor(name: string, options: ITransformerOptions) { - this.name = name; - this.source = options.source; - this.destination = options.destination; - this.functions = { - map: options.functions.map, - reduce: options.functions.reduce - }; + public id: string; + public dimensions: string[]; + public metrics: string[]; + + private server: Server; + private operators: any; + private source: Source; + + constructor(server: Server, options: ITransformerOptions) { + this.id = uuid.v4(); + this.server = server; + this.dimensions = options.dimensions; + this.metrics = options.metrics; + this.operators = options.operators; + this.source = null; + } + + public setSource(source: Source) { + this.source = source; } - public apply() { - let temp = new Map(); + public position() { + if (!this.source) { + throw new Error("Source not defined"); + } - this.destination.truncate(); + return miss.through.obj((data: any, enc: string, callback: Function) => { + let dimensions = this.operators.position(data.doc); + callback(null, { dimensions: dimensions, doc: data.doc }); + }); + } - this.source.forEach((doc: any) => { - let emit = (dimensions: any, metrics: any) => { - let key = Hash.sha1(dimensions); - let current = temp.get(key) || { dimensions, metrics: [] }; - temp.set(key, { - dimensions, - metrics: current.metrics.concat([metrics]) - }); - }; + public measure() { + if (!this.source) { + throw new Error("Source not defined"); + } - this.functions.map(doc, emit); + return miss.through.obj((data: any, enc: string, callback: Function) => { + let metrics = this.operators.measure(data.doc); + callback(null, { dimensions: data.dimensions, metrics: metrics }); }); + } + + public aggregate(newDimensions: string[]) { + if (!this.source) { + throw new Error("Source not defined"); + } - temp.forEach((value, key) => { - let dimensions = value.dimensions; - let metrics = value.metrics; - - if (metrics.length > 1) { - this.destination.push({ - dimensions: dimensions, - metrics: this.functions.reduce(dimensions, metrics) - }); - } - else { - this.destination.push({ - dimensions: dimensions, - metrics: metrics - }); - } + return miss.through.obj((data: any, enc: string, callback: Function) => { + let newMetrics = this.operators.aggregate(data, newDimensions); + callback(null, { dimensions: newDimensions, metrics: newMetrics }); }); } + + public query(queryMetrics: string[], queryDimensions: string[]) { + if (!this.baseView) { + this.baseView = new View(this.metrics, this.dimensions); + + miss.pipeline( + this.source.stream(), + this.position(), + this.measure(), + this.baseView.sink() + ); + } + + this.baseView.aggregate(queryMetrics, queryDimensions); + } } diff --git a/src/core/aggregate.spec.ts b/src/core/view.ts similarity index 54% rename from src/core/aggregate.spec.ts rename to src/core/view.ts index bcc3a3f66649c1e1ee0abaec24d0584e340f9d8c..d2b53872c51543a016990c049cac538c0dcf5c96 100644 --- a/src/core/aggregate.spec.ts +++ b/src/core/view.ts @@ -18,23 +18,29 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ -import { expect } from "chai"; +const miss = require("mississippi"); +const uuid = require("node-uuid"); -import { Aggregate } from "./aggregate"; +export class View { + public id: string; -describe("aggregate class", () => { - it("should be instantiated with an array metrics and one of dimensions", () => { - let aggr = new Aggregate(["met:one"], ["dim:one", "dim:two"]); - expect(aggr).to.be.an("object"); - }); + private data: any[]; - it("should not be instantiated with an empty array of metrics", () => { - let aggr = new Aggregate([], ["dim:one", "dim:two"]); - expect(aggr).to.be.an("object"); - }); + constructor() { + this.id = uuid.v4(); + this.data = new Array(); + } - it("should not be instantiated with an empty array of dimensions", () => { - let aggr = new Aggregate(["met:one"], []); - expect(aggr).to.be.an("object"); - }); -}); + public sink() { + function write(data: any, enc: string, cb: Function) { + this.data.push(data); + cb(); + } + + function flush(cb: Function) { + cb(); + } + + return miss.to.obj(write, flush); + } +} diff --git a/src/main.ts b/src/main.ts index c33b4562e2aa29facbcb24a575c254d73cca0091..91d0785931013cefce68514670b8843d6d2e3e06 100755 --- a/src/main.ts +++ b/src/main.ts @@ -27,9 +27,91 @@ const ramlParser = require("raml-parser"); // load router import { router } from "./api/router-v1"; +// load server +import { Server } from "./core/server"; + // create a new express app const app = module.exports = express(); +const blendb = new Server(); +app.locals.blendb = blendb; + +////// TESTING //////// +let s1 = blendb.source("s1", { tags: ["telecentro", "v1"] }); +let s2 = blendb.source("s2", { tags: ["telecentro", "v2"] }); +let s3 = blendb.source("s3", { tags: ["gesac"] }); + +for (let i = 0; i < 1024; i++) { + s1.insert({ + id: i % 10, + seed: ((i % 10) === 0) ? 5 : Math.random() + }); +} +for (let i = 0; i < 256; i++) { + s2.insert({ + id: i % 5, + seed: ((i % 5) === 0) ? 26 : Math.random() + }); +} +for (let i = 0; i < 2048; i++) { + s3.insert({ + id: i % 8 + 5, + seed: ((i % 8) === 0) ? 13 : Math.random() + }); +} + +let position = ((data: any): any => { + return { + id: data.id * 2, + local: data.seed + 1 + }; +}); +let measure = ((data: any): any => { + return { + id: data.id * 3, + local: data.seed + 2 + }; +}); +let aggregate = ((data: any, newDimensions: string[]): any => { + return data; +}); + +let t1 = blendb.transformer("t1", { + dimensions: ["dim:one", "dim:two"], + metrics: ["met:one"], + operators: { + position: position, + measure: measure, + aggregate: aggregate, + } +}); +let t2 = blendb.transformer("t2", { + dimensions: ["dim:one", "dim:three", "dim:four"], + metrics: ["met:two", "met:three"], + operators: { + position: position, + measure: measure, + aggregate: aggregate + } +}); +let t3 = blendb.transformer("t3", { + dimensions: ["dim:two"], + metrics: ["met:four", "met:five"], + operators: { + position: position, + measure: measure, + aggregate: aggregate + } +}); + +t1.setSource(s1); +t2.setSource(s2); +t3.setSource(s3); +/////////////////////// + +app.set("view engine", "pug"); +app.set("views", "src/api/views"); + // parse the RAML spec and load osprey middleware ramlParser.loadFile("specs/blendb-api-v1.raml") .then((raml: any) => { diff --git a/src/storage/monet.ts b/src/storage/monet.ts new file mode 100644 index 0000000000000000000000000000000000000000..0899a461633ebcae28f4a7b886a27639d9d9d74c --- /dev/null +++ b/src/storage/monet.ts @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +import { Storage } from "../core/storage"; + +export class MonetStorage extends Storage { + public read(): void { + // + }; + public filter(query: any): void { + // + }; +} diff --git a/src/storage/mongo.ts b/src/storage/mongo.ts new file mode 100644 index 0000000000000000000000000000000000000000..c12c0235ca0d906f64f2b3f601f48d1befb01246 --- /dev/null +++ b/src/storage/mongo.ts @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +import { Storage } from "../core/storage"; + +export class MongoStorage extends Storage { + public read(): void { + // + }; + public filter(query: any): void { + // + }; +} diff --git a/src/storage/redis.ts b/src/storage/redis.ts new file mode 100644 index 0000000000000000000000000000000000000000..978d9e24d3f70c69cc35ecadae9e094d9db0ad4b --- /dev/null +++ b/src/storage/redis.ts @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +import { Storage } from "../core/storage"; + +export class RedisStorage extends Storage { + public read(): void { + // + }; + public filter(query: any): void { + // + }; +}