diff --git a/page_analyzer/app.py b/page_analyzer/app.py index 42f06e6..dd863f2 100644 --- a/page_analyzer/app.py +++ b/page_analyzer/app.py @@ -30,24 +30,12 @@ def index(): @app.get('/urls') def urls_get(): conn = db.connect(app.config['DATABASE_URL']) - # можно спрятать try: - with conn: - urls = db.get_urls(conn) - with conn: - checks = db.get_last_checks(conn) + urls = db.get_urls_with_last_checks(conn) except psycopg2.Error as e: raise e finally: conn.close() - - if checks: - for url in urls: - check = [c for c in checks if c['url_id'] == url['id']] - if check: - url['status_code'] = check[0]['status_code'] - url['created_at'] = check[0]['created_at'] - return render_template('urls.html', urls=urls) @@ -62,23 +50,33 @@ def urls_post(): normalized_url = normalize_url(url_string) - with db.connect(app.config['DATABASE_URL']) as conn: + conn = db.connect(app.config['DATABASE_URL']) + try: url = db.get_url_by_name(conn, normalized_url) - if not url: + if url: + url_id = url.id + flash('Страница уже существует', 'info') + else: url_id = db.add_url(conn, normalized_url) flash('Страница успешно добавлена', 'success') - return redirect(url_for('url_info', id=url_id)) + except psycopg2.Error as e: + raise e + finally: + conn.close() - url_id = url.id - flash('Страница уже существует', 'info') return redirect(url_for('url_info', id=url_id)) @app.get('/urls/') def url_info(id: int): - with db.connect(app.config['DATABASE_URL']) as conn: + conn = db.connect(app.config['DATABASE_URL']) + try: url = db.get_url_by_id(conn, id) checks = db.get_url_checks(conn, id) + except psycopg2.Error as e: + raise e + finally: + conn.close() if not url: return abort(404) @@ -87,15 +85,20 @@ def url_info(id: int): @app.post('/urls//checks') def check_url(id: int): - with db.connect(app.config['DATABASE_URL']) as conn: + conn = db.connect(app.config['DATABASE_URL']) + try: url = db.get_url_by_id(conn, id) check = make_check(url.name) - if not check: + if check: + db.add_url_check(conn, check, id) + flash('Страница успешно проверена', 'success') + else: flash('Произошла ошибка при проверке', 'error') - return redirect(url_for('url_info', id=id)) - db.add_url_check(conn, check, id) + except psycopg2.Error as e: + raise e + finally: + conn.close() - flash('Страница успешно проверена', 'success') return redirect(url_for('url_info', id=id)) diff --git a/page_analyzer/db.py b/page_analyzer/db.py index d822c97..389112d 100644 --- a/page_analyzer/db.py +++ b/page_analyzer/db.py @@ -11,68 +11,89 @@ def connect(db_url): raise e -def execute(connection, factory, query: str, data=None, fetch: str = None): - with connection.cursor(cursor_factory=factory) as cursor: - cursor.execute(query, data) - result = None - match fetch: - case 'all': - result = cursor.fetchall() - case 'one': - result = cursor.fetchone() - return result - - def get_urls(connection): - query = 'SELECT id, name FROM urls ORDER BY id DESC' - urls = execute(connection, RealDictCursor, query, fetch='all') + with connection: + with connection.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute('SELECT id, name FROM urls ORDER BY id DESC') + urls = cursor.fetchall() return urls def get_last_checks(connection): - query = ''' - SELECT DISTINCT - ON (url_id) - url_id, - created_at, - status_code - FROM urls_checks - ORDER BY url_id DESC - ''' - last_checks = execute(connection, RealDictCursor, query, fetch='all') + with connection: + with connection.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute( + ''' + SELECT DISTINCT ON (url_id) + url_id, created_at, status_code + FROM urls_checks ORDER BY url_id DESC + ''' + ) + last_checks = cursor.fetchall() return last_checks +def get_urls_with_last_checks(connection): + urls = get_urls(connection) + checks = get_last_checks(connection) + if checks: + for url in urls: + check = [c for c in checks if c['url_id'] == url['id']] + if check: + url['status_code'] = check[0]['status_code'] + url['created_at'] = check[0]['created_at'] + return urls + + def get_url_checks(connection, url_check_id: int): - query = "SELECT * FROM urls_checks WHERE url_id = (%s)" - url_checks = execute(connection, NamedTupleCursor, query, (url_check_id,), fetch='all') + with connection: + with connection.cursor(cursor_factory=NamedTupleCursor) as cursor: + cursor.execute( + 'SELECT * FROM urls_checks WHERE url_id = (%s)', + (url_check_id,)) + url_checks = cursor.fetchall() return url_checks def get_url_by_id(connection, url_id: int): - query = "SELECT * FROM urls WHERE id = (%s)" - result = execute(connection, NamedTupleCursor, query, (url_id,), fetch='one') + with connection: + with connection.cursor(cursor_factory=NamedTupleCursor) as cursor: + cursor.execute( + 'SELECT * FROM urls WHERE id = (%s)', + (url_id,)) + result = cursor.fetchone() return result def get_url_by_name(connection, url: str): - query = "SELECT * FROM urls WHERE name = (%s)" - result = execute(connection, NamedTupleCursor, query, (url,), fetch='one') + with connection: + with connection.cursor(cursor_factory=NamedTupleCursor) as cursor: + cursor.execute( + 'SELECT * FROM urls WHERE name = (%s)', + (url,)) + result = cursor.fetchone() return result def add_url(connection, url: str): - query = "INSERT INTO urls (name) VALUES (%s) RETURNING id" - result = execute(connection, NamedTupleCursor, query, (url,), fetch='one') + with connection: + with connection.cursor(cursor_factory=NamedTupleCursor) as cursor: + cursor.execute('INSERT INTO urls (name) VALUES (%s) RETURNING id', + (url,)) + result = cursor.fetchone() return result.id def add_url_check(connection, check: dict, url_id: int): - query = """ - INSERT INTO urls_checks (url_id, status_code, - h1, title, description) - VALUES (%(url_id)s, %(status_code)s, %(h1)s, - %(title)s, %(description)s) - """ check['url_id'] = url_id - execute(connection, NamedTupleCursor, query, check) + with connection: + with connection.cursor(cursor_factory=NamedTupleCursor) as cursor: + cursor.execute( + ''' + INSERT INTO urls_checks (url_id, status_code, + h1, title, description) + VALUES (%(url_id)s, %(status_code)s, %(h1)s, + %(title)s, %(description)s) + ''', + check + )