Select Git revision
-
gabriellisboaconegero authoredgabriellisboaconegero authored
actions.py 6.48 KiB
'''
Copyright (C) 2016 Centro de Computacao Cientifica e Software Livre
Departamento de Informatica - Universidade Federal do Parana - C3SL/UFPR
This file is part of HOTMapper.
HOTMapper is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
HOTMapper is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with HOTMapper. If not, see <https://www.gnu.org/licenses/>.
'''
'''Database manipulation actions - these can be used as models for other modules.'''
import logging
from sqlalchemy import create_engine, MetaData, text
from os import chdir
from datetime import datetime
from database.base import MissingTableError
from database.database_table import gen_data_table, copy_tabbed_to_csv
import settings
from database.groups import DATA_GROUP, DATABASE_TABLE_NAME
import pandas as pd
ENGINE = create_engine(settings.DATABASE_URI, echo=settings.ECHO)
META = MetaData(bind=ENGINE)
logging.basicConfig(format = settings.LOGGING_FORMAT)
logger = logging.getLogger(__name__)
database_table_logger = logging.getLogger('database.database_table')
database_table_logger.setLevel(settings.LOGGING_LEVEL)
protocol_logger = logging.getLogger('database.protocol')
protocol_logger.setLevel(settings.LOGGING_LEVEL)
sqlalchemy_logger = logging.getLogger('sqlalchemy.engine')
sqlalchemy_logger.setLevel(settings.LOGGING_LEVEL)
def temporary_data(connection, file_name, table, year, offset=2,
delimiters=[';', '\\n', '"'], null=''):
header = pd.read_csv(file_name, encoding="ISO-8859-9", sep=delimiters[0], nrows=1)
header = [h.strip() for h in header.columns.values]
ttable = table.get_temporary(header, year)
ttable.create(bind=connection)
file_name = settings.db_env_file_path(file_name)
table.populate_temporary(ttable, file_name, header, year, delimiters, null, offset, bind=connection)
table.apply_derivatives(ttable, ttable.columns.keys(), year, bind=connection)
table.add_pk_to_temporary(ttable, bind=connection)
return ttable
def insert(file_name, table, year, offset=2, delimiters=[';', '\\n', '"'], null='', notifybackup=None):
'''Inserts contents of csv in file_name in table using year as index for mapping'''
table = gen_data_table(table, META)
table.map_from_database()
if not table.exists():
raise MissingTableError(table.name)
with ENGINE.connect() as connection:
trans = connection.begin()
ttable = temporary_data(connection, file_name, table, year, offset, delimiters, null)
table.insert_from_temporary(ttable, bind=connection)
ttable.drop()
trans.commit()
def create(table, ignore_definitions=False):
'''Creates table from mapping_protocol metadata'''
table = gen_data_table(table, META)
table.gen_definitions()
with ENGINE.connect() as connection:
trans = connection.begin()
table.create(bind=connection, ignore_definitions=ignore_definitions)
table.set_source(bind=connection)
table.create_mapping_table(bind=connection)
trans.commit()
def drop(table):
'''Drops table'''
table = gen_data_table(table, META)
table.drop()
def remap(table, auto_confirmation=True, verify_definitions=False):
'''Applies change made in mapping protocols to database'''
table = gen_data_table(table, META)
table.gen_definitions()
table.map_from_database()
table.remap(auto_confirmation, verify_definitions)
def csv_from_tabbed(table_name, input_file, output_file, year, sep=';'):
table = gen_data_table(table_name, META)
protocol = table.get_protocol()
column_names, column_mappings = protocol.get_tabbed_mapping(year)
copy_tabbed_to_csv(input_file, column_mappings, settings.CHUNK_SIZE, output_file,
column_names=column_names, sep=sep)
def update_from_file(file_name, table, year, columns=None,
offset=2, delimiters=[';', '\\n', '"'], null=''):
'''Updates table columns from an input csv file'''
table = gen_data_table(table, META)
table.map_from_database()
if not table.exists():
raise MissingTableError(table.name)
if columns is None:
columns = [c.name for c in table.columns]
with ENGINE.connect() as connection:
trans = connection.begin()
ttable = temporary_data(connection, file_name, table, year, offset, delimiters, null)
table.update_from_temporary(ttable, columns, bind=connection)
ttable.drop()
trans.commit()
def run_aggregations(table, year):
'''
Runs aggregation queries from protocol
'''
table = gen_data_table(table, META)
table.map_from_database()
with ENGINE.connect() as connection:
trans = connection.begin()
table.run_aggregations(year, bind=connection)
trans.commit()
def generate_backup():
'''Create/Recriate file monitored by backup script in production'''
chdir(settings.BACKUP_FOLDER)
f = open(settings.BACKUP_FILE,"w")
f.write(str(datetime.now()))
f.close()
def execute_sql_script(sql_scripts, sql_path=settings.SCRIPTS_FOLDER):
if type(sql_scripts) == str:
sql_scripts = [sql_scripts]
with ENGINE.connect() as connection:
trans = connection.begin()
for script in sql_scripts:
with open(sql_path + '/' + script) as sql:
connection.execute(text(sql.read()))
trans.commit()
def execute_sql_group(script_group, sql_path=settings.SCRIPTS_FOLDER, files=False):
if not files:
sql_script = [DATA_GROUP[group.upper()] for group in script_group.split(",")]
else:
sql_script = script_group.split(",")
for sql in sql_script:
execute_sql_script(sql, sql_path + '/')
def drop_group(script_group, files=False):
script_group = script_group.split(",")
selected_tables = []
if not files:
for group in script_group:
selected_tables += DATA_GROUP[group.upper()]
else:
selected_tables = script_group
for table in reversed(selected_tables):
if table in DATABASE_TABLE_NAME:
table_name = DATABASE_TABLE_NAME[table]
else:
table_name = table.replace('.sql', '')
drop(table_name)