Skip to content

Commit

Permalink
Merge pull request #1 from zhiweio/feature/release-0.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiweio authored Jul 11, 2024
2 parents afb7d2c + a6c7a78 commit f41cfad
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 239 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ dmypy.json

# Pyre type checker
.pyre/

.idea/
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# StreamXfer

StreamXfer is a powerful tool for streaming data from SQL Server to object storage for seamless transfer using UNIX
StreamXfer is a powerful tool for streaming data from SQL Server to local or object storage(S3) for seamless transfer using UNIX
pipe, supporting various general data formats(CSV, TSV, JSON).

**Supported OS:** Linux, macOS

_I've migrated 10TB data from SQL Server into Amazon Redshift using this tool._

## Demo

Expand All @@ -14,7 +17,7 @@ pipe, supporting various general data formats(CSV, TSV, JSON).
Before installing StreamXfer, you need to install the following dependencies:

* mssql-tools: [SQL Docs - bcp Utility](https://learn.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver16)
* lzop: `yum install lzop`
* lzop: [Download](https://www.lzop.org/)

Then, install StreamXfer from PyPI:

Expand Down Expand Up @@ -50,48 +53,45 @@ You can also use the following options:

* `-F, --format`: The data format (CSV, TSV, or JSON).
* `--compress-type`: The compression type (LZOP or GZIP).
* `--no-compress`: Disables compression.

For more information on the options, run stx --help.

```shell
$ stx --help
Usage: stx [OPTIONS] PYMSSQL_URL TABLE OUTPUT_PATH

StreamXfer is a powerful tool for streaming data from SQL Server to object
storage for seamless transfer using UNIX pipe, supporting various general
data formats(CSV, TSV, JSON).
StreamXfer is a powerful tool for streaming data from SQL Server to local or
object storage(S3) for seamless transfer using UNIX pipe, supporting various
general data formats(CSV, TSV, JSON).

Examples:
stx 'mssql+pymssql:://user:pass@host:port/db' '[dbo].[test]' /local/path/to/dir/
stx 'mssql+pymssql:://user:pass@host:port/db' '[dbo].[test]' s3://bucket/path/to/dir/

Options:
-F, --format [CSV|TSV|JSON] [default: JSON]
--compress-type [LZOP|GZIP] [default: LZOP]
--no-compress
--redshift-escape
--help Show this message and exit.

-F, --format [CSV|TSV|JSON] [default: JSON]
-C, --compress-type [LZOP|GZIP]
-h, --help Show this message and exit.
```

### Library Usage

To use StreamXfer as a library in Python, you can import the StreamXfer class and the sink classes (such as S3Sink), and use them to build and pump the data stream.
To use StreamXfer as a library in Python, you can import the StreamXfer class, and use them to build and pump the data stream.

Here is an example code snippet:

```python
from streamxfer import StreamXfer
from streamxfer.cmd import S3Sink
from streamxfer.format import Format
from streamxfer.compress import CompressType

sx = StreamXfer(
"mssql+pymssql:://user:pass@host:port/db",
format="TSV",
enable_compress=True,
compress_type="LZOP",
format=Format.CSV,
compress_type=CompressType.LZOP,
chunk_size=1000000,
)
sx.build("[dbo].[test]", path="s3://bucket/path/to/dir/", sink="s3")
sx.build("[dbo].[test]", path="s3://bucket/path/to/dir/")
sx.pump()

```
Expand Down
4 changes: 1 addition & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
sqlalchemy~=2.0.10
pymssql~=2.2.7
boto3~=1.26.118
pymssql~=2.3.0
loguru~=0.7.0
click~=8.1.3
psutil~=5.9.5
Expand Down
8 changes: 3 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from setuptools import setup, find_packages

__version__ = "0.0.1"
__version__ = "0.0.2"

with open("requirements.txt", encoding="utf-8") as requirements_file:
requirements = [_.strip() for _ in requirements_file.readlines()]

setup(
name="streamxfer",
version=__version__,
description="StreamXfer is a powerful tool for streaming data from SQL Server to object storage for seamless transfer using UNIX pipe, supporting various general data formats(CSV, TSV, JSON).",
description="StreamXfer is a powerful tool for streaming data from SQL Server to local or object storage(S3) for seamless transfer using UNIX pipe, supporting various general data formats(CSV, TSV, JSON).",
long_description=io.open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
packages=find_packages(),
Expand All @@ -20,9 +20,7 @@
entry_points={
"console_scripts": [
"stx = streamxfer.cli.main:cli",
"stx-mssql-csv-escape = streamxfer.cli.mssql_csv_escape:cli",
"stx-mssql-json-escape = streamxfer.cli.mssql_json_escape:cli",
"stx-redshift-escape = streamxfer.cli.redshift_escape:cli",
"stx-escape = streamxfer.cli.mssql_escape:cli",
],
},
)
79 changes: 38 additions & 41 deletions streamxfer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import os
from subprocess import Popen
from typing import Literal

import psutil
from sqlalchemy import create_engine

from streamxfer import mssql as ms
from streamxfer.typing import *
from streamxfer.cmd import (
Compress,
BCP,
Split,
Cat,
RedshiftEscape,
MssqlTsvEscape,
MssqlCsvEscape,
MssqlJsonEscape,
LocalSink,
S3Sink,
LZOPCompress,
GZIPCompress,
)
from streamxfer.compress import COMPRESS_LEVEL
from streamxfer.sink import uri2sink
from streamxfer.compress import CompressType
from streamxfer.format import Format, sc
from streamxfer.log import LOG
from streamxfer.utils import (
Expand All @@ -31,32 +30,34 @@
__module__ = ["StreamXfer"]
__all__ = ["StreamXfer"]

_sinks = {
"s3": S3Sink(),
"local": LocalSink(),
}


class StreamXfer:
def __init__(
self,
url,
format: str,
enable_compress=False,
compress_type: str = Compress.lzop.name,
compress_level=COMPRESS_LEVEL,
compress_type: str = None,
compress_level=6,
chunk_size=1000000,
):
self.url = url
self.format = format
self.compress_type = compress_type
self.compress_level = compress_level
self.enable_compress = enable_compress
self.chunk_size = chunk_size
self.sink = None
self.columns = None
self.compress = None
self._bcp = None
self._pipe = None
self._fifo = None
self.sink = None

if compress_type == CompressType.LZOP:
self.compress = LZOPCompress
elif compress_type == CompressType.GZIP:
self.compress = GZIPCompress
else:
compress_type = None
self.enable_compress = compress_type is not None
self.compress_level = compress_level

@property
def bcp(self):
Expand All @@ -66,24 +67,27 @@ def bcp(self):
def pipe(self):
return self._pipe

def add_escape(self, cmds: List[str]):
if self.format == Format.TSV:
cmds.insert(1, MssqlTsvEscape.cmd())
elif self.format == Format.CSV:
cmds.insert(1, MssqlCsvEscape.cmd())
elif self.format == Format.JSON:
if contains_dot(self.columns):
cmds.insert(1, MssqlJsonEscape.cmd())

def build(
self,
table,
path: str,
sink: Literal["s3", "local"],
redshift_escape=False,
):
try:
self.sink = _sinks[sink]
except KeyError:
raise ValueError(f"Unsupported sink: {sink!r}")

compress = Compress(self.compress_type)
file_ext = "." + self.format.lower()
if self.enable_compress:
file_ext = file_ext + compress.ext()
file_ext = file_ext + self.compress.ext

self.sink = uri2sink(path)
self.sink.set_file_extension(file_ext)
self._fifo = mktempfifo(suffix=file_ext)
uri = os.path.join(path, "$FILE" + file_ext)

if self.format == Format.CSV:
ft = ms.csv_in_ft
Expand All @@ -92,7 +96,7 @@ def build(
ft = sc.TAB
rt = sc.LN

engine = create_engine(self.url)
engine = ms.SqlCreds.from_url(self.url)
conn = engine.connect()
try:
tbl_size = ms.table_data_size(table, conn)
Expand All @@ -114,27 +118,20 @@ def build(
shell=True,
conn=conn,
)
columns = ms.table_columns(table, conn)
dot_in_cols = contains_dot(columns)
self.columns = ms.table_columns(table, conn)
finally:
conn.close()

upload_cmd = self.sink.cmd(uri)
compress_cmd = compress.cmd(level=self.compress_level)
upload_cmd = self.sink.cmd()
if self.enable_compress:
compress_cmd = self.compress.cmd(level=self.compress_level)
split_filter = cmd2pipe(compress_cmd, upload_cmd)
else:
split_filter = upload_cmd
split_cmd = Split.cmd(filter=split_filter, lines=self.chunk_size)
cat_cmd = Cat.cmd(self._fifo)
cmds = [cat_cmd, split_cmd]
if self.format == Format.TSV and redshift_escape:
cmds.insert(1, RedshiftEscape.cmd(shell=True))
elif self.format == Format.CSV:
cmds.insert(1, MssqlCsvEscape.cmd(shell=True))
elif self.format == Format.JSON and dot_in_cols:
cmds.insert(1, MssqlJsonEscape.cmd(shell=True))

self.add_escape(cmds)
self._pipe = cmd2pipe(*cmds)

def pump(self):
Expand Down
30 changes: 11 additions & 19 deletions streamxfer/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,39 @@
import click

from streamxfer import StreamXfer
from streamxfer import compress
from streamxfer import format
from streamxfer.cmd import Compress
from streamxfer.cmd import S3Sink, LocalSink
from streamxfer.compress import supported as supported_compress
from streamxfer.format import supported as supported_format
from streamxfer.format import Format


@click.command()
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])

@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument("pymssql-url")
@click.argument("table")
@click.argument("output-path")
@click.option(
"-F",
"--format",
default=Format.JSON,
type=click.Choice(format.supported, case_sensitive=False),
type=click.Choice(supported_format, case_sensitive=False),
show_default=True,
)
@click.option(
"-C",
"--compress-type",
default=Compress.lzop.name,
type=click.Choice(compress.supported, case_sensitive=False),
default=None,
type=click.Choice(supported_compress, case_sensitive=False),
show_default=True,
)
@click.option("--no-compress", "disable_compress", is_flag=True)
@click.option("--redshift-escape", "enable_redshift_escape", is_flag=True)
def cli(
pymssql_url,
table,
output_path,
format,
compress_type,
disable_compress,
enable_redshift_escape,
):
"""StreamXfer is a powerful tool for streaming data from SQL Server to object storage for seamless transfer
"""StreamXfer is a powerful tool for streaming data from SQL Server to local or object storage(S3) for seamless transfer
using UNIX pipe, supporting various general data formats(CSV, TSV, JSON).
\b
Expand All @@ -48,14 +45,9 @@ def cli(
sx = StreamXfer(
pymssql_url,
format,
enable_compress=not disable_compress,
compress_type=compress_type,
)
if output_path.startswith("s3://"):
sink = "s3"
else:
sink = "local"
sx.build(table, output_path, sink, enable_redshift_escape)
sx.build(table, output_path)
sx.pump()


Expand Down
23 changes: 0 additions & 23 deletions streamxfer/cli/mssql_csv_escape.py

This file was deleted.

Loading

0 comments on commit f41cfad

Please sign in to comment.