refactor(project): move files into specific folders
This commit is contained in:
125
disclosures/financial_disclosure.py
Normal file
125
disclosures/financial_disclosure.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
import os
|
||||
from util.telegram_bot import Telegram
|
||||
from util.db import DB
|
||||
from dotenv import load_dotenv
|
||||
import asyncio
|
||||
import datetime
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class Disclosures:
|
||||
def __init__(
|
||||
self, telegram_api_key, telegram_channel, db_name, schema_path, seed_path
|
||||
):
|
||||
self.telegram = Telegram(telegram_api_key, telegram_channel)
|
||||
self.db = DB(db_name, schema_path, seed_path)
|
||||
|
||||
async def send_message(self, message, return_value=True):
|
||||
try:
|
||||
await self.telegram.send_message(message)
|
||||
return return_value
|
||||
except Exception as e:
|
||||
self.log(f"Error sending message: {e}, message: {message}, return_value: {return_value}")
|
||||
return False
|
||||
|
||||
def getDocuments(self, name="pelosi"):
|
||||
disclosures_url = "https://disclosures-clerk.house.gov"
|
||||
data = {"LastName": name}
|
||||
response = requests.post(
|
||||
f"{disclosures_url}/FinancialDisclosure/ViewMemberSearchResult", data=data
|
||||
)
|
||||
|
||||
parsed_html = BeautifulSoup(response.text, "html.parser")
|
||||
fillings = parsed_html.find_all("tr", attrs={"role": "row"})
|
||||
fillings.pop(0)
|
||||
|
||||
# sort fillings by year
|
||||
fillings.sort(
|
||||
key=lambda x: int(
|
||||
x.find_all("td", attrs={"data-label": "Filing Year"})[0].text
|
||||
)
|
||||
)
|
||||
documents = []
|
||||
for filling in fillings:
|
||||
year = self.formatString(
|
||||
filling.find_all("td", attrs={"data-label": "Filing Year"})[0].text
|
||||
)
|
||||
name = self.formatString(
|
||||
filling.find_all("td", attrs={"data-label": "Name"})[0].text
|
||||
)
|
||||
filing = self.formatString(
|
||||
filling.find_all("td", attrs={"data-label": "Filing"})[0].text
|
||||
)
|
||||
url = f'{disclosures_url}/{filling.a.get("href")}'
|
||||
documents.append({"name": name, "year": year, "filing": filing, "url": url})
|
||||
|
||||
return documents
|
||||
|
||||
def formatString(self, text):
|
||||
# remove \n and \t
|
||||
text = text.replace("\n", " ").replace("\t", " ")
|
||||
# remove extra spaces
|
||||
text = " ".join(text.split())
|
||||
return text
|
||||
|
||||
def prepareValues(self, documents):
|
||||
already_inserted = self.db.query("SELECT link FROM disclosures;")
|
||||
already_inserted = [x[0] for x in already_inserted]
|
||||
values = []
|
||||
for member_id, d in documents.items():
|
||||
for document in d:
|
||||
if document["url"] not in already_inserted:
|
||||
values.append(
|
||||
(
|
||||
member_id,
|
||||
int(document["year"]),
|
||||
document["filing"],
|
||||
document["url"],
|
||||
)
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
def insertDisclosures(self, values):
|
||||
self.db.insertDisclosures(values)
|
||||
|
||||
async def run(self):
|
||||
members = self.db.query("SELECT * FROM members;")
|
||||
documents = {}
|
||||
for id, name in members:
|
||||
documents[id] = self.getDocuments(name)
|
||||
values = self.prepareValues(documents)
|
||||
tasks = []
|
||||
sent = []
|
||||
for v in values:
|
||||
message = f"New disclosure from {[name for id, name in members if id == v[0]][0]} for the year {v[1]}. {v[2]} {v[3]}"
|
||||
tasks.append(asyncio.create_task(self.send_message(message, v)))
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
results = await asyncio.gather(*tasks)
|
||||
sent = [r for r in results if r]
|
||||
|
||||
self.insertDisclosures(sent)
|
||||
|
||||
self.log(
|
||||
f"Sent {len(sent)} disclosures. Inserted {len(sent)} disclosures. Total disclosures {len(values)}"
|
||||
)
|
||||
|
||||
def log(self, message):
|
||||
print(f"{datetime.datetime.now()} - {message}")
|
||||
|
||||
|
||||
d = Disclosures(
|
||||
os.getenv("TELEGRAM_API_KEY"),
|
||||
os.getenv("TELEGRAM_CHANNEL_ID"),
|
||||
os.getenv("DB_PATH"),
|
||||
os.getenv("SCHEMA_PATH"),
|
||||
os.getenv("SEED_PATH"),
|
||||
)
|
||||
try:
|
||||
asyncio.run(d.run())
|
||||
except Exception as e:
|
||||
d.log(f"Error while running: {e}")
|
||||
24
disclosures/util/db.py
Normal file
24
disclosures/util/db.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
class DB:
|
||||
def __init__(self, db_path, schema_path="schema.sql", seed_path="seed.sql"):
|
||||
self.con = sqlite3.connect(db_path)
|
||||
self.cur = self.con.cursor()
|
||||
with open(schema_path) as f:
|
||||
self.cur.executescript(f.read())
|
||||
|
||||
with open(seed_path) as f:
|
||||
self.cur.executescript(f.read())
|
||||
|
||||
def __del__(self):
|
||||
self.con.close()
|
||||
|
||||
def query(self, query, params=[]):
|
||||
self.cur.execute(query, params)
|
||||
return self.cur.fetchall()
|
||||
|
||||
def insertDisclosures(self, disclosures):
|
||||
sql = "INSERT INTO disclosures (member_id, filing_year, filing, link) VALUES (?, ?, ?, ?)"
|
||||
self.cur.executemany(sql, disclosures)
|
||||
self.con.commit()
|
||||
11
disclosures/util/telegram_bot.py
Normal file
11
disclosures/util/telegram_bot.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import telegram
|
||||
|
||||
class Telegram:
|
||||
def __init__(self, token, channel_id):
|
||||
self.token = token
|
||||
self.bot = telegram.Bot(token=token)
|
||||
self.channel_id = channel_id
|
||||
|
||||
|
||||
async def send_message(self, message: str):
|
||||
await self.bot.send_message(chat_id=self.channel_id, text=message)
|
||||
Reference in New Issue
Block a user