diff --git a/src/core/aggregate.ts b/src/core/aggregate.ts index 489b82b67e3732344fc6a6655a2a223c4e8c6ee3..a1e3ccf54da6dde4b858b744bb34124995d9c685 100644 --- a/src/core/aggregate.ts +++ b/src/core/aggregate.ts @@ -18,10 +18,15 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ +export interface IAggregateData { + metrics: any; + dimensions: any; +} + export class Aggregate { public metrics: string[]; public dimensions: string[]; - private data: any[]; + private data: IAggregateData[]; constructor(metrics: string[], dimensions: string[], options?: any) { this.metrics = metrics; @@ -30,11 +35,36 @@ export class Aggregate { this.data = []; } - public push(data: any) { + public push(data: IAggregateData) { this.data.push(data); } public truncate() { this.data = []; } + + public find(query: any) { + let result: any = []; + + this.data.forEach((doc: IAggregateData) => { + let match = true; + + for (let key in query) { + if (query.hasOwnProperty(key)) { + let value = query[key]; + + if (doc.dimensions[key] !== value) { + match = false; + break; + } + } + } + + if (match) { + result.push(doc); + } + }); + + return result; + } } diff --git a/src/core/server.spec.ts b/src/core/server.spec.ts index f817c727dbfcfb5da533595c9d57bcb5fcb32aa3..9e0dfbbb0d2f9fa30333608a826674ac3e7d0659 100644 --- a/src/core/server.spec.ts +++ b/src/core/server.spec.ts @@ -23,9 +23,9 @@ import { expect } from "chai"; import { Server } from "./server"; describe("server class", () => { - const server = new Server(); - it("should be able to create and retrieve sources", () => { + const server = new Server(); + // create two sources const source1 = server.source("source1"); const source2 = server.source("source2"); @@ -45,24 +45,50 @@ describe("server class", () => { expect(source1).to.be.equal(retrieved); }); + it("should be able to create and retrieve aggregates", () => { + const server = new Server(); + + // create two aggregates + const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); + const aggr2 = server.aggregate(["met:two"], ["dim:one", "dim:two"]); + + // retrieve the first one + const retrieved = server.aggregate(["met:one"], ["dim:two", "dim:one"]); + + // check if aggregates were actually created/retrieved + expect(aggr1).to.be.an("object"); + expect(aggr2).to.be.an("object"); + expect(retrieved).to.be.an("object"); + + // check if the two created aggregates are different + expect(aggr1).to.not.be.equal(aggr2); + + // check if the retrieved aggregate is the same as the created one + expect(aggr1).to.be.equal(retrieved); + }); + it("should be able to create and retrieve transformers", () => { + const server = new Server(); + + const source1 = server.source("source1"); + const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); + const aggr2 = server.aggregate(["met:one", "met:two"], ["dim:one"]); + // create two transformers const transformer1 = server.transformer("transformer1", { - source: "source1", - metrics: ["met:one"], - dimensions: ["dim:one"], - extractors: { - metrics: ((doc: any) => null), - dimensions: ((doc: any) => null), + source: source1, + destination: aggr1, + functions: { + map: (doc: any, emit: Function) => { return; }, + reduce: (dimensions: any, metrics: any) => { return {}; } } }); const transformer2 = server.transformer("transformer2", { - source: "source2", - metrics: ["met:one"], - dimensions: ["dim:one"], - extractors: { - metrics: ((doc: any) => null), - dimensions: ((doc: any) => null), + source: source1, + destination: aggr2, + functions: { + map: (doc: any, emit: Function) => { return; }, + reduce: (dimensions: any, metrics: any) => { return {}; } } }); @@ -81,23 +107,39 @@ describe("server class", () => { expect(transformer1).to.be.equal(retrieved); }); - it("should be able to create and retrieve aggregates", () => { - // create two aggregates - const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); - const aggr2 = server.aggregate(["met:two"], ["dim:one", "dim:two"]); + it("should fail to create two transformer with name collision", () => { + const server = new Server(); - // retrieve the first one - const retrieved = server.aggregate(["met:one"], ["dim:one", "dim:two"]); + const source1 = server.source("source1"); + const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]); + const aggr2 = server.aggregate(["met:one", "met:two"], ["dim:one"]); + + server.transformer("transformer1", { + source: source1, + destination: aggr1, + functions: { + map: (doc: any, emit: Function) => { return; }, + reduce: (dimensions: any, metrics: any) => { return {}; } + } + }); - // check if aggregates were actually created/retrieved - expect(aggr1).to.be.an("object"); - expect(aggr2).to.be.an("object"); - expect(retrieved).to.be.an("object"); + expect(() => { + server.transformer("transformer1", { + source: source1, + destination: aggr2, + functions: { + map: (doc: any, emit: Function) => { return; }, + reduce: (dimensions: any, metrics: any) => { return {}; } + } + }); + }).to.throw(Error); + }); - // check if the two created aggregates are different - expect(aggr1).to.not.be.equal(aggr2); + it("should fail to retrieve a transformer that doesn't exist", () => { + const server = new Server(); - // check if the retrieved aggregate is the same as the created one - expect(aggr1).to.be.equal(retrieved); + expect(() => { + server.transformer("transformerX"); + }).to.throw(Error); }); }); diff --git a/src/core/server.ts b/src/core/server.ts index bf110d060d0cb3be798786df149b73f8d2f072db..235cbca2bcbe488b6eaacb8dc34c8969e00583ee 100644 --- a/src/core/server.ts +++ b/src/core/server.ts @@ -46,6 +46,19 @@ export class Server { } } + public aggregate(metrics: string[], dimensions: string[], options?: any) { + const id = Hash.sha1(metrics.sort(), dimensions.sort()); + + if (this.aggregates.has(id)) { + return this.aggregates.get(id); + } + else { + const aggregate = new Aggregate(metrics, dimensions, options); + this.aggregates.set(id, aggregate); + return aggregate; + } + } + public transformer(name: string, options?: ITransformerOptions) { if (typeof options !== "undefined") { if (this.transformers.has(name)) { @@ -64,37 +77,4 @@ export class Server { return this.transformers.get(name); } } - - public aggregate(metrics: string[], dimensions: string[], options?: any) { - const id = Hash.sha1(metrics.sort(), dimensions.sort()); - - if (this.aggregates.has(id)) { - return this.aggregates.get(id); - } - else { - const aggregate = new Aggregate(metrics, dimensions, options); - this.aggregates.set(id, aggregate); - return aggregate; - } - } - - public process() { - this.transformers.forEach((transformer: Transformer) => { - const source = this.source(transformer.source); - const aggr = this.aggregate(transformer.metrics, - transformer.dimensions); - - source.forEach((doc: any) => { - aggr.push({ - metrics: transformer.extractMetrics(doc), - dimensions: transformer.extractDimensions(doc) - }); - }); - - // TODO: stream support - // source.stream() - // .pipe(transformer.stream()); - // .pipe(aggregate.stream()); - }); - } } diff --git a/src/core/source.ts b/src/core/source.ts index f2784fad2358e17f74c00857e5992f724751b211..f030c5660300275aeb7afc12b83ad5a5d159d603 100644 --- a/src/core/source.ts +++ b/src/core/source.ts @@ -22,7 +22,7 @@ export class Source { public name: string; private data: any[]; - constructor(name: string, options: any) { + constructor(name: string, options?: any) { this.name = name; this.data = []; @@ -37,4 +37,8 @@ export class Source { callback(value); }); } + + public truncate() { + this.data = []; + } } diff --git a/src/core/transformer.spec.ts b/src/core/transformer.spec.ts new file mode 100644 index 0000000000000000000000000000000000000000..003deec9c3f582199dc5d7ae15bcc49efe3eac12 --- /dev/null +++ b/src/core/transformer.spec.ts @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre + * Departamento de Informatica - Universidade Federal do Parana + * + * This file is part of blendb. + * + * blendb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * blendb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with blendb. If not, see <http://www.gnu.org/licenses/>. + */ + +import { expect } from "chai"; + +import { Hash } from "../util/hash"; + +import { Transformer } from "./transformer"; +import { Source } from "./source"; +import { Aggregate } from "./aggregate"; + +describe("transformer class", () => { + const source = new Source("testSource"); + const aggregate = new Aggregate(["met:one"], ["dim:one", "dim:two"]); + + it("should be able to aggregate data", () => { + source.truncate(); + + for (let i = 0; i < 1000; i++) { + source.push({ + id: i % 10, + seed: Math.random() + }); + } + + source.push({ id: 10, seed: 5 }); + source.push({ id: 10, seed: 5 }); + source.push({ id: 10, seed: 5 }); + source.push({ id: 10, seed: 5 }); + source.push({ id: 10, seed: 5 }); + + const transformer = new Transformer("processTransformer", { + source: source, + destination: aggregate, + functions: { + map: ((doc: any, emit: Function) => { + emit({ + "dim:one": doc.id, + "dim:two": Hash.sha1(doc.seed) + }, { + "met:one": Math.floor(doc.seed * 100000) / 100 + }); + }), + reduce: ((dimensions: any, metrics: any) => { + let tmp = 0; + metrics.forEach((met: any) => { + tmp += met["met:one"]; + }); + return { + "met:one": tmp + }; + }) + } + }); + + transformer.apply(); + + let result = aggregate.find({ + "dim:one": 10, + "dim:two": Hash.sha1(5) + }); + + expect(result).to.have.length(1); + expect(result[0]).to.have.property("metrics"); + expect(result[0].metrics).to.have.property("met:one"); + expect(result[0].metrics["met:one"]).to.be.equal(25000); + }); +}); diff --git a/src/core/transformer.ts b/src/core/transformer.ts index cc62f6b7ece5a95c52dafc332175a58a6955d449..e3c6734cde7b6939e72619d2e3f399dfa9613153 100644 --- a/src/core/transformer.ts +++ b/src/core/transformer.ts @@ -18,40 +18,70 @@ * along with blendb. If not, see <http://www.gnu.org/licenses/>. */ +import { Hash } from "../util/hash"; + +import { Source } from "./source"; +import { Aggregate } from "./aggregate"; + export interface ITransformerOptions { - source: string; - metrics: string[]; - dimensions: string[]; - extractors: { - metrics: (doc: any) => any; - dimensions: (doc: any) => any; + source: Source; + destination: Aggregate; + functions: { + map: (doc: any, emit: Function) => void; + reduce: (dimensions: any, metrics: any) => any; }; } export class Transformer { public name: string; - public source: string; - public metrics: string[]; - public dimensions: string[]; - private extractors: any; + public source: Source; + public destination: Aggregate; + private functions: any; constructor(name: string, options: ITransformerOptions) { this.name = name; - this.source = options.source; - this.metrics = options.metrics; - this.dimensions = options.dimensions; - this.extractors = { - metrics: options.extractors.metrics, - dimensions: options.extractors.dimensions + this.destination = options.destination; + this.functions = { + map: options.functions.map, + reduce: options.functions.reduce }; } - public extractMetrics(doc: any) { - return this.extractors.metrics(doc); - } + public apply() { + let temp = new Map(); + + this.destination.truncate(); + + this.source.forEach((doc: any) => { + let emit = (dimensions: any, metrics: any) => { + let key = Hash.sha1(dimensions); + let current = temp.get(key) || { dimensions, metrics: [] }; + temp.set(key, { + dimensions, + metrics: current.metrics.concat([metrics]) + }); + }; + + this.functions.map(doc, emit); + }); + + temp.forEach((value, key) => { + let dimensions = value.dimensions; + let metrics = value.metrics; - public extractDimensions(doc: any) { - return this.extractors.dimensions(doc); + if (metrics.length > 1) { + this.destination.push({ + dimensions: dimensions, + metrics: this.functions.reduce(dimensions, metrics) + }); + } + else { + this.destination.push({ + dimensions: dimensions, + metrics: metrics + }); + } + }); } } diff --git a/src/util/hash.spec.ts b/src/util/hash.spec.ts index 9ce39c2f72b24cdb467c782bedd442d4afb55c81..37688cb470453cf1c0ed47f19c5cb7334fed9ff0 100644 --- a/src/util/hash.spec.ts +++ b/src/util/hash.spec.ts @@ -31,8 +31,8 @@ describe("hash utility library", () => { }); it("should generate the same hash for the same input", () => { - let h1 = Hash.sha1("test", { obj: "test" }, ["list", "of", "things"]); - let h2 = Hash.sha1("test", { obj: "test" }, ["list", "of", "things"]); + let h1 = Hash.sha1("test", { obj: "test" }, 43.2, ["list", "of", "things"]); + let h2 = Hash.sha1("test", { obj: "test" }, 43.2, ["list", "of", "things"]); expect(h1).to.be.a("string"); expect(h2).to.be.a("string"); @@ -66,4 +66,14 @@ describe("hash utility library", () => { expect(h2).to.be.a("string"); expect(h1).to.not.be.equal(h2); }); + + it("should throw an error for unhashable objects", () => { + expect(() => { + Hash.sha1( + "test", + function (a: number, b: number) { return a + b; }, + ["of", "list", "things"] + ); + }).to.throw(TypeError); + }); }); diff --git a/src/util/hash.ts b/src/util/hash.ts index 39a33d9b480434584171e3d515813bb421774cd1..bfe777670811b3f1690b9a2698baa6112d19d8a6 100644 --- a/src/util/hash.ts +++ b/src/util/hash.ts @@ -31,6 +31,8 @@ export class Hash { return obj; case "object": return JSON.stringify(obj); + case "number": + return obj.toString(); default: throw new TypeError(typeof obj + " cannot be hashed");