From e71642b75966a9346e360abc2a2a8c55f9b9bad6 Mon Sep 17 00:00:00 2001 From: Tom Forbes Date: Wed, 23 Aug 2023 14:46:04 +0100 Subject: [PATCH] Try uploading assets --- .github/workflows/unique_python_files.yml | 15 +++++++++++++- sql/only_python_files.sql | 5 +++++ src/main.rs | 25 ++++++++++++++--------- 3 files changed, 34 insertions(+), 11 deletions(-) create mode 100644 sql/only_python_files.sql diff --git a/.github/workflows/unique_python_files.yml b/.github/workflows/unique_python_files.yml index 4dd6bf8b..fec7a5a7 100644 --- a/.github/workflows/unique_python_files.yml +++ b/.github/workflows/unique_python_files.yml @@ -36,8 +36,21 @@ jobs: - name: Run run: | - ./target/optimized/data links/dataset.txt data/ --limit=5 + ./target/optimized/data links/dataset.txt data/ - name: Check run: | ls -la data/ + + - name: Fetch Latest Release + id: get_release + uses: gregziegan/fetch-latest-release@v2.0.0 + with: + github_token: ${{ github.token }} + + - name: Upload Assets + uses: shogo82148/actions-upload-release-asset@v1 + with: + upload_url: ${{ steps.get_release.outputs.upload_url }} + asset_path: ${{ github.workspace }}/data/combined.parquet + asset_name: unique_python_files.parquet \ No newline at end of file diff --git a/sql/only_python_files.sql b/sql/only_python_files.sql new file mode 100644 index 00000000..e86d5feb --- /dev/null +++ b/sql/only_python_files.sql @@ -0,0 +1,5 @@ +SELECT archive_path, hash, lines, project_name, project_version, size, uploaded_on +FROM input_dataset +where skip_reason = '' + and archive_path ILIKE '%.py' and size != 0 +order by project_name, project_version, archive_path; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index ec7c42bd..add68bf0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,12 +36,13 @@ async fn main() -> Result<()> { let download_dir = args.working_directory.join("downloads"); let output_dir = args.working_directory.join("output"); + let only_python_dir = args.working_directory.join("only_python"); let final_output_dir = args.working_directory.join("final"); let combined_parquet_file = args.working_directory.join("combined.parquet"); tokio::fs::create_dir_all(&args.working_directory).await?; tokio::fs::create_dir_all(&download_dir).await?; tokio::fs::create_dir_all(&output_dir).await?; - // tokio::fs::create_dir_all(&final_output_dir).await?; + tokio::fs::create_dir_all(&only_python_dir).await?; let urls_file = BufReader::new(File::open(&args.urls_file).await?); let mut lines = urls_file.lines(); @@ -59,9 +60,11 @@ async fn main() -> Result<()> { for (idx, url) in urls.into_iter().enumerate() { let path = download_dir.join(format!("url-{}.parquet", idx)); let output_dir = output_dir.join(format!("url-{}/", idx)); + let only_python_dir = only_python_dir.join(format!("url-{}/", idx)); download_file(&url, &path).await?; - get_unique_python_files(&path, &output_dir, include_str!("../sql/unique_files.sql")) - .await?; + run_sql(&path, &output_dir, include_str!("../sql/unique_files.sql")).await?; + // run_sql(&path, &only_python_dir, include_str!("../sql/only_python_files.sql")) + // .await?; tokio::fs::remove_file(&path).await?; } @@ -79,7 +82,7 @@ async fn main() -> Result<()> { println!("Reducing combined files to unique records"); - get_unique_python_files( + run_sql( &combined_parquet_file, &final_output_dir, include_str!("../sql/unique_files_combined.sql"), @@ -93,12 +96,14 @@ async fn main() -> Result<()> { .flatten() .collect(); - println!("Finally reducing {} files to {}", all_files.len(), combined_parquet_file.display()); + println!( + "Finally reducing {} files to {}", + all_files.len(), + combined_parquet_file.display() + ); - tokio::task::spawn_blocking(move || { - combine_parquet_files(&all_files, &combined_parquet_file) - }) - .await??; + tokio::task::spawn_blocking(move || combine_parquet_files(&all_files, &combined_parquet_file)) + .await??; Ok(()) } @@ -133,7 +138,7 @@ async fn download_file(url: &str, path: &Path) -> Result<()> { Ok(()) } -async fn get_unique_python_files(path: &Path, output: &Path, sql: &str) -> Result<()> { +async fn run_sql(path: &Path, output: &Path, sql: &str) -> Result<()> { let ctx = SessionContext::new(); let read_options = ParquetReadOptions::default().parquet_pruning(true); ctx.register_parquet("input_dataset", path.to_str().unwrap(), read_options)