Skip to content

Commit

Permalink
Merge pull request #1 from jagedn/master
Browse files Browse the repository at this point in the history
This is an initial upgrade of the plugin using an open source library (carpet) to read/write parquet files avoiding the complexity of using low level library.

Carpet can read parquet files as Map but also you can provide a java Record class as projection so the library read only field presents in the Record

Main changes:
- upgrade to nextflow 24
- use carpet lib
- refactor splitParquet
- some validation use cases
  • Loading branch information
pditommaso authored Sep 2, 2024
2 parents 4b2a4d5 + 8ab8ee2 commit c87b67b
Show file tree
Hide file tree
Showing 32 changed files with 204 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [8, 11]
java_version: [17]

steps:
- name: Environment
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ This plugin provides two new operators:

: Load each Parquet file in a source channel, emitting each row as a separate item.

Default method emits every row in the parquet file as a Map.
In case you want to read only a subset
of fields (a projection) you can provide a java Record class with the fields you want to read

```groovy
record MyRecord( int id, String name){}
splitParquet( [record: MyRecord] )
```

`toParquet( path )`

: Write each item in a source channel to a Parquet file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ java {
}

compileJava {
options.release.set(11)
options.release.set(17)
}

tasks.withType(GroovyCompile) {
sourceCompatibility = '11'
targetCompatibility = '11'
sourceCompatibility = '17'
targetCompatibility = '17'
}

tasks.withType(Test) {
Expand Down
19 changes: 6 additions & 13 deletions plugins/nf-parquet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,27 @@ sourceSets {
}

ext {
nextflowVersion = '23.04.0'
nextflowVersion = '24.04.2'
}

dependencies {
// This dependency is exported to consumers, that is to say found on their compile classpath.
compileOnly "io.nextflow:nextflow:$nextflowVersion"
compileOnly 'org.slf4j:slf4j-api:1.7.10'
compileOnly 'org.pf4j:pf4j:3.4.1'

// add here plugins depepencies
api('org.apache.parquet:parquet-avro:1.10.0')
api('org.apache.hadoop:hadoop-common:3.1.0') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
implementation 'com.jerolba:carpet-record:0.1.0'

// test configuration
testImplementation "org.codehaus.groovy:groovy:3.0.17"
testImplementation "org.codehaus.groovy:groovy-nio:3.0.17"
testImplementation "io.nextflow:nextflow:$nextflowVersion"
testImplementation ("org.codehaus.groovy:groovy-test:3.0.17") { exclude group: 'org.codehaus.groovy' }
testImplementation ("cglib:cglib-nodep:3.3.0")
testImplementation ("org.objenesis:objenesis:3.1")
testImplementation ("org.spockframework:spock-core:2.2-groovy-3.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('org.spockframework:spock-junit4:2.2-groovy-3.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('com.google.jimfs:jimfs:1.1')
testImplementation ("org.spockframework:spock-core:2.3-groovy-4.0") { exclude group: 'org.apache.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('org.spockframework:spock-junit4:2.3-groovy-4.0') { exclude group: 'org.apache.groovy'; exclude group: 'net.bytebuddy' }

testImplementation(testFixtures("io.nextflow:nextflow:$nextflowVersion"))
testImplementation(testFixtures("io.nextflow:nf-commons:$nextflowVersion"))


// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sec:module_replacement
modules {
module("commons-logging:commons-logging") { replacedBy("org.slf4j:jcl-over-slf4j") }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package nextflow.parquet

import nextflow.plugin.extension.Factory
import nextflow.plugin.extension.Function

import java.nio.file.Path

import groovy.transform.CompileStatic
Expand All @@ -12,18 +15,16 @@ import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.plugin.extension.Operator
import nextflow.plugin.extension.PluginExtensionPoint
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path as HadoopPath
import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

import com.jerolba.carpet.CarpetReader
import com.jerolba.carpet.CarpetWriter
import org.apache.parquet.hadoop.ParquetFileWriter

/**
* Implements extensions for reading and writing Parquet files.
*
* @author Ben Sherman <[email protected]>
* @author Jorge Aguilera <[email protected]>
*/
@Slf4j
@CompileStatic
Expand All @@ -37,14 +38,14 @@ class ParquetExtension extends PluginExtensionPoint {
}

/**
* Load each Parquet file in a source channel, emitting each row as a separate item.
* Load a Parquet file emitting each row as a separate item.
*
* @param path
*/
@Operator
DataflowWriteChannel splitParquet(DataflowReadChannel source) {
DataflowWriteChannel splitParquet(DataflowReadChannel source, Map params=[:]) {
final target = CH.create()
final splitter = new ParquetSplitter(target)
final splitter = new ParquetSplitter(target, params)

final onNext = { it -> splitter.apply(it) }
final onComplete = { target << Channel.STOP }
Expand All @@ -55,66 +56,40 @@ class ParquetExtension extends PluginExtensionPoint {

class ParquetSplitter {
private DataflowWriteChannel target
private Class<Record> clazz

ParquetSplitter(DataflowWriteChannel target) {
ParquetSplitter(DataflowWriteChannel target, Map params) {
this.target = target
if( params.record ) {
if (!(params.record instanceof Class<Record>)) {
throw new IllegalArgumentException("A Record.class is required. Class provided $params.record")
}
this.clazz = params.record as Class<Record>
}
}

void apply(Object source) {
try {
log.debug "Start reading $source, with projection ${clazz ?: 'raw'}"
// create parquet reader
final reader = ParquetFileReader.open(HadoopInputFile.fromPath(toHadoopPath(source), new Configuration()))
final schema = reader.getFooter().getFileMetaData().getSchema()
final fields = schema.getFields()

// read each row from parquet file
def pages = null
try {
while( (pages=reader.readNextRowGroup()) != null ) {
final rows = pages.getRowCount()
final columnIO = new ColumnIOFactory().getColumnIO(schema)
final recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))

for( int i = 0; i < rows; i++ )
target << fetchRow(recordReader.read())
}
}
finally {
reader.close()
final reader = new CarpetReader(toFile(source), clazz ?: Map)
for (def record : reader) {
target << record
}
}
catch( IOException e ) {
throw new IllegalStateException("Error while reading Parquet file - cause: ${e.message ?: e}", e)
}
}

private HadoopPath toHadoopPath(Object source) {
if( source instanceof String )
new HadoopPath((String)source)
else if( source instanceof Path )
new HadoopPath(((Path)source).toUriString())
else
throw new IllegalArgumentException("Invalid input for splitParquet operator: ${source}")
}

private Map fetchRow(Group group) {
def result = [:]

final fieldCount = group.getType().getFieldCount()
for( int field = 0; field < fieldCount; field++ ) {
final valueCount = group.getFieldRepetitionCount(field)
final fieldType = group.getType().getType(field)
final fieldName = fieldType.getName()

for( int index = 0; index < valueCount; index++ )
if( fieldType.isPrimitive() ) {
println "${fieldName} ${group.getValueToString(field, index)}"
result[fieldName] = group.getValueToString(field, index)
}
private File toFile(Object source){
return switch( source ){
case {it instanceof String}->Path.of(source as String).toFile()
case {it instanceof Path} -> (source as Path).toFile()
default->throw new IllegalArgumentException("Invalid input for splitParquet operator: ${source}")
}

return result
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@
package nextflow.parquet

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper

/**
* Implements the nf-parquet plugin entry point
*
* @author Ben Sherman <[email protected]>
* @author Jorge Aguilera <[email protected]>
*/
@CompileStatic
@Slf4j
class ParquetPlugin extends BasePlugin {

ParquetPlugin(PluginWrapper wrapper) {
super(wrapper)
initPlugin()
}

private void initPlugin(){
log.debug "${this.class.name} plugin initialized"
}
}
2 changes: 1 addition & 1 deletion plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ Plugin-Id: nf-parquet
Plugin-Version: 0.1.0
Plugin-Class: nextflow.parquet.ParquetPlugin
Plugin-Provider: nextflow
Plugin-Requires: >=22.10.0
Plugin-Requires: >=24.04.2
98 changes: 98 additions & 0 deletions plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package nextflow.parquet

import nextflow.Channel
import nextflow.plugin.Plugins
import nextflow.plugin.TestPluginDescriptorFinder
import nextflow.plugin.TestPluginManager
import nextflow.plugin.extension.PluginExtensionProvider
import org.pf4j.PluginDescriptorFinder
import spock.lang.Shared
import test.Dsl2Spec
import test.MockScriptRunner

import java.nio.file.Files
import java.nio.file.Path
import java.util.jar.Manifest

class PluginTest extends Dsl2Spec{

@Shared String pluginsMode

def setup() {
// reset previous instances
PluginExtensionProvider.reset()
// this need to be set *before* the plugin manager class is created
pluginsMode = System.getProperty('pf4j.mode')
System.setProperty('pf4j.mode', 'dev')
// the plugin root should
def root = Path.of('.').toAbsolutePath().normalize()
def manager = new TestPluginManager(root){
@Override
protected PluginDescriptorFinder createPluginDescriptorFinder() {
return new TestPluginDescriptorFinder(){
@Override
protected Manifest readManifestFromDirectory(Path pluginPath) {
def manifestPath= getManifestPath(pluginPath)
final input = Files.newInputStream(manifestPath)
return new Manifest(input)
}
protected Path getManifestPath(Path pluginPath) {
return pluginPath.resolve('build/resources/main/META-INF/MANIFEST.MF')
}
}
}
}
Plugins.init(root, 'dev', manager)
}

def cleanup() {
Plugins.stop()
PluginExtensionProvider.reset()
pluginsMode ? System.setProperty('pf4j.mode',pluginsMode) : System.clearProperty('pf4j.mode')
}

def 'should starts' () {
when:
def SCRIPT = '''
channel.of('hi!')
'''
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val == 'hi!'
result.val == Channel.STOP
}

def 'should parse a parquet file in raw mode'(){
when:
def path = getClass().getResource('/test.parquet').toURI().path
def SCRIPT = """
include {splitParquet} from 'plugin/nf-parquet'
channel.fromPath("$path").splitParquet()
""".toString()
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val == [id:1, name:"test2", sizell:10, value:0.010838246310055144, percentile:0.28001529169191186]
result.val == Channel.STOP
}

def 'should parse a projection'(){
when:
def path = getClass().getResource('/test.parquet').toURI().path
def SCRIPT = """
include {splitParquet} from 'plugin/nf-parquet'
record SingleRecord(long id, String name) {
}
channel.fromPath("$path").splitParquet( [record:SingleRecord] )
""".toString()
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val.id == 1
result.val == Channel.STOP
}

}
Binary file added plugins/nf-parquet/src/testResources/test.parquet
Binary file not shown.
Binary file added validation/lib/module-info.class
Binary file not shown.
Binary file added validation/lib/myrecords/Address.class
Binary file not shown.
Binary file added validation/lib/myrecords/CustomRecord.class
Binary file not shown.
Binary file added validation/lib/myrecords/Job.class
Binary file not shown.
Binary file added validation/lib/myrecords/Person.class
Binary file not shown.
Binary file added validation/lib/myrecords/SingleAddress.class
Binary file not shown.
Binary file added validation/lib/myrecords/SinglePerson.class
Binary file not shown.
Binary file added validation/lib/myrecords/SingleRecord.class
Binary file not shown.
4 changes: 4 additions & 0 deletions validation/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plugins {
id "nf-parquet@${System.getenv("PARQUET_PLUGIN_VERSION") ?: "latest"}"
}

Binary file added validation/presidents.parquet
Binary file not shown.
7 changes: 7 additions & 0 deletions validation/read.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include { splitParquet } from 'plugin/nf-parquet'

record SingleRecord(long id, String name) {
}

channel.fromPath("test*.parquet").splitParquet( record: SingleRecord)
| view
6 changes: 6 additions & 0 deletions validation/read_complex.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include { splitParquet } from 'plugin/nf-parquet'

import myrecords.*

channel.fromPath("presidents.parquet").splitParquet()
| view
6 changes: 6 additions & 0 deletions validation/read_raw.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include { splitParquet } from 'plugin/nf-parquet'

import myrecords.*

channel.fromPath("*.parquet").splitParquet()
| view
3 changes: 3 additions & 0 deletions validation/schemas/myrecords/Address.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package myrecords;

record Address(String street, String zip, String city) { }
4 changes: 4 additions & 0 deletions validation/schemas/myrecords/CustomRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package myrecords;
record CustomRecord(long id, String name, int sizell, double value, double percentile) {

}
3 changes: 3 additions & 0 deletions validation/schemas/myrecords/Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package myrecords;

record Job(String company, String position, int years){ }
Loading

0 comments on commit c87b67b

Please sign in to comment.