diff --git a/.gitignore b/.gitignore index 41184692..43d50585 100644 --- a/.gitignore +++ b/.gitignore @@ -3,11 +3,11 @@ .fleet/ .vscode/ .history/ - # Mac .DS_Store # Project-specific +.env* [Ll]ogs/ test_venv/ grib2_to_cb_test/test/2119312000003 diff --git a/README.md b/README.md index 5f691dac..44b90a07 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,8 @@ cb_scope: "_default" cb_collection: "METAR" ``` +The cb_host file requires a protocol. For example ... "couchbase://adb-cb1.gsd.esrl.noaa.gov" - because adb-cb1... is a single node cluster. For adb-cb2 (which is one node of a multinode cluster) it would be "couchbases://adb-cb2.gsd.esrl.noaa.gov". Any of the nodes would suffice. + Once that's in place, you can run the ingest with Docker Compose like the example below. Note the `public` and `data` env variables respectively point to where the input data resides and where you'd like the container to write out to. They are the only part of the command you would need to modify. ```bash diff --git a/src/vxingest/builder_common/ingest_manager.py b/src/vxingest/builder_common/ingest_manager.py index 882da60b..260f087c 100644 --- a/src/vxingest/builder_common/ingest_manager.py +++ b/src/vxingest/builder_common/ingest_manager.py @@ -118,9 +118,7 @@ def connect_cb(self): _attempts = 0 while _attempts < 3: try: - self.cluster = Cluster( - "couchbase://" + self.cb_credentials["host"], options - ) + self.cluster = Cluster(self.cb_credentials["host"], options) break except CouchbaseException as _e: time.sleep(5) diff --git a/src/vxingest/builder_common/load_backup_ingest_docs.py b/src/vxingest/builder_common/load_backup_ingest_docs.py index 4c1b058f..f15b548b 100644 --- a/src/vxingest/builder_common/load_backup_ingest_docs.py +++ b/src/vxingest/builder_common/load_backup_ingest_docs.py @@ -101,9 +101,7 @@ def connect_cb(self): self.cb_credentials["user"], self.cb_credentials["password"] ) ) - self.cluster = Cluster( - "couchbase://" + self.cb_credentials["host"], options - ) + self.cluster = Cluster(self.cb_credentials["host"], options) self.collection = self.cluster.bucket("mdata").default_collection() except Exception as e: print(f"*** Error in connect_cb *** {e}") diff --git a/src/vxingest/builder_common/vx_ingest.py b/src/vxingest/builder_common/vx_ingest.py index 00fbbad8..f4cfe892 100644 --- a/src/vxingest/builder_common/vx_ingest.py +++ b/src/vxingest/builder_common/vx_ingest.py @@ -148,9 +148,7 @@ def connect_cb(self): _attempts = 0 while _attempts < 3: try: - self.cluster = Cluster( - "couchbase://" + self.cb_credentials["host"], options - ) + self.cluster = Cluster(self.cb_credentials["host"], options) break except CouchbaseException as _e: time.sleep(5) diff --git a/src/vxingest/utilities/backfill_obs_with_rh.py b/src/vxingest/utilities/backfill_obs_with_rh.py index 8495a922..bd5f3c1d 100755 --- a/src/vxingest/utilities/backfill_obs_with_rh.py +++ b/src/vxingest/utilities/backfill_obs_with_rh.py @@ -40,7 +40,7 @@ def setup_connection(): ) options = ClusterOptions(PasswordAuthenticator(_user, _password)) connection = {} - connection["cluster"] = Cluster("couchbase://" + _host, options) + connection["cluster"] = Cluster(_host, options) connection["bucket"] = connection["cluster"].bucket(_bucket) connection["scope"] = connection["bucket"].scope(_scope) connection["collection"] = connection["scope"].collection(_collection) diff --git a/tests/vxingest/builder_common/test_unit_queries.py b/tests/vxingest/builder_common/test_unit_queries.py index 8398152d..193b2554 100644 --- a/tests/vxingest/builder_common/test_unit_queries.py +++ b/tests/vxingest/builder_common/test_unit_queries.py @@ -34,7 +34,7 @@ def connect_cb(): PasswordAuthenticator(cb_connection["user"], cb_connection["password"]), timeout_options=timeout_options, ) - cb_connection["cluster"] = Cluster("couchbase://" + cb_connection["host"], options) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) cb_connection["collection"] = ( cb_connection["cluster"] .bucket(cb_connection["bucket"]) @@ -65,7 +65,7 @@ def test_stations_fcst_valid_epoch(request): @pytest.mark.integration() def test_stations_get_file_list_grib2(request): - _expected_time = 10 + _expected_time = 16 _name = request.node.name testdata = Path("tests/vxingest/builder_common/testdata/get_file_list_grib2.n1ql") with testdata.open(mode="r", encoding="utf-8") as file: diff --git a/tests/vxingest/ctc_to_cb/test_int_metar_ctc.py b/tests/vxingest/ctc_to_cb/test_int_metar_ctc.py index 7839a611..c5504a56 100644 --- a/tests/vxingest/ctc_to_cb/test_int_metar_ctc.py +++ b/tests/vxingest/ctc_to_cb/test_int_metar_ctc.py @@ -58,9 +58,9 @@ def test_check_fcst_valid_epoch_fcst_valid_iso(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) options = ClusterOptions(PasswordAuthenticator(_user, _password)) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) stmnt = f"""SELECT m0.fcstValidEpoch fve, fcstValidISO fvi FROM `{_bucket}`.{_scope}.{_collection} m0 WHERE @@ -105,7 +105,7 @@ def test_get_stations_geo_search(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) collection = cluster.bucket(_bucket).scope(_scope).collection(_collection) load_spec = {} load_spec["cluster"] = cluster @@ -199,7 +199,7 @@ def calculate_cb_ctc( options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) collection = cluster.bucket(_bucket).scope(_scope).collection(_collection) load_spec = {} load_spec["cluster"] = cluster @@ -498,7 +498,7 @@ def test_ctc_ceiling_data_hrrr_ops_all_hrrr(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) # get available fcstValidEpochs for couchbase result = cluster.query( @@ -631,7 +631,7 @@ def test_ctc_visibiltiy_data_hrrr_ops_all_hrrr(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) # get available fcstValidEpochs for couchbase stmnt = f"""SELECT RAW fcstValidEpoch diff --git a/tests/vxingest/ctc_to_cb/test_unit_queries_ctc.py b/tests/vxingest/ctc_to_cb/test_unit_queries_ctc.py index 459a674b..e7b8513d 100644 --- a/tests/vxingest/ctc_to_cb/test_unit_queries_ctc.py +++ b/tests/vxingest/ctc_to_cb/test_unit_queries_ctc.py @@ -35,7 +35,7 @@ def connect_cb(): PasswordAuthenticator(cb_connection["user"], cb_connection["password"]), timeout_options=timeout_options, ) - cb_connection["cluster"] = Cluster("couchbase://" + cb_connection["host"], options) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) cb_connection["collection"] = ( cb_connection["cluster"] .bucket(cb_connection["bucket"]) @@ -137,7 +137,7 @@ def test_get_stations(request): @pytest.mark.integration() def test_get_threshold_descriptions(request): _name = request.node.name - _expected_time = 0.01 + _expected_time = 0.6 testdata = Path( "tests/vxingest/ctc_to_cb/testdata/test_get_threshold_descriptions.n1ql" ) diff --git a/tests/vxingest/grib2_to_cb/test_int_metar_model_grib.py b/tests/vxingest/grib2_to_cb/test_int_metar_model_grib.py index ff0c8c71..05f8685e 100644 --- a/tests/vxingest/grib2_to_cb/test_int_metar_model_grib.py +++ b/tests/vxingest/grib2_to_cb/test_int_metar_model_grib.py @@ -54,9 +54,7 @@ def connect_cb(): PasswordAuthenticator(cb_connection["user"], cb_connection["password"]), timeout_options=timeout_options, ) - cb_connection["cluster"] = Cluster( - "couchbase://" + cb_connection["host"], options - ) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) cb_connection["collection"] = ( cb_connection["cluster"] .bucket(cb_connection["bucket"]) diff --git a/tests/vxingest/grib2_to_cb/test_unit_queries_grib.py b/tests/vxingest/grib2_to_cb/test_unit_queries_grib.py index 74bc9a12..d4da2529 100644 --- a/tests/vxingest/grib2_to_cb/test_unit_queries_grib.py +++ b/tests/vxingest/grib2_to_cb/test_unit_queries_grib.py @@ -35,7 +35,7 @@ def connect_cb(): PasswordAuthenticator(cb_connection["user"], cb_connection["password"]), timeout_options=timeout_options, ) - cb_connection["cluster"] = Cluster("couchbase://" + cb_connection["host"], options) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) cb_connection["collection"] = ( cb_connection["cluster"] .bucket(cb_connection["bucket"]) diff --git a/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_fields.n1ql b/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_fields.n1ql index 3f628a5a..d9a0d132 100644 --- a/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_fields.n1ql +++ b/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_fields.n1ql @@ -1,4 +1,4 @@ Select file_mask, input_data_path from `vxdata`._default.METAR -where meta().id = "MD:V01:METAR:RAP_OPS_130:ingest:grib2"; \ No newline at end of file +USE KEYS ["MD:V01:METAR:RAP_OPS_130:ingest:grib2"]; \ No newline at end of file diff --git a/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_id.n1ql b/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_id.n1ql index 0cffde2d..0a2a1d3e 100644 --- a/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_id.n1ql +++ b/tests/vxingest/grib2_to_cb/testdata/test_ingest_document_id.n1ql @@ -1,3 +1,3 @@ Select ingest_document_ids from `vxdata`._default.METAR -where meta().id = "MD:V01:METAR:RAP_OPS_130:ingest:grib2"; \ No newline at end of file +USE KEYS ["MD:V01:METAR:RAP_OPS_130:ingest:grib2"]; \ No newline at end of file diff --git a/tests/vxingest/netcdf_to_cb/test_unit_queries_obs.py b/tests/vxingest/netcdf_to_cb/test_unit_queries_obs.py index 71a117de..4b4bab1e 100644 --- a/tests/vxingest/netcdf_to_cb/test_unit_queries_obs.py +++ b/tests/vxingest/netcdf_to_cb/test_unit_queries_obs.py @@ -34,7 +34,7 @@ def connect_cb(): PasswordAuthenticator(cb_connection["user"], cb_connection["password"]), timeout_options=timeout_options, ) - cb_connection["cluster"] = Cluster("couchbase://" + cb_connection["host"], options) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) cb_connection["collection"] = ( cb_connection["cluster"] .bucket(cb_connection["bucket"]) diff --git a/tests/vxingest/netcdf_to_cb/testdata/test_get_stations.n1ql b/tests/vxingest/netcdf_to_cb/testdata/test_get_stations.n1ql index 798e5510..359f0305 100644 --- a/tests/vxingest/netcdf_to_cb/testdata/test_get_stations.n1ql +++ b/tests/vxingest/netcdf_to_cb/testdata/test_get_stations.n1ql @@ -1,3 +1,3 @@ Select ingest_document_ids from `vxdata`._default.METAR -where meta().id = "MD:V01:METAR:obs:ingest:netcdf"; \ No newline at end of file +USE KEYS ["MD:V01:METAR:obs:ingest:netcdf"]; \ No newline at end of file diff --git a/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_fields.n1ql b/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_fields.n1ql index 4f46352a..cc73519d 100644 --- a/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_fields.n1ql +++ b/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_fields.n1ql @@ -1,4 +1,4 @@ Select file_mask, input_data_path from `vxdata`._default.METAR -where meta().id = "MD:V01:METAR:obs:ingest:netcdf"; \ No newline at end of file +USE KEYS ["MD:V01:METAR:obs:ingest:netcdf"]; \ No newline at end of file diff --git a/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_id.n1ql b/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_id.n1ql index 798e5510..359f0305 100644 --- a/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_id.n1ql +++ b/tests/vxingest/netcdf_to_cb/testdata/test_ingest_document_id.n1ql @@ -1,3 +1,3 @@ Select ingest_document_ids from `vxdata`._default.METAR -where meta().id = "MD:V01:METAR:obs:ingest:netcdf"; \ No newline at end of file +USE KEYS ["MD:V01:METAR:obs:ingest:netcdf"]; \ No newline at end of file diff --git a/tests/vxingest/partial_sums_to_cb/test_int_metar_partial_sums.py b/tests/vxingest/partial_sums_to_cb/test_int_metar_partial_sums.py index 1cce0773..494f6d29 100644 --- a/tests/vxingest/partial_sums_to_cb/test_int_metar_partial_sums.py +++ b/tests/vxingest/partial_sums_to_cb/test_int_metar_partial_sums.py @@ -58,9 +58,9 @@ def test_check_fcst_valid_epoch_fcst_valid_iso(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) options = ClusterOptions(PasswordAuthenticator(_user, _password)) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) stmnt = f"""SELECT m0.fcstValidEpoch fve, fcstValidISO fvi FROM `{_bucket}`.{_scope}.{_collection} m0 WHERE @@ -105,7 +105,7 @@ def test_get_stations_geo_search(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) collection = cluster.bucket(_bucket).scope(_scope).collection(_collection) load_spec = {} load_spec["cluster"] = cluster @@ -273,7 +273,7 @@ def test_ps_surface_data_hrrr_ops_all_hrrr(): options = ClusterOptions( PasswordAuthenticator(_user, _password), timeout_options=timeout_options ) - cluster = Cluster("couchbase://" + _host, options) + cluster = Cluster(_host, options) # get available fcstValidEpochs for couchbase result = cluster.query( diff --git a/tests/vxingest/partial_sums_to_cb/test_unit_queries.py b/tests/vxingest/partial_sums_to_cb/test_unit_queries.py index cc435753..ff4ad949 100644 --- a/tests/vxingest/partial_sums_to_cb/test_unit_queries.py +++ b/tests/vxingest/partial_sums_to_cb/test_unit_queries.py @@ -35,7 +35,7 @@ def connect_cb(): PasswordAuthenticator(cb_connection["user"], cb_connection["password"]), timeout_options=timeout_options, ) - cb_connection["cluster"] = Cluster("couchbase://" + cb_connection["host"], options) + cb_connection["cluster"] = Cluster(cb_connection["host"], options) cb_connection["collection"] = ( cb_connection["cluster"] .bucket(cb_connection["bucket"]) @@ -145,7 +145,7 @@ def test_get_stations(request): @pytest.mark.integration() def test_get_threshold_descriptions(request): _name = request.node.name - _expected_time = 0.01 + _expected_time = 0.6 testdata = Path( "tests/vxingest/partial_sums_to_cb/testdata/test_get_threshold_descriptions.n1ql" )