Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better naming #45

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions stream2segment/download/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
download_save_segments,
DcDataselectManager)
from stream2segment.download.modules.stations import \
(save_inventories, get_station_df_for_inventory_download)
(save_stationxml, get_station_df_for_inventory_download)


# make the logger refer to the parent of this package (`rfind` below. For info:
Expand Down Expand Up @@ -344,7 +344,7 @@ def stepinfo(text, *args, **kwargs):
session.close()

# query station id, network station, datacenter_url
# for those stations with empty inventory_xml
# for those stations with empty stationxml
# AND at least one segment non-empty/null
# Download inventories for those stations only
sta_df = get_station_df_for_inventory_download(session, update_metadata)
Expand All @@ -354,11 +354,11 @@ def stepinfo(text, *args, **kwargs):
else:
stepinfo("Downloading %d station inventories", len(sta_df))
n_downloaded, n_empty, n_errors = \
save_inventories(session, sta_df,
max_thread_workers,
advanced_settings['i_timeout'],
download_blocksize,
dbbufsize, isterminal)
save_stationxml(session, sta_df,
max_thread_workers,
advanced_settings['i_timeout'],
download_blocksize,
dbbufsize, isterminal)
logger.info(("** Station inventories download summary **\n"
"- downloaded %7d \n"
"- discarded %7d (empty response)\n"
Expand Down
2 changes: 1 addition & 1 deletion stream2segment/download/modules/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def save_stations_and_channels(session, channels_df, eidavalidator, update,
if _update_stations:
_update_stations = [_ for _ in shared_colnames(Station, channels_df,
pkey=False)
if _ != Station.inventory_xml.key]
if _ != Station.stationxml.key]

# Add stations to db (Note: no need to check for `empty(channels_df)`,
# `dbsyncdf` raises a `FailedDownload` in case). First set columns
Expand Down
70 changes: 67 additions & 3 deletions stream2segment/download/modules/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,24 @@
from datetime import timedelta
import logging
from io import StringIO
from urllib.request import Request

import numpy as np
import pandas as pd

from stream2segment.io.cli import get_progressbar
from stream2segment.io.db.pdsql import DbManager
from stream2segment.download.exc import FailedDownload, NothingToDownload
from stream2segment.download.db.models import WebService, Event
from stream2segment.download.url import urlread, socket, HTTPError
from stream2segment.download.db.models import Event, WebService
from stream2segment.download.url import urlread, socket, HTTPError, read_async
from stream2segment.download.modules.utils import (dbsyncdf, get_dataframe_from_fdsn,
formatmsg,
EVENTWS_MAPPING, strptime, urljoin)
EVENTWS_MAPPING,
strptime,
urljoin,
DbExcLogger,
OneTimeLogger,
compress)

# (https://docs.python.org/2/howto/logging.html#advanced-logging-tutorial):
logger = logging.getLogger(__name__)
Expand All @@ -35,6 +42,7 @@ def get_events_df(session, url, evt_query_args, start, end,
pd_df_list = events_df_list(url, evt_query_args, start, end, timeout, show_progress)
# pd_df_list surely not empty (otherwise we raised FailedDownload)
events_df = pd.concat(pd_df_list, axis=0, ignore_index=True, copy=False)
events_df.drop(columns=[Event.quakeml.key], inplace=True)

events_df[Event.webservice_id.key] = eventws_id
events_df = dbsyncdf(events_df, session,
Expand Down Expand Up @@ -471,3 +479,59 @@ def isf2text_iter(isf_filep, catalog='', contributor=''):
expects += 1
except IndexError:
buf = []


def save_quakeml(session, events_df, max_thread_workers, timeout,
download_blocksize, db_bufsize, show_progress=False):
"""Save event's quakeML data. envents_df must not be empty"""

logger_header = "QuakeML"
evt_logger = OneTimeLogger(logger_header)

downloaded, errors, empty = 0, 0, 0

db_exc_logger = DbExcLogger([Event.id.key])

dbmanager = DbManager(session, Event.id,
update=[Event.quakeml.key],
buf_size=db_bufsize,
oninsert_err_callback=db_exc_logger.failed_insert,
onupdate_err_callback=db_exc_logger.failed_update)

with get_progressbar(show_progress, length=len(events_df)) as pbar:

iterable = zip(events_df[Event.id.key],
events_df[WebService.url],
events_df[Event.event_id.key])

reader = read_async(iterable,
urlkey=lambda obj: _get_evt_request(*obj[1:]),
max_workers=max_thread_workers,
blocksize=download_blocksize, timeout=timeout)

for obj, request, data, exc, status_code in reader:
pbar.update(1)
evt_id = obj[0]
if exc:
evt_logger.warn(request, exc)
errors += 1
else:
if not data:
evt_logger.warn(request, "empty response")
empty += 1
else:
downloaded += 1
dfr = pd.DataFrame({Event.id.key: [evt_id],
Event.quakeml.key: [compress(data)]})
dbmanager.add(dfr)

dbmanager.close()

return downloaded, empty, errors


def _get_evt_request(evt_url, evt_eventid):
"""Return a Request object from the given event arguments to download the
QuakeML
"""
return Request(url=f"{evt_url}?eventid={evt_eventid}")
88 changes: 10 additions & 78 deletions stream2segment/download/modules/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
.. moduleauthor:: Riccardo Zaccarelli <[email protected]>
"""
from datetime import datetime
from io import BytesIO
import gzip
import zipfile
import zlib
import bz2
import logging
from datetime import timedelta
from urllib.request import Request
Expand All @@ -20,18 +15,16 @@
from stream2segment.io.cli import get_progressbar
from stream2segment.io.db.pdsql import DbManager, dbquery2df
from stream2segment.download.db.models import DataCenter, Station, Segment
from stream2segment.download.url import get_host, read_async
from stream2segment.download.modules.utils import (DbExcLogger, formatmsg,
url2str, err2str)

from stream2segment.download.url import read_async
from stream2segment.download.modules.utils import DbExcLogger, OneTimeLogger, compress

# (https://docs.python.org/2/howto/logging.html#advanced-logging-tutorial):
logger = logging.getLogger(__name__)


def _get_sta_request(datacenter_url, network, station, start_time, end_time):
"""Return a Request object from the given station arguments to download the
inventory xml
StationXML
"""
# fix bug of ncedc and scedc whereby dates exactly on the start are not returned.
# Adding 1s to the start time is heavily hacky but it fixes the problem easily:
Expand Down Expand Up @@ -99,19 +92,20 @@ def _query4inventorydownload(session, force_update):
return qry


def save_inventories(session, stations_df, max_thread_workers, timeout,
download_blocksize, db_bufsize, show_progress=False):
"""Save inventories. stations_df must not be empty (not checked here)"""
def save_stationxml(session, stations_df, max_thread_workers, timeout,
download_blocksize, db_bufsize, show_progress=False):
"""Save StationXML data. stations_df must not be empty (not checked here)"""

inv_logger = InventoryLogger()
logger_header = "StationXML"
inv_logger = OneTimeLogger(logger_header)

downloaded, errors, empty = 0, 0, 0

db_exc_logger = DbExcLogger([Station.id.key, Station.network.key,
Station.station.key, Station.start_time.key])

dbmanager = DbManager(session, Station.id,
update=[Station.inventory_xml.key],
update=[Station.stationxml.key],
buf_size=db_bufsize,
oninsert_err_callback=db_exc_logger.failed_insert,
onupdate_err_callback=db_exc_logger.failed_update)
Expand Down Expand Up @@ -143,71 +137,9 @@ def save_inventories(session, stations_df, max_thread_workers, timeout,
else:
downloaded += 1
dfr = pd.DataFrame({Station.id.key: [sta_id],
Station.inventory_xml.key: [compress(data)]})
Station.stationxml.key: [compress(data)]})
dbmanager.add(dfr)

dbmanager.close()

return downloaded, empty, errors


def compress(bytestr, compression='gzip', compresslevel=9):
"""Compress `bytestr` returning a new compressed byte sequence

:param bytestr: (string) a sequence of bytes to be compressed
:param compression: String, either ['bz2', 'zlib', 'gzip', 'zip'. Default: 'gzip']
The compression library to use (after serializing `obj` with the given format)
on the serialized data. If None or empty string, no compression is applied, and
`bytestr` is returned as it is
:param compresslevel: integer (9 by default). Ignored if `compression` is None,
empty or 'zip' (the latter does not accept this argument), this parameter
controls the level of compression; 1 is fastest and produces the least
compression, and 9 is slowest and produces the most compression
"""
if compression == 'bz2':
return bz2.compress(bytestr, compresslevel=compresslevel)
elif compression == 'zlib':
return zlib.compress(bytestr, compresslevel)
elif compression:
sio = BytesIO()
if compression == 'gzip':
with gzip.GzipFile(mode='wb', fileobj=sio,
compresslevel=compresslevel) as gzip_obj:
gzip_obj.write(bytestr)
# Note: DO NOT return sio.getvalue() WITHIN the with statement,
# the gzip file obj needs to be closed first. FIXME: ref?
elif compression == 'zip':
# In this case, use the compress argument to ZipFile to compress the data,
# since writestr() does not take compress as an argument. See:
# https://pymotw.com/2/zipfile/#writing-data-from-sources-other-than-files
with zipfile.ZipFile(sio, 'w', compression=zipfile.ZIP_DEFLATED) as zip_obj:
zip_obj.writestr("x", bytestr) # first arg must be a nonempty str
else:
raise ValueError("compression '%s' not in ('gzip', 'zlib', 'bz2', 'zip')" %
str(compression))

return sio.getvalue()

return bytestr


class InventoryLogger(set):
"""Class handling inventory errors and logging only once per error type
and datacenter to avoid polluting the log file/stream with hundreds of
megabytes"""

def warn(self, request, exc):
"""Issue a logger.warn if the given error is not already reported

:param request: the Request object
:pram exc: the reported Exception or string message
"""
url = get_host(request)
item = (url, err2str(exc)) # use err2str to uniquely identify exc
if item not in self:
if not self:
logger.warning('Detailed inventory download errors (showing '
'only first of each type per data center):')
self.add(item)
request_str = url2str(request)
logger.warning(formatmsg("Inventory download error", exc, request_str))
77 changes: 76 additions & 1 deletion stream2segment/download/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
from collections import OrderedDict
from functools import cmp_to_key
import logging
from io import BytesIO
import gzip
import zipfile
import zlib
import bz2

import pandas as pd

Expand All @@ -25,7 +30,8 @@
from stream2segment.io.db.inspection import colnames
from stream2segment.download.db.models import Event, Station, Channel
from stream2segment.download.exc import FailedDownload
from stream2segment.download.url import responses
from stream2segment.download.url import responses, get_host


# (https://docs.python.org/2/howto/logging.html#advanced-logging-tutorial):
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -248,6 +254,75 @@ def dolog(ok, notok, okstr, nookstr):
dolog(updated, not_updated, "%d %s updated", ", %d discarded")


class OneTimeLogger(set):
"""Class handling errors logging only once per error type
and host URL in order to avoid polluting the log file/stream
with hundreds of megabytes"""

def __init__(self, header):
"""
:param header: str, the header to be shown in the log
"""
super().__init__()
self.header = header

def warn(self, request, exc):
"""Issue a logger.warn if the given error is not already reported

:param request: the Request object
:param exc: the reported Exception or string message
"""
url = get_host(request)
item = (url, err2str(exc)) # use err2str to uniquely identify exc
if item not in self:
if not self:
logger.warning(f"{self.header} download errors")
logger.warning('(showing only first of each type per data center):')
self.add(item)
request_str = url2str(request)
logger.warning(formatmsg(f"{self.header} Download error", exc, request_str))


def compress(bytestr, compression='gzip', compresslevel=9):
"""Compress `bytestr` returning a new compressed byte sequence

:param bytestr: (string) a sequence of bytes to be compressed
:param compression: String, either ['bz2', 'zlib', 'gzip', 'zip'. Default: 'gzip']
The compression library to use (after serializing `obj` with the given format)
on the serialized data. If None or empty string, no compression is applied, and
`bytestr` is returned as it is
:param compresslevel: integer (9 by default). Ignored if `compression` is None,
empty or 'zip' (the latter does not accept this argument), this parameter
controls the level of compression; 1 is fastest and produces the least
compression, and 9 is slowest and produces the most compression
"""
if compression == 'bz2':
return bz2.compress(bytestr, compresslevel=compresslevel)
elif compression == 'zlib':
return zlib.compress(bytestr, compresslevel)
elif compression:
sio = BytesIO()
if compression == 'gzip':
with gzip.GzipFile(mode='wb', fileobj=sio,
compresslevel=compresslevel) as gzip_obj:
gzip_obj.write(bytestr)
# Note: DO NOT return sio.getvalue() WITHIN the with statement,
# the gzip file obj needs to be closed first. FIXME: ref?
elif compression == 'zip':
# In this case, use the compress argument to ZipFile to compress the data,
# since writestr() does not take compress as an argument. See:
# https://pymotw.com/2/zipfile/#writing-data-from-sources-other-than-files
with zipfile.ZipFile(sio, 'w', compression=zipfile.ZIP_DEFLATED) as zip_obj:
zip_obj.writestr("x", bytestr) # first arg must be a nonempty str
else:
raise ValueError("compression '%s' not in ('gzip', 'zlib', 'bz2', 'zip')" %
str(compression))

return sio.getvalue()

return bytestr


def get_dataframe_from_fdsn(response_str, query_type, url=''):
"""Return a normalized and harmonized dataframe from raw_data. dbmodel_key
can be 'event' 'station' or 'channel'. Raises ValueError if the resulting
Expand Down
Loading
Loading