Skip to content

Commit

Permalink
v0.3.0, README updates
Browse files Browse the repository at this point in the history
  • Loading branch information
5amCurfew committed Dec 7, 2024
1 parent 42f4d4a commit c6e8dc2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 39 deletions.
64 changes: 32 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
![Release on Homebrew](https://github.com/5amCurfew/xtkt/actions/workflows/release.yml/badge.svg)

- [:computer: Installation](#computer-installation)
- [:nut_and_bolt: Using with Singer.io Targets](#nut_and_bolt-using-with-singerio-targets)
- [:floppy_disk: Metadata](#floppy_disk-metadata)
- [:pencil: Catalog](#pencil-catalog)
- [:clipboard: State](#clipboard-state)
- [:nut_and_bolt: Using with Singer.io Targets](#nut_and_bolt-using-with-singerio-targets)
- [:wrench: Config.json](#wrench-configjson)
- [:rocket: Examples](#rocket-examples)
* [Rick & Morty API](#rick-&-morty-api)
Expand All @@ -19,19 +21,19 @@
* [File csv](#file-csv)
* [File jsonl](#file-jsonl)

**v0.2.1**
**v0.3.0**

`xtkt` ("extract") is a data extraction tool that follows the Singer.io specification. Supported sources include RESTful-APIs, csv and jsonl.
`xtkt` ("extract") is a data extraction tool that follows the Singer.io specification. Supported sources include RESTful APIs, csv and jsonl.

`xtkt` can be pipe'd to any target that meets the Singer.io specification but has been designed and tested for databases such as SQLite & Postgres. Each stream is handled independently and deletion-at-source is not detected.

Extracted records are versioned, with new and updated data being treated as distinct records (with resulting keys `_sdc_natural_key` (unique key) and `_sdc_surrogate_key` (version key)). Only new and/or updated records are sent to be processed by your target.
Extracted records are versioned, with new and updated data being treated as distinct records (with resulting keys `_sdc_natural_key` (unique key) and `_sdc_surrogate_key` (version key)). Only new and/or updated records are sent to be processed by your target.

Fields can be dropped from records prior to being sent to your target using the `records.drop_field_paths` field in your JSON configuration file (see examples below). This may be suitable for dropping redundant, large objects within a record.

Fields can be hashed within records prior to being sent to your target using the `records.sensitive_field_paths` field in your JSON configuration file (see examples below). This may be suitable for handling sensitive data.

Both integers and floats are sent as floats. All fields are considered `NULLABLE`. All fields when extracting from CSV are considered strings for now.
Both integers and floats are sent as floats. All fields except the generated `_sdc_surrogate_key` field are considered `NULLABLE`.

Schema detection is naive using the first non-null data type detected per field when generating the catalog.

Expand All @@ -54,6 +56,26 @@ Flags:
-v, --version version for xtkt
```

### :floppy_disk: Metadata

`xtkt` adds the following metadata to records

* `_sdc_surrogate_key`: SHA256 of a record
* `_sdc_natural_key`: the unique key identifier of the record at source
* `_sdc_time_extracted`: a timestamp (R3339) at the time of the data extraction

### :pencil: Catalog

A [catalog](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#catalog) is required for the extraction. Discovery of the catalog can be run using the `--discover` flag which creates the `<stream_name>_catalog.json` file. This can then be altered for the definition of the [*schema message*](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#schema-message) sent to your target. Running `xtkt` in discovery will overwite the existing catalog.

```bash
$ xtkt config.json --discover
```

### :clipboard: State

`xtkt` uses a state file to track the records processed for each stream. The state file is written to the current working directory and is named `<stream_name>_state.json` where the `bookmark` values are a list of `_sdc_surrogate_key` values already processed. This defines the *[state message](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#state-message)* sent to your target.

### :nut_and_bolt: Using with [Singer.io](https://www.singer.io/) Targets

Install targets (Python) in `_targets/` in virtual environments:
Expand All @@ -64,44 +86,19 @@ Install targets (Python) in `_targets/` in virtual environments:
4. `deactivate`

```bash
xtkt config.json | ./_targets/target-name/bin/target-name`
xtkt config.json | ./_targets/target-name/bin/target-name` --config config_target.json
```

For example:
```bash
xtkt config.json | ./_targets/pipelinewise-target-postgres/bin/target-postgres -c config_target_postgres.json
```

I have been using [jq](https://github.com/stedolan/jq) to view `stdout` messages in development. For example:
For debugging I suggest pipe'ing to [jq](https://github.com/stedolan/jq) to view `stdout` messages in development. For example:
```bash
$ xtkt config.json 2>&1 | jq .
```
`xtkt` can be used in a bash script to iterate over a template `config.json` file to create many data extractions. For example
```bash
#!/bin/bash
# Loop from 2009 to 2019
for year in {2009..2019}
do
new_config="config_${year}.json"
sed "s/YYYY/${year}/g" config.json.template > $new_config
echo "Generated ${new_config}"
echo "Running xtkt on ${new_config}"
xtkt $new_config | ./_targets/target-name/bin/target-name -c config_target_name.json
done
rm -f state_* config_*
```

### :floppy_disk: Metadata

`xtkt` adds the following metadata to records

* `_sdc_surrogate_key`: SHA256 of a record
* `_sdc_natural_key`: the unique key identifier of the record at source
* `_sdc_time_extracted`: a timestamp (R3339) at the time of the data extraction

### :wrench: Config.json
#### xtkt
Expand Down Expand Up @@ -303,6 +300,9 @@ Oauth authentication required, records returned immediately in an array, paginat
"url": "_config_json/data.jsonl",
"records": {
"unique_key_path": ["id"],
"drop_field_paths": [
["salary"]
],
"sensitive_field_paths": [
["location", "address"],
["age"]
Expand Down
3 changes: 3 additions & 0 deletions cmd/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func extract(discover bool) error {
}
}

// /////////////////////////////////////////////////////////
// Extract using existing catalog
// /////////////////////////////////////////////////////////
if !discover {

schema := lib.ParsedCatalog.Streams[0].Schema
Expand Down
8 changes: 1 addition & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
"github.com/spf13/cobra"
)

var version = "0.2.1"
var version = "0.3.0"
var discover bool = false
var defaultConcurrency int = 1000

func Execute() {
rootCmd.Flags().BoolVar(&discover, "discover", false, "run the tap in discovery mode, creating the catalog")
Expand Down Expand Up @@ -69,10 +68,5 @@ func parseConfigJSON(filePath string) (lib.Config, error) {
return cfg, fmt.Errorf("error parseConfigJson unmarshlling config.json: %w", jsonError)
}

// Check if MaxConcurrency field is nil, set it to the default value
if cfg.MaxConcurrency == nil {
cfg.MaxConcurrency = &defaultConcurrency
}

return cfg, nil
}

0 comments on commit c6e8dc2

Please sign in to comment.