Skip to content
Snippets Groups Projects

v1.1.0

Merged jvfpw18 requested to merge v1.1.0 into master
2 files
+ 30
30
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 166
75
"""
Copyright (C) 2018 Centro de Computacao Cientifica e Software Livre
'''
Copyright (C) 2016 Centro de Computacao Cientifica e Software Livre
Departamento de Informatica - Universidade Federal do Parana - C3SL/UFPR
This file is part of HOTMapper.
@@ -15,9 +15,8 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with simcaq-cdn. If not, see <https://www.gnu.org/licenses/>.
"""
along with HOTMapper. If not, see <https://www.gnu.org/licenses/>.
'''
'''This module contains the definition of the DatabaseTable class and a constructor'''
import os
@@ -25,6 +24,7 @@ import time
import json
import re
import logging
import jsbeautifier
from sqlalchemy import Table, Column, inspect, Integer, String, Boolean,\
PrimaryKeyConstraint, ForeignKeyConstraint, text
from sqlalchemy.sql import select, insert, update, delete, func
@@ -32,9 +32,10 @@ import pandas as pd
from database.base import DatabaseColumnError, MissingProtocolError, DatabaseMappingError, \
InvalidTargetError, MissingForeignKeyError, MissingTableError, \
CircularReferenceError
CircularReferenceError, MissingDefinitionsError
from database.protocol import Protocol
from database.types import get_type
from database.definitions import Definitions
import settings
# Disable no-member warnings to silence false positives from Table instances dinamically generated
@@ -145,6 +146,8 @@ class DatabaseTable(Table):
self._mapping_table = gen_mapping_table(self)
if not hasattr(self, '_protocol'):
self._protocol = None
if not hasattr(self, '_definitions'):
self._definitions = None
if 'protocol' in kwargs.keys():
self.load_protocol(kwargs['protocol'])
@@ -228,6 +231,22 @@ class DatabaseTable(Table):
return query
def create_temporary_mirror(self, year, bind=None):
'''
Creates a new temporary table where its data mirrors the original, taken directly from the database
'''
ttable = self.get_temporary(year=year)
ttable.create(bind)
if bind is None:
bind = self.metadata.bind
original_columns = list(self.columns)
query = ttable.insert().from_select(original_columns, select(original_columns)
.where(self.c[settings.YEAR_COLUMN] == year))
bind.execute(query)
return ttable
def check_protocol(self):
'''
Raises MissingProtocolError if no protocol is loaded.
@@ -280,17 +299,44 @@ class DatabaseTable(Table):
logger.debug("Table %s not present in database.", self.name)
raise MissingTableError(self.name)
def get_definitions(self):
def get_columns_dict(self, ignore_diff=False):
'''
Returns a dictionary with definitions from a table definitions file
Get a dictionary of columns, comparing the columns of the associated protocol with the columns in definitions
:param ignore_diff: when set True will ignore differences in table_definition and get the data only from
mapping_protocol when both exists
:return: {"column_name": ["column_type(str)", "target"]}
'''
definitions = self.name + '.json'
logger.debug("Acquiring definitions from %s", definitions)
definitions = os.path.join(settings.TABLE_DEFINITIONS_FOLDER, definitions)
definitions = json.loads(open(definitions).read())
logger.debug("Definitions loaded")
self.check_definitions()
if self._protocol is None:
if self._definitions.columns is None:
raise MissingProtocolError("You must first load a protocol or add columns to the table definition")
else:
logger.warning("Table creation will be entirely based on the table definition")
return self._definitions.columns
else:
column_dict = {}
for column in self._protocol.get_targets():
try:
column = self._protocol.dbcolumn_from_target(column)
except InvalidTargetError:
continue
if column[0]:
column[0] = column[0].strip()
column_dict[column[0]] = [column[1], self._protocol.target_from_dbcolumn(column[0])]
if not ignore_diff and self._definitions.columns:
for c_name, c_type in self._definitions.columns.items():
if c_name not in column_dict.keys():
prompt = input("The column {} is not present on the mapping protocol but is on the "
"table definition, should it exist ? (Y/n): ".format(c_name))
if prompt.upper() in ['', 'Y']:
print("Column {} will be created, please update the protocol later".format(c_name))
column_dict[c_name] = c_type
else:
print("Column {} will be removed from the table_definitions.".format(c_name))
return definitions
return column_dict
def load_protocol(self, protocol):
'''
@@ -312,7 +358,7 @@ class DatabaseTable(Table):
'''
Creates the mapping table in the database
'''
self.check_protocol()
self.check_definitions()
if bind is None:
bind = self.metadata.bind
@@ -325,7 +371,7 @@ class DatabaseTable(Table):
columns = [c[1] for c in self.columns.items()]
for c in columns:
column = {}
column['target_name'] = self._protocol.target_from_dbcolumn(c.name)
column['target_name'] = self._definitions.columns[c.name][1]
if not column['target_name']:
continue
column['name'] = c.name
@@ -339,9 +385,9 @@ class DatabaseTable(Table):
'''
Inserts or updates table entry in the sources table
'''
self.check_definitions()
if bind is None:
bind = self.metadata.bind
definitions = self.get_definitions()
source_table = gen_source_table(self.metadata)
# Create source table if doesnt exist
@@ -350,8 +396,6 @@ class DatabaseTable(Table):
source_table.create(bind=bind)
logger.debug("Source table creation: no exceptions.")
source = definitions['data_source']
logger.debug("Checking for '%s' in source table", self.name)
base_select = select([source_table.c.id]).where(source_table.c.table_name == self.name)
table_id = bind.execute(base_select).fetchone()
@@ -364,41 +408,37 @@ class DatabaseTable(Table):
logger.debug("Table not found. Running insert query")
base_query = insert(source_table)
base_query = base_query.values(table_name=self.name, source=source)
base_query = base_query.values(table_name=self.name, source=self._definitions.source)
bind.execute(base_query)
def map_from_protocol(self, create=False, bind=None):
def map_from_protocol(self, create=False, bind=None, ignore_defintions=False):
'''
Uses information from a protocol to generate self columns. Table definitions must also
be defined to allow primary key and foreign keys addition.
Uses information from a protocol or, if protocol is not present, from table definitions
to generate self columns.
Table definitions must also be defined to allow primary key and foreign keys addition.
Useful for table creation.
'''
self.check_protocol()
self.check_definitions()
if self.columns.keys():
logger.warning("Table mapping already has columns. Nothing done.")
return
if bind is None:
bind = self.metadata.bind
definitions = self.get_definitions()
for column in self._protocol.get_targets():
try:
column = self._protocol.dbcolumn_from_target(column)
except InvalidTargetError:
continue
if column[0]:
column[0] = column[0].strip()
column = Column(column[0], get_type(column[1]))
column_dict = self.get_columns_dict(ignore_defintions)
for c_name, c_type in column_dict.items():
column = Column(c_name, get_type(c_type[0]))
self.append_column(column)
primary_key = [self.columns.get(c) for c in definitions['pk']]
self._definitions.update_columns(column_dict)
primary_key = [self.columns.get(c) for c in self._definitions.pkcolumns]
if primary_key:
self.constraints.add(PrimaryKeyConstraint(*primary_key))
for foreign_key in definitions["foreign_keys"]:
for foreign_key in self._definitions.fkcolumns:
keys = [self.columns.get(c) for c in foreign_key["keys"]]
ref_table = DatabaseTable(foreign_key["reference_table"], self.metadata)
@@ -407,6 +447,7 @@ class DatabaseTable(Table):
try:
protocol.load_csv(protocol_path)
ref_table.load_protocol(protocol)
ref_table.gen_definitions()
ref_table.map_from_protocol(create=create, bind=bind)
if create:
ref_table.create(bind=bind)
@@ -421,7 +462,7 @@ class DatabaseTable(Table):
self.constraints.add(ForeignKeyConstraint(keys, fkeys))
def create(self, bind=None, checkfirst=False):
def create(self, bind=None, checkfirst=False, ignore_definitions=False):
'''
Overrides sqlalchemy's create method to use map_from_protocol before creating.
'''
@@ -431,7 +472,7 @@ class DatabaseTable(Table):
logger.error("Table %s already exists", self.name)
return
self.map_from_protocol(create=True, bind=bind)
self.map_from_protocol(create=True, bind=bind, ignore_defintions=ignore_definitions)
super().create(bind=bind, checkfirst=checkfirst)
@@ -480,7 +521,6 @@ class DatabaseTable(Table):
field_type = get_type(field_type)
if target is not None and self._mapping_table.exists():
entry = {
'target_name': target,
@@ -575,13 +615,14 @@ class DatabaseTable(Table):
base_update = update(self).values(**values)
for original_pk, temp_pk in zip(list(self.primary_key.columns), temp_pk_columns):
base_update = base_update.where(original_pk == temp_pk)
connection.execute(base_update)
trans.commit()
def compare_mapping(self):
'''
Compares contents of mapping table to protocol and returns tuple with differences in
Compares contents of mapping table to table definitions and returns tuple with differences in
the following format:
new_columns, to_drop_columns, update_columns
@@ -592,19 +633,18 @@ class DatabaseTable(Table):
The method uses target_names as the criteria to decide if columns are the same or not.
'''
self.check_protocol()
protocol_target_list = self._protocol.get_targets()
self.check_definitions()
target_list = self._definitions.get_targets()
query = self._mapping_table.select()
results = self.metadata.bind.execute(query).fetchall()
db_target_list = [t[1] for t in results]
new_columns = [c for c in protocol_target_list if c not in db_target_list]
to_drop_columns = [c for c in db_target_list if c not in protocol_target_list]
new_columns = [c for c in target_list if c not in db_target_list and c != '']
to_drop_columns = [c for c in db_target_list if c not in target_list]
update_columns = []
for target in protocol_target_list:
for target in target_list:
query = select([self._mapping_table.c.name, self._mapping_table.c.type])\
.where(self._mapping_table.c.target_name == target)
result = self.metadata.bind.execute(query).fetchone()
@@ -612,7 +652,7 @@ class DatabaseTable(Table):
continue
name, field_type = result
try:
new_name, new_type = self._protocol.dbcolumn_from_target(target)
new_name, new_type = self._definitions.get_dbcolumn_from_target(target)
except InvalidTargetError:
to_drop_columns.append(target)
continue
@@ -628,20 +668,24 @@ class DatabaseTable(Table):
return new_columns, to_drop_columns, update_columns
def remap(self):
def remap(self, auto_confirmation=True, verify_definitions=False):
'''
Checks mapping protocol for differences in table structure - then
attempts to apply differences according to what is recorded in the
mapping table
mapping table.
If verify_definitions is set it will ask any difference between mapping_protocol and table_definition
'''
self.check_definitions()
if not self.exists():
print("Table {} doesn't exist".format(self.name))
return
self.check_protocol()
mtable = self._mapping_table
# Update table definitions
column_dict = self.get_columns_dict(ignore_diff=not verify_definitions)
self._definitions.update_columns(column_dict)
if not mtable.exists():
print("Mapping table for {} not found.".format(self.name))
print("Creating mapping table...")
@@ -652,17 +696,36 @@ class DatabaseTable(Table):
new_columns, to_drop_columns, update_columns = self.compare_mapping()
accept_new_columns, accept_drop_columns, accept_update_columns = [True for _ in range(3)]
if not auto_confirmation:
if new_columns:
print('The following columns will be CREATED:', ', '.join(new_columns))
prompt = input('Is it right (y/N)? ')
accept_new_columns = prompt == 'yes' or prompt == 'y' or prompt == 1
if to_drop_columns:
print('The following columns will be DROPPED:', ', '.join(to_drop_columns))
prompt = input('Is it right (y/N)? ')
accept_drop_columns = prompt == 'yes' or prompt == 'y' or prompt == 1
if update_columns:
update_list = [update_dict['name'] + ' -new name: ' + update_dict['new_name']
+ ' -new type: ' + update_dict['new_type'] for update_dict in update_columns]
print('The following columns will be UPDATED:', ', '.join(update_list))
prompt = input('Is it right (y/N)? ')
accept_update_columns = prompt == 'yes' or prompt == 'y' or prompt == 1
with self.metadata.bind.connect() as connection:
# Create new columns
for column in new_columns:
if accept_new_columns:
for target in new_columns:
try:
dbcolumn = self._protocol.dbcolumn_from_target(column)
dbcolumn = self._definitions.get_dbcolumn_from_target(target)
except InvalidTargetError:
continue
self.add_column(dbcolumn[0], dbcolumn[1], column, bind=connection)
self.add_column(dbcolumn[0], dbcolumn[1], target, bind=connection)
# Drop columns
if accept_drop_columns:
for column in to_drop_columns:
column_name = select([mtable.c.name]).where(mtable.c.target_name == column)
column_name = connection.execute(column_name).fetchone()[0]
@@ -672,6 +735,7 @@ class DatabaseTable(Table):
self.drop_column(column_name, column, bind=connection)
# Update existing columns
if accept_update_columns:
self.transfer_data(connection, update_columns)
def _get_variable_target(self, original, year):
@@ -738,7 +802,8 @@ class DatabaseTable(Table):
if table is self:
return self._derivative_recursion(column, year, recursion_list)
derivative = table._resolv_derivative(column, year)
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': 0,
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': 0, 'dbmapped': True,
'new': '.'.join([table.name, derivative['dbcolumn'][0]])}
return self._derivatives[target]
@@ -761,15 +826,17 @@ class DatabaseTable(Table):
level = derivative['level'] + 1
processed = original
dbmapped = False # column neded to execute the derivative is present on table or need a file.
for substitution in substitutions:
processed = re.sub(substitution['original'], substitution['new'], processed)
dbmapped = True
self._derivatives[target] = {'original': original, 'dbcolumn': dbcolumn, 'level': level,
'processed': processed}
'processed': processed, 'dbmapped': dbmapped}
return self._derivatives[target]
def _resolv_derivative(self, original, year):
'''
Populates self._derivatives with all necessary derivatives to satify original in a given
Populates self._derivatives with all necessary derivatives to satisfy original in a given
year.
'''
if not hasattr(self, '_derivatives'):
@@ -798,10 +865,10 @@ class DatabaseTable(Table):
fk_column = ttable.columns.get(fk_column.name)
query = query.where(fk_column == fkey)
if year:
query = query.where(ttable.c.ano_censo == year)
query = query.where(ttable.columns.get(settings.YEAR_COLUMN) == year)
yield query
def apply_derivatives(self, ttable, columns, year, bind=None):
def apply_derivatives(self, ttable, columns, year, bind=None, dbonly=False):
'''
Given a list of columns, searches for derivatives and denormalizations and applies them
in the appropriate order. Dependencies will be updated regardless of being or not in the
@@ -825,16 +892,17 @@ class DatabaseTable(Table):
ttable.schema = t_schema
if len(self._derivatives) > 0:
max_level = max([self._derivatives[d]['level'] for d in self._derivatives])
derivative_levels = []
for i in range(max_level):
i = i+1
query = {}
level = [self._derivatives[d] for d in self._derivatives if\
self._derivatives[d]['level'] == i]
for derivative in level:
if not dbonly or derivative['dbmapped']:
query[derivative['dbcolumn'][0]] = text(derivative['processed'])
query = update(ttable).values(**query)
bind.execute(query)
return self._derivatives
@@ -861,10 +929,15 @@ class DatabaseTable(Table):
referred_table.map_from_database()
selecter = select([getattr(func, aggregation)(source_column)])
for fk_column, fkey in referred_table.get_relations(self):
try:
fk_tuples = [(fk_column, fkey) for fk_column, fkey in referred_table.get_relations(self)]
except MissingForeignKeyError:
fk_tuples = [(fk_column, fkey) for fk_column, fkey in self.get_relations(referred_table)]
for fk_column, fkey in fk_tuples:
selecter = selecter.where(fk_column == fkey)
if year:
selecter = selecter.where(self.c.ano_censo == year)
selecter = selecter.where(self.columns.get(settings.YEAR_COLUMN) == year)
query = update(self).values(**{column.name: selecter})
@@ -891,6 +964,11 @@ class DatabaseTable(Table):
query = self._aggregate(column, func, source_column, year)
bind.execute(query)
# Run derivatives
ttable = self.create_temporary_mirror(year, bind)
self.apply_derivatives(ttable, ttable.columns.keys(), year, bind, dbonly=True)
self.update_from_temporary(ttable, ttable.columns.keys(), bind)
def get_relations(self, table):
'''
Yields relations between two tables in format
@@ -963,6 +1041,19 @@ class DatabaseTable(Table):
ttable.schema = temp_schema
def check_definitions(self):
''' Raises MissingDefinitionsError if the definitions is not loaded.'''
if self._definitions is None:
raise MissingDefinitionsError('You must first load the table Definitions')
def gen_definitions(self, keys=None):
''' Associates a Definitions object to the table '''
logger.debug('Generating Definitions.')
if not self._definitions:
self._definitions = Definitions(self.name, keys)
else:
logger.debug('Table definitions already loaded, nothing done.')
def gen_data_table(table, meta):
'''Returns a DatabaseTable instance with associated mapping protocol'''
table = DatabaseTable(table, meta)
Loading