Skip to content

Commit

Permalink
Merge pull request #5 from skybristol/dev
Browse files Browse the repository at this point in the history
Update master
  • Loading branch information
skybristol authored Oct 31, 2020
2 parents a7f289e + a7ae1b8 commit d4fd088
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 57 deletions.
25 changes: 8 additions & 17 deletions examples/isaid_scripts/cache_pw_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,16 @@ def accumulator(pw_doc):
) for pw_item in tqdm.tqdm(modified_records)
)

urls_for_query = str(
[
i["url"] for i in summarization["assets"]
]).replace('"', "'").replace("[", "(").replace("]", ")")

delete_statements = [
f"DELETE FROM assets WHERE url in {urls_for_query}",
f"DELETE FROM claims WHERE reference in {urls_for_query}",
f"DELETE FROM links WHERE url in {urls_for_query}",
f"DELETE FROM sentences WHERE url in {urls_for_query}",
f"DELETE FROM contacts WHERE url in {urls_for_query}",
]

with pg_engine.connect() as con:
for statement in delete_statements:
rs = con.execute(statement)

for k, v in summarization.items():
if len(v) > 0:
urls_for_query = str(
[
i["url"] for i in v
]).replace('"', "'").replace("[", "(").replace("]", ")")

with pg_engine.connect() as con:
rs = con.execute(f"DELETE FROM {k} WHERE url in {urls_for_query}")

pd.DataFrame(v).to_sql(
k,
pg_engine,
Expand Down
39 changes: 16 additions & 23 deletions examples/isaid_scripts/cache_sb_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def accumulator(sb_doc):
) for pw_item in tqdm.tqdm(data_release_items)
)

#for k, v in summarization.items():
# print(k, len(v))

pg_user = os.environ["PG_USER"]
pg_pass = os.environ["PG_PASS"]
pg_host = os.environ["PG_HOST"]
Expand All @@ -65,23 +62,19 @@ def accumulator(sb_doc):
pg_engine = create_engine(f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}')

for k, v in summarization.items():
if len(v) > 0 and k in ["links"]:
try:
pd.DataFrame(v).to_sql(
k,
pg_engine,
index=False,
if_exists="append",
chunksize=1000
)
except:
new_data = pd.DataFrame(v)
existing_data = pd.read_sql(f'SELECT * FROM {k}', pg_engine)
df_combined = pd.concat([new_data, existing_data])
df_combined.to_sql(
k,
pg_engine,
index=False,
if_exists="replace",
chunksize=1000
)
if len(v) > 0:
urls_for_query = str(
[
i["url"] for i in v
]).replace('"', "'").replace("[", "(").replace("]", ")")

with pg_engine.connect() as con:
rs = con.execute(f"DELETE FROM {k} WHERE url in {urls_for_query}")

pd.DataFrame(v).to_sql(
k,
pg_engine,
index=False,
if_exists="append",
chunksize=1000
)
13 changes: 10 additions & 3 deletions examples/isaid_scripts/cache_sb_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@

cmd_sb = pylinkedcmd.pylinkedcmd.Sciencebase()

usgs_staff = cmd_sb.get_active_usgs_staff()
#usgs_staff = cmd_sb.get_active_usgs_staff()
usgs_staff = cmd_sb.get_staff_by_email(["[email protected]","[email protected]"])

sbid_list_for_query = str([i["identifier_sbid"]
for i in usgs_staff]).replace('"', "'").replace("[", "(").replace("]", ")")

df_people = pd.DataFrame(usgs_staff)
df_people = df_people.drop_duplicates(subset='identifier_sbid', keep="first")
df_people.loc[df_people['identifier_orcid'].duplicated(), 'identifier_orcid'] = np.NaN
try:
df_people.loc[df_people['identifier_orcid'].duplicated(), 'identifier_orcid'] = np.NaN
except:
pass
df_people.loc[df_people['identifier_email'].duplicated(), 'identifier_email'] = np.NaN

pg_user = os.environ["PG_USER"]
Expand All @@ -22,7 +29,7 @@
pg_engine = create_engine(f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}')

with pg_engine.connect() as con:
con.execute("DELETE FROM people")
con.execute(f"DELETE FROM people where identifier_sbid IN {sbid_list_for_query}")
con.close()

df_people.to_sql(
Expand Down
45 changes: 45 additions & 0 deletions examples/isaid_scripts/cache_sb_orgs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import sciencebasepy
import pylinkedcmd
from joblib import Parallel, delayed
import tqdm
from sqlalchemy import create_engine
import pandas as pd
import os

cmd_sb = pylinkedcmd.pylinkedcmd.Sciencebase()
cmd_isaid = pylinkedcmd.pylinkedcmd.Isaid()

orgs = cmd_isaid.get_organizations()
print(len(orgs))

org_summaries = list()

def accumulator(identifier):
org_summary = cmd_sb.get_sb_org(identifier)
if org_summary is not None:
org_summaries.append(org_summary)


Parallel(n_jobs=4, prefer="threads")(
delayed(accumulator)
(
identifier
) for identifier in tqdm.tqdm([o["organization_uri"] for o in orgs])
)

if len(org_summaries) > 0:
pg_user = os.environ["PG_USER"]
pg_pass = os.environ["PG_PASS"]
pg_host = os.environ["PG_HOST"]
pg_port = os.environ["PG_PORT"]
pg_db = os.environ["PG_DB"]

pg_engine = create_engine(f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}')

pd.DataFrame(org_summaries).to_sql(
"organizations",
pg_engine,
index=False,
if_exists="replace",
chunksize=1000
)
9 changes: 7 additions & 2 deletions examples/isaid_scripts/cache_staff_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

def accumulator(url):
profile_info = usgs_web.scrape_profile(url)
profiles.append(profile_info["summary"])
claims.extend(profile_info["claims"])
if profile_info is not None:
profiles.append(profile_info["summary"])
claims.extend(profile_info["claims"])


Parallel(n_jobs=20, prefer="threads")(
Expand All @@ -50,6 +51,10 @@ def accumulator(url):
chunksize=1000
)

with pg_engine.connect() as con:
con.execute("DELETE FROM claims WHERE claim_source = 'USGS Profile Page Expertise'")
con.close()

pd.DataFrame(claims).to_sql(
"claims",
pg_engine,
Expand Down
98 changes: 87 additions & 11 deletions pylinkedcmd/pylinkedcmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import math
from nltk.tokenize import sent_tokenize
from copy import deepcopy
from jsonbender import bend, K, S, F, OptionalS


class Sciencebase:
Expand All @@ -29,6 +30,30 @@ def __init__(self):
"ASK USGS -- Water Webserver Team",
"U.S. Geological Survey - ScienceBase"
]
self.cmd_isaid = Isaid()
self.org_mapping = {
'identifier': S('links', 0, 'url'),
'name': S('name'),
'url': OptionalS('url'),
'alternateName': OptionalS('aliases', 0, 'name'),
'addressLocality': OptionalS('primaryLocation', 'streetAddress', 'city'),
'addressRegion': OptionalS('primaryLocation', 'streetAddress', 'state'),
'region': OptionalS('extensions', 'usgsOrganization', 'region'),
'usgsMissionAreas': F(
lambda source:
[k["displayText"] for k
in source["extensions"]["usgsOrganization"]["usgsMissionAreas"]
] if exists(source, ["extensions","usgsOrganization","usgsMissionAreas"])
else None
),
'usgsPrograms': F(
lambda source:
[k["displayText"] for k
in source["extensions"]["usgsOrganization"]["usgsPrograms"]
] if exists(source, ["extensions","usgsOrganization","usgsPrograms"])
else None
)
}

def summarize_sb_person(self, person_doc):
ignore_props = [
Expand Down Expand Up @@ -109,6 +134,21 @@ def get_active_usgs_staff(self, return_format="summarized"):

return sb_dir_results

def get_staff_by_email(self, email_list, return_format="summarized"):
sb_dir_results = list()

for email in email_list:
if validators.email(email):
query = f"https://www.sciencebase.gov/directory/people?format=json&email={email}"
r = requests.get(query).json()
if len(r["people"]) > 0:
sb_dir_results.extend(r["people"])

if return_format == "summarized":
return [self.summarize_sb_person(i) for i in sb_dir_results]

return sb_dir_results

def lookup_sb_person_by_email(self, email):
'''
Searches the ScienceBase Directory for a person by their email address. Returns None if either no record or
Expand Down Expand Up @@ -456,7 +496,7 @@ def catalog_item_claims(self, sb_catalog_doc=None, sb_catalog_url=None):
and t["type"] not in ["Harvest Set"]
]:
tag_contact = deepcopy(contact_record)
tag_contact["property_label"] = "data release metadata tag"
tag_contact["property_label"] = "keyword"
tag_contact["object_instance_of"] = "data descriptor"
tag_contact["object_label"] = tag["name"]
tag_contact["object_qualifier"] = ":".join([v for k, v in tag.items() if k != "name"])
Expand All @@ -465,7 +505,11 @@ def catalog_item_claims(self, sb_catalog_doc=None, sb_catalog_url=None):
for coauthor_name in [i for i in unique_contact_names if i != contact["name"]]:
coauthor = next((i for i in sb_catalog_doc["contacts"] if i["name"] == coauthor_name), None)
coauthor_contact = deepcopy(contact_record)
coauthor_contact["property_label"] = "data release co-developer"
coauthor_contact["property_label"] = "coauthor"
if "type" in coauthor.keys():
coauthor_contact["object_qualifier"] = coauthor["type"]
else:
coauthor_contact["object_qualifier"] = "data item co-listed contact"
coauthor_contact["object_instance_of"] = "person"
coauthor_contact["object_label"] = coauthor["name"]

Expand All @@ -487,7 +531,7 @@ def catalog_item_claims(self, sb_catalog_doc=None, sb_catalog_url=None):
and i["name"] not in self.ignore_org_names
]:
organization_contact = deepcopy(contact_record)
organization_contact["property_label"] = "organization data release affiliation"
organization_contact["property_label"] = "organization affiliation"
organization_contact["object_instance_of"] = "organization"
organization_contact["object_label"] = org["name"]
if "oldPartyId" in org.keys():
Expand Down Expand Up @@ -609,6 +653,24 @@ def catalog_item_summary(self, sb_catalog_doc=None, sb_catalog_url=None, parse_s
"links": links
}

def get_sb_org(self, identifier, map_it=True):
if not validators.url(identifier):
try:
oldPartyId = int(identifier)
identifier = f"{self.sb_directory_org_api}/{str(oldPartyId)}"
except:
return None

r = requests.get(identifier, headers={"accept": "application/json"})

if r.status_code != 200:
return None

if map_it:
return bend(self.org_mapping, r.json())
else:
return r.json()


class Wikidata:
def __init__(self):
Expand Down Expand Up @@ -1301,7 +1363,8 @@ def pub_record_claims(self, url, datepublished, pw_authors, cost_centers):

for cost_center in cost_centers:
cost_center_claim = deepcopy(author_claim)
cost_center_claim["property_label"] = "USGS Cost Center association from publication"
cost_center_claim["property_label"] = "organization affiliation"
cost_center_claim["object_qualifier"] = "USGS Cost Center association from publication"
cost_center_claim["object_instance_of"] = "organization"
cost_center_claim["object_label"] = cost_center["text"]
claims.append(cost_center_claim)
Expand Down Expand Up @@ -1588,7 +1651,7 @@ def get_people(self, criteria=None, parameter=None, where_clause=None):

q = '''
{
people %(where_clause)s {
directory %(where_clause)s {
identifier_sbid
identifier_email
identifier_orcid
Expand All @@ -1603,7 +1666,7 @@ def get_people(self, criteria=None, parameter=None, where_clause=None):
if "errors" in query_response.keys():
return query_response
else:
return [i["identifier_sbid"] for i in query_response["data"]["people"]]
return query_response["data"]["directory"]

def people_by_org(self, organization_name, response="email_list"):
where_clause = '''
Expand Down Expand Up @@ -1690,6 +1753,10 @@ def assemble_person_record(self, criteria, parameter="identifier_email", datatyp
note
organization_name
organization_uri
organization_url
region
usgs_mission_areas
usgs_programs
personaltitle
professionalqualifier
state
Expand Down Expand Up @@ -1729,6 +1796,14 @@ def assemble_person_record(self, criteria, parameter="identifier_email", datatyp
reference
subject_instance_of
subject_label
subject_identifier_email
subject_identifier_orcid
subject_identifier_sbid
subject_identifier_wikidata
object_identifier_email
object_identifier_orcid
object_identifier_sbid
object_identifier_wikidata
}
''' % {"where_clause": where_clause.replace("identifier_", "subject_identifier_")}

Expand Down Expand Up @@ -1777,16 +1852,12 @@ def assemble_person_record(self, criteria, parameter="identifier_email", datatyp
if "errors" in query_response.keys():
return query_response
else:
dataset = dict()
for k, v in query_response["data"].items():
dataset[self.isaid_data_collections[k]["title"]] = v
#query_response["data"][self.isaid_data_collections[k]["title"]] = query_response["data"].pop(k)
return query_response["data"]

def get_organizations(self):
q_orgs = '''
{
directory (distinct_on: organization_uri) {
directory (distinct_on: organization_uri, where: {organization_uri: {_is_null: false}}) {
organization_name
organization_uri
}
Expand Down Expand Up @@ -1827,3 +1898,8 @@ def process_abstract(abstract, source_url, title=None, parse_sentences=False):
"abstract": abstract_text,
"sentences": sentences
}

def exists(obj, chain):
_key = chain.pop(0)
if _key in obj:
return exists(obj[_key], chain) if chain else obj[_key]
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
'validators',
'pydash',
'beautifulsoup4',
'nltk'
'nltk',
'jsonbender'
]

setup_requirements = [ ]
Expand Down

0 comments on commit d4fd088

Please sign in to comment.