From a23fbce5d509219e94998f1b279be8e108b4cae8 Mon Sep 17 00:00:00 2001 From: Bastien Pili Date: Mon, 8 Jul 2024 10:41:42 +0200 Subject: [PATCH 1/5] Added column QuakeML to Event --- stream2segment/io/db/models.py | 1 + .../resources/templates/example.db.sqlite | Bin 204800 -> 204800 bytes tests/download/test_u_download_01_events.py | 8 ++++---- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/stream2segment/io/db/models.py b/stream2segment/io/db/models.py index 5571533c..de87f59c 100644 --- a/stream2segment/io/db/models.py +++ b/stream2segment/io/db/models.py @@ -198,6 +198,7 @@ def webservice_id(cls): mag_author = Column(String) event_location_name = Column(String) event_type = Column(String) + quakeml = Column(LargeBinary, nullable=True) @property def url(self): diff --git a/stream2segment/resources/templates/example.db.sqlite b/stream2segment/resources/templates/example.db.sqlite index ef52170f562058c5332665ba8e0d09b68d583a28..33726b54280ab872098ad5ce77fcb2a6c783f658 100644 GIT binary patch delta 409 zcmZoTz|(MmXM(h#6axc;5fn24X~Bs)#;j5ddj7Q=Qx?>-G4hu%@Rw{BRLJ5NY2;)S zV&K$uWM^j-l@*l~WEZt%nBKw9+|0`;=Vs0( z_GWe`mSrrJEcz_^K!ZXU16hig_KNH4Gj=JLBqrsg78m9u#h0Y!7Q}ZYm2gp+e-p2FUkXWX7(&a~tN8-oA?1Ai$nB(nLZm+~`rPny8ZXb29#$*<>&0ss}U BZ_xk% delta 269 zcmZoTz|(MmXM(h#Bm)D35fF0$F%uB;PSi1Gm1NMfzp*i8K|LFT00RSm*=9k79Dd^oSOv`?(=VcJzs2sB-?iePL>=7{`0(-co%YR z=4@hbW_M!AVHX9OVaQ_3ZpE}lTwkBDi?IaATnJ)zW|t%;<)juD<|M_Jq_%Phv5QMe z!bDTcQu9iRA*A8_+y2 yY~sPXAhGrYbH?on=1faYurczNGVqrI9g)pHy_BE1d(s5%%|Zze_$R-fFA4w($3}?& diff --git a/tests/download/test_u_download_01_events.py b/tests/download/test_u_download_01_events.py index b1c63924..c68c1318 100644 --- a/tests/download/test_u_download_01_events.py +++ b/tests/download/test_u_download_01_events.py @@ -762,10 +762,10 @@ def test_get_events_response_has_one_col_more(self, mock_urljoin, db): Event MODEL. TO FIX THIS, EDIT THE RESPONSE BELOW IN ORDER TO HAVE ALWAYS ONE COLUMN MORE THAN IN OUR Event MODEL """ - urlread_sideeffect = ["""#EventID|Time|Latitude|Longitude|Depth/km|Author|Catalog|Contributor|ContributorID|MagType|Magnitude|MagAuthor|EventLocationName|EventType| -gfz2021edty|2021-02-28T23:37:15.211956|-17.565212|167.572067|10.0|||GFZ|gfz2021edty|M|5.787024361||Vanuatu Islands|earthquake| -gfz2021edpn|2021-02-28T21:23:50.840903|-22.500320|172.554474|26.75543594|||GFZ|gfz2021edpn|mb|4.907085435||Southeast of Loyalty Islands|earthquake| -gfz2021edoa|2021-02-28T20:37:40.931643|-22.658522|172.432373|30.70357132|||GFZ|gfz2021edoa|Mw|5.755797284||Southeast of Loyalty Islands|earthquake| + urlread_sideeffect = ["""#EventID|Time|Latitude|Longitude|Depth/km|Author|Catalog|Contributor|ContributorID|MagType|Magnitude|MagAuthor|EventLocationName|EventType|QuakeML| +gfz2021edty|2021-02-28T23:37:15.211956|-17.565212|167.572067|10.0|||GFZ|gfz2021edty|M|5.787024361||Vanuatu Islands|earthquake|| +gfz2021edpn|2021-02-28T21:23:50.840903|-22.500320|172.554474|26.75543594|||GFZ|gfz2021edpn|mb|4.907085435||Southeast of Loyalty Islands|earthquake|| +gfz2021edoa|2021-02-28T20:37:40.931643|-22.658522|172.432373|30.70357132|||GFZ|gfz2021edoa|Mw|5.755797284||Southeast of Loyalty Islands|earthquake|| """] with pytest.raises(FailedDownload) as fdw: data = self.get_events_df(urlread_sideeffect, db.session, "http://eventws", {}, From e898df5f1b80347a61e3d8fc961b28bcf16cbc2e Mon Sep 17 00:00:00 2001 From: Bastien Pili Date: Mon, 8 Jul 2024 15:13:23 +0200 Subject: [PATCH 2/5] Renamed inventory_xml to stationxml --- stream2segment/download/main.py | 2 +- stream2segment/download/modules/channels.py | 2 +- stream2segment/download/modules/stations.py | 4 ++-- stream2segment/io/db/models.py | 6 +++--- stream2segment/process/__init__.py | 2 +- stream2segment/process/db/models.py | 2 +- .../process/gui/webapp/mainapp/db.py | 2 +- ...g-Stream2segment-in-your-Python-code.ipynb | 2 +- .../resources/templates/example.db.sqlite | Bin 204800 -> 299008 bytes tests/conftest.py | 2 +- tests/download/db/test_cli_update_metadata.py | 6 +++--- tests/download/test_download.py | 10 +++++----- tests/process/db/test_db.py | 14 +++++++------- .../db/test_db_segment_obspy_methods.py | 2 +- tests/process/gui/test_webgui.py | 12 ++++++------ 15 files changed, 34 insertions(+), 34 deletions(-) diff --git a/stream2segment/download/main.py b/stream2segment/download/main.py index bc4cce0e..74b2a02e 100644 --- a/stream2segment/download/main.py +++ b/stream2segment/download/main.py @@ -344,7 +344,7 @@ def stepinfo(text, *args, **kwargs): session.close() # query station id, network station, datacenter_url - # for those stations with empty inventory_xml + # for those stations with empty stationxml # AND at least one segment non-empty/null # Download inventories for those stations only sta_df = get_station_df_for_inventory_download(session, update_metadata) diff --git a/stream2segment/download/modules/channels.py b/stream2segment/download/modules/channels.py index cc52b328..d72eec77 100644 --- a/stream2segment/download/modules/channels.py +++ b/stream2segment/download/modules/channels.py @@ -383,7 +383,7 @@ def save_stations_and_channels(session, channels_df, eidavalidator, update, if _update_stations: _update_stations = [_ for _ in shared_colnames(Station, channels_df, pkey=False) - if _ != Station.inventory_xml.key] + if _ != Station.stationxml.key] # Add stations to db (Note: no need to check for `empty(channels_df)`, # `dbsyncdf` raises a `FailedDownload` in case). First set columns diff --git a/stream2segment/download/modules/stations.py b/stream2segment/download/modules/stations.py index bcf6c102..6957739f 100644 --- a/stream2segment/download/modules/stations.py +++ b/stream2segment/download/modules/stations.py @@ -111,7 +111,7 @@ def save_inventories(session, stations_df, max_thread_workers, timeout, Station.station.key, Station.start_time.key]) dbmanager = DbManager(session, Station.id, - update=[Station.inventory_xml.key], + update=[Station.stationxml.key], buf_size=db_bufsize, oninsert_err_callback=db_exc_logger.failed_insert, onupdate_err_callback=db_exc_logger.failed_update) @@ -143,7 +143,7 @@ def save_inventories(session, stations_df, max_thread_workers, timeout, else: downloaded += 1 dfr = pd.DataFrame({Station.id.key: [sta_id], - Station.inventory_xml.key: [compress(data)]}) + Station.stationxml.key: [compress(data)]}) dbmanager.add(dfr) dbmanager.close() diff --git a/stream2segment/io/db/models.py b/stream2segment/io/db/models.py index de87f59c..ad24ff72 100644 --- a/stream2segment/io/db/models.py +++ b/stream2segment/io/db/models.py @@ -291,7 +291,7 @@ def datacenter_id(cls): site_name = Column(String) start_time = Column(DateTime, nullable=False) end_time = Column(DateTime) - inventory_xml = Column(LargeBinary) + stationxml = Column(LargeBinary) @property def url(self): @@ -301,11 +301,11 @@ def url(self): @hybrid_property def has_inventory(self): - return bool(self.inventory_xml) + return bool(self.stationxml) @has_inventory.expression def has_inventory(cls): # pylint:disable=no-self-argument - return withdata(cls.inventory_xml) + return withdata(cls.stationxml) # relationships (implement here only those shared by download+process): @declared_attr diff --git a/stream2segment/process/__init__.py b/stream2segment/process/__init__.py index a63cf5cb..43aeee53 100644 --- a/stream2segment/process/__init__.py +++ b/stream2segment/process/__init__.py @@ -353,7 +353,7 @@ def get_segment_help(format='html', maxwidth=79, **print_kwargs): # assigned to the segment"], ["classes.description", None], # "int: the description(s) of the class labels # assigned to the segment"], - ["station.inventory_xml", None], # bytes + ["station.stationxml", None], # bytes ["download.log", None], # str ["download.warnings", None], # int ["download.errors", None], # int diff --git a/stream2segment/process/db/models.py b/stream2segment/process/db/models.py index 2c48e4d3..a994eae6 100644 --- a/stream2segment/process/db/models.py +++ b/stream2segment/process/db/models.py @@ -194,7 +194,7 @@ def get_inventory(station): """Return the inventory object for the given station. Raises :class:`SkipSegment` if inventory data is empty """ - data = station.inventory_xml + data = station.stationxml if not data: raise SkipSegment('no data') return get_inventory_from_bytes(data) diff --git a/stream2segment/process/gui/webapp/mainapp/db.py b/stream2segment/process/gui/webapp/mainapp/db.py index d52fb923..a1c7a935 100644 --- a/stream2segment/process/gui/webapp/mainapp/db.py +++ b/stream2segment/process/gui/webapp/mainapp/db.py @@ -166,7 +166,7 @@ def get_metadata(segment_id=None): related_models_attrs = { 'event': lambda attr: attr not in {'contributor', 'contributor_id', 'mag_author', 'event_type', 'author'}, - 'station': lambda attr: attr != Station.inventory_xml.key, + 'station': lambda attr: attr != Station.stationxml.key, 'channel': lambda attr: True, 'datacenter': lambda attr: attr in {'id', 'dataselect_url'}, 'download': lambda attr: attr in {'id', 'run_time'} diff --git a/stream2segment/resources/templates/Using-Stream2segment-in-your-Python-code.ipynb b/stream2segment/resources/templates/Using-Stream2segment-in-your-Python-code.ipynb index 12916a0c..8dbbaa1c 100644 --- a/stream2segment/resources/templates/Using-Stream2segment-in-your-Python-code.ipynb +++ b/stream2segment/resources/templates/Using-Stream2segment-in-your-Python-code.ipynb @@ -421,7 +421,7 @@ " site_name: None (NoneType)\n", " start_time: 2012-03-21 10:00:00 (datetime)\n", " end_time: None (NoneType)\n", - " inventory_xml: b'\\x1f\\x8b\\x08\\x00\\xa4\\x99\\x1b\\\\\\x02\\xff' (bytes, 44710 elements, showing first 10 only)\n", + " stationxml: b'\\x1f\\x8b\\x08\\x00\\xa4\\x99\\x1b\\\\\\x02\\xff' (bytes, 44710 elements, showing first 10 only)\n", " datacenter_id: 1 (int)\n", " id: 2 (int)\n", " related_objects (0 of 3 loaded):\n", diff --git a/stream2segment/resources/templates/example.db.sqlite b/stream2segment/resources/templates/example.db.sqlite index 33726b54280ab872098ad5ce77fcb2a6c783f658..4cf1a1f873265217cb97dc549a9dbe2749b62d6d 100644 GIT binary patch delta 1315 zcmeH{PfQa*6vpQ*Q|lJG3nu&%q3fbjq*{taMXk`51scj0mtul3DHYa8z^ZiBnwUtG z7`>sG>``Kjo{WDg&QUyY*Na{?UX^e%F+}lAftVOC-aY*G+kS8R_M3U_Of8g5&25>f zDXSrb#E=rC8Y#cy+XpqzhwsZEJt&=#+~|=~ej&~BO1?~)cIWRt$p~i(79mxrTM^!8 z8&ZqZAs$3Qwj(N1kL*DF$WCMz(tvo8Mr1eAg!qs>$X=uwDaKd!$@%%$@!{Iyd&K3) z5$T)zt2AHo*!{A2u_Pzj;%P_Dkz;iVoN(B>0`=^D8+>0}yhJrmggdnX$}C z`)3fUof*@*d~R~w_OwEUGi#--v2&Ti2RqaPZP&~R^HRZ6#jbcFobT(?1U;t;)w`) z(S$ybG{P}Gsa#VrUkYO(*g17^L{$d$*s(#)%PTy&>QmQauib5gXv9uk2}6r@>q@US zIq%xw51?;wqd7!Bi0u-hSoaqva1mX z&^zd#mBngX4?_K>7Bn^!b}0xU2PLFvm9aKhDH8mZ_Pci28)})}TH&m$t%X}oD}8Fh zTu+_d&_LACaNm?e-h*s zypmYqJOByLgD-#sBt7}nr%=l8f{f0T>!jxbs(pc}Gpw+eMkRfJC-XlJy8n;neya}6^(%r#;^n-;vxcV74) uLzK}eZgyf0t{Udx+J!l|c4H3VheyPyAjw3M@f>jQg)9T?d$r6TIrVT_y4u~{>*@V#T&GntoDimN4$gI!!wlCjf#`UVq5`OV3kt&D<7sb#5o zCGjPd1*uA$A+8Z2(IjGHNri0kxEe>QCn9l$`Es!l=y31r#vZUS`5* z%R1dpfl+z;Y*ogetdlu84ci&0su=VR)_!q diff --git a/tests/conftest.py b/tests/conftest.py index 7055bfb0..c925f208 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -522,7 +522,7 @@ def create(self, to_file=False): inv_xml = data.read("inventory_GE.APE.xml") s_ok = dbp.Station(datacenter_id=dtc.id, latitude=11, longitude=12, network='ok', station='ok', start_time=datetime.utcnow(), - inventory_xml=compress(inv_xml)) + stationxml=compress(inv_xml)) session.add(s_ok) session.commit() diff --git a/tests/download/db/test_cli_update_metadata.py b/tests/download/db/test_cli_update_metadata.py index d8854c61..3d7e757e 100644 --- a/tests/download/db/test_cli_update_metadata.py +++ b/tests/download/db/test_cli_update_metadata.py @@ -97,7 +97,7 @@ def tst_cmdline_inv_only(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, stainvs = db.session.query(Station).filter(Station.has_inventory).all() assert len(stainvs) == 1 ix = \ - db.session.query(Station.id, Station.inventory_xml).filter( + db.session.query(Station.id, Station.stationxml).filter( Station.has_inventory).all() num_downloaded_inventories_first_try = len(ix) assert len(ix) == num_downloaded_inventories_first_try @@ -149,7 +149,7 @@ def tst_cmdline_inv_only(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, # data center returning an already saved station. The saved station has # been downloaded from a different data center - # id datacenter_id inventory_xml + # id datacenter_id stationxml # 1 1 b'...' # 2 1 None # 3 2 b'...' @@ -163,7 +163,7 @@ def get_stadf(): db.session.query( Station.id, Station.datacenter_id, - Station.inventory_xml, + Station.stationxml, Station.network, Station.station, Station.start_time) diff --git a/tests/download/test_download.py b/tests/download/test_download.py index 8ddf6829..7eeb468d 100644 --- a/tests/download/test_download.py +++ b/tests/download/test_download.py @@ -630,7 +630,7 @@ def mgdf(*a, **v): stainvs = db.session.query(Station).filter(Station.has_inventory).all() assert len(stainvs) == 1 assert "Inventory download error" in self.log_msg() - ix = db.session.query(Station.id, Station.inventory_xml).filter(Station.has_inventory).all() + ix = db.session.query(Station.id, Station.stationxml).filter(Station.has_inventory).all() num_downloaded_inventories_first_try = len(ix) assert len(ix) == num_downloaded_inventories_first_try staid, invdata = ix[0][0], ix[0][1] @@ -660,7 +660,7 @@ def mgdf(*a, **v): '--start', '2016-05-08T00:00:00', '--end', '2016-05-08T09:00:00', '--inventory']) assert clirunner.ok(result) - ix = db.session.query(Station.id, Station.inventory_xml).filter(Station.has_inventory).all() + ix = db.session.query(Station.id, Station.stationxml).filter(Station.has_inventory).all() assert len(ix) == num_expected_inventories_to_download # check now that none is downloaded @@ -746,7 +746,7 @@ def mgdf(*a, **v): sta1.elevation = new_elevation sta_inv.site_name = new_sitename cha.sample_rate = new_srate - sta_inv.inventory_xml = new_sta_inv + sta_inv.stationxml = new_sta_inv db.session.commit() # assure some data is returned from inventoriy url: @@ -767,7 +767,7 @@ def mgdf(*a, **v): assert db.session.query(Channel).filter(Channel.sample_rate == new_srate).first() # assert segment without inventory has still No inventory: assert db.session.query(Station).filter(Station.id == - sta_inv_id).first().inventory_xml == new_sta_inv + sta_inv_id).first().stationxml == new_sta_inv # NOW UPDATE METADATA @@ -784,7 +784,7 @@ def mgdf(*a, **v): # assert sta_inv has inventory re-downloaded: # assert segment without inventory has inventory: assert db.session.query(Station).filter(Station.id == - sta_inv_id).first().inventory_xml != new_sta_inv + sta_inv_id).first().stationxml != new_sta_inv # and now this: assert db.session.query(Station).filter(Station.site_name == new_sitename).first() # WHY? because site_name has been implemented for compatibility when the query diff --git a/tests/process/db/test_db.py b/tests/process/db/test_db.py index 9a67fc2f..53598292 100644 --- a/tests/process/db/test_db.py +++ b/tests/process/db/test_db.py @@ -59,21 +59,21 @@ def test_inventory_io(self, db, data): dumped_inv = compress(invdata, compression='gzip', compresslevel=9) assert len(dumped_inv) < len(invdata) - e.inventory_xml = dumped_inv + e.stationxml = dumped_inv db.session.add(e) db.session.commit() - inv_xml = get_inventory_from_bytes(e.inventory_xml) + inv_xml = get_inventory_from_bytes(e.stationxml) assert isinstance(inv_xml, Inventory) - inv_count = db.session.query(Station).filter(Station.inventory_xml != None).count() + inv_count = db.session.query(Station).filter(Station.stationxml != None).count() stationsc = db.session.query(Station).count() # test what happens deleting it: ret = db.session.query(Station).\ - filter(Station.inventory_xml!=None).\ - update({Station.inventory_xml: None}) + filter(Station.stationxml!=None).\ + update({Station.stationxml: None}) assert ret == inv_count db.session.commit() @@ -85,7 +85,7 @@ def test_inventory_io(self, db, data): assert ret == stationsc db.session.commit() - # SHIT< WE DELETED ALL STATIONS IN THE COMMAND ABOVE, NOT ONLY inventory_xml!! + # SHIT< WE DELETED ALL STATIONS IN THE COMMAND ABOVE, NOT ONLY stationxml!! # now delete only nonnull, should return zero: assert db.session.query(Station).count() == 0 @@ -710,7 +710,7 @@ def COPY(self): # test has_inventory stas1 = db.session.query(Station.id).filter(Station.has_inventory) - stas2 = db.session.query(Station.id).filter(withdata(Station.inventory_xml)) + stas2 = db.session.query(Station.id).filter(withdata(Station.stationxml)) assert str(stas1) == str(stas2) assert sorted(x[0] for x in stas1.all()) == sorted(x[0] for x in stas2.all()) diff --git a/tests/process/db/test_db_segment_obspy_methods.py b/tests/process/db/test_db_segment_obspy_methods.py index 0980b839..9a13c691 100644 --- a/tests/process/db/test_db_segment_obspy_methods.py +++ b/tests/process/db/test_db_segment_obspy_methods.py @@ -147,7 +147,7 @@ def read_stream(segment, reload=False): assert not mock_getinv.called else: assert mock_getinv.called - assert len(segment.station.inventory_xml) > 0 + assert len(segment.station.stationxml) > 0 # re-call it with reload=True and assert we raise the previous # exception, and that we called get_inv: ccc = mock_getinv.call_count diff --git a/tests/process/gui/test_webgui.py b/tests/process/gui/test_webgui.py index dfe48caa..ad40e3ae 100644 --- a/tests/process/gui/test_webgui.py +++ b/tests/process/gui/test_webgui.py @@ -81,7 +81,7 @@ def init(self, request, db, data): inv_xml = data.read("GE.FLT1.xml") s = Station(network='network', station='station', datacenter_id=dc.id, latitude=90, longitude=-45, - start_time=d, inventory_xml=inv_xml) + start_time=d, stationxml=inv_xml) session.add(s) channels = [ @@ -261,10 +261,10 @@ def test_init(self, a2 = data['metadata'] # a2 = None get_metadata(db.session, None) - # Station.inventory_xml, Segment.data, Download.log, + # Station.stationxml, Segment.data, Download.log, # Download.config, Download.errors, Download.warnings, # Download.program_version, Class.description - for excluded in ['station.inventory_xml', 'data', 'download.log', + for excluded in ['station.stationxml', 'data', 'download.log', 'download.config', 'download.errors', 'download.warnings', 'download.program_version', 'class.description']: assert not any(_['label'] == excluded for _ in a2) @@ -515,8 +515,8 @@ def _(*a, **v): # store empty inventory xml in segment sess = db.session() sta = sess.query(Segment).filter(Segment.id == self.segment_id).one().station - inv_xml = sta.inventory_xml - sta.inventory_xml = b'' + inv_xml = sta.stationxml + sta.stationxml = b'' sess.commit() try: with self.app.test_request_context(): @@ -536,7 +536,7 @@ def _(*a, **v): assert isinstance(plots[""], str) \ and "Station inventory (xml) error" in plots[""] finally: - sta.inventory_xml = inv_xml + sta.stationxml = inv_xml sess.commit() @pytest.mark.parametrize('calculate_sn_spectra', [True, False]) From dc423e7fb66289a6f120da9cf7ba7f2f74d0f959 Mon Sep 17 00:00:00 2001 From: Bastien Pili Date: Mon, 8 Jul 2024 15:59:09 +0200 Subject: [PATCH 3/5] Renamed save_inventories to save_stationxml --- stream2segment/download/main.py | 12 ++--- stream2segment/download/modules/stations.py | 4 +- tests/download/db/test_cli_update_metadata.py | 10 ++-- tests/download/test_download.py | 52 +++++++++---------- tests/download/test_download2.py | 12 ++--- tests/download/test_download_auth.py | 30 +++++------ 6 files changed, 60 insertions(+), 60 deletions(-) diff --git a/stream2segment/download/main.py b/stream2segment/download/main.py index 74b2a02e..f2d404a1 100644 --- a/stream2segment/download/main.py +++ b/stream2segment/download/main.py @@ -26,7 +26,7 @@ download_save_segments, DcDataselectManager) from stream2segment.download.modules.stations import \ - (save_inventories, get_station_df_for_inventory_download) + (save_stationxml, get_station_df_for_inventory_download) # make the logger refer to the parent of this package (`rfind` below. For info: @@ -354,11 +354,11 @@ def stepinfo(text, *args, **kwargs): else: stepinfo("Downloading %d station inventories", len(sta_df)) n_downloaded, n_empty, n_errors = \ - save_inventories(session, sta_df, - max_thread_workers, - advanced_settings['i_timeout'], - download_blocksize, - dbbufsize, isterminal) + save_stationxml(session, sta_df, + max_thread_workers, + advanced_settings['i_timeout'], + download_blocksize, + dbbufsize, isterminal) logger.info(("** Station inventories download summary **\n" "- downloaded %7d \n" "- discarded %7d (empty response)\n" diff --git a/stream2segment/download/modules/stations.py b/stream2segment/download/modules/stations.py index 6957739f..d64fc61c 100644 --- a/stream2segment/download/modules/stations.py +++ b/stream2segment/download/modules/stations.py @@ -99,8 +99,8 @@ def _query4inventorydownload(session, force_update): return qry -def save_inventories(session, stations_df, max_thread_workers, timeout, - download_blocksize, db_bufsize, show_progress=False): +def save_stationxml(session, stations_df, max_thread_workers, timeout, + download_blocksize, db_bufsize, show_progress=False): """Save inventories. stations_df must not be empty (not checked here)""" inv_logger = InventoryLogger() diff --git a/tests/download/db/test_cli_update_metadata.py b/tests/download/db/test_cli_update_metadata.py index 3d7e757e..971c9ab1 100644 --- a/tests/download/db/test_cli_update_metadata.py +++ b/tests/download/db/test_cli_update_metadata.py @@ -35,8 +35,8 @@ def tst_cmdline_inv_only(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, lambda *a, **v: self.get_datacenters_df(None, *a, **v) mock_get_channels_df.side_effect = lambda *a, **v: self.get_channels_df(None, *a, **v) - mock_save_inventories.side_effect = lambda *a, **v: self.save_inventories(None, *a, - **v) + mock_save_inventories.side_effect = lambda *a, **v: self.save_stationxml(None, *a, + **v) mock_download_save_segments.side_effect = \ lambda *a, **v: self.download_save_segments(None, *a, **v) # mseed unpack is mocked by accepting only first arg (so that time bounds are not @@ -76,7 +76,7 @@ def tst_cmdline_inv_only(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, # and be more safe about the fact that we will have only ONE station inventory saved inv_urlread_ret_val = [self._inv_data, URLError('a')] mock_save_inventories.side_effect = \ - lambda *a, **v: self.save_inventories(inv_urlread_ret_val, *a, **v) + lambda *a, **v: self.save_stationxml(inv_urlread_ret_val, *a, **v) mock_download_save_segments.reset_mock() old_log_msg = self.log_msg() @@ -111,8 +111,8 @@ def tst_cmdline_inv_only(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, # Now write also to the second station inventory (the one # which raised before) - mock_save_inventories.side_effect = lambda *a, **v: self.save_inventories([b"x"], *a, - **v) + mock_save_inventories.side_effect = lambda *a, **v: self.save_stationxml([b"x"], *a, + **v) result = clirunner.invoke(cli, ['download', '-c', self.configfile, '--dburl', db.dburl, diff --git a/tests/download/test_download.py b/tests/download/test_download.py index 7eeb468d..d24eef6c 100644 --- a/tests/download/test_download.py +++ b/tests/download/test_download.py @@ -18,7 +18,7 @@ from stream2segment.cli import cli from stream2segment.download.main import get_events_df, get_datacenters_df, \ - get_channels_df, download_save_segments, save_inventories + get_channels_df, download_save_segments, save_stationxml from stream2segment.download.log import configlog4download from stream2segment.download.db.models import Segment, Download, Station, Channel, \ Event, DataCenter @@ -247,22 +247,22 @@ def download_save_segments(self, url_read_side_effect, *a, **kw): else url_read_side_effect) return download_save_segments(*a, **kw) - def save_inventories(self, url_read_side_effect, *a, **v): + def save_stationxml(self, url_read_side_effect, *a, **v): self.setup_urlopen(self._inv_data if url_read_side_effect is None else url_read_side_effect) - return save_inventories(*a, **v) + return save_stationxml(*a, **v) @patch('stream2segment.io.db.pdsql._get_max') @patch('stream2segment.download.main.get_events_df') @patch('stream2segment.download.main.get_datacenters_df') @patch('stream2segment.download.main.get_channels_df') - @patch('stream2segment.download.main.save_inventories') + @patch('stream2segment.download.main.save_stationxml') @patch('stream2segment.download.main.download_save_segments') @patch('stream2segment.download.modules.segments.mseedunpack') @patch('stream2segment.io.db.pdsql.insertdf') @patch('stream2segment.io.db.pdsql.updatedf') def test_cmdline_dberr(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, - mock_download_save_segments, mock_save_inventories, + mock_download_save_segments, mock_save_stationxml, mock_get_channels_df, mock_get_datacenters_df, mock_get_events_df, mock_autoinc_db, # fixtures: @@ -272,7 +272,7 @@ def test_cmdline_dberr(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, mock_get_datacenters_df.side_effect = \ lambda *a, **v: self.get_datacenters_df(None, *a, **v) mock_get_channels_df.side_effect = lambda *a, **v: self.get_channels_df(None, *a, **v) - mock_save_inventories.side_effect = lambda *a, **v: self.save_inventories(None, *a, **v) + mock_save_stationxml.side_effect = lambda *a, **v: self.save_stationxml(None, *a, **v) def dss(*a, **v): """Call self.download_save_segments after setting dbbufsize (a[9]) to 1""" @@ -344,13 +344,13 @@ def insdf(*a, **v): @patch('stream2segment.download.main.get_events_df') @patch('stream2segment.download.main.get_datacenters_df') @patch('stream2segment.download.main.get_channels_df') - @patch('stream2segment.download.main.save_inventories') + @patch('stream2segment.download.main.save_stationxml') @patch('stream2segment.download.main.download_save_segments') @patch('stream2segment.download.modules.segments.mseedunpack') @patch('stream2segment.io.db.pdsql.insertdf') @patch('stream2segment.io.db.pdsql.updatedf') def test_cmdline_outofbounds(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, - mock_download_save_segments, mock_save_inventories, + mock_download_save_segments, mock_save_stationxml, mock_get_channels_df, mock_get_datacenters_df, mock_get_events_df, # fixtures: @@ -360,7 +360,7 @@ def test_cmdline_outofbounds(self, mock_updatedf, mock_insertdf, mock_mseed_unpa mock_get_datacenters_df.side_effect = \ lambda *a, **v: self.get_datacenters_df(None, *a, **v) mock_get_channels_df.side_effect = lambda *a, **v: self.get_channels_df(None, *a, **v) - mock_save_inventories.side_effect = lambda *a, **v: self.save_inventories(None, *a, **v) + mock_save_stationxml.side_effect = lambda *a, **v: self.save_stationxml(None, *a, **v) mock_download_save_segments.side_effect = \ lambda *a, **v: self.download_save_segments(None, *a, **v) mock_mseed_unpack.side_effect = lambda *a, **v: unpack(*a, **v) @@ -409,13 +409,13 @@ def test_cmdline_outofbounds(self, mock_updatedf, mock_insertdf, mock_mseed_unpa @patch('stream2segment.download.main.get_events_df') @patch('stream2segment.download.main.get_datacenters_df') @patch('stream2segment.download.main.get_channels_df') - @patch('stream2segment.download.main.save_inventories') + @patch('stream2segment.download.main.save_stationxml') @patch('stream2segment.download.main.download_save_segments') @patch('stream2segment.download.modules.segments.mseedunpack') @patch('stream2segment.io.db.pdsql.insertdf') @patch('stream2segment.io.db.pdsql.updatedf') def tst_cmdline(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, - mock_download_save_segments, mock_save_inventories, mock_get_channels_df, + mock_download_save_segments, mock_save_stationxml, mock_get_channels_df, mock_get_datacenters_df, mock_get_events_df, # fixtures: db, clirunner, pytestdir): @@ -424,7 +424,7 @@ def tst_cmdline(self, mock_updatedf, mock_insertdf, mock_mseed_unpack, mock_get_datacenters_df.side_effect = \ lambda *a, **v: self.get_datacenters_df(None, *a, **v) mock_get_channels_df.side_effect = lambda *a, **v: self.get_channels_df(None, *a, **v) - mock_save_inventories.side_effect = lambda *a, **v: self.save_inventories(None, *a, **v) + mock_save_stationxml.side_effect = lambda *a, **v: self.save_stationxml(None, *a, **v) mock_download_save_segments.side_effect = \ lambda *a, **v: self.download_save_segments(None, *a, **v) # mseed unpack is mocked by accepting only first arg (so that time bounds are not @@ -608,8 +608,8 @@ def mgdf(*a, **v): assert num_expected_inventories_to_download == 2 # just in order to set the value below # and be more safe about the fact that we will have only ONE station inventory saved inv_urlread_ret_val = [self._inv_data, URLError('a')] - mock_save_inventories.side_effect = \ - lambda *a, **v: self.save_inventories(inv_urlread_ret_val, *a, **v) + mock_save_stationxml.side_effect = \ + lambda *a, **v: self.save_stationxml(inv_urlread_ret_val, *a, **v) # SKIP THIS TEST (not valid anymore): # first check that we issue an error if we provide inventory as flag (old behaviour): @@ -636,11 +636,11 @@ def mgdf(*a, **v): staid, invdata = ix[0][0], ix[0][1] expected_invs_to_download_ids.remove(staid) # remove the saved inventory assert not invdata.startswith(b' Date: Wed, 10 Jul 2024 14:18:58 +0200 Subject: [PATCH 4/5] Added save_quakeml function. Refactored InventoryLogger to OneTimeLogger --- stream2segment/download/modules/events.py | 69 +++++++++++++++++- stream2segment/download/modules/stations.py | 80 ++------------------- stream2segment/download/modules/utils.py | 77 +++++++++++++++++++- 3 files changed, 148 insertions(+), 78 deletions(-) diff --git a/stream2segment/download/modules/events.py b/stream2segment/download/modules/events.py index a07920ad..bac5e4b1 100644 --- a/stream2segment/download/modules/events.py +++ b/stream2segment/download/modules/events.py @@ -9,17 +9,24 @@ from datetime import timedelta import logging from io import StringIO +from urllib.request import Request import numpy as np import pandas as pd from stream2segment.io.cli import get_progressbar +from stream2segment.io.db.pdsql import DbManager from stream2segment.download.exc import FailedDownload, NothingToDownload -from stream2segment.download.db.models import WebService, Event -from stream2segment.download.url import urlread, socket, HTTPError +from stream2segment.download.db.models import Event, WebService +from stream2segment.download.url import urlread, socket, HTTPError, read_async from stream2segment.download.modules.utils import (dbsyncdf, get_dataframe_from_fdsn, formatmsg, - EVENTWS_MAPPING, strptime, urljoin) + EVENTWS_MAPPING, + strptime, + urljoin, + DbExcLogger, + OneTimeLogger, + compress) # (https://docs.python.org/2/howto/logging.html#advanced-logging-tutorial): logger = logging.getLogger(__name__) @@ -471,3 +478,59 @@ def isf2text_iter(isf_filep, catalog='', contributor=''): expects += 1 except IndexError: buf = [] + + +def save_quakeml(session, events_df, max_thread_workers, timeout, + download_blocksize, db_bufsize, show_progress=False): + """Save event's quakeML data. envents_df must not be empty""" + + logger_header = "QuakeML" + evt_logger = OneTimeLogger(logger_header) + + downloaded, errors, empty = 0, 0, 0 + + db_exc_logger = DbExcLogger([Event.id.key]) + + dbmanager = DbManager(session, Event.id, + update=[Event.quakeml.key], + buf_size=db_bufsize, + oninsert_err_callback=db_exc_logger.failed_insert, + onupdate_err_callback=db_exc_logger.failed_update) + + with get_progressbar(show_progress, length=len(events_df)) as pbar: + + iterable = zip(events_df[Event.id.key], + events_df[WebService.url], + events_df[Event.event_id.key]) + + reader = read_async(iterable, + urlkey=lambda obj: _get_evt_request(*obj[1:]), + max_workers=max_thread_workers, + blocksize=download_blocksize, timeout=timeout) + + for obj, request, data, exc, status_code in reader: + pbar.update(1) + evt_id = obj[0] + if exc: + evt_logger.warn(request, exc) + errors += 1 + else: + if not data: + evt_logger.warn(request, "empty response") + empty += 1 + else: + downloaded += 1 + dfr = pd.DataFrame({Event.id.key: [evt_id], + Event.quakeml.key: [compress(data)]}) + dbmanager.add(dfr) + + dbmanager.close() + + return downloaded, empty, errors + + +def _get_evt_request(evt_url, evt_eventid): + """Return a Request object from the given event arguments to download the + QuakeML + """ + return Request(url=f"{evt_url}?eventid={evt_eventid}") \ No newline at end of file diff --git a/stream2segment/download/modules/stations.py b/stream2segment/download/modules/stations.py index d64fc61c..96f0bdb0 100644 --- a/stream2segment/download/modules/stations.py +++ b/stream2segment/download/modules/stations.py @@ -6,11 +6,6 @@ .. moduleauthor:: Riccardo Zaccarelli """ from datetime import datetime -from io import BytesIO -import gzip -import zipfile -import zlib -import bz2 import logging from datetime import timedelta from urllib.request import Request @@ -20,10 +15,8 @@ from stream2segment.io.cli import get_progressbar from stream2segment.io.db.pdsql import DbManager, dbquery2df from stream2segment.download.db.models import DataCenter, Station, Segment -from stream2segment.download.url import get_host, read_async -from stream2segment.download.modules.utils import (DbExcLogger, formatmsg, - url2str, err2str) - +from stream2segment.download.url import read_async +from stream2segment.download.modules.utils import DbExcLogger, OneTimeLogger, compress # (https://docs.python.org/2/howto/logging.html#advanced-logging-tutorial): logger = logging.getLogger(__name__) @@ -31,7 +24,7 @@ def _get_sta_request(datacenter_url, network, station, start_time, end_time): """Return a Request object from the given station arguments to download the - inventory xml + StationXML """ # fix bug of ncedc and scedc whereby dates exactly on the start are not returned. # Adding 1s to the start time is heavily hacky but it fixes the problem easily: @@ -101,9 +94,10 @@ def _query4inventorydownload(session, force_update): def save_stationxml(session, stations_df, max_thread_workers, timeout, download_blocksize, db_bufsize, show_progress=False): - """Save inventories. stations_df must not be empty (not checked here)""" + """Save StationXML data. stations_df must not be empty (not checked here)""" - inv_logger = InventoryLogger() + logger_header = "StationXML" + inv_logger = OneTimeLogger(logger_header) downloaded, errors, empty = 0, 0, 0 @@ -149,65 +143,3 @@ def save_stationxml(session, stations_df, max_thread_workers, timeout, dbmanager.close() return downloaded, empty, errors - - -def compress(bytestr, compression='gzip', compresslevel=9): - """Compress `bytestr` returning a new compressed byte sequence - - :param bytestr: (string) a sequence of bytes to be compressed - :param compression: String, either ['bz2', 'zlib', 'gzip', 'zip'. Default: 'gzip'] - The compression library to use (after serializing `obj` with the given format) - on the serialized data. If None or empty string, no compression is applied, and - `bytestr` is returned as it is - :param compresslevel: integer (9 by default). Ignored if `compression` is None, - empty or 'zip' (the latter does not accept this argument), this parameter - controls the level of compression; 1 is fastest and produces the least - compression, and 9 is slowest and produces the most compression - """ - if compression == 'bz2': - return bz2.compress(bytestr, compresslevel=compresslevel) - elif compression == 'zlib': - return zlib.compress(bytestr, compresslevel) - elif compression: - sio = BytesIO() - if compression == 'gzip': - with gzip.GzipFile(mode='wb', fileobj=sio, - compresslevel=compresslevel) as gzip_obj: - gzip_obj.write(bytestr) - # Note: DO NOT return sio.getvalue() WITHIN the with statement, - # the gzip file obj needs to be closed first. FIXME: ref? - elif compression == 'zip': - # In this case, use the compress argument to ZipFile to compress the data, - # since writestr() does not take compress as an argument. See: - # https://pymotw.com/2/zipfile/#writing-data-from-sources-other-than-files - with zipfile.ZipFile(sio, 'w', compression=zipfile.ZIP_DEFLATED) as zip_obj: - zip_obj.writestr("x", bytestr) # first arg must be a nonempty str - else: - raise ValueError("compression '%s' not in ('gzip', 'zlib', 'bz2', 'zip')" % - str(compression)) - - return sio.getvalue() - - return bytestr - - -class InventoryLogger(set): - """Class handling inventory errors and logging only once per error type - and datacenter to avoid polluting the log file/stream with hundreds of - megabytes""" - - def warn(self, request, exc): - """Issue a logger.warn if the given error is not already reported - - :param request: the Request object - :pram exc: the reported Exception or string message - """ - url = get_host(request) - item = (url, err2str(exc)) # use err2str to uniquely identify exc - if item not in self: - if not self: - logger.warning('Detailed inventory download errors (showing ' - 'only first of each type per data center):') - self.add(item) - request_str = url2str(request) - logger.warning(formatmsg("Inventory download error", exc, request_str)) diff --git a/stream2segment/download/modules/utils.py b/stream2segment/download/modules/utils.py index 9312651f..307a3c15 100644 --- a/stream2segment/download/modules/utils.py +++ b/stream2segment/download/modules/utils.py @@ -17,6 +17,11 @@ from collections import OrderedDict from functools import cmp_to_key import logging +from io import BytesIO +import gzip +import zipfile +import zlib +import bz2 import pandas as pd @@ -25,7 +30,8 @@ from stream2segment.io.db.inspection import colnames from stream2segment.download.db.models import Event, Station, Channel from stream2segment.download.exc import FailedDownload -from stream2segment.download.url import responses +from stream2segment.download.url import responses, get_host + # (https://docs.python.org/2/howto/logging.html#advanced-logging-tutorial): logger = logging.getLogger(__name__) @@ -248,6 +254,75 @@ def dolog(ok, notok, okstr, nookstr): dolog(updated, not_updated, "%d %s updated", ", %d discarded") +class OneTimeLogger(set): + """Class handling errors logging only once per error type + and host URL in order to avoid polluting the log file/stream + with hundreds of megabytes""" + + def __init__(self, header): + """ + :param header: str, the header to be shown in the log + """ + super().__init__() + self.header = header + + def warn(self, request, exc): + """Issue a logger.warn if the given error is not already reported + + :param request: the Request object + :param exc: the reported Exception or string message + """ + url = get_host(request) + item = (url, err2str(exc)) # use err2str to uniquely identify exc + if item not in self: + if not self: + logger.warning(f"{self.header} download errors") + logger.warning('(showing only first of each type per data center):') + self.add(item) + request_str = url2str(request) + logger.warning(formatmsg(f"{self.header} Download error", exc, request_str)) + + +def compress(bytestr, compression='gzip', compresslevel=9): + """Compress `bytestr` returning a new compressed byte sequence + + :param bytestr: (string) a sequence of bytes to be compressed + :param compression: String, either ['bz2', 'zlib', 'gzip', 'zip'. Default: 'gzip'] + The compression library to use (after serializing `obj` with the given format) + on the serialized data. If None or empty string, no compression is applied, and + `bytestr` is returned as it is + :param compresslevel: integer (9 by default). Ignored if `compression` is None, + empty or 'zip' (the latter does not accept this argument), this parameter + controls the level of compression; 1 is fastest and produces the least + compression, and 9 is slowest and produces the most compression + """ + if compression == 'bz2': + return bz2.compress(bytestr, compresslevel=compresslevel) + elif compression == 'zlib': + return zlib.compress(bytestr, compresslevel) + elif compression: + sio = BytesIO() + if compression == 'gzip': + with gzip.GzipFile(mode='wb', fileobj=sio, + compresslevel=compresslevel) as gzip_obj: + gzip_obj.write(bytestr) + # Note: DO NOT return sio.getvalue() WITHIN the with statement, + # the gzip file obj needs to be closed first. FIXME: ref? + elif compression == 'zip': + # In this case, use the compress argument to ZipFile to compress the data, + # since writestr() does not take compress as an argument. See: + # https://pymotw.com/2/zipfile/#writing-data-from-sources-other-than-files + with zipfile.ZipFile(sio, 'w', compression=zipfile.ZIP_DEFLATED) as zip_obj: + zip_obj.writestr("x", bytestr) # first arg must be a nonempty str + else: + raise ValueError("compression '%s' not in ('gzip', 'zlib', 'bz2', 'zip')" % + str(compression)) + + return sio.getvalue() + + return bytestr + + def get_dataframe_from_fdsn(response_str, query_type, url=''): """Return a normalized and harmonized dataframe from raw_data. dbmodel_key can be 'event' 'station' or 'channel'. Raises ValueError if the resulting From 46947b80a432ec4dd9ec1b5fbe90c5b34f617e10 Mon Sep 17 00:00:00 2001 From: Bastien Pili Date: Thu, 11 Jul 2024 12:14:24 +0200 Subject: [PATCH 5/5] Fix for QuakeML column (a23fbce) --- stream2segment/download/modules/events.py | 1 + stream2segment/io/db/models.py | 2 +- tests/download/test_u_download_01_events.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/stream2segment/download/modules/events.py b/stream2segment/download/modules/events.py index bac5e4b1..200e7c14 100644 --- a/stream2segment/download/modules/events.py +++ b/stream2segment/download/modules/events.py @@ -42,6 +42,7 @@ def get_events_df(session, url, evt_query_args, start, end, pd_df_list = events_df_list(url, evt_query_args, start, end, timeout, show_progress) # pd_df_list surely not empty (otherwise we raised FailedDownload) events_df = pd.concat(pd_df_list, axis=0, ignore_index=True, copy=False) + events_df.drop(columns=[Event.quakeml.key], inplace=True) events_df[Event.webservice_id.key] = eventws_id events_df = dbsyncdf(events_df, session, diff --git a/stream2segment/io/db/models.py b/stream2segment/io/db/models.py index ad24ff72..f30895a3 100644 --- a/stream2segment/io/db/models.py +++ b/stream2segment/io/db/models.py @@ -198,7 +198,7 @@ def webservice_id(cls): mag_author = Column(String) event_location_name = Column(String) event_type = Column(String) - quakeml = Column(LargeBinary, nullable=True) + quakeml = Column(LargeBinary) @property def url(self): diff --git a/tests/download/test_u_download_01_events.py b/tests/download/test_u_download_01_events.py index c68c1318..88edc18b 100644 --- a/tests/download/test_u_download_01_events.py +++ b/tests/download/test_u_download_01_events.py @@ -762,7 +762,7 @@ def test_get_events_response_has_one_col_more(self, mock_urljoin, db): Event MODEL. TO FIX THIS, EDIT THE RESPONSE BELOW IN ORDER TO HAVE ALWAYS ONE COLUMN MORE THAN IN OUR Event MODEL """ - urlread_sideeffect = ["""#EventID|Time|Latitude|Longitude|Depth/km|Author|Catalog|Contributor|ContributorID|MagType|Magnitude|MagAuthor|EventLocationName|EventType|QuakeML| + urlread_sideeffect = ["""#EventID|Time|Latitude|Longitude|Depth/km|Author|Catalog|Contributor|ContributorID|MagType|Magnitude|MagAuthor|EventLocationName|EventType|| gfz2021edty|2021-02-28T23:37:15.211956|-17.565212|167.572067|10.0|||GFZ|gfz2021edty|M|5.787024361||Vanuatu Islands|earthquake|| gfz2021edpn|2021-02-28T21:23:50.840903|-22.500320|172.554474|26.75543594|||GFZ|gfz2021edpn|mb|4.907085435||Southeast of Loyalty Islands|earthquake|| gfz2021edoa|2021-02-28T20:37:40.931643|-22.658522|172.432373|30.70357132|||GFZ|gfz2021edoa|Mw|5.755797284||Southeast of Loyalty Islands|earthquake||