Skip to content

Commit

Permalink
Better connect contract to tenders and examptions
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 6, 2018
1 parent e2ede74 commit 949f26b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from itertools import chain
import json

from datapackage_pipelines.wrapper import ingest, spew

Expand All @@ -8,15 +9,13 @@
tenders = {}
FIELDS = (
'tender_type', 'tender_id', 'publication_id', 'regulation', 'description')
key_fields = ('publication_id', 'tender_type', 'tender_id')


def collect_tenders(res):
for row in res:
pid = row['publication_id']
if not pid:
pid = row['tender_id']
pid = str(pid)
tenders[pid] = dict(
key = tuple(str(row(k)) for k in key_fields)
tenders[key] = dict(
(k, v)
for k, v in row.items()
if k in FIELDS
Expand All @@ -26,9 +25,10 @@ def collect_tenders(res):
def join_tenders(res):
for row in res:
manof_excerpts = []
manof_refs = row['manof_ref']
if isinstance(manof_refs, list):
for i in manof_refs:
tender_keys = row['tender_key']
if isinstance(tender_keys, list):
for tender_key in tender_keys:
i = tuple(json.loads(tender_key))
t = tenders.get(i)
if t:
manof_excerpts.append(dict(
Expand Down Expand Up @@ -63,17 +63,19 @@ def process_datapackage(dp):
lambda x: x['name'] == 'contract-spending',
dp['resources']
)))
contract_spending_res['schema']['fields'].append({
'name': 'manof_excerpts',
'type': 'array',
'es:itemType': 'object',
'es:schema': {
'fields': list(filter(
lambda x: x['name'] in FIELDS,
tenders_fields
))
contract_spending_res['schema']['fields'].extend([
{
'name': 'manof_excerpts',
'type': 'array',
'es:itemType': 'object',
'es:schema': {
'fields': list(filter(
lambda x: x['name'] in FIELDS,
tenders_fields
))
}
}
})
])
return dp


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ quarterly-contract-spending-reports-data:
resource: quarterly-contract-spending-reports
table: quarterly_contract_spending_reports
- run: stream_remote_resources
- run: resolve_manof_ref
- run: dump.to_path
parameters:
out-path: /var/datapackages/procurement/spending/quarterly-contract-spending-reports-data
Expand Down Expand Up @@ -237,6 +238,8 @@ latest-contract-spending:
aggregate: last
manof_ref:
aggregate: set
tender_key:
aggregate: set
volume:
aggregate: last
purchase_method:
Expand Down Expand Up @@ -309,6 +312,8 @@ latest-contract-spending:
es:hebrew: true
manof_ref:
es:itemType: string
tender_key:
es:itemType: string
publisher:
es:itemType: string
purchase_method:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import logging
import json

from datapackage_pipelines.wrapper import process

from sqlalchemy import create_engine
from sqlalchemy.exc import ProgrammingError, OperationalError

TK = 'tender_key'
db_table = 'procurement_tenders_processed'
connection_string = os.environ['DPP_DB_ENGINE']
engine = create_engine(connection_string)

key_fields = ('publication_id', 'tender_type', 'tender_id')
to_select = ','.join(key_fields)

all_tenders = set()
for result in engine.execute(f'select {to_select} from {db_table}'):
all_tenders.add(tuple(str(result[k]) for k in key_fields))

logging.info('Collected %d tenders and exemptions', len(all_tenders))

def modify_datapackage(dp, *_):
dp['resources'][0]['schema']['fields'].extend([dict(
name = TK,
type = 'string'
) for k in key_fields])
return dp

def process_row(row, *_):
mf = row['manof_ref']
if mf:
mf = mf.strip()
if mf:
for t in all_tenders:
if (t[0] and t[0] in mf) or (t[2] and t[2] in mf):
row[TK] = json.dumps(list(t))
break
if TK not in row:
row[TK] = None
logging.info('Failed to find reference for "%s"', mf)
else:
row[TK] = None
return row

if __name__ == '__main__':
process(modify_datapackage=modify_datapackage,
process_row=process_row)

0 comments on commit 949f26b

Please sign in to comment.