Skip to content

Commit

Permalink
Merge pull request #17675 from jackdelv/parquet-dev
Browse files Browse the repository at this point in the history
HPCC-30091 Add Support for the Parquet File Format

Reviewed-By: Dan S. Camper <[email protected]>
Reviewed-by: Gordon Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Sep 28, 2023
2 parents 2c10656 + 0850494 commit a9d408d
Show file tree
Hide file tree
Showing 29 changed files with 3,884 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ if ( PLUGIN )
HPCC_ADD_SUBDIRECTORY (plugins/h3 "H3")
HPCC_ADD_SUBDIRECTORY (plugins/nlp "NLP")
HPCC_ADD_SUBDIRECTORY (plugins/mongodb "MONGODBEMBED")
HPCC_ADD_SUBDIRECTORY (plugins/parquet "PARQUETEMBED")
elseif ( NOT MAKE_DOCS_ONLY )
HPCC_ADD_SUBDIRECTORY (system)
HPCC_ADD_SUBDIRECTORY (initfiles)
Expand Down
1 change: 1 addition & 0 deletions cmake_modules/plugins.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ set(PLUGINS_LIST
MONGODBEMBED
MYSQLEMBED
NLP
PARQUETEMBED
REDIS
REMBED
SQLITE3EMBED
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_subdirectory (exampleplugin)
add_subdirectory (couchbase)
add_subdirectory (sqs)
add_subdirectory (mongodb)
add_subdirectory (parquet)
IF ( INCLUDE_EE_PLUGINS )
add_subdirectory (eeproxies)
ENDIF()
Expand Down
120 changes: 120 additions & 0 deletions plugins/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
##############################################################################

# HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##############################################################################

# Component: parquetembed

#############################################################
# Description:
# -----------
# Cmake Input File for parquetembed
#############################################################

project(parquetembed)
message("CMAKE Version: ${CMAKE_VERSION}")

if(PARQUETEMBED)
ADD_PLUGIN(parquetembed)
if(MAKE_PARQUETEMBED)
find_package(Arrow CONFIG REQUIRED)
find_package(Parquet CONFIG REQUIRED)
find_package(ArrowDataset CONFIG REQUIRED)

set(
SRCS
parquetembed.cpp
)

INCLUDE_DIRECTORIES(
${HPCC_SOURCE_DIR}/esp/platform
${HPCC_SOURCE_DIR}/system/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/nbcd
${HPCC_SOURCE_DIR}/common/deftype
${HPCC_SOURCE_DIR}/system/jlib
${HPCC_SOURCE_DIR}/roxie/roxiemem
)

HPCC_ADD_LIBRARY(parquetembed SHARED ${SRCS})

install(
TARGETS parquetembed
DESTINATION plugins CALC_DEPS
)

install(
FILES ${LIBARROW_LIB_REAL}
DESTINATION ${LIB_DIR} CALC_DEPS
PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE
COMPONENT Runtime)

install(
FILES ${LIBARROW_LIB} ${LIBARROW_LIB_ABI}
DESTINATION ${LIB_DIR} CALC_DEPS
COMPONENT Runtime)

install(
FILES ${LIBPARQUET_LIB_REAL}
DESTINATION ${LIB_DIR} CALC_DEPS
PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE
COMPONENT Runtime)

install(
FILES ${LIBPARQUET_LIB} ${LIBPARQUET_LIB_ABI}
DESTINATION ${LIB_DIR} CALC_DEPS
COMPONENT Runtime)

install(
FILES ${LIBARRORACERO_LIB_REAL}
DESTINATION ${LIB_DIR} CALC_DEPS
PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE
COMPONENT Runtime)

install(
FILES ${LIBARRORACERO_LIB} ${LIBARROWDATASET_LIB_ABI}
DESTINATION ${LIB_DIR} CALC_DEPS
COMPONENT Runtime)

install(
FILES ${LIBARROWDATASET_LIB_REAL}
DESTINATION ${LIB_DIR} CALC_DEPS
PERMISSIONS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE
COMPONENT Runtime)

install(
FILES ${LIBARROWDATASET_LIB} ${LIBARROWDATASET_LIB_ABI}
DESTINATION ${LIB_DIR} CALC_DEPS
COMPONENT Runtime)

target_link_libraries(
parquetembed
eclrtl
jlib
Arrow::arrow_shared
Parquet::parquet_shared
ArrowDataset::arrow_dataset_shared
)
endif()
endif()

if(PLATFORM OR CLIENTTOOLS_ONLY)
install(
FILES ${CMAKE_CURRENT_SOURCE_DIR}/parquet.ecllib
DESTINATION plugins
COMPONENT Runtime
)
endif()
59 changes: 59 additions & 0 deletions plugins/parquet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Parquet Plugin for HPCC-Systems

The Parquet Plugin for HPCC-Systems is a powerful tool designed to facilitate the fast transfer of data stored in a columnar format to the ECL (Enterprise Control Language) data format. This plugin provides seamless integration between Parquet files and HPCC-Systems, enabling efficient data processing and analysis.

## Installation

The plugin uses vcpkg and can be installed by creating a separate build directory from the platform and running the following commands:
```
cd ./parquet-build
cmake -DPARQUETEMBED=ON ../HPCC-Platform
make -j4 package
sudo dpkg -i ./hpccsystems-plugin-parquetembed_<version>.deb
```

## Documentation

[Doxygen](https://www.doxygen.nl/index.html) can be used to create nice HTML documentation for the code. Call/caller graphs are also generated for functions if you have [dot](https://www.graphviz.org/download/) installed and available on your path.

Assuming `doxygen` is on your path, you can build the documentation via:
```
cd plugins/parquet
doxygen Doxyfile
```

## Features

The Parquet Plugin offers the following main functions:

### Regular Files

#### 1. Reading Parquet Files

The Read function allows ECL programmers to create an ECL dataset from both regular and partitioned Parquet files. It leverages the Apache Arrow interface for Parquet to efficiently stream data from ECL to the plugin, ensuring optimized data transfer.

```
dataset := Read(layout, '/source/directory/data.parquet');
```

#### 2. Writing Parquet Files

The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data.

```
Write(inDataset, '/output/directory/data.parquet');
```

### Partitioned Files (Tabular Datasets)

#### 1. Reading Partitioned Files

The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files.

```
github_dataset := ReadPartition(layout, '/source/directory/partioned_dataset');
```

#### 2. Writing Partitioned Files

For partitioning parquet files all you need to do is run the Write function on Thor rather than hthor and each worker will create its own parquet file.
20 changes: 20 additions & 0 deletions plugins/parquet/examples/blob_test.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
IMPORT STD;
IMPORT PARQUET;

imageRecord := RECORD
STRING filename;
DATA image;
UNSIGNED8 RecPos{virtual(fileposition)};
END;

#IF(0)
in_image_data := DATASET('~parquet::image', imageRecord, FLAT);
OUTPUT(in_image_data, NAMED('IN_IMAGE_DATA'));
PARQUET.Write(in_image_data, '/datadrive/dev/test_data/test_image.parquet');

#END;

#IF(1)
out_image_data := Read({DATA image}, '/datadrive/dev/test_data/test_image.parquet');
OUTPUT(out_image_data, NAMED('OUT_IMAGE_DATA'));
#END
29 changes: 29 additions & 0 deletions plugins/parquet/examples/create_partition.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
IMPORT STD;
IMPORT Parquet;

#OPTION('outputLimit', 2000);
#OPTION('pickBestEngine', FALSE);

layout := RECORD
STRING actor_login;
INTEGER actor_id;
INTEGER comment_id;
STRING comment;
STRING repo;
STRING language;
STRING author_login;
INTEGER author_id;
INTEGER pr_id;
INTEGER c_id;
INTEGER commit_date;
END;

#IF(0)
github_dataset := Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet');
Write(DISTRIBUTE(github_dataset, SKEW(.05)), '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet');
#END

#IF(1)
github_dataset := ReadPartition(layout, '/datadrive/dev/test_data/hpcc_gh_partition');
OUTPUT(COUNT(github_dataset), NAMED('GITHUB_PARTITION'));
#END
17 changes: 17 additions & 0 deletions plugins/parquet/examples/decimal_test.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
IMPORT STD;
IMPORT PARQUET;


layout := RECORD
DECIMAL5_2 height;
END;

decimal_data := DATASET([{152.25}, {125.56}], layout);

#IF(1)
Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet');
#END

#IF(1)
Read(layout, '/datadrive/dev/test_data/decimal.parquet');
#END
29 changes: 29 additions & 0 deletions plugins/parquet/examples/large_io.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
IMPORT STD;
IMPORT Parquet;

#OPTION('outputLimit', 2000);
#OPTION('pickBestEngine', FALSE);

layout := RECORD
STRING actor_login;
INTEGER actor_id;
INTEGER comment_id;
STRING comment;
STRING repo;
STRING language;
STRING author_login;
INTEGER author_id;
INTEGER pr_id;
INTEGER c_id;
INTEGER commit_date;
END;

#IF(0)
csv_data := DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1)));
Write(csv_data, '/datadrive/dev/test_data/ghtorrent-2019-02-04.parquet');
#END

#IF(1)
parquet_data := Read(layout, '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet');
OUTPUT(COUNT(parquet_data), NAMED('ghtorrent_2019_01_07'));
#END
30 changes: 30 additions & 0 deletions plugins/parquet/examples/nested_io.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
IMPORT Parquet;

friendsRec :=RECORD
UNSIGNED4 age;
INTEGER2 friends;
SET OF STRING friendsList;
END;

childRec := RECORD
friendsRec friends;
REAL height;
REAL weight;
END;

parentRec := RECORD
UTF8_de firstname;
UTF8_de lastname;
childRec details;
END;
nested_dataset := DATASET([{U'J\353ck', U'\353ackson', { {22, 2, ['James', 'Jonathon']}, 5.9, 600}}, {'John', 'Johnson', { {17, 0, []}, 6.3, 18}},
{'Amy', U'Amy\353on', { {59, 1, ['Andy']}, 3.9, 59}}, {'Grace', U'Graceso\353', { {11, 3, ['Grayson', 'Gina', 'George']}, 7.9, 100}}], parentRec);

#IF(1)
Write(nested_dataset, '/datadrive/dev/test_data/nested.parquet');
#END

#IF(1)
read_in := Read(parentRec, '/datadrive/dev/test_data/nested.parquet');
OUTPUT(read_in, NAMED('NESTED_PARQUET_IO'));
#END
Loading

0 comments on commit a9d408d

Please sign in to comment.