From 93675cd4fb863485284c3aafd44cf30d74c8dfa4 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Wed, 11 Oct 2023 16:37:16 +0000 Subject: [PATCH 1/2] WIP --- plugins/parquet/parquetembed.cpp | 42 +++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 366b76279c3..28bb5907137 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -227,13 +227,49 @@ arrow::Status ParquetHelper::openReadFile() StringBuffer filename; StringBuffer path; splitFilename(location.c_str(), nullptr, &path, &filename, nullptr, false); - Owned itr = createDirectoryIterator(path.str(), filename.append("*.parquet")); + Owned fileItr; + + // hThor and Thor files that were written with parquet have diffent naming schemes + // Thor files are tagged with a worker ID while hThor files are single non-tagged files + if (activityCtx->numSlaves() > 1) + { + // Executing on Thor: Check for single files that were written by hThor or from external source + Owned singleItr = createDirectoryIterator(path.str(), filename.append(".parquet")); + if (!singleItr || !singleItr->first()) + { + // Check for partitioned files which will have indexes prefixed to the filename + Owned multItr = createDirectoryIterator(path.str(), filename.insert(filename.length() - 8, '*')); + fileItr = multItr; + } + else + { + // There was a single file so check for partitioned files + Owned multItr = createDirectoryIterator(path.str(), filename.insert(filename.length() - 8, '*')); + multItr->next(); + if (!multItr || !multItr->isValid()) + fileItr = singleItr; // If there aren't any partitioned files read the single file with Thor + else + fileItr = multItr; + } + } + else + { + // When reading on hThor check for single file and only if there isn't a single file check for partitioned files to read + Owned singleItr = createDirectoryIterator(path.str(), filename.append(".parquet")); + if (!singleItr || !singleItr->first()) + { + Owned multItr = createDirectoryIterator(path.str(), filename.insert(filename.length() - 8, '*')); + fileItr = multItr; + } + else + fileItr = singleItr; + } auto reader_properties = parquet::ReaderProperties(pool); auto arrow_reader_props = parquet::ArrowReaderProperties(); - ForEach (*itr) + ForEach (*fileItr) { - IFile &file = itr->query(); + IFile &file = fileItr->query(); parquet::arrow::FileReaderBuilder reader_builder; reportIfFailure(reader_builder.OpenFile(file.queryFilename(), false, reader_properties)); reader_builder.memory_pool(pool); From 9d199ca874cc042a9b10c07af3958d866207b986 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Wed, 11 Oct 2023 18:58:52 +0000 Subject: [PATCH 2/2] Thor no longer reads hthor files if there are partitioned files and vice versa. --- plugins/parquet/parquetembed.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 28bb5907137..0f8e5e13e8f 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -244,8 +244,7 @@ arrow::Status ParquetHelper::openReadFile() else { // There was a single file so check for partitioned files - Owned multItr = createDirectoryIterator(path.str(), filename.insert(filename.length() - 8, '*')); - multItr->next(); + Owned multItr = createDirectoryIterator(path.str(), filename.insert(filename.length() - 8, "[0-9]*")); if (!multItr || !multItr->isValid()) fileItr = singleItr; // If there aren't any partitioned files read the single file with Thor else