diff --git a/CMakeLists.txt b/CMakeLists.txt index e226b94631f..b26f33ec7e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -181,6 +181,7 @@ if ( PLUGIN ) HPCC_ADD_SUBDIRECTORY (plugins/h3 "H3") HPCC_ADD_SUBDIRECTORY (plugins/nlp "NLP") HPCC_ADD_SUBDIRECTORY (plugins/mongodb "MONGODBEMBED") + HPCC_ADD_SUBDIRECTORY (plugins/parquet "PARQUETEMBED") elseif ( NOT MAKE_DOCS_ONLY ) HPCC_ADD_SUBDIRECTORY (system) HPCC_ADD_SUBDIRECTORY (initfiles) diff --git a/cmake_modules/plugins.cmake b/cmake_modules/plugins.cmake index 7b12f5ae48c..b96a7e7cd09 100644 --- a/cmake_modules/plugins.cmake +++ b/cmake_modules/plugins.cmake @@ -36,6 +36,7 @@ set(PLUGINS_LIST MONGODBEMBED MYSQLEMBED NLP + PARQUETEMBED REDIS REMBED SQLITE3EMBED diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 98fad1db96f..109e74b7abb 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -42,6 +42,7 @@ add_subdirectory (exampleplugin) add_subdirectory (couchbase) add_subdirectory (sqs) add_subdirectory (mongodb) +add_subdirectory (parquet) IF ( INCLUDE_EE_PLUGINS ) add_subdirectory (eeproxies) ENDIF() diff --git a/plugins/parquet/CMakeLists.txt b/plugins/parquet/CMakeLists.txt new file mode 100644 index 00000000000..5d20ca5e1cf --- /dev/null +++ b/plugins/parquet/CMakeLists.txt @@ -0,0 +1,120 @@ +############################################################################## + +# HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################## + +# Component: parquetembed + +############################################################# +# Description: +# ----------- +# Cmake Input File for parquetembed +############################################################# + +project(parquetembed) +message("CMAKE Version: ${CMAKE_VERSION}") + +if(PARQUETEMBED) + ADD_PLUGIN(parquetembed) + if(MAKE_PARQUETEMBED) + find_package(Arrow CONFIG REQUIRED) + find_package(Parquet CONFIG REQUIRED) + find_package(ArrowDataset CONFIG REQUIRED) + + set( + SRCS + parquetembed.cpp + ) + + INCLUDE_DIRECTORIES( + ${HPCC_SOURCE_DIR}/esp/platform + ${HPCC_SOURCE_DIR}/system/include + ${HPCC_SOURCE_DIR}/rtl/eclrtl + ${HPCC_SOURCE_DIR}/rtl/include + ${HPCC_SOURCE_DIR}/rtl/nbcd + ${HPCC_SOURCE_DIR}/common/deftype + ${HPCC_SOURCE_DIR}/system/jlib + ${HPCC_SOURCE_DIR}/roxie/roxiemem + ) + + HPCC_ADD_LIBRARY(parquetembed SHARED ${SRCS}) + + install( + TARGETS parquetembed + DESTINATION plugins CALC_DEPS + ) + + install( + FILES ${LIBARROW_LIB_REAL} + DESTINATION ${LIB_DIR} CALC_DEPS + PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE + COMPONENT Runtime) + + install( + FILES ${LIBARROW_LIB} ${LIBARROW_LIB_ABI} + DESTINATION ${LIB_DIR} CALC_DEPS + COMPONENT Runtime) + + install( + FILES ${LIBPARQUET_LIB_REAL} + DESTINATION ${LIB_DIR} CALC_DEPS + PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE + COMPONENT Runtime) + + install( + FILES ${LIBPARQUET_LIB} ${LIBPARQUET_LIB_ABI} + DESTINATION ${LIB_DIR} CALC_DEPS + COMPONENT Runtime) + + install( + FILES ${LIBARRORACERO_LIB_REAL} + DESTINATION ${LIB_DIR} CALC_DEPS + PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE + COMPONENT Runtime) + + install( + FILES ${LIBARRORACERO_LIB} ${LIBARROWDATASET_LIB_ABI} + DESTINATION ${LIB_DIR} CALC_DEPS + COMPONENT Runtime) + + install( + FILES ${LIBARROWDATASET_LIB_REAL} + DESTINATION ${LIB_DIR} CALC_DEPS + PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE + COMPONENT Runtime) + + install( + FILES ${LIBARROWDATASET_LIB} ${LIBARROWDATASET_LIB_ABI} + DESTINATION ${LIB_DIR} CALC_DEPS + COMPONENT Runtime) + + target_link_libraries( + parquetembed + eclrtl + jlib + Arrow::arrow_shared + Parquet::parquet_shared + ArrowDataset::arrow_dataset_shared + ) + endif() +endif() + +if(PLATFORM OR CLIENTTOOLS_ONLY) + install( + FILES ${CMAKE_CURRENT_SOURCE_DIR}/parquet.ecllib + DESTINATION plugins + COMPONENT Runtime + ) +endif() diff --git a/plugins/parquet/README.md b/plugins/parquet/README.md new file mode 100644 index 00000000000..d3b3802d195 --- /dev/null +++ b/plugins/parquet/README.md @@ -0,0 +1,59 @@ +# Parquet Plugin for HPCC-Systems + +The Parquet Plugin for HPCC-Systems is a powerful tool designed to facilitate the fast transfer of data stored in a columnar format to the ECL (Enterprise Control Language) data format. This plugin provides seamless integration between Parquet files and HPCC-Systems, enabling efficient data processing and analysis. + +## Installation + +The plugin uses vcpkg and can be installed by creating a separate build directory from the platform and running the following commands: +``` +cd ./parquet-build +cmake -DPARQUETEMBED=ON ../HPCC-Platform +make -j4 package +sudo dpkg -i ./hpccsystems-plugin-parquetembed_.deb +``` + +## Documentation + +[Doxygen](https://www.doxygen.nl/index.html) can be used to create nice HTML documentation for the code. Call/caller graphs are also generated for functions if you have [dot](https://www.graphviz.org/download/) installed and available on your path. + +Assuming `doxygen` is on your path, you can build the documentation via: +``` +cd plugins/parquet +doxygen Doxyfile +``` + +## Features + +The Parquet Plugin offers the following main functions: + +### Regular Files + +#### 1. Reading Parquet Files + +The Read function allows ECL programmers to create an ECL dataset from both regular and partitioned Parquet files. It leverages the Apache Arrow interface for Parquet to efficiently stream data from ECL to the plugin, ensuring optimized data transfer. + +``` +dataset := Read(layout, '/source/directory/data.parquet'); +``` + +#### 2. Writing Parquet Files + +The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. + +``` +Write(inDataset, '/output/directory/data.parquet'); +``` + +### Partitioned Files (Tabular Datasets) + +#### 1. Reading Partitioned Files + +The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files. + +``` +github_dataset := ReadPartition(layout, '/source/directory/partioned_dataset'); +``` + +#### 2. Writing Partitioned Files + +For partitioning parquet files all you need to do is run the Write function on Thor rather than hthor and each worker will create its own parquet file. diff --git a/plugins/parquet/examples/blob_test.ecl b/plugins/parquet/examples/blob_test.ecl new file mode 100644 index 00000000000..04fde97f2e3 --- /dev/null +++ b/plugins/parquet/examples/blob_test.ecl @@ -0,0 +1,20 @@ +IMPORT STD; +IMPORT PARQUET; + +imageRecord := RECORD + STRING filename; + DATA image; + UNSIGNED8 RecPos{virtual(fileposition)}; +END; + +#IF(0) +in_image_data := DATASET('~parquet::image', imageRecord, FLAT); +OUTPUT(in_image_data, NAMED('IN_IMAGE_DATA')); +PARQUET.Write(in_image_data, '/datadrive/dev/test_data/test_image.parquet'); + +#END; + +#IF(1) +out_image_data := Read({DATA image}, '/datadrive/dev/test_data/test_image.parquet'); +OUTPUT(out_image_data, NAMED('OUT_IMAGE_DATA')); +#END \ No newline at end of file diff --git a/plugins/parquet/examples/create_partition.ecl b/plugins/parquet/examples/create_partition.ecl new file mode 100755 index 00000000000..509bac03cbd --- /dev/null +++ b/plugins/parquet/examples/create_partition.ecl @@ -0,0 +1,29 @@ +IMPORT STD; +IMPORT Parquet; + +#OPTION('outputLimit', 2000); +#OPTION('pickBestEngine', FALSE); + +layout := RECORD + STRING actor_login; + INTEGER actor_id; + INTEGER comment_id; + STRING comment; + STRING repo; + STRING language; + STRING author_login; + INTEGER author_id; + INTEGER pr_id; + INTEGER c_id; + INTEGER commit_date; +END; + +#IF(0) +github_dataset := Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); +Write(DISTRIBUTE(github_dataset, SKEW(.05)), '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet'); +#END + +#IF(1) +github_dataset := ReadPartition(layout, '/datadrive/dev/test_data/hpcc_gh_partition'); +OUTPUT(COUNT(github_dataset), NAMED('GITHUB_PARTITION')); +#END \ No newline at end of file diff --git a/plugins/parquet/examples/decimal_test.ecl b/plugins/parquet/examples/decimal_test.ecl new file mode 100644 index 00000000000..507a732a3a3 --- /dev/null +++ b/plugins/parquet/examples/decimal_test.ecl @@ -0,0 +1,17 @@ +IMPORT STD; +IMPORT PARQUET; + + +layout := RECORD + DECIMAL5_2 height; +END; + +decimal_data := DATASET([{152.25}, {125.56}], layout); + +#IF(1) +Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet'); +#END + +#IF(1) +Read(layout, '/datadrive/dev/test_data/decimal.parquet'); +#END diff --git a/plugins/parquet/examples/large_io.ecl b/plugins/parquet/examples/large_io.ecl new file mode 100644 index 00000000000..d08fbadec53 --- /dev/null +++ b/plugins/parquet/examples/large_io.ecl @@ -0,0 +1,29 @@ +IMPORT STD; +IMPORT Parquet; + +#OPTION('outputLimit', 2000); +#OPTION('pickBestEngine', FALSE); + +layout := RECORD + STRING actor_login; + INTEGER actor_id; + INTEGER comment_id; + STRING comment; + STRING repo; + STRING language; + STRING author_login; + INTEGER author_id; + INTEGER pr_id; + INTEGER c_id; + INTEGER commit_date; +END; + +#IF(0) +csv_data := DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1))); +Write(csv_data, '/datadrive/dev/test_data/ghtorrent-2019-02-04.parquet'); +#END + +#IF(1) +parquet_data := Read(layout, '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet'); +OUTPUT(COUNT(parquet_data), NAMED('ghtorrent_2019_01_07')); +#END \ No newline at end of file diff --git a/plugins/parquet/examples/nested_io.ecl b/plugins/parquet/examples/nested_io.ecl new file mode 100644 index 00000000000..411be93e44a --- /dev/null +++ b/plugins/parquet/examples/nested_io.ecl @@ -0,0 +1,30 @@ +IMPORT Parquet; + +friendsRec :=RECORD + UNSIGNED4 age; + INTEGER2 friends; + SET OF STRING friendsList; +END; + +childRec := RECORD + friendsRec friends; + REAL height; + REAL weight; +END; + +parentRec := RECORD + UTF8_de firstname; + UTF8_de lastname; + childRec details; +END; +nested_dataset := DATASET([{U'J\353ck', U'\353ackson', { {22, 2, ['James', 'Jonathon']}, 5.9, 600}}, {'John', 'Johnson', { {17, 0, []}, 6.3, 18}}, + {'Amy', U'Amy\353on', { {59, 1, ['Andy']}, 3.9, 59}}, {'Grace', U'Graceso\353', { {11, 3, ['Grayson', 'Gina', 'George']}, 7.9, 100}}], parentRec); + +#IF(1) +Write(nested_dataset, '/datadrive/dev/test_data/nested.parquet'); +#END + +#IF(1) +read_in := Read(parentRec, '/datadrive/dev/test_data/nested.parquet'); +OUTPUT(read_in, NAMED('NESTED_PARQUET_IO')); +#END \ No newline at end of file diff --git a/plugins/parquet/examples/null_test.ecl b/plugins/parquet/examples/null_test.ecl new file mode 100644 index 00000000000..0357547d84d --- /dev/null +++ b/plugins/parquet/examples/null_test.ecl @@ -0,0 +1,131 @@ +IMPORT PARQUET; + +Set_date_sk:=[-2]; +Set_date_dt:=[' ']; +Set_date_sd:=['-2 ']; +Set_date_ld:=['NA ']; +Set_yr_num:=[0]; +Set_half_yr_num:=[0]; +Set_half_yr_nam_sd:=[' ']; +Set_half_yr_nam_ld:=[' ']; +Set_qtr_num:=[0]; +Set_qtr_nam_sd:=[' ']; +Set_qtr_nam_ld:=[' ']; +Set_qtr_start_dt:=[0]; +Set_qtr_end_dt:=[0]; +Set_yr_qtr_num:=[0]; +Set_yr_qtr_nam_sd:=[' ']; +Set_yr_qtr_nam_ld:=[' ']; +Set_mth_num:=[0]; +Set_mth_nam_sd:=[' ']; +Set_mth_nam_ld:=[' ']; +Set_mth_end_dt:=[0]; +Set_days_in_mth_num:=[0]; +Set_yr_mth_num:=[0]; +Set_yr_mth_nam_sd:=[' ']; +Set_yr_mth_nam_ld:=[' ']; +Set_days_in_yr_num:=[0]; +Set_day_of_mth_num:=[0]; +Set_day_of_wk_num:=[0]; +Set_day_of_wk_nam_sd:=[' ']; +Set_day_of_wk_nam_ld:=[' ']; +Set_day_of_yr_num:=[0]; +Set_wk_of_yr_num:=[0]; +Set_wk_start_dt:=[0]; +Set_wk_end_dt:=[0]; +Set_wkday_fl:=[' ']; +Set_holiday_fl:=['N']; +Set_major_event_fl:=[' ']; +Set_first_seen_dt:=[20151105]; +Set_last_seen_dt:=[20151105]; +Set_iso_wk_start_dt:=[' ']; +Set_iso_wk_end_dt:=[' ']; +Set_iso_wk_num_of_year:=[0]; +Set_ins_wk_num_of_month:=[5]; +Set_ins_like_day:=[0]; +Set_ins_rev_day:=['0']; +Set_bsv_rev_day:=['0']; +Set_bsv_fcst_day_factor:=['0']; +Set_iso_yr_num:=[0]; +Set_ln_holiday_fl:=['N']; +Set_work_hours:=[0]; +Set_qtr_start_date:=[' ']; +Set_qtr_end_date:=[' ']; +Set_mth_start_date:=[' ']; +Set_mth_end_date:=[' ']; +Set_year_start_date:=[' ']; +Set_year_end_date:=[' ']; +Set_iso_year_week_num:=[0]; +Set_ins_lag_day:=[0]; +Set_ins_like_date:=[' ']; +Set_hc_rev_day:=['0']; +Row_1:=['-2',' ','-2 ','NA ','0','0',' ',' ','0',' ',' ','0','0','0',' ',' ','0',' ',' ','0','0','0',' ',' ','0','0','0',' ',' ','0','0','0','0',' ','N',' ','20151105','20151105',' ',' ','0','5','0','0','0','0','0','N','0',' ',' ',' ',' ',' ',' ','0','0',' ','0']; +empty_record := + record + integer4 date_sk; + string10 date_dt; + string11 date_sd; + string32 date_ld; + unsigned2 yr_num; + unsigned1 half_yr_num; + string9 half_yr_nam_sd; + string32 half_yr_nam_ld; + unsigned1 qtr_num; + string2 qtr_nam_sd; + string9 qtr_nam_ld; + unsigned4 qtr_start_dt; + unsigned4 qtr_end_dt; + unsigned3 yr_qtr_num; + string10 yr_qtr_nam_sd; + string18 yr_qtr_nam_ld; + unsigned1 mth_num; + string5 mth_nam_sd; + string9 mth_nam_ld; + unsigned4 mth_end_dt; + unsigned1 days_in_mth_num; + unsigned3 yr_mth_num; + string10 yr_mth_nam_sd; + string24 yr_mth_nam_ld; + unsigned2 days_in_yr_num; + unsigned1 day_of_mth_num; + unsigned1 day_of_wk_num; + string5 day_of_wk_nam_sd; + string10 day_of_wk_nam_ld; + unsigned2 day_of_yr_num; + unsigned1 wk_of_yr_num; + unsigned4 wk_start_dt; + unsigned4 wk_end_dt; + string1 wkday_fl; + string1 holiday_fl; + string1 major_event_fl; + unsigned4 first_seen_dt; + unsigned4 last_seen_dt; + string10 iso_wk_start_dt; + string10 iso_wk_end_dt; + integer8 iso_wk_num_of_year; + integer8 ins_wk_num_of_month; + integer8 ins_like_day; + decimal3_2 ins_rev_day; + decimal3_2 bsv_rev_day; + decimal15_3 bsv_fcst_day_factor; + integer4 iso_yr_num; + string1 ln_holiday_fl; + integer2 work_hours; + string10 qtr_start_date; + string10 qtr_end_date; + string10 mth_start_date; + string10 mth_end_date; + string10 year_start_date; + string10 year_end_date; + integer8 iso_year_week_num; + integer8 ins_lag_day; + string10 ins_like_date; + decimal3_2 hc_rev_day; + UNICODE null_data; + end; +empty_dataset := dataset([{-2, ' ', '-2 ', 'NA ', 0, 0, ' ', ' ', 0, ' ', ' ', 0, 0, 0, ' ', ' ', 0, ' ', ' ', 0, 0, 0, ' ', ' ', 0, 0, 0, ' ', ' ', 0, 0, 0, 0, ' ', 'N', ' ', 20151105, 20151105, ' ', ' ', 0, 5, 0, '0', '0', '0', 0, 'N', 0, ' ', ' ', ' ', ' ', ' ', ' ', 0, 0, ' ', '0', '\123\000\000\000\123'}], empty_record); + +Write(empty_dataset, '/datadrive/dev/test_data/empty_parquet.parquet'); + +empty_data_in := Read(empty_record, '/datadrive/dev/test_data/empty_parquet.parquet'); +OUTPUT(empty_data_in); \ No newline at end of file diff --git a/plugins/parquet/examples/parquet_read_write.ecl b/plugins/parquet/examples/parquet_read_write.ecl new file mode 100644 index 00000000000..b9fbe9c5f7f --- /dev/null +++ b/plugins/parquet/examples/parquet_read_write.ecl @@ -0,0 +1,26 @@ +IMPORT STD; +IMPORT Parquet; + +#OPTION('outputLimit', 2000); +#OPTION('pickBestEngine', FALSE); + +layout := RECORD + STRING actor_login; + INTEGER actor_id; + INTEGER comment_id; + STRING comment; + STRING repo; + STRING language; + STRING author_login; + INTEGER author_id; + INTEGER pr_id; + INTEGER c_id; + INTEGER commit_date; +END; + +parquet_data := Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); +part_a := CHOOSEN(parquet_data, 1000); +OUTPUT(CHOOSEN(DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1))), 1000), NAMED('CSV_GHTORRENT')); +OUTPUT(part_a, NAMED('GHTORRENT')); + +Write(part_a, '/datadrive/dev/test_data/github_partition/part_a.parquet'); \ No newline at end of file diff --git a/plugins/parquet/examples/read_file.ecl b/plugins/parquet/examples/read_file.ecl new file mode 100644 index 00000000000..f14849b7e9a --- /dev/null +++ b/plugins/parquet/examples/read_file.ecl @@ -0,0 +1,3 @@ +IMPORT Parquet; + +Parquet.Read({INTEGER n}, '/var/lib/HPCCSystems/mydropzone/sample.parquet'); diff --git a/plugins/parquet/examples/simple_io.ecl b/plugins/parquet/examples/simple_io.ecl new file mode 100644 index 00000000000..e54dd08c457 --- /dev/null +++ b/plugins/parquet/examples/simple_io.ecl @@ -0,0 +1,14 @@ +IMPORT Parquet; + +simpleRec := RECORD + UNSIGNED4 num; + REAL balance; + UTF8_de lastname; + STRING name; +END; +simpleDataset := DATASET([{1, 2.4356, U'de\3531', 'Jack'}, {1, 2.4356, U'de\3531', 'Jack'}, {2, 4.8937, U'as\352df', 'John'}, {3, 1.8573, 'nj\351vf', 'Jane'}, {4, 9.1235, U'ds\354fg', 'Jill'}, {5, 6.3297, U'po\355gm', 'Jim'}], simpleRec); + +Write(simpleDataset, '/datadrive/dev/test_data/simple.parquet'); + +read_in := Read(simpleRec, '/datadrive/dev/test_data/simple.parquet'); +OUTPUT(read_in, NAMED('SIMPLE_PARQUET_IO')); \ No newline at end of file diff --git a/plugins/parquet/examples/write_file.ecl b/plugins/parquet/examples/write_file.ecl new file mode 100644 index 00000000000..f64e7347ea0 --- /dev/null +++ b/plugins/parquet/examples/write_file.ecl @@ -0,0 +1,14 @@ +IMPORT Parquet; + +ds := DATASET + ( + 1000, + TRANSFORM + ( + {INTEGER n}, + SELF.n := RANDOM() + ), + DISTRIBUTED + ); + +Parquet.Write(ds, '/var/lib/HPCCSystems/mydropzone/sample.parquet'); diff --git a/plugins/parquet/parquet.ecllib b/plugins/parquet/parquet.ecllib new file mode 100644 index 00000000000..b2c9f688bc8 --- /dev/null +++ b/plugins/parquet/parquet.ecllib @@ -0,0 +1,48 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +SHARED ParquetService := SERVICE : plugin('parquetembed') + BOOLEAN getEmbedContext():cpp, pure, namespace='parquetembed', entrypoint='getEmbedContext', prototype='IEmbedContext* getEmbedContext()'; +END; + +EXPORT Read(resultLayout, filePath) := FUNCTIONMACRO + STREAMED DATASET(resultLayout) _DoParquetRead() := EMBED(parquet : activity, option('read'), location(filePath)) + ENDEMBED; + RETURN _DoParquetRead(); +ENDMACRO; + +EXPORT ReadPartition(resultLayout, basePath) := FUNCTIONMACRO + STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readpartition'), location(basePath)) + ENDEMBED; + RETURN _DoParquetReadPartition(); +ENDMACRO; + +EXPORT Write(outDS, filePath) := MACRO + _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath)) + ENDEMBED; + _doParquetWrite(outDS); +ENDMACRO; + +EXPORT WritePartition(outDS, outRows = 100000, basePath) := MACRO + _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows)) + ENDEMBED; + _DoParquetWritePartition(outDS) +ENDMACRO; + +EXPORT getEmbedContext := ParquetService.getEmbedContext; +EXPORT BOOLEAN supportsImport := FALSE; +EXPORT BOOLEAN supportsScript := TRUE; diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp new file mode 100644 index 00000000000..366b76279c3 --- /dev/null +++ b/plugins/parquet/parquetembed.cpp @@ -0,0 +1,1974 @@ +/*############################################################################## + HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#include "parquetembed.hpp" +#include "arrow/result.h" +#include "parquet/arrow/schema.h" +#include "arrow/io/api.h" +#include + +#include "rtlembed.hpp" +#include "rtlds_imp.hpp" +#include "jfile.hpp" + +static constexpr const char *MODULE_NAME = "parquet"; +static constexpr const char *MODULE_DESCRIPTION = "Parquet Embed Helper"; +static constexpr const char *VERSION = "Parquet Embed Helper 1.0.0"; +static const char *COMPATIBLE_VERSIONS[] = {VERSION, nullptr}; +static const NullFieldProcessor NULLFIELD(NULL); + +/** + * @brief Takes a pointer to an ECLPluginDefinitionBlock and passes in all the important info + * about the plugin. + */ +extern "C" PARQUETEMBED_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb) +{ + if (pb->size == sizeof(ECLPluginDefinitionBlockEx)) + { + ECLPluginDefinitionBlockEx *pbx = (ECLPluginDefinitionBlockEx *)pb; + pbx->compatibleVersions = COMPATIBLE_VERSIONS; + } + else if (pb->size != sizeof(ECLPluginDefinitionBlock)) + return false; + + pb->magicVersion = PLUGIN_VERSION; + pb->version = VERSION; + pb->moduleName = MODULE_NAME; + pb->ECL = nullptr; + pb->flags = PLUGIN_IMPLICIT_MODULE; + pb->description = MODULE_DESCRIPTION; + return true; +} + +namespace parquetembed +{ +static thread_local rapidjson::MemoryPoolAllocator<> jsonAlloc; + +// //-------------------------------------------------------------------------- +// Plugin Classes +//-------------------------------------------------------------------------- + +/** + * @brief Throws an exception and gets called when an operation that is unsupported is attempted. + * + * @param feature Name of the feature that is currently unsupported. + */ +extern void UNSUPPORTED(const char *feature) +{ + throw MakeStringException(-1, "%s UNSUPPORTED feature: %s not supported in %s", MODULE_NAME, feature, VERSION); +} + +/** + * @brief Exits the program with a failure code and a message to display. + * + * @param message Message to display. + * @param ... Takes any number of arguments that can be inserted into the string using %. + */ +extern void failx(const char *message, ...) +{ + va_list args; + va_start(args, message); + StringBuffer msg; + msg.appendf("%s: ", MODULE_NAME).valist_appendf(message, args); + va_end(args); + rtlFail(0, msg.str()); +} + +/** + * @brief Exits the program with a failure code and a message to display. + * + * @param message Message to display. + */ +extern void fail(const char *message) +{ + StringBuffer msg; + msg.appendf("%s: ", MODULE_NAME).append(message); + rtlFail(0, msg.str()); +} + +/** + * @brief Simple constructor that stores the inputs from the user. + * + * @param option The read or write option. + * + * @param location The location to read a parquet file. + * + * @param destination The destination to write a parquet file. + * + * @param rowsize The max row group size when reading parquet files. + * + * @param _batchSize The size of the batches when converting parquet columns to rows. + */ +ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, + int _rowSize, int _batchSize, const IThorActivityContext *_activityCtx) + : partOption(option), location(_location), destination(destination) +{ + rowSize = _rowSize; + batchSize = _batchSize; + activityCtx = _activityCtx; + + pool = arrow::default_memory_pool(); + + parquetDoc = std::vector(rowSize); + + partition = String(option).endsWith("partition"); +} + +ParquetHelper::~ParquetHelper() +{ + pool->ReleaseUnused(); + jsonAlloc.Clear(); +} + +/** + * @brief Get the Schema shared pointer + * + * @return std::shared_ptr Shared_ptr of schema object for building the write stream. + */ +std::shared_ptr ParquetHelper::getSchema() +{ + return schema; +} + +/** + * @brief Opens the write stream with the schema and destination. T + * + */ +arrow::Status ParquetHelper::openWriteFile() +{ + if (destination == "") + failx("Invalid option: The destination was not supplied."); + + if (partition) + { + ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); + reportIfFailure(filesystem->DeleteDirContents(destination)); + auto partition_schema = arrow::schema({schema->field(5)}); + + auto format = std::make_shared(); + auto partitioning = std::make_shared(partition_schema); + + writeOptions.file_write_options = format->DefaultWriteOptions(); + writeOptions.filesystem = filesystem; + writeOptions.base_dir = destination; + writeOptions.partitioning = partitioning; + writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore; + } + else + { + // Currently under the assumption that all channels and workers are given a worker id and no matter + // the configuration will show up in activityCtx->numSlaves() + if (activityCtx->numSlaves() > 1) + { + destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave())); + } + + std::shared_ptr outfile; + + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination)); + + // Choose compression + // TO DO let the user choose a compression + std::shared_ptr props = parquet::WriterProperties::Builder().compression(arrow::Compression::UNCOMPRESSED)->build(); + + // Opt to store Arrow schema for easier reads back into Arrow + std::shared_ptr arrow_props = parquet::ArrowWriterProperties::Builder().store_schema()->build(); + + // Create a writer + ARROW_ASSIGN_OR_RAISE(writer, parquet::arrow::FileWriter::Open(*schema.get(), pool, outfile, props, arrow_props)); + } + return arrow::Status::OK(); +} + +/** + * @brief Opens the read stream with the schema and location. + * + */ +arrow::Status ParquetHelper::openReadFile() +{ + if (partition) + { + // Create a filesystem + std::shared_ptr fs; + ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(location)); + + // FileSelector allows traversal of multi-file dataset + arrow::fs::FileSelector selector; + selector.base_dir = location; // The base directory to be searched is provided by the user in the location option. + selector.recursive = true; // Selector will search the base path recursively for partitioned files. + + // Create a file format + std::shared_ptr format = std::make_shared(); + + arrow::dataset::FileSystemFactoryOptions options; + options.partitioning = arrow::dataset::HivePartitioning::MakeFactory(); // TODO set other partitioning types + + // Create the dataset factory + PARQUET_ASSIGN_OR_THROW(auto dataset_factory, arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options)); + + // Get scanner + PARQUET_ASSIGN_OR_THROW(auto dataset, dataset_factory->Finish()); + ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); + reportIfFailure(scan_builder->Pool(pool)); + ARROW_ASSIGN_OR_RAISE(scanner, scan_builder->Finish()); + } + else + { + StringBuffer filename; + StringBuffer path; + splitFilename(location.c_str(), nullptr, &path, &filename, nullptr, false); + Owned itr = createDirectoryIterator(path.str(), filename.append("*.parquet")); + + auto reader_properties = parquet::ReaderProperties(pool); + auto arrow_reader_props = parquet::ArrowReaderProperties(); + ForEach (*itr) + { + IFile &file = itr->query(); + parquet::arrow::FileReaderBuilder reader_builder; + reportIfFailure(reader_builder.OpenFile(file.queryFilename(), false, reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + std::unique_ptr parquetFileReader; + reportIfFailure(reader_builder.Build(&parquetFileReader)); + parquetFileReaders.push_back(std::move(parquetFileReader)); + } + } + return arrow::Status::OK(); +} + +arrow::Status ParquetHelper::writePartition(std::shared_ptr table) +{ + // Create dataset for writing partitioned files. + auto dataset = std::make_shared(table); + + StringBuffer basename_template; + basename_template.appendf("part{i}_%lld.parquet", tablesProcessed++); + writeOptions.basename_template = basename_template.str(); + + ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan()); + reportIfFailure(scanner_builder->Pool(pool)); + ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish()); + + // Write partitioned files. + reportIfFailure(arrow::dataset::FileSystemDataset::Write(writeOptions, scanner)); + + return arrow::Status::OK(); +} + +/** + * @brief Returns a pointer to the stream writer for writing to the destination. + * + * @return + */ +parquet::arrow::FileWriter *ParquetHelper::queryWriter() +{ + return writer.get(); +} + +/** + * @brief Returns a pointer to the top of the stack for the current row being built. + * + * @return A rapidjson::Value containing the row + */ +rapidjson::Value *ParquetHelper::queryCurrentRow() +{ + return &rowStack[rowStack.size() - 1]; +} + +/** + * @brief A helper method for updating the current row on writes and keeping + * it within the boundary of the rowSize set by the user when creating RowGroups. + */ +void ParquetHelper::updateRow() +{ + if (++currentRow == rowSize) + currentRow = 0; +} + +std::vector &ParquetHelper::queryRecordBatch() +{ + return parquetDoc; +} + +/** + * @brief Divide row groups being read from a parquet file among any number of thor workers. If running hthor all row groups are assigned to it. This function + * will handle all cases where the number of groups is greater than, less than or divisible by the number of thor workers. + */ +void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRowGroups, __int64 &numRowGroups, __int64 &startRowGroup) +{ + int workers = activityCtx->numSlaves(); + int strands = activityCtx->numStrands(); + int worker_id = activityCtx->querySlave(); + + // Currently under the assumption that all channels and workers are given a worker id and no matter + // the configuration will show up in activityCtx->numSlaves() + if (workers > 1) + { + // If the number of workers goes into totalRowGroups evenly then every worker gets the same amount + // of rows to read + if (totalRowGroups % workers == 0) + { + numRowGroups = totalRowGroups / workers; + startRowGroup = numRowGroups * worker_id; + } + // If the totalRowGroups is not evenly divisible by the number of workers then we divide them up + // with the first n-1 workers getting slightly more and the nth worker gets the remainder + else if (totalRowGroups > workers) + { + __int64 groupsPerWorker = totalRowGroups / workers; + __int64 remainder = totalRowGroups % workers; + + if (worker_id < remainder) + { + numRowGroups = groupsPerWorker + 1; + startRowGroup = numRowGroups * worker_id; + } + else + { + numRowGroups = groupsPerWorker; + startRowGroup = (remainder * (numRowGroups + 1)) + ((worker_id - remainder) * numRowGroups); + } + } + // If the number of totalRowGroups is less than the number of workers we give as many as possible + // a single row group to read. + else + { + if (worker_id < totalRowGroups) + { + numRowGroups = 1; + startRowGroup = worker_id; + } + else + { + numRowGroups = 0; + startRowGroup = 0; + } + } + } + else + { + // There is only one worker + numRowGroups = totalRowGroups; + startRowGroup = 0; + } +} + +void ParquetHelper::chunkTable(std::shared_ptr &table) +{ + auto columns = table->columns(); + parquetTable.clear(); + for (int i = 0; i < columns.size(); i++) + { + parquetTable.insert(std::make_pair(table->field(i)->name(), columns[i]->chunk(0))); + } +} + +std::shared_ptr ParquetHelper::queryCurrentTable(__int64 currTable) +{ + __int64 tables = 0; + __int64 offset = 0; + for (int i = 0; i < parquetFileReaders.size(); i++) + { + tables += fileTableCounts[i]; + if (currTable < tables) + { + return parquetFileReaders[i]->RowGroup(currTable - offset); + } + offset = tables; + } + failx("Failed getting RowGroupReader. Index %lli is out of bounds.", currTable); +} + +/** + * @brief Sets the parquetTable member to the output of what is read from the given + * parquet file. + */ +arrow::Status ParquetHelper::processReadFile() +{ + if (partition) + { + // rowsProcessed starts at zero and we read in batches until it is equal to rowsCount + rowsProcessed = 0; + PARQUET_ASSIGN_OR_THROW(rbatchReader, scanner->ToRecordBatchReader()); + rbatchItr = arrow::RecordBatchReader::RecordBatchReaderIterator(rbatchReader.get()); + // Divide the work among any number of workers + PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr); + PARQUET_ASSIGN_OR_THROW(float total_rows, scanner->CountRows()); + batchSize = batch->num_rows(); + divide_row_groups(activityCtx, std::ceil(total_rows / batchSize), tableCount, startRowGroup); + if (tableCount != 0) + { + std::shared_ptr table; + PARQUET_ASSIGN_OR_THROW(table, queryRows()); + rowsCount = table->num_rows(); + chunkTable(table); + tablesProcessed++; + } + else + { + rowsCount = 0; + } + } + else + { + __int64 totalTables = 0; + + for (int i = 0; i < parquetFileReaders.size(); i++) + { + __int64 tables = parquetFileReaders[i]->num_row_groups(); + fileTableCounts.push_back(tables); + totalTables += tables; + } + + divide_row_groups(activityCtx, totalTables, tableCount, startRowGroup); + rowsProcessed = 0; + if (tableCount != 0) + { + std::shared_ptr table; + reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table)); + rowsCount = table->num_rows(); + chunkTable(table); + tablesProcessed++; + } + else + { + rowsCount = 0; + } + } + return arrow::Status::OK(); +} + +/** + * @brief Returns a boolean so we know if we are writing partitioned files. + * + * @return true If we are partitioning. + * @return false If we are writing a single file. + */ +bool ParquetHelper::partSetting() +{ + return partition; +} + +/** + * @brief Returns the maximum size of the row group set by the user. Default is 1000. + * + * @return int Maximum size of the row group. + */ +__int64 ParquetHelper::getMaxRowSize() +{ + return rowSize; +} + +char ParquetHelper::queryPartOptions() +{ + if (partOption[0] == 'W' || partOption[0] == 'w') + { + return 'w'; + } + else if (partOption[0] == 'R' || partOption[0] == 'r') + { + return 'r'; + } + else + { + failx("Invalid options parameter."); + } +} + +/** + * @brief Checks if all the rows have been read and if reading a single file all of the + * RowGroups as well. + * + * @return True if there are more rows to be read and false if else. + */ +bool ParquetHelper::shouldRead() +{ + return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount); +} + +__int64 &ParquetHelper::getRowsProcessed() +{ + return rowsProcessed; +} + +arrow::Result> ParquetHelper::convertToRecordBatch( + const std::vector &rows, std::shared_ptr schema) +{ + // RecordBatchBuilder will create array builders for us for each field in our + // schema. By passing the number of output rows (`rows.size()`) we can + // pre-allocate the correct size of arrays, except of course in the case of + // string, byte, and list arrays, which have dynamic lengths. + std::unique_ptr batch_builder; + ARROW_ASSIGN_OR_RAISE( + batch_builder, + arrow::RecordBatchBuilder::Make(schema, pool, rows.size())); + + // Inner converter will take rows and be responsible for appending values + // to provided array builders. + JsonValueConverter converter(rows); + for (int i = 0; i < batch_builder->num_fields(); ++i) + { + std::shared_ptr field = schema->field(i); + arrow::ArrayBuilder *builder = batch_builder->GetField(i); + ARROW_RETURN_NOT_OK(converter.Convert(*field.get(), builder)); + } + + std::shared_ptr batch; + ARROW_ASSIGN_OR_RAISE(batch, batch_builder->Flush()); + + // Use RecordBatch::ValidateFull() to make sure arrays were correctly constructed. + reportIfFailure(batch->ValidateFull()); + return batch; +} + +arrow::Result> ParquetHelper::queryRows() +{ + if (tablesProcessed == 0) + { + __int64 offset = 0; + while (offset < startRowGroup) + { + rbatchItr++; + offset++; + } + } + PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr); + rbatchItr++; + std::vector> to_table = {batch}; + return std::move(arrow::Table::FromRecordBatches(std::move(to_table))); +} + +std::unordered_map> &ParquetHelper::next() +{ + if (rowsProcessed == rowsCount) + { + if (partition) + { + // rowsProcessed starts at zero and we read in batches until it is equal to rowsCount + rowsProcessed = 0; + tablesProcessed++; + std::shared_ptr table; + PARQUET_ASSIGN_OR_THROW(table, queryRows()); + rowsCount = table->num_rows(); + chunkTable(table); + } + else + { + std::shared_ptr table; + reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table)); + rowsProcessed = 0; + tablesProcessed++; + rowsCount = table->num_rows(); + chunkTable(table); + } + } + return parquetTable; +} + +__int64 ParquetHelper::queryRowsCount() +{ + return rowsCount; +} + +/** + * @brief Creates the child record for an array or dataset type. This method is used for converting + * the ECL RtlFieldInfo object into arrow::Fields for creating a rapidjson document object. + * + * @param field The field containing metadata for the record. + * + * @returns An arrow::Structype holding the schema and fields of the child records. + */ +std::shared_ptr ParquetHelper::makeChildRecord(const RtlFieldInfo *field) +{ + const RtlTypeInfo *typeInfo = field->type; + const RtlFieldInfo *const *fields = typeInfo->queryFields(); + // Create child fields + if (fields) + { + int count = getNumFields(typeInfo); + + std::vector> child_fields; + + for (int i = 0; i < count; i++, fields++) + { + reportIfFailure(fieldToNode((*fields)->name, *fields, child_fields)); + } + + return std::make_shared(child_fields); + } + else + { + // Create set + const RtlTypeInfo *child = typeInfo->queryChildType(); + const RtlFieldInfo childField = RtlFieldInfo("", "", child); + std::vector> child_field; + reportIfFailure(fieldToNode(childField.name, &childField, child_field)); + return std::make_shared(child_field[0]); + } +} + +/** + * @brief Converts an RtlFieldInfo object into an arrow field and adds it to the output vector. + * + * @param name The name of the field + * + * @param field The field containing metadata for the record. + * + * @param arrow_fields Output vector for pushing new nodes to. + * + * @return Status of the operation + */ +arrow::Status ParquetHelper::fieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrow_fields) +{ + unsigned len = field->type->length; + + switch (field->type->getType()) + { + case type_boolean: + arrow_fields.push_back(std::make_shared(name, arrow::boolean())); + break; + case type_int: + if (field->type->isSigned()) + { + if (len > 4) + { + arrow_fields.push_back(std::make_shared(name, arrow::int64())); + } + else + { + arrow_fields.push_back(std::make_shared(name, arrow::int32())); + } + } + else + { + if (len > 4) + { + arrow_fields.push_back(std::make_shared(name, arrow::uint64())); + } + else + { + arrow_fields.push_back(std::make_shared(name, arrow::uint32())); + } + } + break; + case type_real: + arrow_fields.push_back(std::make_shared(name, arrow::float64())); + break; + case type_string: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_char: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_varstring: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_qstring: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_unicode: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_utf8: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_decimal: + arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + break; + case type_data: + arrow_fields.push_back(std::make_shared(name, arrow::large_binary())); + break; + case type_record: + arrow_fields.push_back(std::make_shared(name, makeChildRecord(field))); + break; + case type_set: + arrow_fields.push_back(std::make_shared(name, makeChildRecord(field))); + break; + default: + failx("Datatype %i is not compatible with this plugin.", field->type->getType()); + } + + return arrow::Status::OK(); +} + +/** + * @brief Creates an arrow::Schema from the field info of the row. + * @param typeInfo An RtlTypeInfo object that we iterate through to get all + * the information for the row. + */ +arrow::Status ParquetHelper::fieldsToSchema(const RtlTypeInfo *typeInfo) +{ + const RtlFieldInfo *const *fields = typeInfo->queryFields(); + int count = getNumFields(typeInfo); + + std::vector> arrow_fields; + + for (int i = 0; i < count; i++, fields++) + { + ARROW_RETURN_NOT_OK(fieldToNode((*fields)->name, *fields, arrow_fields)); + } + + schema = std::make_shared(arrow_fields); + return arrow::Status::OK(); +} + +/** + * @brief Creates a rapidjson::Value and adds it to the stack + */ +void ParquetHelper::beginSet() +{ + rapidjson::Value row(rapidjson::kArrayType); + rowStack.push_back(std::move(row)); +} + +/** + * @brief Creates a rapidjson::Value and adds it to the stack + */ +void ParquetHelper::beginRow() +{ + rapidjson::Value row(rapidjson::kObjectType); + rowStack.push_back(std::move(row)); +} + +/** + * @brief Removes the value from the top of the stack and adds it the parent row. + * If there is only one value on the stack then it converts it to a rapidjson::Document. + */ +void ParquetHelper::endRow(const char *name) +{ + if (rowStack.size() > 1) + { + rapidjson::Value child = std::move(rowStack[rowStack.size() - 1]); + rowStack.pop_back(); + rowStack[rowStack.size() - 1].AddMember(rapidjson::StringRef(name), child, jsonAlloc); + } + else + { + parquetDoc[currentRow].SetObject(); + + rapidjson::Value parent = std::move(rowStack[rowStack.size() - 1]); + rowStack.pop_back(); + + for (auto itr = parent.MemberBegin(); itr != parent.MemberEnd(); ++itr) + { + parquetDoc[currentRow].AddMember(itr->name, itr->value, jsonAlloc); + } + } +} + +ParquetRowStream::ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::shared_ptr _parquet) + : m_resultAllocator(_resultAllocator), s_parquet(_parquet) +{ + rowsCount = _parquet->queryRowsCount(); + array_visitor = std::make_shared(); +} + +const void *ParquetRowStream::nextRow() +{ + if (m_shouldRead && s_parquet->shouldRead()) + { + auto table = s_parquet->next(); + m_currentRow++; + + if (!table.empty()) + { + ParquetRowBuilder pRowBuilder(&table, s_parquet->getRowsProcessed()++, &array_visitor); + + RtlDynamicRowBuilder rowBuilder(m_resultAllocator); + const RtlTypeInfo *typeInfo = m_resultAllocator->queryOutputMeta()->queryTypeInfo(); + assertex(typeInfo); + RtlFieldStrInfo dummyField("", NULL, typeInfo); + size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, pRowBuilder); + return rowBuilder.finalizeRowClear(len); + } + else + failx("Error processing result row"); + } + return nullptr; +} + +void ParquetRowStream::stop() +{ + m_resultAllocator.clear(); + m_shouldRead = false; +} + +void ParquetRowBuilder::xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const +{ + outXPath.clear(); + + if (field->xpath) + { + if (field->xpath[0] == xpathCompoundSeparatorChar) + { + outXPath.append(field->xpath + 1); + } + else + { + const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar); + + if (!sep) + { + outXPath.append(field->xpath); + } + else + { + outXPath.append(field->xpath, 0, static_cast(sep - field->xpath)); + } + } + } + else + { + outXPath.append(field->name); + } +} + +int64_t ParquetRowBuilder::currArrayIndex() +{ + return !m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet ? m_pathStack.back().childrenProcessed++ : currentRow; +} + +/** + * @brief Gets a Boolean result for an ECL Row + * + * @param field Holds the value of the field. + * @return bool Returns the boolean value from the result row. + */ +bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + return p.boolResult; + } + if ((*array_visitor)->type != BoolType) + { + failx("Incorrect type for field %s.", field->name); + } + return (*array_visitor)->bool_arr->Value(currArrayIndex()); +} + +/** + * @brief Gets a data result from the result row and passes it back to engine through result. + * + * @param field Holds the value of the field. + * @param len Length of the Data value. + * @param result Used for returning the result to the caller. + */ +void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, void *&result) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + rtlUtf8ToDataX(len, result, p.resultChars, p.stringResult); + return; + } + if ((*array_visitor)->type == BinaryType) + { + auto view = (*array_visitor)->large_bin_arr->GetView(currArrayIndex()); + rtlStrToDataX(len, result, view.size(), view.data()); + return; + } + else + { + failx("Incorrect type for field %s.", field->name); + } +} + +/** + * @brief Gets a real result from the result row. + * + * @param field Holds the value of the field. + * @return double Double value to return. + */ +double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + return p.doubleResult; + } + if ((*array_visitor)->type != DoubleType) + { + failx("Incorrect type for field %s.", field->name); + } + return (*array_visitor)->double_arr->Value(currArrayIndex()); +} + +__int64 getSigned(std::shared_ptr *array_visitor, int index) +{ + switch ((*array_visitor)->size) + { + case 8: + return (*array_visitor)->int8_arr->Value(index); + case 16: + return (*array_visitor)->int16_arr->Value(index); + case 32: + return (*array_visitor)->int32_arr->Value(index); + case 64: + return (*array_visitor)->int64_arr->Value(index); + default: + failx("getSigned: Invalid size %i", (*array_visitor)->size); + } +} + +unsigned __int64 getUnsigned(std::shared_ptr *array_visitor, int index) +{ + switch ((*array_visitor)->size) + { + case 8: + return (*array_visitor)->uint8_arr->Value(index); + case 16: + return (*array_visitor)->uint16_arr->Value(index); + case 32: + return (*array_visitor)->uint32_arr->Value(index); + case 64: + return (*array_visitor)->uint64_arr->Value(index); + default: + failx("getUnsigned: Invalid size %i", (*array_visitor)->size); + } +} + +/** + * @brief Gets the Signed Integer result from the result row. + * + * @param field Holds the value of the field. + * @return __int64 Value to return. + */ +__int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + return p.uintResult; + } + if ((*array_visitor)->type != IntType) + { + failx("Incorrect type for field %s.", field->name); + } + return getSigned(array_visitor, currArrayIndex()); +} + +/** + * @brief Gets the Unsigned Integer result from the result row. + * + * @param field Holds the value of the field. + * @return unsigned Value to return. + */ +unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + + NullFieldProcessor p(field); + return p.uintResult; + } + if ((*array_visitor)->type != UIntType) + { + failx("Incorrect type for field %s.", field->name); + } + return getUnsigned(array_visitor, currArrayIndex()); +} + +/** + * @brief Gets a String from the result row. + * + * @param field Holds the value of the field. + * @param chars Number of chars in the String. + * @param result Variable used for returning string back to the caller. + */ +void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &chars, char *&result) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + rtlUtf8ToStrX(chars, result, p.resultChars, p.stringResult); + return; + } + if ((*array_visitor)->type == StringType) + { + auto view = (*array_visitor)->string_arr->GetView(currArrayIndex()); + unsigned numchars = rtlUtf8Length(view.size(), view.data()); + rtlUtf8ToStrX(chars, result, numchars, view.data()); + return; + } + else + { + failx("Incorrect type for field %s.", field->name); + } +} + +/** + * @brief Gets a UTF8 from the result row. + * + * @param field Holds the value of the field. + * @param chars Number of chars in the UTF8. + * @param result Variable used for returning UTF8 back to the caller. + */ +void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char *&result) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult); + return; + } + if ((*array_visitor)->type == StringType) + { + auto view = (*array_visitor)->string_arr->GetView(currArrayIndex()); + unsigned numchars = rtlUtf8Length(view.size(), view.data()); + rtlUtf8ToUtf8X(chars, result, numchars, view.data()); + return; + } + else + { + failx("Incorrect type for field %s.", field->name); + } +} + +/** + * @brief Gets a Unicode from the result row. + * + * @param field Holds the value of the field. + * @param chars Number of chars in the Unicode. + * @param result Variable used for returning Unicode back to the caller. + */ +void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar *&result) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult); + return; + } + if ((*array_visitor)->type == StringType) + { + auto view = (*array_visitor)->string_arr->GetView(currArrayIndex()); + unsigned numchars = rtlUtf8Length(view.size(), view.data()); + rtlUtf8ToUnicodeX(chars, result, numchars, view.data()); + return; + } + else + { + failx("Incorrect type for field %s.", field->name); + } +} + +/** + * @brief Gets a decimal from the result row. + * + * @param field Holds the value of the field. + * @param value Variable used for returning decimal to caller. + */ +void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &value) +{ + nextField(field); + + if ((*array_visitor)->type == NullType) + { + NullFieldProcessor p(field); + value.set(p.decimalResult); + return; + } + if ((*array_visitor)->type == StringType) + { + auto dvalue = (*array_visitor)->string_arr->GetView(currArrayIndex()); + value.setString(dvalue.size(), dvalue.data()); + RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type; + value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision()); + return; + } + else + { + failx("Incorrect type for field %s.", field->name); + } +} + +/** + * @brief Starts a new Set. + * + * @param field Field with information about the context of the set. + * @param isAll Not Supported. + */ +void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll) +{ + isAll = false; // ALL not supported + nextField(field); + + if ((*array_visitor)->type == ListType) + { + PathTracker newPathNode(field->name, (*array_visitor)->list_arr, CPNTSet); + newPathNode.childCount = (*array_visitor)->list_arr->value_slice(currentRow)->length(); + m_pathStack.push_back(newPathNode); + } + else + { + failx("Incorrect type for field %s.", field->name); + } +} + +/** + * @brief Checks if we should process another set. + * + * @param field Context information about the set. + * @return true If the children that we have process is less than the total child count. + * @return false If all the children sets have been processed. + */ +bool ParquetRowBuilder::processNextSet(const RtlFieldInfo *field) +{ + return m_pathStack.back().finishedChildren(); +} + +/** + * @brief Starts a new Dataset. + * + * @param field Information about the context of the dataset. + */ +void ParquetRowBuilder::processBeginDataset(const RtlFieldInfo *field) +{ + UNSUPPORTED("Nested Dataset type is unsupported."); +} + +/** + * @brief Starts a new Row. + * + * @param field Information about the context of the row. + */ +void ParquetRowBuilder::processBeginRow(const RtlFieldInfo *field) +{ + StringBuffer xpath; + xpathOrName(xpath, field); + + if (!xpath.isEmpty()) + { + if (strncmp(xpath, "", 5) != 0) + { + nextField(field); + if ((*array_visitor)->type == StructType) + { + m_pathStack.push_back(PathTracker(field->name, (*array_visitor)->struct_arr, CPNTScalar)); + } + else + { + failx("proccessBeginRow: Incorrect type for row."); + } + } + } + else + { + failx("processBeginRow: Field name or xpath missing"); + } +} + +/** + * @brief Checks whether we should process the next row. + * + * @param field Information about the context of the row. + * @return true If the number of child rows process is less than the total count of children. + * @return false If all of the child rows have been processed. + */ +bool ParquetRowBuilder::processNextRow(const RtlFieldInfo *field) +{ + return m_pathStack.back().childrenProcessed < m_pathStack.back().childCount; +} + +/** + * @brief Ends a set. + * + * @param field Information about the context of the set. + */ +void ParquetRowBuilder::processEndSet(const RtlFieldInfo *field) +{ + StringBuffer xpath; + xpathOrName(xpath, field); + + if (!xpath.isEmpty() && !m_pathStack.empty() && strcmp(xpath.str(), m_pathStack.back().nodeName) == 0) + { + m_pathStack.pop_back(); + } +} + +/** + * @brief Ends a dataset. + * + * @param field Information about the context of the dataset. + */ +void ParquetRowBuilder::processEndDataset(const RtlFieldInfo *field) +{ + UNSUPPORTED("Nested Dataset type is unsupported."); +} + +/** + * @brief Ends a row. + * + * @param field Information about the context of the row. + */ +void ParquetRowBuilder::processEndRow(const RtlFieldInfo *field) +{ + StringBuffer xpath; + xpathOrName(xpath, field); + + if (!xpath.isEmpty()) + { + if (!m_pathStack.empty()) + { + if (m_pathStack.back().nodeType == CPNTDataset) + { + m_pathStack.back().childrenProcessed++; + } + else if (strcmp(xpath.str(), m_pathStack.back().nodeName) == 0) + { + m_pathStack.pop_back(); + } + } + } + else + { + failx("processEndRow: Field name or xpath missing"); + } +} + +void ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field) +{ + auto structPtr = m_pathStack.back().structPtr; + reportIfFailure(structPtr->Accept((*array_visitor).get())); + if (m_pathStack.back().nodeType == CPNTScalar) + { + auto child = (*array_visitor)->struct_arr->GetFieldByName(field->name); + reportIfFailure(child->Accept((*array_visitor).get())); + } + else if (m_pathStack.back().nodeType == CPNTSet) + { + auto child = (*array_visitor)->list_arr->value_slice(currentRow); + reportIfFailure(child->Accept((*array_visitor).get())); + } +} + +/** + * @brief Gets the next field and processes it. + * + * @param field Information about the context of the next field. + * @return const char* Result of building field. + */ +void ParquetRowBuilder::nextField(const RtlFieldInfo *field) +{ + if (!field->name) + { + failx("Field name is empty."); + } + if (m_pathStack.size() > 0) + { + nextFromStruct(field); + return; + } + auto column = result_rows->find(field->name); + if (column != result_rows->end()) + { + reportIfFailure(column->second->Accept((*array_visitor).get())); + return; + } +} + +unsigned ParquetRecordBinder::checkNextParam(const RtlFieldInfo *field) +{ + if (logctx.queryTraceLevel() > 4) + logctx.CTXLOG("Binding %s to %d", field->name, thisParam); + return thisParam++; +} + +int ParquetRecordBinder::numFields() +{ + int count = 0; + const RtlFieldInfo *const *fields = typeInfo->queryFields(); + assertex(fields); + while (*fields++) + count++; + return count; +} + +static void addMember(std::shared_ptr r_parquet, rapidjson::Value &key, rapidjson::Value &value) +{ + rapidjson::Value *row = r_parquet->queryCurrentRow(); + if(!row) + failx("Failed to add member to rapidjson row"); + if (row->GetType() == rapidjson::kObjectType) + row->AddMember(key, value, jsonAlloc); + else + row->PushBack(value, jsonAlloc); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param len Number of chars in value. + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindUtf8Param(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(value, rtlUtf8Size(len, value), jsonAlloc); + + addMember(r_parquet, key, val); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param len Number of chars in value. + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindStringParam(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + size32_t utf8chars; + rtlDataAttr utf8; + rtlStrToUtf8X(utf8chars, utf8.refstr(), len, value); + + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(std::string(utf8.getstr(), rtlUtf8Size(utf8chars, utf8.getdata())), jsonAlloc); + + addMember(r_parquet, key, val); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindBoolParam(bool value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(value); + + addMember(r_parquet, key, val); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param len Number of chars in value. + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindDataParam(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + rapidjson::Value key; + key.SetString(field->name, jsonAlloc); + rapidjson::Value val; + size32_t utf8size = rtlUtf8Size(len, value); + val.SetString(value, utf8size, jsonAlloc); + + addMember(r_parquet, key, val); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindIntParam(__int64 value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + int64_t val = value; + + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value num(val); + + addMember(r_parquet, key, num); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindUIntParam(unsigned __int64 value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + uint64_t val = value; + + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value num(val); + + addMember(r_parquet, key, num); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindRealParam(double value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(value); + + addMember(r_parquet, key, val); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param chars Number of chars in value. + * @param value pointer to value of parameter. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindUnicodeParam(unsigned chars, const UChar *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + size32_t utf8chars; + char *utf8; + rtlUnicodeToUtf8X(utf8chars, utf8, chars, value); + + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(utf8, rtlUtf8Size(utf8chars, utf8), jsonAlloc); + + addMember(r_parquet, key, val); +} + +/** + * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * + * @param value Decimal value represented as a string. + * @param field RtlFieldInfo holds meta information about the embed context. + * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + */ +void bindDecimalParam(const char *value, size32_t bytes, const RtlFieldInfo *field, std::shared_ptr r_parquet) +{ + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(std::string(value, bytes), jsonAlloc); + + addMember(r_parquet, key, val); +} + +/** + * @brief Calls the typeInfo member function process to write an ECL row to parquet. + * + * @param row Pointer to ECL row. + */ +void ParquetRecordBinder::processRow(const byte *row) +{ + thisParam = firstParam; + typeInfo->process(row, row, &dummyField, *this); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param len Number of chars in value. + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processString(unsigned len, const char *value, const RtlFieldInfo *field) +{ + checkNextParam(field); + + bindStringParam(len, value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processBool(bool value, const RtlFieldInfo *field) +{ + bindBoolParam(value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param len Number of chars in value. + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processData(unsigned len, const void *value, const RtlFieldInfo *field) +{ + bindDataParam(len, (const char *) value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processInt(__int64 value, const RtlFieldInfo *field) +{ + bindIntParam(value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processUInt(unsigned __int64 value, const RtlFieldInfo *field) +{ + bindUIntParam(value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processReal(double value, const RtlFieldInfo *field) +{ + bindRealParam(value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param value Data to be written to the parquet file. + * @param digits Number of digits in decimal. + * @param precision Number of digits of precision. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field) +{ + Decimal val; + size32_t bytes; + rtlDataAttr decText; + val.setDecimal(digits, precision, value); + val.getStringX(bytes, decText.refstr()); + + bindDecimalParam(decText.getstr(), bytes, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param chars Number of chars in the value. + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo *field) +{ + bindUnicodeParam(chars, value, field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param len Length of QString + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processQString(unsigned len, const char *value, const RtlFieldInfo *field) +{ + size32_t charCount; + rtlDataAttr text; + rtlQStrToStrX(charCount, text.refstr(), len, value); + + bindStringParam(charCount, text.getstr(), field, r_parquet); +} + +/** + * @brief Calls the bind function for the data type of the value. + * + * @param chars Number of chars in the value. + * @param value Data to be written to the parquet file. + * @param field Object with information about the current field. + */ +void ParquetRecordBinder::processUtf8(unsigned chars, const char *value, const RtlFieldInfo *field) +{ + bindUtf8Param(chars, value, field, r_parquet); +} + +/** + * @brief Construct a new ParquetEmbedFunctionContext object + * + * @param _logctx Context logger for use with the ParquetRecordBinder ParquetDatasetBinder classes. + * @param options Pointer to the list of options that are passed into the Embed function. + * @param _flags Should be zero if the embedded script is ok. + */ +ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_logctx, const IThorActivityContext *activityCtx, const char *options, unsigned _flags) + : logctx(_logctx), m_scriptFlags(_flags) +{ + // Option Variables + const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition) + const char *location = ""; // file name and location of where to write parquet file + const char *destination = ""; // file name and location of where to read parquet file from + __int64 rowsize = 40000; // Size of the row groups when writing to parquet files + __int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows + // Iterate through user options and save them + StringArray inputOptions; + inputOptions.appendList(options, ","); + ForEachItemIn(idx, inputOptions) + { + const char *opt = inputOptions.item(idx); + const char *val = strchr(opt, '='); + if (val) + { + StringBuffer optName(val - opt, opt); + val++; + if (stricmp(optName, "option") == 0) + option = val; + else if (stricmp(optName, "location") == 0) + location = val; + else if (stricmp(optName, "destination") == 0) + destination = val; + else if (stricmp(optName, "MaxRowSize") == 0) + rowsize = atoi(val); + else if (stricmp(optName, "BatchSize") == 0) + batchSize = atoi(val); + else + failx("Unknown option %s", optName.str()); + } + } + if (option == "" || (location == "" && destination == "")) + { + failx("Invalid options must specify read or write settings and a location to perform such actions."); + } + else + { + m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, activityCtx); + } +} + +bool ParquetEmbedFunctionContext::getBooleanResult() +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type BOOLEAN"); + return false; +} + +void ParquetEmbedFunctionContext::getDataResult(size32_t &len, void *&result) +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type DATA"); +} + +double ParquetEmbedFunctionContext::getRealResult() +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type REAL"); + return 0.0; +} + +__int64 ParquetEmbedFunctionContext::getSignedResult() +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type SIGNED"); + return 0; +} + +unsigned __int64 ParquetEmbedFunctionContext::getUnsignedResult() +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type UNSIGNED"); + return 0; +} + +void ParquetEmbedFunctionContext::getStringResult(size32_t &chars, char *&result) +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type STRING"); +} + +void ParquetEmbedFunctionContext::getUTF8Result(size32_t &chars, char *&result) +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type UTF8"); +} + +void ParquetEmbedFunctionContext::getUnicodeResult(size32_t &chars, UChar *&result) +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type UNICODE"); +} + +void ParquetEmbedFunctionContext::getDecimalResult(Decimal &value) +{ + UNIMPLEMENTED_X("Parquet Scalar Return Type DECIMAL"); +} + +IRowStream *ParquetEmbedFunctionContext::getDatasetResult(IEngineRowAllocator *_resultAllocator) +{ + Owned parquetRowStream; + parquetRowStream.setown(new ParquetRowStream(_resultAllocator, m_parquet)); + return parquetRowStream.getLink(); +} + +byte *ParquetEmbedFunctionContext::getRowResult(IEngineRowAllocator *_resultAllocator) +{ + Owned parquetRowStream; + parquetRowStream.setown(new ParquetRowStream(_resultAllocator, m_parquet)); + return (byte *)parquetRowStream->nextRow(); +} + +size32_t ParquetEmbedFunctionContext::getTransformResult(ARowBuilder &rowBuilder) +{ + UNIMPLEMENTED_X("Parquet Transform Result"); + return 0; +} + +void ParquetEmbedFunctionContext::bindRowParam(const char *name, IOutputMetaData &metaVal, const byte *val) +{ + ParquetRecordBinder binder(logctx, metaVal.queryTypeInfo(), m_nextParam, m_parquet); + binder.processRow(val); + m_nextParam += binder.numFields(); +} + +void ParquetEmbedFunctionContext::bindDatasetParam(const char *name, IOutputMetaData &metaVal, IRowStream *val) +{ + if (m_oInputStream) + { + fail("At most one dataset parameter supported"); + } + m_oInputStream.setown(new ParquetDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), m_parquet, m_nextParam)); + m_nextParam += m_oInputStream->numFields(); +} + +void ParquetEmbedFunctionContext::bindBooleanParam(const char *name, bool val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type BOOLEAN"); +} + +void ParquetEmbedFunctionContext::bindDataParam(const char *name, size32_t len, const void *val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type DATA"); +} + +void ParquetEmbedFunctionContext::bindFloatParam(const char *name, float val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type FLOAT"); +} + +void ParquetEmbedFunctionContext::bindRealParam(const char *name, double val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type REAL"); +} + +void ParquetEmbedFunctionContext::bindSignedSizeParam(const char *name, int size, __int64 val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type SIGNED SIZE"); +} + +void ParquetEmbedFunctionContext::bindSignedParam(const char *name, __int64 val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type SIGNED"); +} + +void ParquetEmbedFunctionContext::bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type UNSIGNED SIZE"); +} + +void ParquetEmbedFunctionContext::bindUnsignedParam(const char *name, unsigned __int64 val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type UNSIGNED"); +} + +void ParquetEmbedFunctionContext::bindStringParam(const char *name, size32_t len, const char *val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type STRING"); +} + +void ParquetEmbedFunctionContext::bindVStringParam(const char *name, const char *val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type VSTRING"); +} + +void ParquetEmbedFunctionContext::bindUTF8Param(const char *name, size32_t chars, const char *val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type UTF8"); +} + +void ParquetEmbedFunctionContext::bindUnicodeParam(const char *name, size32_t chars, const UChar *val) +{ + UNIMPLEMENTED_X("Parquet Scalar Parameter type UNICODE"); +} + +/** + * @brief Compiles the embedded script passed in by the user. The script is placed inside the EMBED + * and ENDEMBED block. + * + * @param chars The number of chars in the script. + * + * @param script The embedded script for compilation. + */ +void ParquetEmbedFunctionContext::compileEmbeddedScript(size32_t chars, const char *script) +{ +} + +void ParquetEmbedFunctionContext::execute() +{ + if (m_oInputStream) + { + m_oInputStream->executeAll(); + } + else + { + if (m_parquet->queryPartOptions() == 'r') + { + reportIfFailure(m_parquet->openReadFile()); + reportIfFailure(m_parquet->processReadFile()); + } + else + { + failx("Invalid read/write option."); + } + } +} + +void ParquetEmbedFunctionContext::callFunction() +{ + execute(); +} + +unsigned ParquetEmbedFunctionContext::checkNextParam(const char *name) +{ + if (m_nextParam == m_numParams) + failx("Too many parameters supplied: No matching $ placeholder for parameter %s", name); + return m_nextParam++; +} + +/** + * @brief Gets the next ECL row. + * + * @return true If there is a row to process. + * @return false If there are no rows left. + */ +bool ParquetDatasetBinder::bindNext() +{ + roxiemem::OwnedConstRoxieRow nextRow = (const byte *)input->ungroupedNextRow(); + if (!nextRow) + return false; + processRow((const byte *)nextRow.get()); // Bind the variables for the current row + return true; +} + +void ParquetDatasetBinder::writeRecordBatch() +{ + // convert row_batch vector to RecordBatch and write to file. + PARQUET_ASSIGN_OR_THROW(auto recordBatch, d_parquet->convertToRecordBatch(d_parquet->queryRecordBatch(), d_parquet->getSchema())); + // Write each batch as a row_groups + PARQUET_ASSIGN_OR_THROW(auto table, arrow::Table::FromRecordBatches(d_parquet->getSchema(), {recordBatch})); + + if (partition) + { + reportIfFailure(d_parquet->writePartition(table)); + } + else + { + reportIfFailure(d_parquet->queryWriter()->WriteTable(*(table.get()), recordBatch->num_rows())); + } +} + +/** + * @brief Binds all the rows of the dataset and executes the function. + */ +void ParquetDatasetBinder::executeAll() +{ + if (bindNext()) + { + reportIfFailure(d_parquet->openWriteFile()); + + int i = 1; + int rowSize = d_parquet->getMaxRowSize(); + do + { + if (i % rowSize == 0) + { + writeRecordBatch(); + jsonAlloc.Clear(); + } + d_parquet->updateRow(); + i++; + } + while (bindNext()); + + i--; + if (i % rowSize != 0) + { + d_parquet->queryRecordBatch().resize(i % rowSize); + writeRecordBatch(); + jsonAlloc.Clear(); + } + } +} +/** + * @brief Serves as the entry point for the HPCC Engine into the plugin and is how it obtains a + * ParquetEmbedFunctionContext object for creating the query and executing it. + * + */ +class ParquetEmbedContext : public CInterfaceOf +{ +public: + virtual IEmbedFunctionContext *createFunctionContext(unsigned flags, const char *options) override + { + return createFunctionContextEx(nullptr, nullptr, flags, options); + } + + virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext *ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override + { + if (flags & EFimport) + { + UNSUPPORTED("IMPORT"); + return nullptr; + } + else + return new ParquetEmbedFunctionContext(ctx ? ctx->queryContextLogger() : queryDummyContextLogger(), activityCtx, options, flags); + } + + virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override + { + throwUnexpected(); + return nullptr; + } +}; + +extern DECL_EXPORT IEmbedContext *getEmbedContext() +{ + return new ParquetEmbedContext(); +} + +extern DECL_EXPORT bool syntaxCheck(const char *script) +{ + return true; +} +} + +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + return true; +} + +MODULE_EXIT() +{ +} diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp new file mode 100644 index 00000000000..8da6377116b --- /dev/null +++ b/plugins/parquet/parquetembed.hpp @@ -0,0 +1,1034 @@ +/*############################################################################## + HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#ifndef _PARQUETEMBED_INCL +#define _PARQUETEMBED_INCL + +#ifdef PARQUETEMBED_PLUGIN_EXPORTS +#define PARQUETEMBED_PLUGIN_API DECL_EXPORT +#else +#define PARQUETEMBED_PLUGIN_API DECL_IMPORT +#endif + +#define RAPIDJSON_HAS_STDSTRING 1 + +#include "arrow/api.h" +#include "arrow/dataset/api.h" +#include "arrow/filesystem/api.h" +#include "arrow/io/file.h" +#include "arrow/util/logging.h" +#include "arrow/ipc/api.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" +#include "rapidjson/document.h" + +// Platform includes +#include "hqlplugins.hpp" +#include "eclrtl_imp.hpp" +#include "eclhelper.hpp" +#include "tokenserialization.hpp" +#include "rtlfield.hpp" +#include "roxiemem.hpp" + +#include +#include + +namespace parquetembed +{ +extern void UNSUPPORTED(const char *feature) __attribute__((noreturn)); +extern void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2))); +extern void fail(const char *msg) __attribute__((noreturn)); + +#define reportIfFailure(st) \ + if (!st.ok()) \ + { \ + failx("%s: %s.", st.CodeAsString().c_str(), st.message().c_str()); \ + } + +static void typeError(const char *expected, const char *fieldname) +{ + VStringBuffer msg("MongoDBembed: type mismatch - %s expected", expected); + if (!isEmptyString(fieldname)) + msg.appendf(" for field %s", fieldname); + rtlFail(0, msg.str()); +} + +static void typeError(const char *expected, const RtlFieldInfo *field) +{ + typeError(expected, field ? field->name : nullptr); +} + +static int getNumFields(const RtlTypeInfo *record) +{ + int count = 0; + const RtlFieldInfo *const *fields = record->queryFields(); + assertex(fields); + while (*fields++) + count++; + return count; +} + +static void handleDeserializeOutcome(DeserializationResult resultcode, const char *targetype, const char *culpritvalue) +{ + switch (resultcode) + { + case Deserialization_SUCCESS: + break; + case Deserialization_BAD_TYPE: + failx("Deserialization error (%s): value cannot be const", targetype); + break; + case Deserialization_UNSUPPORTED: + failx("Deserialization error (%s): encountered value type not supported", targetype); + break; + case Deserialization_INVALID_TOKEN: + failx("Deserialization error (%s): token cannot be NULL, empty, or all whitespace", targetype); + break; + case Deserialization_NOT_A_NUMBER: + failx("Deserialization error (%s): non-numeric characters found in numeric conversion: '%s'", targetype, culpritvalue); + break; + case Deserialization_OVERFLOW: + failx("Deserialization error (%s): number too large to be represented by receiving value", targetype); + break; + case Deserialization_UNDERFLOW: + failx("Deserialization error (%s): number too small to be represented by receiving value", targetype); + break; + default: + typeError(targetype, culpritvalue); + break; + } +} + +enum PathNodeType {CPNTScalar, CPNTDataset, CPNTSet}; + +struct PathTracker +{ + const char *nodeName; + PathNodeType nodeType; + const arrow::Array *structPtr; + unsigned int childCount = 0; + unsigned int childrenProcessed = 0; + + PathTracker(const char *_nodeName, const arrow::Array *_struct, PathNodeType _nodeType) + : nodeName(_nodeName), nodeType(_nodeType), structPtr(_struct) {} + + bool finishedChildren() { return childrenProcessed < childCount; } +}; + +enum ParquetArrayType +{ + NullType, + BoolType, + IntType, + UIntType, + FloatType, + StringType, + BinaryType, + Decimal128Type, + ListType, + StructType, + DoubleType +}; + +class ParquetArrayVisitor : public arrow::ArrayVisitor +{ +public: + arrow::Status Visit(const arrow::NullArray &array) + { + type = NullType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::BooleanArray &array) + { + bool_arr = &array; + type = BoolType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Int8Array &array) + { + int8_arr = &array; + type = IntType; + size = 8; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Int16Array &array) + { + int16_arr = &array; + type = IntType; + size = 16; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Int32Array &array) + { + int32_arr = &array; + type = IntType; + size = 32; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Int64Array &array) + { + int64_arr = &array; + type = IntType; + size = 64; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::UInt8Array &array) + { + uint8_arr = &array; + type = UIntType; + size = 8; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::UInt16Array &array) + { + uint16_arr = &array; + type = UIntType; + size = 16; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::UInt32Array &array) + { + uint32_arr = &array; + type = UIntType; + size = 32; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::UInt64Array &array) + { + uint64_arr = &array; + type = UIntType; + size = 64; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::FloatArray &array) + { + float_arr = &array; + type = FloatType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::DoubleArray &array) + { + double_arr = &array; + type = DoubleType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::StringArray &array) + { + string_arr = &array; + type = StringType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::BinaryArray &array) + { + bin_arr = &array; + type = BinaryType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::LargeBinaryArray &array) + { + large_bin_arr = &array; + type = BinaryType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Decimal128Array &array) + { + dec_arr = &array; + type = Decimal128Type; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::ListArray &array) + { + list_arr = &array; + type = ListType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::StructArray &array) + { + struct_arr = &array; + type = StructType; + return arrow::Status::OK(); + } + + ParquetArrayType type = NullType; + int size = 0; + const arrow::BooleanArray *bool_arr = nullptr; + const arrow::Int8Array *int8_arr = nullptr; + const arrow::Int16Array *int16_arr = nullptr; + const arrow::Int32Array *int32_arr = nullptr; + const arrow::Int64Array *int64_arr = nullptr; + const arrow::UInt8Array *uint8_arr = nullptr; + const arrow::UInt16Array *uint16_arr = nullptr; + const arrow::UInt32Array *uint32_arr = nullptr; + const arrow::UInt64Array *uint64_arr = nullptr; + const arrow::FloatArray *float_arr = nullptr; + const arrow::DoubleArray *double_arr = nullptr; + const arrow::StringArray *string_arr = nullptr; + const arrow::BinaryArray *bin_arr = nullptr; + const arrow::LargeBinaryArray *large_bin_arr = nullptr; + const arrow::Decimal128Array *dec_arr = nullptr; + const arrow::ListArray *list_arr = nullptr; + const arrow::StructArray *struct_arr = nullptr; +}; + +const rapidjson::Value kNullJsonSingleton = rapidjson::Value(); + +class DocValuesIterator +{ +public: + /// \param rows vector of rows + /// \param path field names to enter + /// \param array_levels number of arrays to enter + DocValuesIterator(const std::vector &_rows, + std::vector &&_path, int64_t _array_levels) + : rows(_rows), path(std::move(_path)), array_levels(_array_levels) {} + + ~DocValuesIterator() = default; + + const rapidjson::Value *NextArrayOrRow(const rapidjson::Value *value, size_t *path_i, + int64_t *arr_i) + { + while (array_stack.size() > 0) + { + ArrayPosition &pos = array_stack.back(); + // Try to get next position in Array + if (pos.index + 1 < pos.array_node->Size()) + { + ++pos.index; + value = &(*pos.array_node)[pos.index]; + *path_i = pos.path_index; + *arr_i = array_stack.size(); + return value; + } + else + { + array_stack.pop_back(); + } + } + ++row_i; + if (row_i < rows.size()) + { + value = static_cast(&rows[row_i]); + } + else + { + value = nullptr; + } + *path_i = 0; + *arr_i = 0; + return value; + } + + arrow::Result Next() + { + const rapidjson::Value *value = nullptr; + size_t path_i; + int64_t arr_i; + // Can either start at document or at last array level + if (array_stack.size() > 0) + { + auto &pos = array_stack.back(); + value = pos.array_node; + path_i = pos.path_index; + arr_i = array_stack.size() - 1; + } + + value = NextArrayOrRow(value, &path_i, &arr_i); + + // Traverse to desired level (with possible backtracking as needed) + while (path_i < path.size() || arr_i < array_levels) + { + if (value == nullptr) + { + return value; + } + else if (value->IsArray() && value->Size() > 0) + { + ArrayPosition pos; + pos.array_node = value; + pos.path_index = path_i; + pos.index = 0; + array_stack.push_back(pos); + + value = &(*value)[0]; + ++arr_i; + } + else if (value->IsArray()) + { + // Empty array means we need to backtrack and go to next array or row + value = NextArrayOrRow(value, &path_i, &arr_i); + } + else if (value->HasMember(path[path_i])) + { + value = &(*value)[path[path_i]]; + ++path_i; + } + else + { + return &kNullJsonSingleton; + } + } + + // Return value + return value; + } + +private: + const std::vector &rows; + std::vector path; + int64_t array_levels; + size_t row_i = -1; // index of current row + + // Info about array position for one array level in array stack + struct ArrayPosition + { + const rapidjson::Value *array_node; + int64_t path_index; + rapidjson::SizeType index; + }; + std::vector array_stack; +}; + +class JsonValueConverter +{ +public: + explicit JsonValueConverter(const std::vector &rows) + : rows_(rows) {} + + JsonValueConverter(const std::vector &rows, const std::vector &root_path, int64_t array_levels) + : rows_(rows), root_path_(root_path), array_levels_(array_levels) {} + + ~JsonValueConverter() = default; + + /// \brief For field passed in, append corresponding values to builder + arrow::Status Convert(const arrow::Field &field, arrow::ArrayBuilder *builder) + { + return Convert(field, field.name(), builder); + } + + /// \brief For field passed in, append corresponding values to builder + arrow::Status Convert(const arrow::Field &field, const std::string &field_name, arrow::ArrayBuilder *builder) + { + field_name_ = field_name; + builder_ = builder; + ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*field.type().get(), this)); + return arrow::Status::OK(); + } + + // Default implementation + arrow::Status Visit(const arrow::DataType &type) + { + return arrow::Status::NotImplemented("Can not convert json value to Arrow array of type ", type.ToString()); + } + + arrow::Status Visit(const arrow::LargeBinaryType &type) + { + arrow::LargeBinaryBuilder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetString(), value->GetStringLength())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::Int64Type &type) + { + arrow::Int64Builder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + if (value->IsInt()) + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetInt())); + } + else if (value->IsInt64()) + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetInt64())); + } + else + { + return arrow::Status::Invalid("Value is not an integer"); + } + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::Int32Type &type) + { + arrow::Int32Builder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetInt())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::UInt64Type &type) + { + arrow::Int64Builder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + if (value->IsUint()) + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetUint())); + } + else if (value->IsUint64()) + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetUint64())); + } + else + { + return arrow::Status::Invalid("Value is not an integer"); + } + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::UInt32Type &type) + { + arrow::UInt32Builder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetUint())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::FloatType &type) + { + arrow::FloatBuilder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetFloat())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::DoubleType &type) + { + arrow::DoubleBuilder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetDouble())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::StringType &type) + { + arrow::StringBuilder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetString(), value->GetStringLength())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::BooleanType &type) + { + arrow::BooleanBuilder *builder = static_cast(builder_); + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + if (value->IsNull()) + { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + } + else + { + ARROW_RETURN_NOT_OK(builder->Append(value->GetBool())); + } + } + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::StructType &type) + { + arrow::StructBuilder *builder = static_cast(builder_); + + std::vector child_path(root_path_); + if (field_name_.size() > 0) + { + child_path.push_back(field_name_); + } + auto child_converter = JsonValueConverter(rows_, child_path, array_levels_); + + for (int i = 0; i < type.num_fields(); ++i) + { + std::shared_ptr child_field = type.field(i); + std::shared_ptr child_builder = builder->child_builder(i); + + ARROW_RETURN_NOT_OK(child_converter.Convert(*child_field.get(), child_builder.get())); + } + + // Make null bitunordered_map + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_RETURN_NOT_OK(builder->Append(!value->IsNull())); + } + + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::ListType &type) + { + arrow::ListBuilder *builder = static_cast(builder_); + + // Values and offsets needs to be interleaved in ListBuilder, so first collect the + // values + std::unique_ptr tmp_value_builder; + ARROW_ASSIGN_OR_RAISE(tmp_value_builder, arrow::MakeBuilder(builder->value_builder()->type())); + std::vector child_path(root_path_); + child_path.push_back(field_name_); + auto child_converter = JsonValueConverter(rows_, child_path, array_levels_ + 1); + ARROW_RETURN_NOT_OK(child_converter.Convert(*type.value_field().get(), "", tmp_value_builder.get())); + + std::shared_ptr values_array; + ARROW_RETURN_NOT_OK(tmp_value_builder->Finish(&values_array)); + std::shared_ptr values_data = values_array->data(); + + arrow::ArrayBuilder *value_builder = builder->value_builder(); + int64_t offset = 0; + for (const auto &maybe_value : FieldValues()) + { + ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_RETURN_NOT_OK(builder->Append(!value->IsNull())); + if (!value->IsNull() && value->Size() > 0) + { + ARROW_RETURN_NOT_OK(value_builder->AppendArraySlice(*values_data.get(), offset, value->Size())); + offset += value->Size(); + } + } + + return arrow::Status::OK(); + } + +private: + std::string field_name_; + arrow::ArrayBuilder *builder_; + const std::vector &rows_; + std::vector root_path_; + int64_t array_levels_ = 0; + + /// Return a flattened iterator over values at nested location + arrow::Iterator FieldValues() + { + std::vector path(root_path_); + if (field_name_.size() > 0) + { + path.push_back(field_name_); + } + + auto iter = DocValuesIterator(rows_, std::move(path), array_levels_); + auto fn = [iter]() mutable -> arrow::Result + { return iter.Next(); }; + + return arrow::MakeFunctionIterator(fn); + } +}; + +/** + * @brief ParquetHelper holds the inputs from the user, the file stream objects, function for setting the schema, and functions + * for opening parquet files. + */ +class ParquetHelper +{ +public: + ParquetHelper(const char *option, const char *location, const char *destination, int rowsize, int _batchSize, const IThorActivityContext *_activityCtx); + ~ParquetHelper(); + std::shared_ptr getSchema(); + arrow::Status openWriteFile(); + arrow::Status openReadFile(); + arrow::Status processReadFile(); + arrow::Status writePartition(std::shared_ptr table); + parquet::arrow::FileWriter *queryWriter(); + void chunkTable(std::shared_ptr &table); + rapidjson::Value *queryCurrentRow(); + void updateRow(); + std::vector &queryRecordBatch(); + bool partSetting(); + __int64 getMaxRowSize(); + char queryPartOptions(); + bool shouldRead(); + __int64 &getRowsProcessed(); + arrow::Result> convertToRecordBatch(const std::vector &rows, std::shared_ptr schema); + std::unordered_map> &next(); + std::shared_ptr queryCurrentTable(__int64 currTable); + arrow::Result> queryRows(); + __int64 queryRowsCount(); + std::shared_ptr makeChildRecord(const RtlFieldInfo *field); + arrow::Status fieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrow_fields); + arrow::Status fieldsToSchema(const RtlTypeInfo *typeInfo); + void beginSet(); + void beginRow(); + void endRow(const char *name); + +private: + __int64 currentRow = 0; + __int64 rowSize = 0; // The maximum size of each parquet row group. + __int64 tablesProcessed = 0; // Current RowGroup that has been read from the input file. + __int64 rowsProcessed = 0; // Current Row that has been read from the RowGroup + __int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker + __int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading. + __int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file. + size_t batchSize = 0; // batchSize for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once. + bool partition; // Boolean variable to track whether we are writing partitioned files or not. + std::string partOption; // Read, r, Write, w, option for specifying parquet operation. + std::string location; // Location to read parquet file from. + std::string destination; // Destination to write parquet file to. + const IThorActivityContext *activityCtx; // Additional local context information + std::shared_ptr schema = nullptr; // Schema object that holds the schema of the file for reading and writing + std::unique_ptr writer = nullptr; // FileWriter for writing to parquet files. + std::vector parquetDoc; // Document vector for converting rows to columns for writing to parquet files. + std::vector rowStack; // Stack for keeping track of the context when building a nested row. + std::shared_ptr scanner = nullptr; // Scanner for reading through partitioned files. PARTITION + arrow::dataset::FileSystemDatasetWriteOptions writeOptions; // Write options for writing partitioned files. PARTITION + std::shared_ptr rbatchReader = nullptr; + arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; + std::vector<__int64> fileTableCounts; + std::vector> parquetFileReaders; + std::unordered_map> parquetTable; + arrow::MemoryPool *pool = nullptr; +}; + +/** + * @brief Builds ECL Records from Parquet result rows. + * + */ +class ParquetRowStream : public RtlCInterface, implements IRowStream +{ +public: + ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::shared_ptr _parquet); + virtual ~ParquetRowStream() = default; + + RTLIMPLEMENT_IINTERFACE + virtual const void *nextRow() override; + virtual void stop() override; + +private: + Linked m_resultAllocator; //! Pointer to allocator used when building result rows. + bool m_shouldRead = true; //! If true, we should continue trying to read more messages. + __int64 m_currentRow = 0; //! Current result row. + __int64 rowsCount; //! Number of result rows read from parquet file. + std::shared_ptr array_visitor = nullptr; + std::shared_ptr s_parquet = nullptr; //! Shared pointer to ParquetHelper class for the stream class. +}; + +/** + * @brief Builds ECL records for ParquetRowStream. + * + */ +class ParquetRowBuilder : public CInterfaceOf +{ +public: + ParquetRowBuilder(std::unordered_map> *_result_rows, int64_t _currentRow, std::shared_ptr *_array_visitor) + : result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor) {} + + virtual ~ParquetRowBuilder() = default; + + virtual bool getBooleanResult(const RtlFieldInfo *field) override; + virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void *&result) override; + virtual double getRealResult(const RtlFieldInfo *field) override; + virtual __int64 getSignedResult(const RtlFieldInfo *field) override; + virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field) override; + virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char *&result) override; + virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char *&result) override; + virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar *&result) override; + virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value) override; + virtual void processBeginSet(const RtlFieldInfo *field, bool &isAll) override; + virtual bool processNextSet(const RtlFieldInfo *field) override; + virtual void processBeginDataset(const RtlFieldInfo *field) override; + virtual void processBeginRow(const RtlFieldInfo *field) override; + virtual bool processNextRow(const RtlFieldInfo *field) override; + virtual void processEndSet(const RtlFieldInfo *field) override; + virtual void processEndDataset(const RtlFieldInfo *field) override; + virtual void processEndRow(const RtlFieldInfo *field) override; + +protected: + const std::shared_ptr &getChunk(std::shared_ptr *column); + void nextField(const RtlFieldInfo *field); + void nextFromStruct(const RtlFieldInfo *field); + void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const; + int64_t currArrayIndex(); + +private: + __int64 currentRow; + TokenDeserializer m_tokenDeserializer; + std::unordered_map> *result_rows; + std::vector m_pathStack; + std::shared_ptr *array_visitor; +}; + +/** + * @brief Binds ECL records to parquet objects + * + */ +class ParquetRecordBinder : public CInterfaceOf +{ +public: + ParquetRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, int _firstParam, std::shared_ptr _parquet) + : logctx(_logctx), typeInfo(_typeInfo), firstParam(_firstParam), dummyField("", NULL, typeInfo), thisParam(_firstParam) + { + r_parquet = _parquet; + partition = _parquet->partSetting(); + } + + virtual ~ParquetRecordBinder() = default; + + int numFields(); + void processRow(const byte *row); + virtual void processString(unsigned len, const char *value, const RtlFieldInfo *field); + virtual void processBool(bool value, const RtlFieldInfo *field); + virtual void processData(unsigned len, const void *value, const RtlFieldInfo *field); + virtual void processInt(__int64 value, const RtlFieldInfo *field); + virtual void processUInt(unsigned __int64 value, const RtlFieldInfo *field); + virtual void processReal(double value, const RtlFieldInfo *field); + virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field); + virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field) + { + UNSUPPORTED("UNSIGNED decimals"); + } + + virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo *field); + virtual void processQString(unsigned len, const char *value, const RtlFieldInfo *field); + virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo *field); + virtual bool processBeginSet(const RtlFieldInfo *field, unsigned numElements, bool isAll, const byte *data) + { + r_parquet->beginSet(); + return true; + } + virtual bool processBeginDataset(const RtlFieldInfo *field, unsigned rowsCount) + { + UNSUPPORTED("DATASET"); + return false; + } + virtual bool processBeginRow(const RtlFieldInfo *field) + { + // There is a better way to do this than creating a stack and having to iterate back through to + // copy over the members of the rapidjson value. + // TO DO + // Create a json string of all the fields which will be much more performant. + r_parquet->beginRow(); + return true; + } + virtual void processEndSet(const RtlFieldInfo *field) + { + r_parquet->endRow(field->name); + } + virtual void processEndDataset(const RtlFieldInfo *field) + { + UNSUPPORTED("DATASET"); + } + virtual void processEndRow(const RtlFieldInfo *field) + { + r_parquet->endRow(field->name); + } + +protected: + inline unsigned checkNextParam(const RtlFieldInfo *field); + + const RtlTypeInfo *typeInfo = nullptr; + const IContextLogger &logctx; + int firstParam; + RtlFieldStrInfo dummyField; + int thisParam; + TokenSerializer m_tokenSerializer; + + std::shared_ptr r_parquet; + bool partition; //! Local copy of a boolean so we can know if we are writing partitioned files or not. +}; + +/** + * @brief Binds an ECL dataset to a vector of parquet objects. + * + */ +class ParquetDatasetBinder : public ParquetRecordBinder +{ +public: + /** + * @brief Construct a new ParquetDataset Binder object + * + * @param _logctx logger for building the dataset. + * @param _input Stream of input of dataset. + * @param _typeInfo Field type info. + * @param _query Holds the builder object for creating the documents. + * @param _firstParam Index of the first param. + */ + ParquetDatasetBinder(const IContextLogger &_logctx, IRowStream *_input, const RtlTypeInfo *_typeInfo, std::shared_ptr _parquet, int _firstParam) + : input(_input), ParquetRecordBinder(_logctx, _typeInfo, _firstParam, _parquet) + { + d_parquet = _parquet; + reportIfFailure(d_parquet->fieldsToSchema(_typeInfo)); + } + virtual ~ParquetDatasetBinder() = default; + void getFieldTypes(const RtlTypeInfo *typeInfo); + bool bindNext(); + void writeRecordBatch(); + void executeAll(); + +protected: + Owned input; + std::shared_ptr d_parquet; //! Helper object for keeping track of read and write options, schema, and file names. +}; + +/** + * @brief Main interface for the engine to interact with the plugin. The get functions return results to the engine and the Rowstream and + * + */ +class ParquetEmbedFunctionContext : public CInterfaceOf +{ +public: + ParquetEmbedFunctionContext(const IContextLogger &_logctx, const IThorActivityContext *activityCtx, const char *options, unsigned _flags); + virtual ~ParquetEmbedFunctionContext() = default; + virtual bool getBooleanResult(); + virtual void getDataResult(size32_t &len, void *&result); + virtual double getRealResult(); + virtual __int64 getSignedResult(); + virtual unsigned __int64 getUnsignedResult(); + virtual void getStringResult(size32_t &chars, char *&result); + virtual void getUTF8Result(size32_t &chars, char *&result); + virtual void getUnicodeResult(size32_t &chars, UChar *&result); + virtual void getDecimalResult(Decimal &value); + virtual void getSetResult(bool &__isAllResult, size32_t &__resultBytes, void *&__result, int elemType, size32_t elemSize) + { + UNSUPPORTED("SET results"); + } + virtual IRowStream *getDatasetResult(IEngineRowAllocator *_resultAllocator); + virtual byte *getRowResult(IEngineRowAllocator *_resultAllocator); + virtual size32_t getTransformResult(ARowBuilder &rowBuilder); + virtual void bindRowParam(const char *name, IOutputMetaData &metaVal, const byte *val) override; + virtual void bindDatasetParam(const char *name, IOutputMetaData &metaVal, IRowStream *val); + virtual void bindBooleanParam(const char *name, bool val); + virtual void bindDataParam(const char *name, size32_t len, const void *val); + virtual void bindFloatParam(const char *name, float val); + virtual void bindRealParam(const char *name, double val); + virtual void bindSignedSizeParam(const char *name, int size, __int64 val); + virtual void bindSignedParam(const char *name, __int64 val); + virtual void bindUnsignedSizeParam(const char *name, int size, unsigned __int64 val); + virtual void bindUnsignedParam(const char *name, unsigned __int64 val); + virtual void bindStringParam(const char *name, size32_t len, const char *val); + virtual void bindVStringParam(const char *name, const char *val); + virtual void bindUTF8Param(const char *name, size32_t chars, const char *val); + virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val); + virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, const void *setData) + { + UNSUPPORTED("SET parameters"); + } + virtual IInterface *bindParamWriter(IInterface *esdl, const char *esdlservice, const char *esdltype, const char *name) + { + return NULL; + } + virtual void paramWriterCommit(IInterface *writer) + { + UNSUPPORTED("paramWriterCommit"); + } + virtual void writeResult(IInterface *esdl, const char *esdlservice, const char *esdltype, IInterface *writer) + { + UNSUPPORTED("writeResult"); + } + virtual void importFunction(size32_t lenChars, const char *text) + { + UNSUPPORTED("importFunction"); + } + virtual void compileEmbeddedScript(size32_t chars, const char *script); + virtual void callFunction(); + virtual void loadCompiledScript(size32_t chars, const void *_script) override + { + UNSUPPORTED("loadCompiledScript"); + } + virtual void enter() override {} + virtual void reenter(ICodeContext *codeCtx) override {} + virtual void exit() override {} + +protected: + void execute(); + unsigned checkNextParam(const char *name); + const IContextLogger &logctx; + Owned m_resultrow; + + Owned m_oInputStream; //! Input Stream used for building a dataset. + + TokenDeserializer m_tokenDeserializer; + TokenSerializer m_tokenSerializer; + unsigned m_nextParam = 0; //! Index of the next parameter to process. + unsigned m_numParams = 0; //! Number of parameters in the function definition. + unsigned m_scriptFlags; //! Count of flags raised by embedded script. + + std::shared_ptr m_parquet; //! Helper object for keeping track of read and write options, schema, and file names. +}; +} +#endif diff --git a/vcpkg.json.in b/vcpkg.json.in index 334ae5aab9d..7396715d0af 100644 --- a/vcpkg.json.in +++ b/vcpkg.json.in @@ -11,6 +11,17 @@ "name": "apr-util", "platform": "@VCPKG_APR@" }, + { + "name": "arrow", + "default-features": false, + "features": [ + "acero", + "dataset", + "filesystem", + "parquet" + ], + "platform": "@VCPKG_PARQUETEMBED@" + }, { "name": "aws-sdk-cpp", "default-features": false, diff --git a/vcpkg_overlays/arrow/fix-ci-error.patch b/vcpkg_overlays/arrow/fix-ci-error.patch new file mode 100644 index 00000000000..adbe0f86683 --- /dev/null +++ b/vcpkg_overlays/arrow/fix-ci-error.patch @@ -0,0 +1,13 @@ +diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt +index eeda520..f55080e 100644 +--- a/cpp/CMakeLists.txt ++++ b/cpp/CMakeLists.txt +@@ -921,7 +921,7 @@ if(WIN32) + list(APPEND ARROW_SYSTEM_LINK_LIBS "ws2_32.dll") + endif() + +-if(NOT WIN32 AND NOT APPLE) ++if(NOT WIN32 AND NOT APPLE AND NOT ANDROID) + # Pass -lrt on Linux only + list(APPEND ARROW_SYSTEM_LINK_LIBS rt) + endif() diff --git a/vcpkg_overlays/arrow/msvc-static-name.patch b/vcpkg_overlays/arrow/msvc-static-name.patch new file mode 100644 index 00000000000..45624f16898 --- /dev/null +++ b/vcpkg_overlays/arrow/msvc-static-name.patch @@ -0,0 +1,13 @@ +diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake +index 391c43e0a..50f6d3d3c 100644 +--- a/cpp/cmake_modules/BuildUtils.cmake ++++ b/cpp/cmake_modules/BuildUtils.cmake +@@ -427,7 +427,7 @@ function(ADD_ARROW_LIB LIB_NAME) + target_include_directories(${LIB_NAME}_static PRIVATE ${ARG_PRIVATE_INCLUDES}) + endif() + +- if(MSVC_TOOLCHAIN) ++ if(MSVC_TOOLCHAIN AND 0) + set(LIB_NAME_STATIC ${LIB_NAME}_static) + else() + set(LIB_NAME_STATIC ${LIB_NAME}) diff --git a/vcpkg_overlays/arrow/portfile.cmake b/vcpkg_overlays/arrow/portfile.cmake new file mode 100644 index 00000000000..1ee7a81efa8 --- /dev/null +++ b/vcpkg_overlays/arrow/portfile.cmake @@ -0,0 +1,122 @@ +vcpkg_download_distfile( + ARCHIVE_PATH + URLS "https://archive.apache.org/dist/arrow/arrow-${VERSION}/apache-arrow-${VERSION}.tar.gz" + FILENAME apache-arrow-${VERSION}.tar.gz + SHA512 3314d79ef20ac2cfc63f2c16fafb30c3f6187c10c6f5ea6ff036f6db766621d7c65401d85bf1e979bd0ecf831fbb0a785467642792d6bf77016f9807243c064e +) +vcpkg_extract_source_archive( + SOURCE_PATH + ARCHIVE ${ARCHIVE_PATH} + PATCHES + msvc-static-name.patch + utf8proc.patch + thrift.patch + fix-ci-error.patch +) + +vcpkg_check_features(OUT_FEATURE_OPTIONS FEATURE_OPTIONS + FEATURES + acero ARROW_ACERO + csv ARROW_CSV + cuda ARROW_CUDA + dataset ARROW_DATASET + filesystem ARROW_FILESYSTEM + flight ARROW_FLIGHT + gcs ARROW_GCS + jemalloc ARROW_JEMALLOC + json ARROW_JSON + mimalloc ARROW_MIMALLOC + orc ARROW_ORC + parquet ARROW_PARQUET + parquet PARQUET_REQUIRE_ENCRYPTION + s3 ARROW_S3 +) + +if(VCPKG_TARGET_IS_WINDOWS AND NOT VCPKG_TARGET_IS_MINGW) + list(APPEND FEATURE_OPTIONS "-DARROW_USE_NATIVE_INT128=OFF") +endif() + +string(COMPARE EQUAL ${VCPKG_LIBRARY_LINKAGE} "dynamic" ARROW_BUILD_SHARED) +string(COMPARE EQUAL ${VCPKG_LIBRARY_LINKAGE} "static" ARROW_BUILD_STATIC) +string(COMPARE EQUAL ${VCPKG_LIBRARY_LINKAGE} "dynamic" ARROW_DEPENDENCY_USE_SHARED) + +vcpkg_cmake_configure( + SOURCE_PATH "${SOURCE_PATH}/cpp" + OPTIONS + ${FEATURE_OPTIONS} + -DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED} + -DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC} + -DARROW_BUILD_TESTS=OFF + -DARROW_DEPENDENCY_SOURCE=SYSTEM + -DARROW_DEPENDENCY_USE_SHARED=${ARROW_DEPENDENCY_USE_SHARED} + -DARROW_PACKAGE_KIND=vcpkg + -DARROW_WITH_BROTLI=ON + -DARROW_WITH_BZ2=ON + -DARROW_WITH_LZ4=ON + -DARROW_WITH_SNAPPY=ON + -DARROW_WITH_ZLIB=ON + -DARROW_WITH_ZSTD=ON + -DBUILD_WARNING_LEVEL=PRODUCTION + -DCMAKE_SYSTEM_PROCESSOR=${VCPKG_TARGET_ARCHITECTURE} + -DZSTD_MSVC_LIB_PREFIX= + MAYBE_UNUSED_VARIABLES + ZSTD_MSVC_LIB_PREFIX +) + +vcpkg_cmake_install() +vcpkg_copy_pdbs() + +vcpkg_fixup_pkgconfig() + +if(EXISTS "${CURRENT_PACKAGES_DIR}/lib/arrow_static.lib") + message(FATAL_ERROR "Installed lib file should be named 'arrow.lib' via patching the upstream build.") +endif() + +if("dataset" IN_LIST FEATURES) + vcpkg_cmake_config_fixup( + PACKAGE_NAME arrowdataset + CONFIG_PATH lib/cmake/ArrowDataset + DO_NOT_DELETE_PARENT_CONFIG_PATH + ) +endif() + +if("acero" IN_LIST FEATURES) + vcpkg_cmake_config_fixup( + PACKAGE_NAME arrowacero + CONFIG_PATH lib/cmake/ArrowAcero + DO_NOT_DELETE_PARENT_CONFIG_PATH + ) +endif() + +if("parquet" IN_LIST FEATURES) + vcpkg_cmake_config_fixup( + PACKAGE_NAME parquet + CONFIG_PATH lib/cmake/Parquet + DO_NOT_DELETE_PARENT_CONFIG_PATH + ) +endif() +vcpkg_cmake_config_fixup(CONFIG_PATH lib/cmake/Arrow) + +file(INSTALL "${CMAKE_CURRENT_LIST_DIR}/usage" DESTINATION "${CURRENT_PACKAGES_DIR}/share/${PORT}") +if("parquet" IN_LIST FEATURES) + file(READ "${CMAKE_CURRENT_LIST_DIR}/usage-parquet" usage-parquet) + file(APPEND "${CURRENT_PACKAGES_DIR}/share/${PORT}/usage" "${usage-parquet}") +endif() +if("dataset" IN_LIST FEATURES) + file(READ "${CMAKE_CURRENT_LIST_DIR}/usage-dataset" usage-dataset) + file(APPEND "${CURRENT_PACKAGES_DIR}/share/${PORT}/usage" "${usage-dataset}") +endif() +if("acero" IN_LIST FEATURES) + file(READ "${CMAKE_CURRENT_LIST_DIR}/usage-acero" usage-acero) + file(APPEND "${CURRENT_PACKAGES_DIR}/share/${PORT}/usage" "${usage-acero}") +endif() + +if("example" IN_LIST FEATURES) + file(INSTALL "${SOURCE_PATH}/cpp/examples/minimal_build/" DESTINATION "${CURRENT_PACKAGES_DIR}/share/${PORT}/example") +endif() + +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include") +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/share") +file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/share/doc") + +vcpkg_install_copyright(FILE_LIST "${SOURCE_PATH}/LICENSE.txt" "${SOURCE_PATH}/NOTICE.txt") diff --git a/vcpkg_overlays/arrow/thrift.patch b/vcpkg_overlays/arrow/thrift.patch new file mode 100644 index 00000000000..7c81e23dba7 --- /dev/null +++ b/vcpkg_overlays/arrow/thrift.patch @@ -0,0 +1,22 @@ +diff --git a/cpp/cmake_modules/FindThriftAlt.cmake b/cpp/cmake_modules/FindThriftAlt.cmake +index f3e4902..65ceac8 100644 +--- a/cpp/cmake_modules/FindThriftAlt.cmake ++++ b/cpp/cmake_modules/FindThriftAlt.cmake +@@ -45,7 +45,7 @@ endif() + # * https://github.com/apache/thrift/pull/2725 + # * https://github.com/apache/thrift/pull/2726 + # * https://github.com/conda-forge/thrift-cpp-feedstock/issues/68 +-if(NOT WIN32) ++ + set(find_package_args "") + if(ThriftAlt_FIND_VERSION) + list(APPEND find_package_args ${ThriftAlt_FIND_VERSION}) +@@ -61,7 +61,7 @@ if(NOT WIN32) + "${THRIFT_COMPILER}") + return() + endif() +-endif() ++ + + function(extract_thrift_version) + if(ThriftAlt_INCLUDE_DIR) diff --git a/vcpkg_overlays/arrow/usage b/vcpkg_overlays/arrow/usage new file mode 100644 index 00000000000..b07f1c9c32b --- /dev/null +++ b/vcpkg_overlays/arrow/usage @@ -0,0 +1,4 @@ +The package arrow provides CMake targets: + + find_package(Arrow CONFIG REQUIRED) + target_link_libraries(main PRIVATE "$,Arrow::arrow_static,Arrow::arrow_shared>") diff --git a/vcpkg_overlays/arrow/usage-acero b/vcpkg_overlays/arrow/usage-acero new file mode 100644 index 00000000000..05c4776b797 --- /dev/null +++ b/vcpkg_overlays/arrow/usage-acero @@ -0,0 +1,3 @@ + + find_package(ArrowAcero CONFIG REQUIRED) + target_link_libraries(main PRIVATE "$,ArrowAcero::arrow_acero_static,ArrowAcero::arrow_acero_shared>") diff --git a/vcpkg_overlays/arrow/usage-dataset b/vcpkg_overlays/arrow/usage-dataset new file mode 100644 index 00000000000..8bedfb83246 --- /dev/null +++ b/vcpkg_overlays/arrow/usage-dataset @@ -0,0 +1,3 @@ + + find_package(ArrowDataset CONFIG REQUIRED) + target_link_libraries(main PRIVATE "$,ArrowDataset::arrow_dataset_static,ArrowDataset::arrow_dataset_shared>") diff --git a/vcpkg_overlays/arrow/usage-parquet b/vcpkg_overlays/arrow/usage-parquet new file mode 100644 index 00000000000..ce04039a64e --- /dev/null +++ b/vcpkg_overlays/arrow/usage-parquet @@ -0,0 +1,3 @@ + + find_package(Parquet CONFIG REQUIRED) + target_link_libraries(main PRIVATE "$,Parquet::parquet_static,Parquet::parquet_shared>") diff --git a/vcpkg_overlays/arrow/utf8proc.patch b/vcpkg_overlays/arrow/utf8proc.patch new file mode 100644 index 00000000000..eea9ca59e2e --- /dev/null +++ b/vcpkg_overlays/arrow/utf8proc.patch @@ -0,0 +1,13 @@ +diff --git a/cpp/cmake_modules/Findutf8proc.cmake b/cpp/cmake_modules/Findutf8proc.cmake +index e347414..83f2aa1 100644 +--- a/cpp/cmake_modules/Findutf8proc.cmake ++++ b/cpp/cmake_modules/Findutf8proc.cmake +@@ -30,7 +30,7 @@ if(ARROW_PACKAGE_KIND STREQUAL "vcpkg") + if(utf8proc_FIND_REQUIRED) + list(APPEND find_package_args REQUIRED) + endif() +- find_package(utf8proc NAMES unofficial-utf8proc ${find_package_args}) ++ find_package(utf8proc NAMES unofficial-utf8proc) + if(utf8proc_FOUND) + add_library(utf8proc::utf8proc ALIAS utf8proc) + return() diff --git a/vcpkg_overlays/arrow/vcpkg.json b/vcpkg_overlays/arrow/vcpkg.json new file mode 100644 index 00000000000..f467e377153 --- /dev/null +++ b/vcpkg_overlays/arrow/vcpkg.json @@ -0,0 +1,126 @@ +{ + "name": "arrow", + "version": "13.0.0", + "description": "Cross-language development platform for in-memory analytics", + "homepage": "https://arrow.apache.org", + "license": "Apache-2.0", + "supports": "x64 | (arm64 & !windows)", + "dependencies": [ + "boost-filesystem", + "boost-multiprecision", + "boost-system", + "brotli", + "bzip2", + "gflags", + "lz4", + "openssl", + "re2", + "snappy", + "thrift", + "utf8proc", + { + "name": "vcpkg-cmake", + "host": true + }, + { + "name": "vcpkg-cmake-config", + "host": true + }, + "xsimd", + "zlib", + "zstd" + ], + "default-features": [ + "csv", + "filesystem", + "json", + "parquet" + ], + "features": { + "acero": { + "description": "Acero support" + }, + "csv": { + "description": "CSV support" + }, + "cuda": { + "description": "cuda support", + "dependencies": [ + "cuda" + ] + }, + "dataset": { + "description": "Dataset support" + }, + "example": { + "description": "Install the minimal example (source code)" + }, + "filesystem": { + "description": "Filesystem support" + }, + "flight": { + "description": "Arrow Flight RPC support", + "dependencies": [ + "abseil", + "c-ares", + "grpc", + "protobuf" + ] + }, + "gcs": { + "description": "GCS support", + "dependencies": [ + { + "name": "google-cloud-cpp", + "default-features": false, + "features": [ + "storage" + ] + } + ] + }, + "jemalloc": { + "description": "jemalloc allocator", + "supports": "!windows" + }, + "json": { + "description": "JSON support", + "dependencies": [ + "rapidjson" + ] + }, + "mimalloc": { + "description": "mimalloc allocator", + "supports": "windows" + }, + "orc": { + "description": "ORC support", + "dependencies": [ + "orc" + ] + }, + "parquet": { + "description": "Parquet support", + "dependencies": [ + "rapidjson" + ] + }, + "s3": { + "description": "S3 support", + "dependencies": [ + { + "name": "aws-sdk-cpp", + "default-features": false, + "features": [ + "cognito-identity", + "config", + "identity-management", + "s3", + "sts", + "transfer" + ] + } + ] + } + } +}