-
Notifications
You must be signed in to change notification settings - Fork 0
/
population.py
174 lines (147 loc) · 6.97 KB
/
population.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging
import re
from pandas import concat, read_csv
from rasterstats import zonal_stats
from slugify import slugify
from hdx.data.dataset import Dataset
from hdx.location.country import Country
from hdx.utilities.downloader import DownloadError
logger = logging.getLogger()
class Population:
def __init__(self, configuration, downloader, subnational_json, temp_folder):
self.downloader = downloader
self.boundaries = subnational_json
self.temp_folder = temp_folder
self.exceptions = {"dataset": configuration["inputs"].get("dataset_exceptions", {}),
"resource": configuration["inputs"].get("resource_exceptions", {})}
self.headers = configuration["pcode_mappings"]
self.skip = configuration["inputs"].get("do_not_process", [])
def find_resource(self, iso, level):
dataset = Dataset.read_from_hdx(self.exceptions["dataset"].get(iso, f"cod-ps-{iso.lower()}"))
if not dataset:
logger.warning(f"{iso}: Could not find PS dataset")
dataset = Dataset.read_from_hdx(
f"worldpop-population-counts-for-{slugify(Country.get_country_name_from_iso3(iso))}"
)
pop_resource = [r for r in dataset.get_resources() if r.get_file_type() == "geotiff" and
bool(re.match("(?<!\d)\d{4}_constrained", r["name"], re.IGNORECASE))]
if len(pop_resource) == 0:
return None, "geotiff"
return pop_resource[0], "geotiff"
resources = dataset.get_resources()
resource_name = self.exceptions["resource"].get(iso, f"adm(in)?{level}")
pop_resource = [r for r in resources if r.get_file_type() == "csv" and
bool(re.match(f".*{resource_name}.*", r["name"], re.IGNORECASE))]
if len(pop_resource) == 0:
logger.warning(f"{iso}: Could not find csv resource at adm{level}")
return None, "geotiff"
if len(pop_resource) > 1:
yearmatches = [
re.findall("(?<!\d)\d{4}(?!\d)", r["name"], re.IGNORECASE)
for r in pop_resource
]
yearmatches = sum(yearmatches, [])
if len(yearmatches) > 0:
yearmatches = [int(y) for y in yearmatches]
maxyear = [
r for r in pop_resource if str(max(yearmatches)) in r["name"]
]
if len(maxyear) == 1:
pop_resource = maxyear
if len(pop_resource) > 1:
logger.warning(f"{iso}: Found multiple resources, using first in list")
return pop_resource[0], "csv"
def analyze_raster(self, resource, iso, level):
try:
_, pop_raster = resource.download(folder=self.temp_folder)
except DownloadError:
logger.error(f"{iso}: Could not download geotiff")
return None
pop_stats = zonal_stats(
vectors=self.boundaries.loc[(self.boundaries["alpha_3"] == iso) &
(self.boundaries["ADM_LEVEL"] == level)],
raster=pop_raster,
stats="sum",
geojson_out=True,
)
for row in pop_stats:
pcode = row["properties"]["ADM_PCODE"]
pop = row["properties"]["sum"]
if pop:
pop = int(round(pop, 0))
self.boundaries.loc[self.boundaries["ADM_PCODE"] == pcode, "Population"] = pop
return iso
def analyze_tabular(self, resource, iso, level):
headers, iterator = self.downloader.get_tabular_rows(
resource["url"], dict_form=True
)
pcode_header = None
pop_header = None
for header in headers:
if not pcode_header:
if header.upper() in [h.replace("#", str(level)) for h in self.headers]:
pcode_header = header
if not pop_header:
if header.upper() == "T_TL":
pop_header = header
if not pcode_header:
logger.error(f"{iso}: Could not find pcode header at adm{level}")
return None
if not pop_header:
logger.error(f"{iso}: Could not find pop header at adm{level}")
return None
updated = False
for row in iterator:
pcode = row[pcode_header]
pop = row[pop_header]
if pcode not in list(self.boundaries["ADM_PCODE"]):
logger.warning(f"{iso}: Could not find unit {pcode} in boundaries at adm{level}")
else:
self.boundaries.loc[self.boundaries["ADM_PCODE"] == pcode, "Population"] = pop
updated = True
if not updated:
return None
return iso
def update_population(self, countries):
updated_countries = dict()
for iso in countries:
levels = list(set(self.boundaries["ADM_LEVEL"].loc[(self.boundaries["alpha_3"] == iso)]))
for level in levels:
if level not in updated_countries:
updated_countries[level] = list()
logger.info(f"{iso}: Processing population at adm{level}")
# find dataset and resource to use
updated = False
resource, resource_type = self.find_resource(iso, level)
if not resource:
logger.error(f"{iso}: Could not find any {resource_type} data at adm{level}")
continue
if resource_type == "geotiff":
updated = self.analyze_raster(resource, iso, level)
if resource_type == "csv":
updated = self.analyze_tabular(resource, iso, level)
if updated and iso not in updated_countries[level]:
updated_countries[level].append(iso)
continue
return updated_countries
def update_hdx_resource(self, dataset_name, updated_countries):
dataset = Dataset.read_from_hdx(dataset_name)
if not dataset:
logger.error("Could not find overall pop dataset")
return None, None
resource = dataset.get_resources()[0]
try:
_, pop_data = resource.download(folder=self.temp_folder)
except DownloadError:
logger.error(f"Could not download population csv")
return None, None
pop_data = read_csv(pop_data)
updated_data = self.boundaries.drop(columns="geometry")
for level in updated_countries:
pop_data.drop(pop_data[(pop_data["alpha_3"].isin(updated_countries[level])) &
(pop_data["ADM_LEVEL"] == level)].index, inplace=True)
pop_data = concat([pop_data,
updated_data.loc[(updated_data["alpha_3"].isin(updated_countries[level])) &
(updated_data["ADM_LEVEL"] == level)]])
pop_data.sort_values(by=["alpha_3", "ADM_LEVEL", "ADM_PCODE"], inplace=True)
return pop_data, resource