Skip to content
Snippets Groups Projects

Add denormalizations to run aggregations

Merged jvfpw18 requested to merge fix_run_aggregations into development
4 files
+ 2202
2141
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 36
7
@@ -227,6 +227,21 @@ class DatabaseTable(Table):
@@ -227,6 +227,21 @@ class DatabaseTable(Table):
return query
return query
 
def create_temporary_mirror(self, year, bind=None):
 
'''
 
Creates a new temporary table where its data mirrors the original, taken directly from the database
 
'''
 
ttable = self.get_temporary(year=year)
 
ttable.create(bind)
 
if bind is None:
 
bind = self.metadata.bind
 
 
original_columns = list(self.columns)
 
query = ttable.insert().from_select(original_columns, select(original_columns))
 
bind.execute(query)
 
 
return ttable
 
def check_protocol(self):
def check_protocol(self):
'''
'''
Raises MissingProtocolError if no protocol is loaded.
Raises MissingProtocolError if no protocol is loaded.
@@ -758,7 +773,8 @@ class DatabaseTable(Table):
@@ -758,7 +773,8 @@ class DatabaseTable(Table):
if table is self:
if table is self:
return self._derivative_recursion(column, year, recursion_list)
return self._derivative_recursion(column, year, recursion_list)
derivative = table._resolv_derivative(column, year)
derivative = table._resolv_derivative(column, year)
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': 0,
 
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': 0, 'dbmapped': True,
'new': '.'.join([table.name, derivative['dbcolumn'][0]])}
'new': '.'.join([table.name, derivative['dbcolumn'][0]])}
return self._derivatives[target]
return self._derivatives[target]
@@ -781,15 +797,17 @@ class DatabaseTable(Table):
@@ -781,15 +797,17 @@ class DatabaseTable(Table):
level = derivative['level'] + 1
level = derivative['level'] + 1
processed = original
processed = original
 
dbmapped = False # column neded to execute the derivative is present on table or need a file.
for substitution in substitutions:
for substitution in substitutions:
processed = re.sub(substitution['original'], substitution['new'], processed)
processed = re.sub(substitution['original'], substitution['new'], processed)
 
dbmapped = True
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': level,
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': level,
'processed': processed}
'processed': processed, 'dbmapped': dbmapped}
return self._derivatives[target]
return self._derivatives[target]
def _resolv_derivative(self, original, year):
def _resolv_derivative(self, original, year):
'''
'''
Populates self._derivatives with all necessary derivatives to satify original in a given
Populates self._derivatives with all necessary derivatives to satisfy original in a given
year.
year.
'''
'''
if not hasattr(self, '_derivatives'):
if not hasattr(self, '_derivatives'):
@@ -821,7 +839,7 @@ class DatabaseTable(Table):
@@ -821,7 +839,7 @@ class DatabaseTable(Table):
query = query.where(ttable.c.ano_censo == year)
query = query.where(ttable.c.ano_censo == year)
yield query
yield query
def apply_derivatives(self, ttable, columns, year, bind=None):
def apply_derivatives(self, ttable, columns, year, bind=None, dbonly=False):
'''
'''
Given a list of columns, searches for derivatives and denormalizations and applies them
Given a list of columns, searches for derivatives and denormalizations and applies them
in the appropriate order. Dependencies will be updated regardless of being or not in the
in the appropriate order. Dependencies will be updated regardless of being or not in the
@@ -845,16 +863,17 @@ class DatabaseTable(Table):
@@ -845,16 +863,17 @@ class DatabaseTable(Table):
ttable.schema = t_schema
ttable.schema = t_schema
if len(self._derivatives) > 0:
if len(self._derivatives) > 0:
max_level = max([self._derivatives[d]['level'] for d in self._derivatives])
max_level = max([self._derivatives[d]['level'] for d in self._derivatives])
derivative_levels = []
for i in range(max_level):
for i in range(max_level):
i = i+1
i = i+1
query = {}
query = {}
level = [self._derivatives[d] for d in self._derivatives if\
level = [self._derivatives[d] for d in self._derivatives if\
self._derivatives[d]['level'] == i]
self._derivatives[d]['level'] == i]
for derivative in level:
for derivative in level:
query[derivative['dbcolumn'][0]] = text(derivative['processed'])
if not dbonly or derivative['dbmapped']:
 
query[derivative['dbcolumn'][0]] = text(derivative['processed'])
query = update(ttable).values(**query)
query = update(ttable).values(**query)
 
bind.execute(query)
bind.execute(query)
return self._derivatives
return self._derivatives
@@ -881,7 +900,12 @@ class DatabaseTable(Table):
@@ -881,7 +900,12 @@ class DatabaseTable(Table):
referred_table.map_from_database()
referred_table.map_from_database()
selecter = select([getattr(func, aggregation)(source_column)])
selecter = select([getattr(func, aggregation)(source_column)])
for fk_column, fkey in referred_table.get_relations(self):
try:
 
fk_tuples = [(fk_column, fkey) for fk_column, fkey in referred_table.get_relations(self)]
 
except MissingForeignKeyError:
 
fk_tuples = [(fk_column, fkey) for fk_column, fkey in self.get_relations(referred_table)]
 
 
for fk_column, fkey in fk_tuples:
selecter = selecter.where(fk_column == fkey)
selecter = selecter.where(fk_column == fkey)
if year:
if year:
selecter = selecter.where(self.c.ano_censo == year)
selecter = selecter.where(self.c.ano_censo == year)
@@ -911,6 +935,11 @@ class DatabaseTable(Table):
@@ -911,6 +935,11 @@ class DatabaseTable(Table):
query = self._aggregate(column, func, source_column, year)
query = self._aggregate(column, func, source_column, year)
bind.execute(query)
bind.execute(query)
 
# Run derivatives
 
ttable = self.create_temporary_mirror(year, bind)
 
self.apply_derivatives(ttable, ttable.columns.keys(), year, bind, dbonly=True)
 
self.update_from_temporary(ttable, ttable.columns.keys(), bind)
 
def get_relations(self, table):
def get_relations(self, table):
'''
'''
Yields relations between two tables in format
Yields relations between two tables in format
Loading