Skip to content

Commit

Permalink
Merge pull request #921 from hdoupe/outputs
Browse files Browse the repository at this point in the history
Merged #921
  • Loading branch information
hdoupe authored Aug 22, 2018
2 parents 9e19073 + b37390e commit a09a2b3
Show file tree
Hide file tree
Showing 170 changed files with 2,237 additions and 12,442 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ before_script:
- psql -c 'create database test_db;' -U postgres

script:
- py.test webapp/apps/
- py.test webapp/apps/ -v
81 changes: 44 additions & 37 deletions distributed/api/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

from celery import Celery

import taxcalc
import btax
from btax.front_end_util import runner_json_tables
import taxcalc

from collections import defaultdict


CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL',
Expand All @@ -21,83 +23,88 @@
)


def dropq_task(year_n, user_mods, first_budget_year, use_puf_not_cps=True,
def dropq_task(year_n, user_mods, start_year, use_puf_not_cps=True,
use_full_sample=True):
print(
'keywords to dropq',
dict(
year_n=year_n,
start_year=int(first_budget_year),
year_n_n=year_n,
start_year=int(start_year),
use_puf_not_cps=use_puf_not_cps,
use_full_sample=use_full_sample,
user_mods=user_mods
)
)

results = taxcalc.tbi.run_nth_year_taxcalc_model(
raw_data = taxcalc.tbi.run_nth_year_taxcalc_model(
year_n=year_n,
start_year=int(first_budget_year),
start_year=int(start_year),
use_puf_not_cps=use_puf_not_cps,
use_full_sample=use_full_sample,
user_mods=user_mods
)

return raw_data


def postprocess(ans, postprocess_func):
all_to_process = defaultdict(list)
for year_data in ans:
for key, value in year_data.items():
all_to_process[key] += value
results = postprocess_func(all_to_process)
# Add taxcalc version to results
vinfo = taxcalc._version.get_versions()
results['taxcalc_version'] = vinfo['version']
# TODO: Make this the distributed app version, not the TC version
results['dropq_version'] = vinfo['version']

json_res = json.dumps(results)
return json_res
return json.dumps(results)


@celery_app.task(name='api.celery_tasks.dropq_task_async')
def dropq_task_async(year, user_mods, first_budget_year, use_puf_not_cps=True):
return dropq_task(year, user_mods, first_budget_year,
def dropq_task_async(year_n, user_mods, start_year, use_puf_not_cps=True):
return dropq_task(year_n, user_mods, start_year,
use_puf_not_cps=use_puf_not_cps, use_full_sample=True)


@celery_app.task(name='api.celery_tasks.dropq_task_small_async')
def dropq_task_small_async(year, user_mods, first_budget_year,
def dropq_task_small_async(year_n, user_mods, start_year,
use_puf_not_cps=True):
return dropq_task(year, user_mods, first_budget_year,
return dropq_task(year_n, user_mods, start_year,
use_puf_not_cps=use_puf_not_cps, use_full_sample=False)


@celery_app.task(name='api.celery_tasks.elasticity_gdp_task_async')
def elasticity_gdp_task_async(year_n, user_mods, first_budget_year,
gdp_elasticity, use_puf_not_cps=True):
print("kw to dropq", dict(
year_n=year_n,
start_year=int(first_budget_year),
use_puf_not_cps=use_puf_not_cps,
use_full_sample=True,
user_mods=user_mods,
gdp_elasticity=gdp_elasticity
))
@celery_app.task(name='api.celery_tasks.taxbrain_postprocess')
def taxbrain_postprocess(ans):
return postprocess(ans, taxcalc.tbi.postprocess)


@celery_app.task(name='api.celery_tasks.taxbrain_elast_async')
def taxbrain_elast_async(year_n, start_year,
use_puf_not_cps,
user_mods,
gdp_elasticity,
use_full_sample=True,
return_dict=True):

gdp_elast_i = taxcalc.tbi.run_nth_year_gdp_elast_model(
year_n=year_n,
start_year=int(first_budget_year),
start_year=start_year,
use_puf_not_cps=use_puf_not_cps,
use_full_sample=True,
use_full_sample=use_full_sample,
user_mods=user_mods,
gdp_elasticity=gdp_elasticity
gdp_elasticity=gdp_elasticity,
return_dict=True
)
print(gdp_elast_i)

results = {'elasticity_gdp': gdp_elast_i}
# Add taxcalc version to results
vinfo = taxcalc._version.get_versions()
results['taxcalc_version'] = vinfo['version']
results['dropq_version'] = vinfo['version']
return gdp_elast_i

json_res = json.dumps(results)
return json_res

@celery_app.task(name='api.celery_tasks.taxbrain_elast_postprocess')
def taxbrain_elast_postprocess(ans):
return postprocess(ans, taxcalc.tbi.postprocess_elast)

# @celery_app.task(name='api.celery_tasks.ogusa_async')
# def ogusa_async(user_mods, ogusa_params, guid):
# pass

@celery_app.task(name='api.celery_tasks.btax_async')
def btax_async(user_mods, start_year):
Expand Down
35 changes: 26 additions & 9 deletions distributed/api/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from flask import Blueprint, request, make_response
from celery.result import AsyncResult
from celery import chord

import redis
import json
import msgpack
import os

from api.celery_tasks import (dropq_task_async,
from api.celery_tasks import (taxbrain_postprocess,
taxbrain_elast_postprocess,
dropq_task_async,
dropq_task_small_async,
elasticity_gdp_task_async,
taxbrain_elast_async,
btax_async)

bp = Blueprint('endpoints', __name__)
Expand All @@ -18,37 +21,51 @@
"redis://redis:6379/0"))


def dropq_endpoint(dropq_task):
def aggr_endpoint(compute_task, postprocess_task):
print('aggregating endpoint')
data = request.get_data()
inputs = msgpack.loads(data, encoding='utf8',
use_list=True)
print('inputs', inputs)
result = (chord(compute_task.signature(kwargs=i, serializer='msgpack')
for i in inputs))(postprocess_task.signature(
serializer='msgpack'))
length = client.llen(queue_name) + 1
data = {'job_id': str(result), 'qlength': length}
return json.dumps(data)


def endpoint(task):
print('dropq endpoint')
data = request.get_data()
inputs = msgpack.loads(data, encoding='utf8',
use_list=True)
print('inputs', inputs)
result = dropq_task.apply_async(kwargs=inputs['inputs'],
serializer='msgpack')
result = task.apply_async(kwargs=inputs[0],
serializer='msgpack')
length = client.llen(queue_name) + 1
data = {'job_id': str(result), 'qlength': length}
return json.dumps(data)


@bp.route("/dropq_start_job", methods=['POST'])
def dropq_endpoint_full():
return dropq_endpoint(dropq_task_async)
return aggr_endpoint(dropq_task_async, taxbrain_postprocess)


@bp.route("/dropq_small_start_job", methods=['POST'])
def dropq_endpoint_small():
return dropq_endpoint(dropq_task_small_async)
return aggr_endpoint(dropq_task_small_async, taxbrain_postprocess)


@bp.route("/btax_start_job", methods=['POST'])
def btax_endpoint():
return dropq_endpoint(btax_async)
return endpoint(btax_async)


@bp.route("/elastic_gdp_start_job", methods=['POST'])
def elastic_endpoint():
return dropq_endpoint(elasticity_gdp_task_async)
return aggr_endpoint(taxbrain_elast_async, taxbrain_elast_postprocess)


@bp.route("/dropq_get_result", methods=['GET'])
Expand Down
6 changes: 6 additions & 0 deletions distributed/api/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
msgpack
celery
redis
pytest
flask
toolz
5 changes: 5 additions & 0 deletions distributed/api/setup_env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

pushd ../Tax-Calculator && conda env create && source activate taxcalc-dev && pip install -e . && popd
pushd ../B-Tax && pip install -e . && popd
pip install -r requirements.txt
pip install -e .
69 changes: 69 additions & 0 deletions distributed/api/tests/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
from celery import chord

from api.celery_tasks import (taxbrain_elast_async,
taxbrain_elast_postprocess,
dropq_task_small_async,
taxbrain_postprocess)

@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'redis://localhost:6379',
'result_backend': 'redis://localhost:6379',
'task_serializer': 'json',
'accept_content': ['msgpack', 'json']}

def test_elast_endpoint(celery_worker):
elast_params = {
'year_n': 1,
'user_mods': {
'policy': {2017: {'_FICA_ss_trt': [0.1]}},
'behavior': {},
'growdiff_response': {},
'consumption': {},
'growdiff_baseline': {},
'growmodel': {}},
'gdp_elasticity': 0.3,
'use_puf_not_cps': False,
'start_year': 2017,
'use_full_sample':True,
'return_dict': True
}

inputs = []
for i in range(0, 3):
inputs.append(dict(elast_params, **{'year_n': i}))
compute_task = taxbrain_elast_async
postprocess_task = taxbrain_elast_postprocess
result = (chord(compute_task.signature(kwargs=i, serializer='msgpack')
for i in inputs))(postprocess_task.signature(
serializer='msgpack'))
print(result.get())


def test_taxbrain_endpoint(celery_worker):
tc_params = {
'user_mods': {
"policy": {
2017: {"_FICA_ss_trt": [0.1]}},
"consumption": {},
"behavior": {},
"growdiff_baseline": {},
"growdiff_response": {},
"growmodel": {}
},
'start_year': 2017,
'use_puf_not_cps': False,
'year_n': 0
}

inputs = []
for i in range(0, 3):
inputs.append(dict(tc_params, **{'year_n': i}))
compute_task = dropq_task_small_async
postprocess_task = taxbrain_postprocess
result = (chord(compute_task.signature(kwargs=i, serializer='msgpack')
for i in inputs))(postprocess_task.signature(
serializer='msgpack'))
print(result.get())
16 changes: 8 additions & 8 deletions distributed/api/tests/test_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@pytest.fixture
def taxcalc_inputs():
return {
return [{
'user_mods': {
"policy": {
2017: {"_FICA_ss_trt": [0.1]}},
Expand All @@ -18,10 +18,10 @@ def taxcalc_inputs():
"growdiff_response": {},
"growmodel": {}
},
'first_budget_year': 2017,
'use_puf_not_cps': True,
'year': 0
}
'start_year': 2017,
'use_puf_not_cps': False,
'year_n': 0
}]


@pytest.fixture
Expand All @@ -37,7 +37,7 @@ def client(app):


def post_and_poll(client, url, data, exp_status='YES', tries=30):
packed = msgpack.dumps({'inputs': data}, use_bin_type=True)
packed = msgpack.dumps(data, use_bin_type=True)
resp = client.post(url,
data=packed,
headers={'Content-Type': 'application/octet-stream'}
Expand Down Expand Up @@ -72,11 +72,11 @@ def test_hello(client):
def test_dropq_small_start_job(client, taxcalc_inputs):
resp = post_and_poll(client, '/dropq_small_start_job', taxcalc_inputs)
result = json.loads(resp.data.decode('utf-8'))
assert 'aggr_1' in result
assert 'aggr_outputs' in result


def test_dropq_job_fails(client, taxcalc_inputs):
del taxcalc_inputs['user_mods']['policy']
del taxcalc_inputs[0]['user_mods']['policy']
resp = post_and_poll(client, '/dropq_start_job', exp_status='FAIL',
data=taxcalc_inputs)

Expand Down
1 change: 1 addition & 0 deletions distributed/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ toolz
celery
requests-mock
requests
msgpack
pytest
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ python-dateutil
toolz
whitenoise
msgpack
dataclasses # This will not be needed with Python >=3.7
Loading

0 comments on commit a09a2b3

Please sign in to comment.