From c230788310bb01d4afbccf9ffff4ae7ae34d0dd1 Mon Sep 17 00:00:00 2001 From: Geoffrey Aldebert Date: Sat, 17 Sep 2022 17:40:09 +0200 Subject: [PATCH 1/5] Add filters for numeric fields - greater than or less than --- csvapi/tableview.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/csvapi/tableview.py b/csvapi/tableview.py index a3b8a76..18ea73f 100644 --- a/csvapi/tableview.py +++ b/csvapi/tableview.py @@ -72,17 +72,25 @@ def add_filters_to_sql(self, sql, filters): comparator = f_key.split('__')[1] column = f_key.split('__')[0] normalized_column = slugify(column, separator='_') + print(f_value) if comparator == 'exact': wheres.append(f"[{column}] = :filter_value_{normalized_column}") params[f'filter_value_{normalized_column}'] = f_value elif comparator == 'contains': wheres.append(f"[{column}] LIKE :filter_value_{normalized_column}") params[f'filter_value_{normalized_column}'] = f'%{f_value}%' + elif comparator == 'less': + wheres.append(f"[{column}] <= :filter_value_l_{normalized_column}") + params[f'filter_value_l_{normalized_column}'] = float(f_value) + elif comparator == 'greater': + wheres.append(f"[{column}] >= :filter_value_gt_{normalized_column}") + params[f'filter_value_gt_{normalized_column}'] = float(f_value) else: app.logger.warning(f'Dropped unknown comparator in {f_key}') if wheres: sql += ' WHERE ' sql += ' AND '.join(wheres) + print(params) return sql, params async def data(self, db_info, export=False): From 655aad60b421e56b8d45cb3b5f543b0aab60fb1c Mon Sep 17 00:00:00 2001 From: Geoffrey Aldebert Date: Sat, 17 Sep 2022 17:40:17 +0200 Subject: [PATCH 2/5] Add tests --- tests/test_api.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_api.py b/tests/test_api.py index 811ae77..1f2477a 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -478,6 +478,48 @@ async def test_api_and_filters(rmock, uploaded_csv_filters, client): ] +async def test_api_filters_greater_float(rmock, csv_numeric, client): + content = csv_numeric.replace('', ';').encode('utf-8') + url = random_url() + rmock.get(url, body=content) + await client.get(f"/apify?url={url}") + res = await client.get(f"/api/{get_hash(url)}?value__greater=10") + assert res.status_code == 200 + jsonres = await res.json + print(jsonres) + assert jsonres['rows'] == [ + [3, 'c', 12], + ] + + +async def test_api_filters_less_float(rmock, csv_numeric, client): + content = csv_numeric.replace('', ';').encode('utf-8') + url = random_url() + rmock.get(url, body=content) + await client.get(f"/apify?url={url}") + res = await client.get(f"/api/{get_hash(url)}?value__less=3") + assert res.status_code == 200 + jsonres = await res.json + print(jsonres) + assert jsonres['rows'] == [ + [1, 'a', 2], + ] + + +async def test_api_filters_less_greater_float(rmock, csv_numeric, client): + content = csv_numeric.replace('', ';').encode('utf-8') + url = random_url() + rmock.get(url, body=content) + await client.get(f"/apify?url={url}") + res = await client.get(f"/api/{get_hash(url)}?value__greater=3&value__less=10") + assert res.status_code == 200 + jsonres = await res.json + print(jsonres) + assert jsonres['rows'] == [ + [2, 'b', 4], + ] + + async def test_api_filters_unnormalized_column(rmock, uploaded_csv_filters, client): res = await client.get(f"/api/{MOCK_CSV_HASH_FILTERS}?id__contains=fir&another column__contains=value") assert res.status_code == 200 From 6123827b3a6766c6f88bd764f806bb4c2d19dafb Mon Sep 17 00:00:00 2001 From: Geoffrey Aldebert Date: Sat, 17 Sep 2022 17:41:14 +0200 Subject: [PATCH 3/5] Remove prints --- csvapi/tableview.py | 2 -- tests/test_api.py | 1 - 2 files changed, 3 deletions(-) diff --git a/csvapi/tableview.py b/csvapi/tableview.py index 18ea73f..a0d98a1 100644 --- a/csvapi/tableview.py +++ b/csvapi/tableview.py @@ -72,7 +72,6 @@ def add_filters_to_sql(self, sql, filters): comparator = f_key.split('__')[1] column = f_key.split('__')[0] normalized_column = slugify(column, separator='_') - print(f_value) if comparator == 'exact': wheres.append(f"[{column}] = :filter_value_{normalized_column}") params[f'filter_value_{normalized_column}'] = f_value @@ -90,7 +89,6 @@ def add_filters_to_sql(self, sql, filters): if wheres: sql += ' WHERE ' sql += ' AND '.join(wheres) - print(params) return sql, params async def data(self, db_info, export=False): diff --git a/tests/test_api.py b/tests/test_api.py index 1f2477a..84e5d71 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -514,7 +514,6 @@ async def test_api_filters_less_greater_float(rmock, csv_numeric, client): res = await client.get(f"/api/{get_hash(url)}?value__greater=3&value__less=10") assert res.status_code == 200 jsonres = await res.json - print(jsonres) assert jsonres['rows'] == [ [2, 'b', 4], ] From 109f8b53984f42aa2033d79c2f32358f123ff7c8 Mon Sep 17 00:00:00 2001 From: Geoffrey Aldebert Date: Mon, 19 Sep 2022 12:16:42 +0200 Subject: [PATCH 4/5] Rerun analysis if we detect change on resource_url in datagouv API --- csvapi/parseview.py | 14 +++++++++----- csvapi/uploadview.py | 4 ++-- csvapi/utils.py | 34 ++++++++++++++++++++++++++++------ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/csvapi/parseview.py b/csvapi/parseview.py index 2951a84..41c37d5 100644 --- a/csvapi/parseview.py +++ b/csvapi/parseview.py @@ -13,12 +13,13 @@ from csvapi.errors import APIError from csvapi.parser import parse from csvapi.utils import ( - already_exists, + should_be_parsed, get_hash, check_csv_detective_report_structure, check_profile_report_structure, create_connection, - enrich_db_with_metadata + enrich_db_with_metadata, + get_dgv_infos ) from csv_detective.explore_csv import routine @@ -89,12 +90,15 @@ async def do_parse( ) return + dataset_id, resource_id, resource_url = get_dgv_infos(url) + enrich_db_with_metadata( urlhash, csv_detective_report, profile_report, - None, - None + dataset_id, + resource_id, + resource_url ) if not is_csv and analysis and analysis == 'yes': @@ -122,7 +126,7 @@ async def get(self): raise APIError('Malformed url parameter.', status=400) urlhash = get_hash(url) analysis = request.args.get('analysis') - if not await already_exists(urlhash, analysis): + if await should_be_parsed(urlhash, analysis, url): try: storage = app.config['DB_ROOT_DIR'] await self.do_parse(url=url, diff --git a/csvapi/uploadview.py b/csvapi/uploadview.py index 85bce83..61c19a9 100644 --- a/csvapi/uploadview.py +++ b/csvapi/uploadview.py @@ -5,7 +5,7 @@ from quart.views import MethodView from csvapi.errors import APIError -from csvapi.utils import get_hash_bytes, already_exists +from csvapi.utils import get_hash_bytes, should_be_parsed from csvapi.parser import parse @@ -18,7 +18,7 @@ async def post(self): raise APIError('Missing file.', status=400) content_hash = get_hash_bytes(_file.read()) _file.seek(0) - if not already_exists(content_hash): + if should_be_parsed(content_hash): storage = app.config['DB_ROOT_DIR'] sniff_limit = app.config.get('CSV_SNIFF_LIMIT') try: diff --git a/csvapi/utils.py b/csvapi/utils.py index 45de1ad..e70cc7c 100644 --- a/csvapi/utils.py +++ b/csvapi/utils.py @@ -7,6 +7,7 @@ import sqlite3 from datetime import datetime import pandas as pd +import requests executor = None @@ -36,19 +37,19 @@ def get_hash_bytes(to_hash): return hashlib.md5(to_hash).hexdigest() -async def already_exists(urlhash, analysis=None): +async def should_be_parsed(urlhash, analysis=None, url=None): ''' Check if db exist. If analysis is requested, we check if general_infos table exist. If not, we bypass cache and do a new download of file to analyse it with pp and csv-detective. ''' cache_enabled = app.config.get('CSV_CACHE_ENABLED') if not cache_enabled: - return False + return True db_exist = Path(get_db_info(urlhash)['db_path']).exists() if not analysis or analysis != 'yes': - return db_exist + return not db_exist else: conn = create_connection(get_db_info(urlhash)['db_path']) cur = conn.cursor() @@ -56,9 +57,18 @@ async def already_exists(urlhash, analysis=None): cur.execute(sql) rows = cur.fetchall() if rows[0][0] != 0: - return True - else: + dataset_id, resource_id, resource_url = get_dgv_infos(url) + if resource_url is not None: + sql = 'SELECT resource_url FROM general_infos LIMIT 1' + cur.execute(sql) + rows = cur.fetchall() + if resource_url == rows[0][0]: + return False + else: + return True return False + else: + return True def create_connection(db_file): @@ -129,7 +139,7 @@ def df_to_sql(obj, conn, name): df.to_sql(name, con=conn, if_exists='replace', index=False) -def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, dataset_id, key): +def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, dataset_id, key, resource_url): # Save to sql conn = create_connection(app.config['DB_ROOT_DIR'] + '/' + urlhash + '.db') @@ -146,6 +156,7 @@ def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, datas 'date_last_check': datetime.today().strftime('%Y-%m-%d'), 'dataset_id': dataset_id, 'resource_id': key, + 'resource_url': resource_url, 'filetype': 'csv' } ] @@ -221,3 +232,14 @@ def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, datas df_to_sql(numeric_plot_infos, conn, 'numeric_plot_infos') conn.commit() + + +def get_dgv_infos(url): + if "https://www.data.gouv.fr/fr/datasets/r/" not in url: + return None, None, None + rid = url.split('/')[-1] + r = requests.get('https://www.data.gouv.fr/api/2/datasets/resources/{}'.format(rid)) + if r.json()['resource']: + return r.json()['dataset_id'], r.json()['resource']['id'], r.json()['resource']['url'] + else: + return None, None, None From 56991a3af2286df45158d34090a7c48b0455b3e4 Mon Sep 17 00:00:00 2001 From: Geoffrey Aldebert Date: Fri, 7 Oct 2022 09:50:23 +0200 Subject: [PATCH 5/5] Add complex queries for generating graph --- csvapi/tableview.py | 100 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 14 deletions(-) diff --git a/csvapi/tableview.py b/csvapi/tableview.py index a0d98a1..ff23f24 100644 --- a/csvapi/tableview.py +++ b/csvapi/tableview.py @@ -73,17 +73,19 @@ def add_filters_to_sql(self, sql, filters): column = f_key.split('__')[0] normalized_column = slugify(column, separator='_') if comparator == 'exact': - wheres.append(f"[{column}] = :filter_value_{normalized_column}") - params[f'filter_value_{normalized_column}'] = f_value + wheres.append("[{}] = '{}'".format(column, f_value)) elif comparator == 'contains': - wheres.append(f"[{column}] LIKE :filter_value_{normalized_column}") - params[f'filter_value_{normalized_column}'] = f'%{f_value}%' + wheres.append("[{}] LIKE '%{}%'".format(column, f_value)) elif comparator == 'less': - wheres.append(f"[{column}] <= :filter_value_l_{normalized_column}") - params[f'filter_value_l_{normalized_column}'] = float(f_value) + wheres.append("[{}] <= {}".format(column, float(f_value))) elif comparator == 'greater': - wheres.append(f"[{column}] >= :filter_value_gt_{normalized_column}") - params[f'filter_value_gt_{normalized_column}'] = float(f_value) + wheres.append("[{}] >= {}".format(column, float(f_value))) + elif comparator == 'after': + wheres.append("lower([{}]) > lower('{}')".format(column, f_value)) + elif comparator == 'before': + wheres.append("lower([{}]) < lower('{}')".format(column, f_value)) + elif comparator == 'different': + wheres.append("[{}] != '{}'".format(column, f_value)) else: app.logger.warning(f'Dropped unknown comparator in {f_key}') if wheres: @@ -91,6 +93,7 @@ def add_filters_to_sql(self, sql, filters): sql += ' AND '.join(wheres) return sql, params + async def data(self, db_info, export=False): limit = request.args.get('_size', ROWS_LIMIT) if not export else -1 rowid = not (request.args.get('_rowid') == 'hide') and not export @@ -108,6 +111,7 @@ async def data(self, db_info, export=False): cols = 'rowid, *' if rowid else '*' sql = 'SELECT {} FROM [{}]'.format(cols, db_info['table_name']) sql, params = self.add_filters_to_sql(sql, filters) + if sort: sql += f' ORDER BY [{sort}]' elif sort_desc: @@ -141,12 +145,7 @@ async def data(self, db_info, export=False): return res - async def get(self, urlhash): - db_info = get_db_info(urlhash) - p = Path(db_info['db_path']) - if not p.exists(): - raise APIError('Database has probably been removed.', status=404) - + async def rawdata(self, db_info): start = time.time() try: data = await self.data(db_info) @@ -181,6 +180,79 @@ async def get(self, urlhash): return jsonify(res) + async def vizdata(self, db_info): + start = time.time() + res = {} + try: + x = request.args.get('viz_axis_x', None) + y = request.args.get('viz_axis_y', None) + xtop = request.args.get('viz_axis_x_top', None) + op = request.args.get('viz_op', None) + gb = request.args.get('viz_axis_x_substring', None) + gb1 = '' + gb2 = '' + if gb is not None: + gb1 = 'SUBSTR(' + gb2 = ', 1, ' + gb + ')' + + filters = [] + for key, value in request.args.items(): + if not key.startswith('_') and '__' in key: + filters.append((key, value)) + + sql = 'SELECT * FROM [{}]'.format(db_info['table_name']) + sql, params = self.add_filters_to_sql(sql, filters) + + if(x and op in ['count', 'min', 'max', 'sum', 'avg']): + if op == 'count': + y = '*' + else: + y = '[' + y + ']' + + if not xtop: + sql = 'SELECT {}({}) as {}, {}[{}]{} FROM ({}) GROUP BY {}[{}]{}'.format(op, y, op, gb1, x, gb2, sql, gb1, x, gb2) + else: + sql = 'SELECT {}({}) as {}, {}t.[{}]{} FROM ({}) t JOIN (SELECT {}({}) as {}, {}[{}]{} FROM ({}) GROUP BY {}[{}]{} ORDER BY {} DESC LIMIT {}) t2 ON t.{}[{}]{} = {}t2.[{}]{} GROUP BY {}t.[{}]{}'.format(op, y, op, gb1, x, gb2, sql, op, y, op, gb1, x, gb2, sql, gb1, x, gb2, op, xtop, gb1, x, gb2, gb1, x, gb2, gb1, x, gb2) + + print(sql) + rows, description = await self.execute(sql, db_info, params=None) + columns = [r[0] for r in description] + resx = [] + resy = [] + cpt = 0 + for row in rows: + resx.append(row[1]) + resy.append(row[0]) + res['resx'] = resx + res['resy'] = resy + + + + + except (sqlite3.OperationalError, sqlite3.IntegrityError) as e: + raise APIError('Error selecting data', status=400, payload=dict(details=str(e))) + + end = time.time() + + res['ok'] = True, + res['query_ms'] = (end - start) * 1000 + + return jsonify(res) + + async def get(self, urlhash): + db_info = get_db_info(urlhash) + p = Path(db_info['db_path']) + if not p.exists(): + raise APIError('Database has probably been removed.', status=404) + + + viz = request.args.get('_viz', None) + if viz and viz == 'yes': + return await self.vizdata(db_info) + else: + return await self.rawdata(db_info) + + async def general_infos(self, db_info): params = {} sql = 'SELECT count(*) FROM sqlite_master WHERE type=\'table\' AND name=\'general_infos\''