diff --git a/plugins/parquet/README.md b/plugins/parquet/README.md index d3b3802d195..241d89d7a51 100644 --- a/plugins/parquet/README.md +++ b/plugins/parquet/README.md @@ -33,7 +33,7 @@ The Parquet Plugin offers the following main functions: The Read function allows ECL programmers to create an ECL dataset from both regular and partitioned Parquet files. It leverages the Apache Arrow interface for Parquet to efficiently stream data from ECL to the plugin, ensuring optimized data transfer. ``` -dataset := Read(layout, '/source/directory/data.parquet'); +dataset := ParquetIO.Read(layout, '/source/directory/data.parquet'); ``` #### 2. Writing Parquet Files @@ -41,7 +41,7 @@ dataset := Read(layout, '/source/directory/data.parquet'); The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. ``` -Write(inDataset, '/output/directory/data.parquet'); +ParquetIO.Write(inDataset, '/output/directory/data.parquet'); ``` ### Partitioned Files (Tabular Datasets) @@ -51,7 +51,7 @@ Write(inDataset, '/output/directory/data.parquet'); The Read Partition function extends the Read functionality by enabling ECL programmers to read from partitioned Parquet files. ``` -github_dataset := ReadPartition(layout, '/source/directory/partioned_dataset'); +github_dataset := ParquetIO.ReadPartition(layout, '/source/directory/partioned_dataset'); ``` #### 2. Writing Partitioned Files diff --git a/plugins/parquet/examples/blob_test.ecl b/plugins/parquet/examples/blob_test.ecl index 04fde97f2e3..3d6abe70480 100644 --- a/plugins/parquet/examples/blob_test.ecl +++ b/plugins/parquet/examples/blob_test.ecl @@ -10,11 +10,11 @@ END; #IF(0) in_image_data := DATASET('~parquet::image', imageRecord, FLAT); OUTPUT(in_image_data, NAMED('IN_IMAGE_DATA')); -PARQUET.Write(in_image_data, '/datadrive/dev/test_data/test_image.parquet'); +ParquetIO.Write(in_image_data, '/datadrive/dev/test_data/test_image.parquet'); #END; #IF(1) -out_image_data := Read({DATA image}, '/datadrive/dev/test_data/test_image.parquet'); +out_image_data := ParquetIO.Read({DATA image}, '/datadrive/dev/test_data/test_image.parquet'); OUTPUT(out_image_data, NAMED('OUT_IMAGE_DATA')); #END \ No newline at end of file diff --git a/plugins/parquet/examples/create_partition.ecl b/plugins/parquet/examples/create_partition.ecl index 509bac03cbd..00b849c21bc 100755 --- a/plugins/parquet/examples/create_partition.ecl +++ b/plugins/parquet/examples/create_partition.ecl @@ -19,11 +19,11 @@ layout := RECORD END; #IF(0) -github_dataset := Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); +github_dataset := ParquetIO.Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); Write(DISTRIBUTE(github_dataset, SKEW(.05)), '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet'); #END #IF(1) -github_dataset := ReadPartition(layout, '/datadrive/dev/test_data/hpcc_gh_partition'); -OUTPUT(COUNT(github_dataset), NAMED('GITHUB_PARTITION')); +github_dataset := ParquetIO.ReadPartition(layout, '/datadrive/dev/test_data/hpcc_gh_partition'); +OUTPUT(CHOOSEN(github_dataset, 10000), NAMED('GITHUB_PARTITION')); #END \ No newline at end of file diff --git a/plugins/parquet/examples/decimal_test.ecl b/plugins/parquet/examples/decimal_test.ecl index 507a732a3a3..39a7cb0cf84 100644 --- a/plugins/parquet/examples/decimal_test.ecl +++ b/plugins/parquet/examples/decimal_test.ecl @@ -9,9 +9,9 @@ END; decimal_data := DATASET([{152.25}, {125.56}], layout); #IF(1) -Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet'); +ParquetIO.Write(decimal_data, '/datadrive/dev/test_data/decimal.parquet'); #END #IF(1) -Read(layout, '/datadrive/dev/test_data/decimal.parquet'); +ParquetIO.Read(layout, '/datadrive/dev/test_data/decimal.parquet'); #END diff --git a/plugins/parquet/examples/large_io.ecl b/plugins/parquet/examples/large_io.ecl index d08fbadec53..33a9054f8f5 100644 --- a/plugins/parquet/examples/large_io.ecl +++ b/plugins/parquet/examples/large_io.ecl @@ -20,10 +20,10 @@ END; #IF(0) csv_data := DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1))); -Write(csv_data, '/datadrive/dev/test_data/ghtorrent-2019-02-04.parquet'); +ParquetIO.Write(csv_data, '/datadrive/dev/test_data/ghtorrent-2019-02-04.parquet'); #END #IF(1) -parquet_data := Read(layout, '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet'); +parquet_data := ParquetIO.Read(layout, '/datadrive/dev/test_data/hpcc_gh_partition/data.parquet'); OUTPUT(COUNT(parquet_data), NAMED('ghtorrent_2019_01_07')); #END \ No newline at end of file diff --git a/plugins/parquet/examples/nested_io.ecl b/plugins/parquet/examples/nested_io.ecl index 411be93e44a..fe0a0785cc6 100644 --- a/plugins/parquet/examples/nested_io.ecl +++ b/plugins/parquet/examples/nested_io.ecl @@ -21,10 +21,10 @@ nested_dataset := DATASET([{U'J\353ck', U'\353ackson', { {22, 2, ['James', 'Jona {'Amy', U'Amy\353on', { {59, 1, ['Andy']}, 3.9, 59}}, {'Grace', U'Graceso\353', { {11, 3, ['Grayson', 'Gina', 'George']}, 7.9, 100}}], parentRec); #IF(1) -Write(nested_dataset, '/datadrive/dev/test_data/nested.parquet'); +ParquetIO.Write(nested_dataset, '/datadrive/dev/test_data/nested.parquet'); #END #IF(1) -read_in := Read(parentRec, '/datadrive/dev/test_data/nested.parquet'); +read_in := ParquetIO.Read(parentRec, '/datadrive/dev/test_data/nested.parquet'); OUTPUT(read_in, NAMED('NESTED_PARQUET_IO')); #END \ No newline at end of file diff --git a/plugins/parquet/examples/null_test.ecl b/plugins/parquet/examples/null_test.ecl index 0357547d84d..79853c32fc6 100644 --- a/plugins/parquet/examples/null_test.ecl +++ b/plugins/parquet/examples/null_test.ecl @@ -125,7 +125,7 @@ empty_record := end; empty_dataset := dataset([{-2, ' ', '-2 ', 'NA ', 0, 0, ' ', ' ', 0, ' ', ' ', 0, 0, 0, ' ', ' ', 0, ' ', ' ', 0, 0, 0, ' ', ' ', 0, 0, 0, ' ', ' ', 0, 0, 0, 0, ' ', 'N', ' ', 20151105, 20151105, ' ', ' ', 0, 5, 0, '0', '0', '0', 0, 'N', 0, ' ', ' ', ' ', ' ', ' ', ' ', 0, 0, ' ', '0', '\123\000\000\000\123'}], empty_record); -Write(empty_dataset, '/datadrive/dev/test_data/empty_parquet.parquet'); +ParquetIO.Write(empty_dataset, '/datadrive/dev/test_data/empty_parquet.parquet'); -empty_data_in := Read(empty_record, '/datadrive/dev/test_data/empty_parquet.parquet'); +empty_data_in := ParquetIO.Read(empty_record, '/datadrive/dev/test_data/empty_parquet.parquet'); OUTPUT(empty_data_in); \ No newline at end of file diff --git a/plugins/parquet/examples/parquet_read_write.ecl b/plugins/parquet/examples/parquet_read_write.ecl index b9fbe9c5f7f..3b3dcdc84b7 100644 --- a/plugins/parquet/examples/parquet_read_write.ecl +++ b/plugins/parquet/examples/parquet_read_write.ecl @@ -18,9 +18,9 @@ layout := RECORD INTEGER commit_date; END; -parquet_data := Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); +parquet_data := ParquetIO.Read(layout, '/datadrive/dev/test_data/ghtorrent-2019-01-07.parquet'); part_a := CHOOSEN(parquet_data, 1000); OUTPUT(CHOOSEN(DATASET('~parquet::large::ghtorrent-2019-02-04.csv', layout, CSV(HEADING(1))), 1000), NAMED('CSV_GHTORRENT')); OUTPUT(part_a, NAMED('GHTORRENT')); -Write(part_a, '/datadrive/dev/test_data/github_partition/part_a.parquet'); \ No newline at end of file +ParquetIO.Write(part_a, '/datadrive/dev/test_data/github_partition/part_a.parquet'); \ No newline at end of file diff --git a/plugins/parquet/examples/read_file.ecl b/plugins/parquet/examples/read_file.ecl index f14849b7e9a..8d3d157af4f 100644 --- a/plugins/parquet/examples/read_file.ecl +++ b/plugins/parquet/examples/read_file.ecl @@ -1,3 +1,3 @@ IMPORT Parquet; -Parquet.Read({INTEGER n}, '/var/lib/HPCCSystems/mydropzone/sample.parquet'); +ParquetIO.Read({INTEGER n}, '/var/lib/HPCCSystems/mydropzone/sample.parquet'); diff --git a/plugins/parquet/examples/simple_io.ecl b/plugins/parquet/examples/simple_io.ecl index e54dd08c457..042892fa837 100644 --- a/plugins/parquet/examples/simple_io.ecl +++ b/plugins/parquet/examples/simple_io.ecl @@ -8,7 +8,7 @@ simpleRec := RECORD END; simpleDataset := DATASET([{1, 2.4356, U'de\3531', 'Jack'}, {1, 2.4356, U'de\3531', 'Jack'}, {2, 4.8937, U'as\352df', 'John'}, {3, 1.8573, 'nj\351vf', 'Jane'}, {4, 9.1235, U'ds\354fg', 'Jill'}, {5, 6.3297, U'po\355gm', 'Jim'}], simpleRec); -Write(simpleDataset, '/datadrive/dev/test_data/simple.parquet'); +ParquetIO.Write(simpleDataset, '/datadrive/dev/test_data/simple.parquet'); -read_in := Read(simpleRec, '/datadrive/dev/test_data/simple.parquet'); +read_in := ParquetIO.Read(simpleRec, '/datadrive/dev/test_data/simple.parquet'); OUTPUT(read_in, NAMED('SIMPLE_PARQUET_IO')); \ No newline at end of file diff --git a/plugins/parquet/examples/write_file.ecl b/plugins/parquet/examples/write_file.ecl index f64e7347ea0..e012a6b679b 100644 --- a/plugins/parquet/examples/write_file.ecl +++ b/plugins/parquet/examples/write_file.ecl @@ -11,4 +11,4 @@ ds := DATASET DISTRIBUTED ); -Parquet.Write(ds, '/var/lib/HPCCSystems/mydropzone/sample.parquet'); +ParquetIO.Write(ds, '/var/lib/HPCCSystems/mydropzone/sample.parquet'); diff --git a/plugins/parquet/parquet.ecllib b/plugins/parquet/parquet.ecllib index b2c9f688bc8..07529024153 100644 --- a/plugins/parquet/parquet.ecllib +++ b/plugins/parquet/parquet.ecllib @@ -19,29 +19,32 @@ SHARED ParquetService := SERVICE : plugin('parquetembed') BOOLEAN getEmbedContext():cpp, pure, namespace='parquetembed', entrypoint='getEmbedContext', prototype='IEmbedContext* getEmbedContext()'; END; -EXPORT Read(resultLayout, filePath) := FUNCTIONMACRO - STREAMED DATASET(resultLayout) _DoParquetRead() := EMBED(parquet : activity, option('read'), location(filePath)) - ENDEMBED; - RETURN _DoParquetRead(); -ENDMACRO; - -EXPORT ReadPartition(resultLayout, basePath) := FUNCTIONMACRO - STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readpartition'), location(basePath)) - ENDEMBED; - RETURN _DoParquetReadPartition(); -ENDMACRO; - -EXPORT Write(outDS, filePath) := MACRO - _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath)) - ENDEMBED; - _doParquetWrite(outDS); -ENDMACRO; - -EXPORT WritePartition(outDS, outRows = 100000, basePath) := MACRO - _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows)) - ENDEMBED; - _DoParquetWritePartition(outDS) -ENDMACRO; +EXPORT ParquetIO := MODULE + + EXPORT Read(resultLayout, filePath) := FUNCTIONMACRO + LOCAL STREAMED DATASET(resultLayout) _DoParquetRead() := EMBED(parquet : activity, option('read'), location(filePath)) + ENDEMBED; + RETURN _DoParquetRead(); + ENDMACRO; + + EXPORT ReadPartition(resultLayout, basePath) := FUNCTIONMACRO + LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readpartition'), location(basePath)) + ENDEMBED; + RETURN _DoParquetReadPartition(); + ENDMACRO; + + EXPORT Write(outDS, filePath) := MACRO + LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath)) + ENDEMBED; + _doParquetWrite(outDS); + ENDMACRO; + + EXPORT WritePartition(outDS, outRows = 100000, basePath) := MACRO + LOCAL _DoParquetWritePartition(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('writepartition'), destination(basePath), MaxRowSize(outRows)) + ENDEMBED; + _DoParquetWritePartition(outDS) + ENDMACRO; +END; EXPORT getEmbedContext := ParquetService.getEmbedContext; EXPORT BOOLEAN supportsImport := FALSE;