From 434ba17191d2fa1b45403f376b035ebbb9720367 Mon Sep 17 00:00:00 2001 From: Tom Forbes Date: Thu, 27 Jul 2023 00:27:43 +0100 Subject: [PATCH] Oh my god. --- .github/workflows/run.yml | 2 +- sql/combine.prql | 2 -- sql/combine.sql | 2 ++ src/pypi_data/cli.py | 28 ++++++++++++++++++---------- 4 files changed, 21 insertions(+), 13 deletions(-) delete mode 100644 sql/combine.prql create mode 100644 sql/combine.sql diff --git a/.github/workflows/run.yml b/.github/workflows/run.yml index ddf3ba64..63338f99 100644 --- a/.github/workflows/run.yml +++ b/.github/workflows/run.yml @@ -136,7 +136,7 @@ jobs: - name: Combine run: | - poetry run pypi-data run-sql ${{ github.workspace }}/sql/combine.prql output.parquet input/*.parquet + poetry run pypi-data run-sql ${{ github.workspace }}/sql/combine.sql output.parquet input/*.parquet - name: Upload Assets uses: shogo82148/actions-upload-release-asset@v1 diff --git a/sql/combine.prql b/sql/combine.prql deleted file mode 100644 index 12a64784..00000000 --- a/sql/combine.prql +++ /dev/null @@ -1,2 +0,0 @@ -prql target:sql.duckdb -from (read_parquet $1) \ No newline at end of file diff --git a/sql/combine.sql b/sql/combine.sql new file mode 100644 index 00000000..e41a3d7e --- /dev/null +++ b/sql/combine.sql @@ -0,0 +1,2 @@ +CREATE TABLE temp_table AS SELECT * FROM read_parquet($1); +COPY temp_table TO 'output.parquet' (FORMAT PARQUET, COMPRESSION zstd); \ No newline at end of file diff --git a/src/pypi_data/cli.py b/src/pypi_data/cli.py index 673c7f84..73236cd8 100644 --- a/src/pypi_data/cli.py +++ b/src/pypi_data/cli.py @@ -88,10 +88,17 @@ def run_sql( options = prql.CompileOptions( format=True, signature_comment=True, target="sql.duckdb" ) - - sql = prql.compile(prql_file.read_text(), options=options) - print(sql) print(f'{parameter=}') + if prql_file.name.endswith(".sql"): + sql = prql_file.read_text() + # Can't get it to work without doing this. So dumb. + sql = sql.replace('$1', json.dumps(parameter)) + parameter = [] + else: + compiled_sql = prql.compile(prql_file.read_text(), options=options) + sql = f"EXPLAIN ANALYZE COPY ({compiled_sql}) TO '{output_file}' (FORMAT PARQUET, COMPRESSION zstd)" + print(sql) + print("\n\n\n") # x = duckdb.execute(sql, parameters=[parameter] if parameter else []) # import pprint @@ -110,15 +117,16 @@ def print_thread(): t = threading.Thread(target=print_thread, daemon=True) t.start() duckdb.execute("PRAGMA EXPLAIN_OUTPUT='ALL';") - duckdb.execute(f"EXPLAIN COPY ({sql}) TO '{output_file}' (FORMAT PARQUET, COMPRESSION zstd)", parameters=[parameter] if parameter else []) - for name, plan in duckdb.fetchall(): - print(name) - print(plan) - print("\n\n\n") duckdb.executemany(f"PRAGMA threads=2; " f"PRAGMA memory_limit='2GB'; " - f"COPY ({sql}) TO '{output_file}' (FORMAT PARQUET, COMPRESSION zstd)", - parameters=[[parameter]] if parameter else []) + f"{sql}", + parameters=[parameter] if parameter else []) + try: + for name, plan in duckdb.fetchall(): + print(name) + print(plan) + except duckdb.InvalidInputException: + pass if __name__ == "__main__":