diff --git a/README.md b/README.md index 974f173..fd4d9b0 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,10 @@ ![Release on Homebrew](https://github.com/5amCurfew/xtkt/actions/workflows/release.yml/badge.svg) - [:computer: Installation](#computer-installation) -- [:nut_and_bolt: Using with Singer.io Targets](#nut_and_bolt-using-with-singerio-targets) - [:floppy_disk: Metadata](#floppy_disk-metadata) +- [:pencil: Catalog](#pencil-catalog) +- [:clipboard: State](#clipboard-state) +- [:nut_and_bolt: Using with Singer.io Targets](#nut_and_bolt-using-with-singerio-targets) - [:wrench: Config.json](#wrench-configjson) - [:rocket: Examples](#rocket-examples) * [Rick & Morty API](#rick-&-morty-api) @@ -19,19 +21,19 @@ * [File csv](#file-csv) * [File jsonl](#file-jsonl) -**v0.2.1** +**v0.3.0** -`xtkt` ("extract") is a data extraction tool that follows the Singer.io specification. Supported sources include RESTful-APIs, csv and jsonl. +`xtkt` ("extract") is a data extraction tool that follows the Singer.io specification. Supported sources include RESTful APIs, csv and jsonl. `xtkt` can be pipe'd to any target that meets the Singer.io specification but has been designed and tested for databases such as SQLite & Postgres. Each stream is handled independently and deletion-at-source is not detected. -Extracted records are versioned, with new and updated data being treated as distinct records (with resulting keys `_sdc_natural_key` (unique key) and `_sdc_surrogate_key` (version key)). Only new and/or updated records are sent to be processed by your target. +Extracted records are versioned, with new and updated data being treated as distinct records (with resulting keys `_sdc_natural_key` (unique key) and `_sdc_surrogate_key` (version key)). Only new and/or updated records are sent to be processed by your target. Fields can be dropped from records prior to being sent to your target using the `records.drop_field_paths` field in your JSON configuration file (see examples below). This may be suitable for dropping redundant, large objects within a record. Fields can be hashed within records prior to being sent to your target using the `records.sensitive_field_paths` field in your JSON configuration file (see examples below). This may be suitable for handling sensitive data. -Both integers and floats are sent as floats. All fields are considered `NULLABLE`. All fields when extracting from CSV are considered strings for now. +Both integers and floats are sent as floats. All fields except the generated `_sdc_surrogate_key` field are considered `NULLABLE`. Schema detection is naive using the first non-null data type detected per field when generating the catalog. @@ -54,6 +56,26 @@ Flags: -v, --version version for xtkt ``` +### :floppy_disk: Metadata + +`xtkt` adds the following metadata to records + +* `_sdc_surrogate_key`: SHA256 of a record +* `_sdc_natural_key`: the unique key identifier of the record at source +* `_sdc_time_extracted`: a timestamp (R3339) at the time of the data extraction + +### :pencil: Catalog + +A [catalog](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#catalog) is required for the extraction. Discovery of the catalog can be run using the `--discover` flag which creates the `_catalog.json` file. This can then be altered for the definition of the [*schema message*](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#schema-message) sent to your target. Running `xtkt` in discovery will overwite the existing catalog. + +```bash +$ xtkt config.json --discover +``` + +### :clipboard: State + +`xtkt` uses a state file to track the records processed for each stream. The state file is written to the current working directory and is named `_state.json` where the `bookmark` values are a list of `_sdc_surrogate_key` values already processed. This defines the *[state message](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#state-message)* sent to your target. + ### :nut_and_bolt: Using with [Singer.io](https://www.singer.io/) Targets Install targets (Python) in `_targets/` in virtual environments: @@ -64,7 +86,7 @@ Install targets (Python) in `_targets/` in virtual environments: 4. `deactivate` ```bash -xtkt config.json | ./_targets/target-name/bin/target-name` +xtkt config.json | ./_targets/target-name/bin/target-name` --config config_target.json ``` For example: @@ -72,36 +94,11 @@ For example: xtkt config.json | ./_targets/pipelinewise-target-postgres/bin/target-postgres -c config_target_postgres.json ``` -I have been using [jq](https://github.com/stedolan/jq) to view `stdout` messages in development. For example: +For debugging I suggest pipe'ing to [jq](https://github.com/stedolan/jq) to view `stdout` messages in development. For example: ```bash $ xtkt config.json 2>&1 | jq . ``` -`xtkt` can be used in a bash script to iterate over a template `config.json` file to create many data extractions. For example -```bash -#!/bin/bash - -# Loop from 2009 to 2019 -for year in {2009..2019} -do - new_config="config_${year}.json" - sed "s/YYYY/${year}/g" config.json.template > $new_config - echo "Generated ${new_config}" - echo "Running xtkt on ${new_config}" - xtkt $new_config | ./_targets/target-name/bin/target-name -c config_target_name.json -done - -rm -f state_* config_* -``` - -### :floppy_disk: Metadata - -`xtkt` adds the following metadata to records - -* `_sdc_surrogate_key`: SHA256 of a record -* `_sdc_natural_key`: the unique key identifier of the record at source -* `_sdc_time_extracted`: a timestamp (R3339) at the time of the data extraction - ### :wrench: Config.json #### xtkt @@ -303,6 +300,9 @@ Oauth authentication required, records returned immediately in an array, paginat "url": "_config_json/data.jsonl", "records": { "unique_key_path": ["id"], + "drop_field_paths": [ + ["salary"] + ], "sensitive_field_paths": [ ["location", "address"], ["age"] diff --git a/cmd/extract.go b/cmd/extract.go index f559036..440a06a 100644 --- a/cmd/extract.go +++ b/cmd/extract.go @@ -94,6 +94,9 @@ func extract(discover bool) error { } } + // ///////////////////////////////////////////////////////// + // Extract using existing catalog + // ///////////////////////////////////////////////////////// if !discover { schema := lib.ParsedCatalog.Streams[0].Schema diff --git a/cmd/root.go b/cmd/root.go index 893304b..0d65df0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -10,9 +10,8 @@ import ( "github.com/spf13/cobra" ) -var version = "0.2.1" +var version = "0.3.0" var discover bool = false -var defaultConcurrency int = 1000 func Execute() { rootCmd.Flags().BoolVar(&discover, "discover", false, "run the tap in discovery mode, creating the catalog") @@ -69,10 +68,5 @@ func parseConfigJSON(filePath string) (lib.Config, error) { return cfg, fmt.Errorf("error parseConfigJson unmarshlling config.json: %w", jsonError) } - // Check if MaxConcurrency field is nil, set it to the default value - if cfg.MaxConcurrency == nil { - cfg.MaxConcurrency = &defaultConcurrency - } - return cfg, nil }