Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complex queries #171

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions csvapi/parseview.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
from csvapi.errors import APIError
from csvapi.parser import parse
from csvapi.utils import (
already_exists,
should_be_parsed,
get_hash,
check_csv_detective_report_structure,
check_profile_report_structure,
create_connection,
enrich_db_with_metadata
enrich_db_with_metadata,
get_dgv_infos
)

from csv_detective.explore_csv import routine
Expand Down Expand Up @@ -89,12 +90,15 @@ async def do_parse(
)
return

dataset_id, resource_id, resource_url = get_dgv_infos(url)

enrich_db_with_metadata(
urlhash,
csv_detective_report,
profile_report,
None,
None
dataset_id,
resource_id,
resource_url
)

if not is_csv and analysis and analysis == 'yes':
Expand Down Expand Up @@ -122,7 +126,7 @@ async def get(self):
raise APIError('Malformed url parameter.', status=400)
urlhash = get_hash(url)
analysis = request.args.get('analysis')
if not await already_exists(urlhash, analysis):
if await should_be_parsed(urlhash, analysis, url):
try:
storage = app.config['DB_ROOT_DIR']
await self.do_parse(url=url,
Expand Down
98 changes: 88 additions & 10 deletions csvapi/tableview.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,27 @@ def add_filters_to_sql(self, sql, filters):
column = f_key.split('__')[0]
normalized_column = slugify(column, separator='_')
if comparator == 'exact':
wheres.append(f"[{column}] = :filter_value_{normalized_column}")
params[f'filter_value_{normalized_column}'] = f_value
wheres.append("[{}] = '{}'".format(column, f_value))
elif comparator == 'contains':
wheres.append(f"[{column}] LIKE :filter_value_{normalized_column}")
params[f'filter_value_{normalized_column}'] = f'%{f_value}%'
wheres.append("[{}] LIKE '%{}%'".format(column, f_value))
elif comparator == 'less':
wheres.append("[{}] <= {}".format(column, float(f_value)))
elif comparator == 'greater':
wheres.append("[{}] >= {}".format(column, float(f_value)))
elif comparator == 'after':
wheres.append("lower([{}]) > lower('{}')".format(column, f_value))
elif comparator == 'before':
wheres.append("lower([{}]) < lower('{}')".format(column, f_value))
elif comparator == 'different':
wheres.append("[{}] != '{}'".format(column, f_value))
else:
app.logger.warning(f'Dropped unknown comparator in {f_key}')
if wheres:
sql += ' WHERE '
sql += ' AND '.join(wheres)
return sql, params


async def data(self, db_info, export=False):
limit = request.args.get('_size', ROWS_LIMIT) if not export else -1
rowid = not (request.args.get('_rowid') == 'hide') and not export
Expand All @@ -102,6 +111,7 @@ async def data(self, db_info, export=False):
cols = 'rowid, *' if rowid else '*'
sql = 'SELECT {} FROM [{}]'.format(cols, db_info['table_name'])
sql, params = self.add_filters_to_sql(sql, filters)

if sort:
sql += f' ORDER BY [{sort}]'
elif sort_desc:
Expand Down Expand Up @@ -135,12 +145,7 @@ async def data(self, db_info, export=False):

return res

async def get(self, urlhash):
db_info = get_db_info(urlhash)
p = Path(db_info['db_path'])
if not p.exists():
raise APIError('Database has probably been removed.', status=404)

async def rawdata(self, db_info):
start = time.time()
try:
data = await self.data(db_info)
Expand Down Expand Up @@ -175,6 +180,79 @@ async def get(self, urlhash):

return jsonify(res)

async def vizdata(self, db_info):
start = time.time()
res = {}
try:
x = request.args.get('viz_axis_x', None)
y = request.args.get('viz_axis_y', None)
xtop = request.args.get('viz_axis_x_top', None)
op = request.args.get('viz_op', None)
gb = request.args.get('viz_axis_x_substring', None)
gb1 = ''
gb2 = ''
if gb is not None:
gb1 = 'SUBSTR('
gb2 = ', 1, ' + gb + ')'

filters = []
for key, value in request.args.items():
if not key.startswith('_') and '__' in key:
filters.append((key, value))

sql = 'SELECT * FROM [{}]'.format(db_info['table_name'])
sql, params = self.add_filters_to_sql(sql, filters)

if(x and op in ['count', 'min', 'max', 'sum', 'avg']):
if op == 'count':
y = '*'
else:
y = '[' + y + ']'

if not xtop:
sql = 'SELECT {}({}) as {}, {}[{}]{} FROM ({}) GROUP BY {}[{}]{}'.format(op, y, op, gb1, x, gb2, sql, gb1, x, gb2)
else:
sql = 'SELECT {}({}) as {}, {}t.[{}]{} FROM ({}) t JOIN (SELECT {}({}) as {}, {}[{}]{} FROM ({}) GROUP BY {}[{}]{} ORDER BY {} DESC LIMIT {}) t2 ON t.{}[{}]{} = {}t2.[{}]{} GROUP BY {}t.[{}]{}'.format(op, y, op, gb1, x, gb2, sql, op, y, op, gb1, x, gb2, sql, gb1, x, gb2, op, xtop, gb1, x, gb2, gb1, x, gb2, gb1, x, gb2)

print(sql)
rows, description = await self.execute(sql, db_info, params=None)
columns = [r[0] for r in description]
resx = []
resy = []
cpt = 0
for row in rows:
resx.append(row[1])
resy.append(row[0])
res['resx'] = resx
res['resy'] = resy




except (sqlite3.OperationalError, sqlite3.IntegrityError) as e:
raise APIError('Error selecting data', status=400, payload=dict(details=str(e)))

end = time.time()

res['ok'] = True,
res['query_ms'] = (end - start) * 1000

return jsonify(res)

async def get(self, urlhash):
db_info = get_db_info(urlhash)
p = Path(db_info['db_path'])
if not p.exists():
raise APIError('Database has probably been removed.', status=404)


viz = request.args.get('_viz', None)
if viz and viz == 'yes':
return await self.vizdata(db_info)
else:
return await self.rawdata(db_info)


async def general_infos(self, db_info):
params = {}
sql = 'SELECT count(*) FROM sqlite_master WHERE type=\'table\' AND name=\'general_infos\''
Expand Down
4 changes: 2 additions & 2 deletions csvapi/uploadview.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from quart.views import MethodView

from csvapi.errors import APIError
from csvapi.utils import get_hash_bytes, already_exists
from csvapi.utils import get_hash_bytes, should_be_parsed
from csvapi.parser import parse


Expand All @@ -18,7 +18,7 @@ async def post(self):
raise APIError('Missing file.', status=400)
content_hash = get_hash_bytes(_file.read())
_file.seek(0)
if not already_exists(content_hash):
if should_be_parsed(content_hash):
storage = app.config['DB_ROOT_DIR']
sniff_limit = app.config.get('CSV_SNIFF_LIMIT')
try:
Expand Down
34 changes: 28 additions & 6 deletions csvapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sqlite3
from datetime import datetime
import pandas as pd
import requests

executor = None

Expand Down Expand Up @@ -36,29 +37,38 @@ def get_hash_bytes(to_hash):
return hashlib.md5(to_hash).hexdigest()


async def already_exists(urlhash, analysis=None):
async def should_be_parsed(urlhash, analysis=None, url=None):
'''
Check if db exist. If analysis is requested, we check if general_infos table exist.
If not, we bypass cache and do a new download of file to analyse it with pp and csv-detective.
'''
cache_enabled = app.config.get('CSV_CACHE_ENABLED')
if not cache_enabled:
return False
return True

db_exist = Path(get_db_info(urlhash)['db_path']).exists()

if not analysis or analysis != 'yes':
return db_exist
return not db_exist
else:
conn = create_connection(get_db_info(urlhash)['db_path'])
cur = conn.cursor()
sql = 'SELECT count(*) FROM sqlite_master WHERE type=\'table\' AND name=\'general_infos\''
cur.execute(sql)
rows = cur.fetchall()
if rows[0][0] != 0:
return True
else:
dataset_id, resource_id, resource_url = get_dgv_infos(url)
if resource_url is not None:
sql = 'SELECT resource_url FROM general_infos LIMIT 1'
cur.execute(sql)
rows = cur.fetchall()
if resource_url == rows[0][0]:
return False
else:
return True
return False
else:
return True


def create_connection(db_file):
Expand Down Expand Up @@ -129,7 +139,7 @@ def df_to_sql(obj, conn, name):
df.to_sql(name, con=conn, if_exists='replace', index=False)


def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, dataset_id, key):
def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, dataset_id, key, resource_url):
# Save to sql
conn = create_connection(app.config['DB_ROOT_DIR'] + '/' + urlhash + '.db')

Expand All @@ -146,6 +156,7 @@ def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, datas
'date_last_check': datetime.today().strftime('%Y-%m-%d'),
'dataset_id': dataset_id,
'resource_id': key,
'resource_url': resource_url,
'filetype': 'csv'
}
]
Expand Down Expand Up @@ -221,3 +232,14 @@ def enrich_db_with_metadata(urlhash, csv_detective_report, profile_report, datas
df_to_sql(numeric_plot_infos, conn, 'numeric_plot_infos')

conn.commit()


def get_dgv_infos(url):
if "https://www.data.gouv.fr/fr/datasets/r/" not in url:
return None, None, None
rid = url.split('/')[-1]
r = requests.get('https://www.data.gouv.fr/api/2/datasets/resources/{}'.format(rid))
if r.json()['resource']:
return r.json()['dataset_id'], r.json()['resource']['id'], r.json()['resource']['url']
else:
return None, None, None
41 changes: 41 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,47 @@ async def test_api_and_filters(rmock, uploaded_csv_filters, client):
]


async def test_api_filters_greater_float(rmock, csv_numeric, client):
content = csv_numeric.replace('<sep>', ';').encode('utf-8')
url = random_url()
rmock.get(url, body=content)
await client.get(f"/apify?url={url}")
res = await client.get(f"/api/{get_hash(url)}?value__greater=10")
assert res.status_code == 200
jsonres = await res.json
print(jsonres)
assert jsonres['rows'] == [
[3, 'c', 12],
]


async def test_api_filters_less_float(rmock, csv_numeric, client):
content = csv_numeric.replace('<sep>', ';').encode('utf-8')
url = random_url()
rmock.get(url, body=content)
await client.get(f"/apify?url={url}")
res = await client.get(f"/api/{get_hash(url)}?value__less=3")
assert res.status_code == 200
jsonres = await res.json
print(jsonres)
assert jsonres['rows'] == [
[1, 'a', 2],
]


async def test_api_filters_less_greater_float(rmock, csv_numeric, client):
content = csv_numeric.replace('<sep>', ';').encode('utf-8')
url = random_url()
rmock.get(url, body=content)
await client.get(f"/apify?url={url}")
res = await client.get(f"/api/{get_hash(url)}?value__greater=3&value__less=10")
assert res.status_code == 200
jsonres = await res.json
assert jsonres['rows'] == [
[2, 'b', 4],
]


async def test_api_filters_unnormalized_column(rmock, uploaded_csv_filters, client):
res = await client.get(f"/api/{MOCK_CSV_HASH_FILTERS}?id__contains=fir&another column__contains=value")
assert res.status_code == 200
Expand Down