Skip to content
Snippets Groups Projects
Commit 18bc8798 authored by Richard Fernando Heise Ferreira's avatar Richard Fernando Heise Ferreira
Browse files

Adding Indexer

parent bf594635
No related branches found
No related tags found
No related merge requests found
import csv
import os
import json
import requests
from datetime import datetime
from dotenv import load_dotenv
load_dotenv('../.env')
es_host = os.getenv('ELASTICSEARCH_HOST')
es_port = os.getenv('ELASTICSEARCH_PORT')
index_prefix = os.getenv('ELASTIC_INDEX_PREFIX')
# Elasticsearch credentials
es_username = os.getenv('ELASTICSEARCH_USER')
es_password = os.getenv('ELASTICSEARCH_PASSWORD')
print(es_host, es_port, es_password, es_username, index_prefix)
lo_file_path = './csvs/lo.csv'
users_file_path = './csvs/users.csv'
collections_file_path = './csvs/collections.csv'
def read_lo_metadata(csv_file_path):
print("\n\nREADING LOS\n\n")
metadata = []
with open(csv_file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile, delimiter="|")
for row in reader:
parsed_row = {
"id": int(row["id"]),
"name": row["name"],
"author": row["author"],
"description": row["description"],
"link": row["link"],
"created_at": datetime.fromisoformat(row["created_at"]) if row["created_at"] else None,
"user": row["user"],
"educational_stages": row["educational_stages"].split(",") if row["educational_stages"] else [],
"languages": row["languages"].split(",") if row["languages"] else [],
"subjects": row["subjects"].split(",") if row["subjects"] else [],
"license": row["license"],
"object_type": row["object_type"],
"resource_state": row["resource_state"],
"user": row["user"],
"user_id": row["user_id"],
"views": int(row["views"]),
"downloads": int(row["downloads"]),
"likes": int(row["likes"]),
"shares": int(row["shares"]),
"score": float(row["score"]),
"comments": int(row["comments"]),
"saves": int(row["saves"])
}
print(parsed_row)
metadata.append(parsed_row)
return metadata
def read_users_metadata(csv_file_path):
print("\n\nREADING USERS\n\n")
metadata = []
with open(csv_file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile, delimiter="|")
for row in reader:
parsed_row = {
"id": int(row["id"]),
"name": row["name"],
"username": row["username"],
"description": row["description"],
"roles": row["roles"].split(",") if row["roles"] else [],
"institutions": row["institutions"],
"created_at": datetime.fromisoformat(row["created_at"].split('.')[0]) if row["created_at"] else None, # cortar fora milissegundos
"is_active": row["is_active"] == "t" if row["is_active"] in ("t", "f") else None, # traduz o bool do postgres (t, f) para o do ES (true, false)
"score": float(row["score"]),
"likes": int(row["likes"]),
"followers": int(row["followers"]),
"approved_resources": int(row["approved_resources"])
}
print(parsed_row)
metadata.append(parsed_row)
return metadata
def read_collections_metadata(csv_file_path):
print("\n\nREADING COLLECTIONS\n\n")
metadata = []
with open(csv_file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile, delimiter="|")
for row in reader:
parsed_row = {
"id": int(row["id"]),
"name": row["name"],
"user": row["user"],
"user_id": row["user_id"],
"description": row["description"],
"created_at": datetime.fromisoformat(row["created_at"].split('.')[0]) if row["created_at"] else None, # cortar fora milissegundos
"is_private": row["is_private"] == "t" if row["is_private"] in ("t", "f") else None, # traduz o bool do postgres (t, f) para o do ES (true, false)
"is_active": row["is_active"] == "t" if row["is_active"] in ("t", "f") else None, # traduz o bool do postgres (t, f) para o do ES (true, false)
"score": float(row["score"]),
"views": int(row["views"]),
"downloads": int(row["downloads"]),
"likes": int(row["likes"]),
"shares": int(row["shares"]),
"followers": int(row["followers"])
}
print(parsed_row)
metadata.append(parsed_row)
return metadata
def index_metadata_to_elasticsearch(metadata, index):
for item in metadata:
es_doc = {key.lower(): (value.isoformat() if isinstance(value, datetime) else value) for key, value in item.items()}
# print('-------------------------')
# print(item['name'])
# print('-------------------------')
response = requests.post(
f'{es_host}:{es_port}/{index_prefix}_{index}/_doc',
headers={"Content-Type": "application/json"},
data=json.dumps(es_doc),
auth=(es_username, es_password) # Pass credentials here
)
print(response.json())
def main():
metadata = read_lo_metadata(lo_file_path)
index_metadata_to_elasticsearch(metadata, 'resources')
metadata = read_users_metadata(users_file_path)
index_metadata_to_elasticsearch(metadata, 'users')
metadata = read_collections_metadata(collections_file_path)
index_metadata_to_elasticsearch(metadata, 'collections')
if __name__ == "__main__":
main()
requests
dotenv
select
col.id,
col.name,
users.name as user,
users.id as user_id,
col.description,
col.created_at,
col.is_private,
col.is_active,
colst.score,
colst.views,
colst.downloads,
colst.likes,
colst.shares,
colst.follows as followers
from
collections col
left join users on users.id = col.user_id
left join collection_stats as colst on colst.id = col.id
group by
col.id,
col.name,
users.name,
users.id,
col.description,
col.created_at,
col.is_private,
col.is_active,
colst.score,
colst.views,
colst.downloads,
colst.likes,
colst.shares,
colst.follows;
select
re.id,
re.name,
re.author,
re.description,
re.link,
re.published_at as created_at,
string_agg(distinct es.name, ',') as educational_stages,
string_agg(distinct lg.name, ',') as languages,
string_agg(distinct sj.name, ',') as subjects,
lc.name as license,
ot.name as object_type,
re.state as resource_state,
us.name as user,
us.id as user_id,
rest.views as views,
rest.downloads as downloads,
rest.likes as likes,
rest.shares as shares,
rest.score as score,
rest.comments as comments,
count(distinct cr.id) as saves
from
resources re
left join resource_educational_stages res on res.resource_id = re.id
left join educational_stages es on res.educational_stage_id = es.id
left join resource_languages rel on rel.resource_id = re.id
left join languages lg on rel.language_id = lg.id
left join resource_subjects rs on rs.resource_id = re.id
left join subjects sj on rs.subject_id = sj.id
left join licenses lc on re.license_id = lc.id
left join object_types ot on re.object_type_id = ot.id
left join users us on us.id = re.user_id
left join resource_stats rest on rest.id = re.resource_stats_id
left join collection_resources cr on cr.resource_id = re.id
group by
re.id,
re.name,
re.author,
re.description,
re.link,
re.published_at,
re.state,
ot.name,
lc.name,
us.name,
us.id,
rest.views,
rest.downloads,
rest.likes,
rest.shares,
rest.score,
rest.comments;
select
users.id,
users.name,
users.username,
users.description,
string_agg(distinct roles.name, ',') as roles,
string_agg(distinct inst.name, ',') as institutions,
users.created_at,
users.is_active,
usst.score,
usst.likes_received as likes,
usst.followers,
usst.approved_resources
from
users
left join user_roles ur on users.id = ur.user_id
left join roles on roles.id = ur.role_id
left join user_institutions ui on ui.user_id = users.id
left join institutions inst on inst.id = ui.institution_id
left join user_stats usst on usst.id = users.user_stats_id
group by
users.id,
users.name,
users.username,
users.description,
users.created_at,
users.is_active,
usst.score,
usst.likes_received,
usst.followers,
usst.approved_resources;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment