From 628a6029f5b247a65d5de02bdddf3715e152c036 Mon Sep 17 00:00:00 2001 From: jorge aguilera Date: Fri, 30 Aug 2024 16:38:35 +0200 Subject: [PATCH 1/4] refactor of initial plugin - upgrade to nextflow 24 - use carpet lib - refactor splitParquet - some validation use cases --- .github/workflows/build.yml | 2 +- ....nextflow.groovy-common-conventions.gradle | 6 +- plugins/nf-parquet/build.gradle | 20 ++--- .../nextflow/parquet/ParquetExtension.groovy | 83 ++++++------------ .../nextflow/parquet/ParquetPlugin.groovy | 8 ++ .../src/resources/META-INF/MANIFEST.MF | 2 +- .../test/nextflow/parquet/PluginTest.groovy | 66 ++++++++++++++ validation/lib/module-info.class | Bin 0 -> 157 bytes validation/lib/myrecords/Address.class | Bin 0 -> 1368 bytes validation/lib/myrecords/CustomRecord.class | Bin 0 -> 1588 bytes validation/lib/myrecords/Job.class | Bin 0 -> 1349 bytes validation/lib/myrecords/Person.class | Bin 0 -> 1416 bytes validation/lib/myrecords/SingleAddress.class | Bin 0 -> 1167 bytes validation/lib/myrecords/SinglePerson.class | Bin 0 -> 1318 bytes validation/lib/myrecords/SingleRecord.class | Bin 0 -> 1268 bytes validation/nextflow.config | 4 + validation/presidents.parquet | Bin 0 -> 1674 bytes validation/read.nf | 6 ++ validation/read_complex.nf | 6 ++ validation/read_raw.nf | 6 ++ validation/schemas/myrecords/Address.java | 3 + .../schemas/myrecords/CustomRecord.java | 4 + validation/schemas/myrecords/Job.java | 3 + validation/schemas/myrecords/Person.java | 3 + .../schemas/myrecords/SingleAddress.java | 3 + .../schemas/myrecords/SinglePerson.java | 3 + .../schemas/myrecords/SingleRecord.java | 3 + validation/schemas/myrecords/module-info.java | 3 + validation/test.parquet | Bin 0 -> 1063 bytes validation/test2.parquet | Bin 0 -> 1114 bytes 30 files changed, 161 insertions(+), 73 deletions(-) create mode 100644 plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy create mode 100644 validation/lib/module-info.class create mode 100644 validation/lib/myrecords/Address.class create mode 100644 validation/lib/myrecords/CustomRecord.class create mode 100644 validation/lib/myrecords/Job.class create mode 100644 validation/lib/myrecords/Person.class create mode 100644 validation/lib/myrecords/SingleAddress.class create mode 100644 validation/lib/myrecords/SinglePerson.class create mode 100644 validation/lib/myrecords/SingleRecord.class create mode 100644 validation/nextflow.config create mode 100644 validation/presidents.parquet create mode 100644 validation/read.nf create mode 100644 validation/read_complex.nf create mode 100644 validation/read_raw.nf create mode 100644 validation/schemas/myrecords/Address.java create mode 100644 validation/schemas/myrecords/CustomRecord.java create mode 100644 validation/schemas/myrecords/Job.java create mode 100644 validation/schemas/myrecords/Person.java create mode 100644 validation/schemas/myrecords/SingleAddress.java create mode 100644 validation/schemas/myrecords/SinglePerson.java create mode 100644 validation/schemas/myrecords/SingleRecord.java create mode 100644 validation/schemas/myrecords/module-info.java create mode 100644 validation/test.parquet create mode 100644 validation/test2.parquet diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ef3fe4e..cbf11ec 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - java_version: [8, 11] + java_version: [17] steps: - name: Environment diff --git a/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle b/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle index 9854a8d..85944fd 100644 --- a/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle +++ b/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle @@ -19,12 +19,12 @@ java { } compileJava { - options.release.set(11) + options.release.set(17) } tasks.withType(GroovyCompile) { - sourceCompatibility = '11' - targetCompatibility = '11' + sourceCompatibility = '17' + targetCompatibility = '17' } tasks.withType(Test) { diff --git a/plugins/nf-parquet/build.gradle b/plugins/nf-parquet/build.gradle index c2af502..31a11ad 100644 --- a/plugins/nf-parquet/build.gradle +++ b/plugins/nf-parquet/build.gradle @@ -50,7 +50,7 @@ sourceSets { } ext { - nextflowVersion = '23.04.0' + nextflowVersion = '24.04.2' } dependencies { @@ -58,26 +58,18 @@ dependencies { compileOnly "io.nextflow:nextflow:$nextflowVersion" compileOnly 'org.slf4j:slf4j-api:1.7.10' compileOnly 'org.pf4j:pf4j:3.4.1' + // add here plugins depepencies - api('org.apache.parquet:parquet-avro:1.10.0') - api('org.apache.hadoop:hadoop-common:3.1.0') { - exclude group: 'org.slf4j', module: 'slf4j-log4j12' - } + implementation 'com.jerolba:carpet-record:0.1.0' // test configuration - testImplementation "org.codehaus.groovy:groovy:3.0.17" - testImplementation "org.codehaus.groovy:groovy-nio:3.0.17" testImplementation "io.nextflow:nextflow:$nextflowVersion" - testImplementation ("org.codehaus.groovy:groovy-test:3.0.17") { exclude group: 'org.codehaus.groovy' } - testImplementation ("cglib:cglib-nodep:3.3.0") - testImplementation ("org.objenesis:objenesis:3.1") - testImplementation ("org.spockframework:spock-core:2.2-groovy-3.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' } - testImplementation ('org.spockframework:spock-junit4:2.2-groovy-3.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' } - testImplementation ('com.google.jimfs:jimfs:1.1') - + testImplementation ("org.spockframework:spock-core:2.2-groovy-4.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' } + testImplementation ('org.spockframework:spock-junit4:2.2-groovy-4.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' } testImplementation(testFixtures("io.nextflow:nextflow:$nextflowVersion")) testImplementation(testFixtures("io.nextflow:nf-commons:$nextflowVersion")) + // see https://docs.gradle.org/4.1/userguide/dependency_management.html#sec:module_replacement modules { module("commons-logging:commons-logging") { replacedBy("org.slf4j:jcl-over-slf4j") } diff --git a/plugins/nf-parquet/src/main/nextflow/parquet/ParquetExtension.groovy b/plugins/nf-parquet/src/main/nextflow/parquet/ParquetExtension.groovy index 3ac80ed..1e041ca 100644 --- a/plugins/nf-parquet/src/main/nextflow/parquet/ParquetExtension.groovy +++ b/plugins/nf-parquet/src/main/nextflow/parquet/ParquetExtension.groovy @@ -1,5 +1,8 @@ package nextflow.parquet +import nextflow.plugin.extension.Factory +import nextflow.plugin.extension.Function + import java.nio.file.Path import groovy.transform.CompileStatic @@ -12,18 +15,16 @@ import nextflow.extension.CH import nextflow.extension.DataflowHelper import nextflow.plugin.extension.Operator import nextflow.plugin.extension.PluginExtensionPoint -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path as HadoopPath -import org.apache.parquet.example.data.Group -import org.apache.parquet.example.data.simple.convert.GroupRecordConverter -import org.apache.parquet.hadoop.ParquetFileReader -import org.apache.parquet.hadoop.util.HadoopInputFile -import org.apache.parquet.io.ColumnIOFactory + +import com.jerolba.carpet.CarpetReader +import com.jerolba.carpet.CarpetWriter +import org.apache.parquet.hadoop.ParquetFileWriter /** * Implements extensions for reading and writing Parquet files. * * @author Ben Sherman + * @author Jorge Aguilera */ @Slf4j @CompileStatic @@ -37,14 +38,14 @@ class ParquetExtension extends PluginExtensionPoint { } /** - * Load each Parquet file in a source channel, emitting each row as a separate item. + * Load a Parquet file emitting each row as a separate item. * * @param path */ @Operator - DataflowWriteChannel splitParquet(DataflowReadChannel source) { + DataflowWriteChannel splitParquet(DataflowReadChannel source, Map params=[:]) { final target = CH.create() - final splitter = new ParquetSplitter(target) + final splitter = new ParquetSplitter(target, params) final onNext = { it -> splitter.apply(it) } final onComplete = { target << Channel.STOP } @@ -55,32 +56,25 @@ class ParquetExtension extends PluginExtensionPoint { class ParquetSplitter { private DataflowWriteChannel target + private Class clazz - ParquetSplitter(DataflowWriteChannel target) { + ParquetSplitter(DataflowWriteChannel target, Map params) { this.target = target + if( params.record ) { + if (!(params.record instanceof Class)) { + throw new IllegalArgumentException("A Record.class is required. Class provided $params.record") + } + this.clazz = params.record as Class + } } void apply(Object source) { try { + log.debug "Start reading $source, with projection ${clazz ?: 'raw'}" // create parquet reader - final reader = ParquetFileReader.open(HadoopInputFile.fromPath(toHadoopPath(source), new Configuration())) - final schema = reader.getFooter().getFileMetaData().getSchema() - final fields = schema.getFields() - - // read each row from parquet file - def pages = null - try { - while( (pages=reader.readNextRowGroup()) != null ) { - final rows = pages.getRowCount() - final columnIO = new ColumnIOFactory().getColumnIO(schema) - final recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)) - - for( int i = 0; i < rows; i++ ) - target << fetchRow(recordReader.read()) - } - } - finally { - reader.close() + final reader = new CarpetReader(toFile(source), clazz ?: Map) + for (def record : reader) { + target << record } } catch( IOException e ) { @@ -88,33 +82,14 @@ class ParquetExtension extends PluginExtensionPoint { } } - private HadoopPath toHadoopPath(Object source) { - if( source instanceof String ) - new HadoopPath((String)source) - else if( source instanceof Path ) - new HadoopPath(((Path)source).toUriString()) - else - throw new IllegalArgumentException("Invalid input for splitParquet operator: ${source}") - } - - private Map fetchRow(Group group) { - def result = [:] - - final fieldCount = group.getType().getFieldCount() - for( int field = 0; field < fieldCount; field++ ) { - final valueCount = group.getFieldRepetitionCount(field) - final fieldType = group.getType().getType(field) - final fieldName = fieldType.getName() - - for( int index = 0; index < valueCount; index++ ) - if( fieldType.isPrimitive() ) { - println "${fieldName} ${group.getValueToString(field, index)}" - result[fieldName] = group.getValueToString(field, index) - } + private File toFile(Object source){ + return switch( source ){ + case {it instanceof String}->Path.of(source as String).toFile() + case {it instanceof Path} -> (source as Path).toFile() + default->throw new IllegalArgumentException("Invalid input for splitParquet operator: ${source}") } - - return result } + } /** diff --git a/plugins/nf-parquet/src/main/nextflow/parquet/ParquetPlugin.groovy b/plugins/nf-parquet/src/main/nextflow/parquet/ParquetPlugin.groovy index 1d56070..bd0d452 100644 --- a/plugins/nf-parquet/src/main/nextflow/parquet/ParquetPlugin.groovy +++ b/plugins/nf-parquet/src/main/nextflow/parquet/ParquetPlugin.groovy @@ -17,6 +17,7 @@ package nextflow.parquet import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j import nextflow.plugin.BasePlugin import org.pf4j.PluginWrapper @@ -24,11 +25,18 @@ import org.pf4j.PluginWrapper * Implements the nf-parquet plugin entry point * * @author Ben Sherman + * @author Jorge Aguilera */ @CompileStatic +@Slf4j class ParquetPlugin extends BasePlugin { ParquetPlugin(PluginWrapper wrapper) { super(wrapper) + initPlugin() + } + + private void initPlugin(){ + log.debug "${this.class.name} plugin initialized" } } diff --git a/plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF b/plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF index fa06d94..7919bf2 100644 --- a/plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF +++ b/plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF @@ -3,4 +3,4 @@ Plugin-Id: nf-parquet Plugin-Version: 0.1.0 Plugin-Class: nextflow.parquet.ParquetPlugin Plugin-Provider: nextflow -Plugin-Requires: >=22.10.0 +Plugin-Requires: >=24.04.2 diff --git a/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy b/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy new file mode 100644 index 0000000..d1c7049 --- /dev/null +++ b/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy @@ -0,0 +1,66 @@ +package nextflow.parquet + +import nextflow.Channel +import nextflow.plugin.Plugins +import nextflow.plugin.TestPluginDescriptorFinder +import nextflow.plugin.TestPluginManager +import nextflow.plugin.extension.PluginExtensionProvider +import org.pf4j.PluginDescriptorFinder +import spock.lang.Shared +import test.Dsl2Spec +import test.MockScriptRunner + +import java.nio.file.Files +import java.nio.file.Path +import java.util.jar.Manifest + +class PluginTest extends Dsl2Spec{ + + @Shared String pluginsMode + + def setup() { +// reset previous instances + PluginExtensionProvider.reset() + // this need to be set *before* the plugin manager class is created + pluginsMode = System.getProperty('pf4j.mode') + System.setProperty('pf4j.mode', 'dev') + // the plugin root should + def root = Path.of('.').toAbsolutePath().normalize() + def manager = new TestPluginManager(root){ + @Override + protected PluginDescriptorFinder createPluginDescriptorFinder() { + return new TestPluginDescriptorFinder(){ + @Override + protected Manifest readManifestFromDirectory(Path pluginPath) { + def manifestPath= getManifestPath(pluginPath) + final input = Files.newInputStream(manifestPath) + return new Manifest(input) + } + protected Path getManifestPath(Path pluginPath) { + return pluginPath.resolve('build/tmp/jar/MANIFEST.MF') + } + } + } + } + Plugins.init(root, 'dev', manager) + } + + def cleanup() { + Plugins.stop() + PluginExtensionProvider.reset() + pluginsMode ? System.setProperty('pf4j.mode',pluginsMode) : System.clearProperty('pf4j.mode') + } + + def 'should starts' () { + when: + def SCRIPT = ''' + channel.of('hi!') + ''' + and: + def result = new MockScriptRunner([:]).setScript(SCRIPT).execute() + then: + result.val == 'hi!' + result.val == Channel.STOP + } + +} diff --git a/validation/lib/module-info.class b/validation/lib/module-info.class new file mode 100644 index 0000000000000000000000000000000000000000..2bbfa89a37c7228552a9628f6ba3feb0c791a576 GIT binary patch literal 157 zcmX^0Z`VEs1_oOOZgvJHMh5QO{FKt1RNc(Hw0uSeuHgLAqU2P!%$!t41_2}~y{yEt zL`DWSU$AOn26jdU&fLnP)a3l4lwx59P9PVgLN6(?I8}s!y@7#&5oienFflL#8B7c; Z46Hy}hJg(%%*epe0A>JHazPjj3;;W39hCq8 literal 0 HcmV?d00001 diff --git a/validation/lib/myrecords/Address.class b/validation/lib/myrecords/Address.class new file mode 100644 index 0000000000000000000000000000000000000000..ec18d743995a48be02d344abec0ba6b441cfc715 GIT binary patch literal 1368 zcma)5*>2NN5Iwg^<1{$L32C7~7oeqyyQJ(C7bGY`R0@k|)dzTTlB?EjVh7u)RKAG^ zDp4Wzfe+xL5a(u5UB#++xSs3rnVBlut7p&_Yb6e$5?-#)Y}P21hA ze3Es~ZwRE;9oGq82qa47oiU^_rXi!_5_Exl>&WlIg38NA!G(jwwu&wfx$#o1|Zg!$vB7sO`&oXq7(;td%1Eeuw|#Deth` z9*c2F#l8BJYYZv`rVW=(C}1X{;cPiEQ202JQJ9{ z7)mvI4P%wdFv4Fl!F}L%@m5fq(zVg7f*2`4lT%Pz+;M0ipQw=6WTBf{+zMUY#g)@3pIwJN!(@05iFoYTjaEQ h^B+uK&A!ZIkXnTbq+xRU%UHo;E!@VN!lSy3m)bs5;f*eySuHy!C>>A!~(W$WJ*x5PCLC7r? z)3{CQ@=M*YJb}4017bDbZ|%u#U(bIM$mUjKKbk`+mrD6fPA%dvxMte&UA?*`-3?>Q zBCDad(%*F|9}Jgc^`*5B$qRAoRJSS%5#eSnf&Q&ld4Z#NW2SxJ?8`y~d}G)ZOL{lg9B02?(_{TNXB_i1 z12+z9vLwB2*R1)D8xJX3hUdi{%^aWGlfohKPG~`Pg(iBO*nmeu`vv8D zTVlZ`T9n5LY(WRw2@CexA{>XN+UW|8WT69r%#RKP#yUc??a&_FVR#qr;vVfJ<&30{ ngpXv8M2{rDPt-73EJERPd4NZFOyQqGN0EP${8Pd!yg~Lq^L#)e literal 0 HcmV?d00001 diff --git a/validation/lib/myrecords/Job.class b/validation/lib/myrecords/Job.class new file mode 100644 index 0000000000000000000000000000000000000000..1fce4a615fbb993cd8da4df0bdfd0db65c461a4a GIT binary patch literal 1349 zcmaJ=+iuf95Ix&moCJrsAuSYW3$%sAEls&!-10yXA}Cx$t3JTfCRsIZ6K}AcO8F-q zs6>R+2R?w0Ld?bvb#c^(^{mEc&YYRq`TghQ7l18nqBylr3W^tLIz6=N$mdf=r39U*ziwd37 zaTV7nw9UQtwrmN~6AK+Bnb)y^MJiH!?eh-hUj82!$?&0ZAez2a`OHu*NBm)ZaZjzX z$B+yJ6zaAk-uJr=;qLNAha?&Agx{8}kKE;5;R{uHsXU>gQRiz6Gtrz8Ex8@pcbnq1 ztw=L8hjrC@s%bnyJj_{W2QXckuR=jr+PZ+p6 z=!sXtYr1yNmu@sIf(DB~B8}e&+dv4Q0@4jw9R=rSLR7V^}$@`J5UwSPTp2 zLun{M!Pua8KSfV9K~JAniEc%;86!)}KvtuZ!VN;*q}v+VB-xVr6U4y_j2|%o literal 0 HcmV?d00001 diff --git a/validation/lib/myrecords/Person.class b/validation/lib/myrecords/Person.class new file mode 100644 index 0000000000000000000000000000000000000000..ccbe48bbb3c0f91324dd9f880783bb0ee136ef8b GIT binary patch literal 1416 zcmaKsTT|0O6vzMDTS`go1yE6;s9f406z`V+ia3sRsxrdxpigPD1A#VMlav|0lg`MD zj59p=0sK&oXPeZ9X@-Z*ZZ^Mj_J7XVlRtmYe*;*&iK2=zj58>PX*%3>8OB1?hQFy7N$OP<*KnO7?TH=FvFv?@`ErQ$kJl^p zK>$A2v%aC?CT@}Sf#Du(3X?NTL|Jdiqo!4qFhha!uU(@>nie9^&^aF)hrH?OmCsUA zEu=|C8G7x9Q`uuk_#$$3%jO@t?FM&tjYf-P<==DfK$zQxW3;(PIb|3xM?}>t$5tgrI$hhd+Pv!P4qP&fH2SA7oR&kG zw(MhZ#H&8|&allEcW3KD9CbT-2nz}y+6fGIPdfY!cbksY@q`mL*=!lE8{PM8U zKK;Ud3$o9kZrL_>dM>%EV2xpJ5c4H5D0s;*eK|_S@B_vQ^-qkhYaB({jI=7scI(a5 zR5U9zlDJLO9a=3BnWT9_`vsx-EdZuWWPG{+vjgx1PS5TV* z1!}=L`YgSbXqKR&onr#{LCcHw1?sr@2zV0s76F&A5Wq8}SH_jr7SAyiMbq^}IiRPA zl{%8zPblAq9Mf+HmH((OS)$ot!SwHe7J(Lq(F7Lhq!{i|QbDPfX_5u-)9)vLncnl{ Z?hz$DjVkWr0Uko9-3;xnVjZtA`41;`Gw1*S literal 0 HcmV?d00001 diff --git a/validation/lib/myrecords/SingleAddress.class b/validation/lib/myrecords/SingleAddress.class new file mode 100644 index 0000000000000000000000000000000000000000..0dd6cb949bfc3cc6dcee0bad34958761b4773830 GIT binary patch literal 1167 zcmaJ=%Wl&^6g|^CoVbQOXbFYV@@OG(Xv||7K!PGfq!bXTx`34vj~cg*9maO0{u2u- zfsnG`1NbP!o!C(vl4jv~T;Fr^js_@qhGsOVaCfuym;t^W!n3acG26g~ddmZaqw!yD3M5;Zxr;%mh{V9d^C5r3{hqLgfhUrXeaH0uWFYqW%REW4`piX%onz2iuFJ)klKTn_hTW;= zB^newVc1%pN}fl8@tB^*3SBTd8u>VAl^K5<6un8Sdo;`cs{RX1um6N{K|6+9G-hcA zQrM(1!w3$P=zZL#u!L^YEE5!cge$;zTF&B1^bWQbB$t413z92iM#+39ZYrnOM_6B! zExVcYA!pe~M*jh2zKb-`fe7!CR*)MKDt)+5pGLa}q%ema>|&21Dn%OP+EZ-&1A4#` As{jB1 literal 0 HcmV?d00001 diff --git a/validation/lib/myrecords/SinglePerson.class b/validation/lib/myrecords/SinglePerson.class new file mode 100644 index 0000000000000000000000000000000000000000..8f81bcd17c63f004f1e4db9cd0ff4ccdd0ded838 GIT binary patch literal 1318 zcmaJ=T~8B16g{`J-L`INK|m3qsG#lgQNO>cCW2B>}TOMX-a_8JL=iIyZ_n*^W0G^|gK@1}r;yMyY67u`zp;>O2&Tjdm zsJqf4B%j%i?Y|(56iYj4q>$E-(QyGfVXAo~J20=jWjnhKu_>hII)s?bnY5wcqdIcP z6ExGZr0_h#boKwVm%&wKoJEc6xQGct%6GSX$*hFsVqZq^P^DDu2@cR5jwv0NaG5#w zOmA<^wFF^iILEqr_KJ=fTxGlBOWSO)%9WuBkvBio_C?*Vls*w=iyM6^yS_?0gm|DM zSG67SzTK<|xoy@OtStXd_LxE+@B5Qvour&A+zPS zWnH|n)x%@ao~^1vvi)w`^RHaj_k3x#0(o8vOPp_8m9URFZYi&9E{Spmw{b_qT^;vu zpHS#EMYbK^Zi;eHsE~?~u65Z7$5B7FbLbw3asYg5I#xq?3su)WXtyd6+hMOGkuY)l zs3l$tuP*JD@5*S(TEq0bXdvYH+)omH{md^Gwws`@JC2YYO5tgEL|E*3KF5QGCxp54 zq1=SPFc&q&;|X5lYTndvs9EAY$t!_bzM13GEsP+}d(QX?G72ZqjFl5)k2%E6;BS(5 zFle5?=@@e{me&|h+BLpX90lGLNHb13j_-Utn>)rMb$tMM68Jg*JcAn{c%D@%7b#=u z6w`yy#EKR|wDP2MB#a->zV#6k3t#*P4J0dQwqI}@H#t4V)hY5m%BP@)K?O^l7-5ww Vmx6(1tl$C1GEDvukMR`Ke*rw6EmQyi literal 0 HcmV?d00001 diff --git a/validation/lib/myrecords/SingleRecord.class b/validation/lib/myrecords/SingleRecord.class new file mode 100644 index 0000000000000000000000000000000000000000..5a5a60f99cfb4b721ae9c5b545283580c8713410 GIT binary patch literal 1268 zcmaJ=*>2NN5IxsfoCK#?XbYvZDbRxBE-w2L;Gso`L}3xB`T$Q(a@Dv^>|i^U`cEKH zi3%wXd;lMXm>WBatGIo*i{_p=GiT<`-+#kD0AArm1`=X2;tCQ-GUN}9V?*y6)`9++ zw``})kbG%cruUj5rk3~9NFgmFqu>e@hWXx!GlIE#!?X^%eC#Bd;dXZIHbq;!n4>Df z-cYcJn+VP|T0bL*DW#)7p2hC}u6xI;l_)d)qSf>#;l)KGv>WNJr^A=9u2PK&=a1$80Zge_5LQZ{Z$ ze#f>w*K>@%pMaZ!#vq&RifBxZuC%hbB(^fpu`T1if(LlWP?|K%8Cag#{0RK7YgAmSgrk+X=hub`94J z8v>56(>a#zL^hP8?2OEk!M#;|-jlse!G#x@OuM1M{~QB05c z^kOpTOwvkViC&iJb`LSc>C9=r!L-sDWNq^d*;6u6GiXoJDQH$`PfNr_SXu>|pj-4L zI7)O1kgN^Kj-PZr?^~czxg7zX1HMOq7jY*5uTWs2BBiYlu@Hr3l+kK{7MesJ3GEl; zA5+AP5+A3$M8GjLJDqSGYveBBF4YH^p_^Yrzk=$>M&nDBOQDQ)Y~T^uGDP_pPw^ZJ F{{a*58P5O! literal 0 HcmV?d00001 diff --git a/validation/nextflow.config b/validation/nextflow.config new file mode 100644 index 0000000..7f1cad2 --- /dev/null +++ b/validation/nextflow.config @@ -0,0 +1,4 @@ +plugins { + id "nf-parquet@${System.getenv("PARQUET_PLUGIN_VERSION") ?: "latest"}" +} + diff --git a/validation/presidents.parquet b/validation/presidents.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6a4e975eb58138ab4e6f31e9dc587739f7cce282 GIT binary patch literal 1674 zcmZuyPiWLw7=M$0la6Jr(C?WnL37xK3ht2Uw5cP5P)|~{4z1!+L`<8oW|>LGWUN*^ zc@Py=rRUy+p1g=C9z}|HlHEP5vZn=a-WGcB;KA>`NoJlq4jCq2-tYT;-}}AamuW35 z*MNWxKf%3^uWwf@P%zX%CuE}eJlap#=SBXDD^+me{qJ{o%6*{5fCmj3L2`wZ%16(y zET|Z)bp0qs_zZaP;K85&sRG=*oWmKgb@tr(6`sLvFvG%LKHvND_kJPchgpdgkXhb1 zei8NyT!e>zJ$@|`AAOP&17d2P>)Nd_j?)KI` zq1HFu&A4-}%etF6=DIFsoPd+??A5!cxlVlhhi~L%D+s*cXIG;5*Ip89td<|7QD7dB zOhYfk#M^}McJks_4j5@5-`0;2#e-=k3(ZvB6)TU3W-$~v>?DqO-llTNVUWeEZvO|ocPf~6ew zNrL+&zH*$c?8zeTg92T?W)m#*SpMt8A~twL6C-7R#0eJFAc+90cNc7&8REJ%6il* zO5mAV<}F7@W0_Mr9tm~@5>@CDU3T>5&;mnWP%=F*#_%a_A8m&lCCwv!qY(OL3?}JY z6&8uJYQeZUr(uO@w&7xz@mR>aFRINl7+`L?6t2oCtajVxZLdf*5D>a zHMooWkwDK>V2&pAQ3WX7DnpYvpOCxIp*Xqx#CU-_C3?SvDAxDD?Z)@#L8wKK3tQQ# zz#;ms3|EM=Cy$n6ZWt)ECv%1P%;4y-w}?n9DzsZcgRFJu3#!xEu8DpeevzEUUIAQ}g*r%^9VtDPhW+tt|bgvLZ?GY!|1G{~y` zks`)o*H5or3wu9zQhNi(4ClqJRco`=nmxUCEoujLV?GSRhIgz!KR@fbK`oeXg!Nj( f^XlH3Uk}21V+Nx&|A=`DXK(Z(yZFv(;lI#VqC6y7 literal 0 HcmV?d00001 diff --git a/validation/read.nf b/validation/read.nf new file mode 100644 index 0000000..47fc479 --- /dev/null +++ b/validation/read.nf @@ -0,0 +1,6 @@ +include { splitParquet } from 'plugin/nf-parquet' + +import myrecords.* + +channel.fromPath("test*.parquet").splitParquet( record: SingleRecord) + | view \ No newline at end of file diff --git a/validation/read_complex.nf b/validation/read_complex.nf new file mode 100644 index 0000000..f9fa9bf --- /dev/null +++ b/validation/read_complex.nf @@ -0,0 +1,6 @@ +include { splitParquet } from 'plugin/nf-parquet' + +import myrecords.* + +channel.fromPath("presidents.parquet").splitParquet() + | view \ No newline at end of file diff --git a/validation/read_raw.nf b/validation/read_raw.nf new file mode 100644 index 0000000..3a0f80e --- /dev/null +++ b/validation/read_raw.nf @@ -0,0 +1,6 @@ +include { splitParquet } from 'plugin/nf-parquet' + +import myrecords.* + +channel.fromPath("*.parquet").splitParquet() + | view \ No newline at end of file diff --git a/validation/schemas/myrecords/Address.java b/validation/schemas/myrecords/Address.java new file mode 100644 index 0000000..a0a6fa5 --- /dev/null +++ b/validation/schemas/myrecords/Address.java @@ -0,0 +1,3 @@ +package myrecords; + +record Address(String street, String zip, String city) { } \ No newline at end of file diff --git a/validation/schemas/myrecords/CustomRecord.java b/validation/schemas/myrecords/CustomRecord.java new file mode 100644 index 0000000..bf42010 --- /dev/null +++ b/validation/schemas/myrecords/CustomRecord.java @@ -0,0 +1,4 @@ +package myrecords; +record CustomRecord(long id, String name, int sizell, double value, double percentile) { + +} diff --git a/validation/schemas/myrecords/Job.java b/validation/schemas/myrecords/Job.java new file mode 100644 index 0000000..fb26bbd --- /dev/null +++ b/validation/schemas/myrecords/Job.java @@ -0,0 +1,3 @@ +package myrecords; + +record Job(String company, String position, int years){ } \ No newline at end of file diff --git a/validation/schemas/myrecords/Person.java b/validation/schemas/myrecords/Person.java new file mode 100644 index 0000000..be3d6d3 --- /dev/null +++ b/validation/schemas/myrecords/Person.java @@ -0,0 +1,3 @@ +package myrecords; + +record Person(long id, Job job, Address address) { } \ No newline at end of file diff --git a/validation/schemas/myrecords/SingleAddress.java b/validation/schemas/myrecords/SingleAddress.java new file mode 100644 index 0000000..cd995c1 --- /dev/null +++ b/validation/schemas/myrecords/SingleAddress.java @@ -0,0 +1,3 @@ +package myrecords; + +record SingleAddress(String street) { } \ No newline at end of file diff --git a/validation/schemas/myrecords/SinglePerson.java b/validation/schemas/myrecords/SinglePerson.java new file mode 100644 index 0000000..f1069ed --- /dev/null +++ b/validation/schemas/myrecords/SinglePerson.java @@ -0,0 +1,3 @@ +package myrecords; + +record SinglePerson(long id, SingleAddress address) { } \ No newline at end of file diff --git a/validation/schemas/myrecords/SingleRecord.java b/validation/schemas/myrecords/SingleRecord.java new file mode 100644 index 0000000..6c48134 --- /dev/null +++ b/validation/schemas/myrecords/SingleRecord.java @@ -0,0 +1,3 @@ +package myrecords; +record SingleRecord(long id, String name) { +} diff --git a/validation/schemas/myrecords/module-info.java b/validation/schemas/myrecords/module-info.java new file mode 100644 index 0000000..83d5d2c --- /dev/null +++ b/validation/schemas/myrecords/module-info.java @@ -0,0 +1,3 @@ +module myrecords { + opens myrecords; +} \ No newline at end of file diff --git a/validation/test.parquet b/validation/test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..2adc0efefb2300fc41b9e88ca6937e3ac79c8d49 GIT binary patch literal 1063 zcmZWp&ubGw6rM~rVF-)0kT>kYB0o0@tAoS3jTs zxMl#wEdxdfGjb$;&~gKA!n>zuuZ!`4Vt8Qf4n==HSHN!|rhy&ZRN?#kCx6!@B1J9X ziA3e>_lwKCVV9U) z15|yRup(A43^3x}=RLFAPD-6D$QqYE*~@&V1>~FO!6xP+ha(V2n&cNTHuw&@+HsfS zFA_#Qi+3{Jq5zzq=b=W-CDN5*y0}7F?n`9L8oNSa@@czQR+;@}$o?xICO@YcyHxW` zVop&a9gYOMTEelK^--fS+T3mq?N&oH h>O<>rSht*Jb7Z%+MqAb`y^m8TzvLCpK@Wcs{{WDl%o_jz literal 0 HcmV?d00001 diff --git a/validation/test2.parquet b/validation/test2.parquet new file mode 100644 index 0000000000000000000000000000000000000000..c87f1c782ca9d37777e905e0686063677d39b4f2 GIT binary patch literal 1114 zcmZWpO=#0#82*~3G3)AKu=nE&69`Oz z0Z)HCcq{9m;86vY5GKdSP|TJ{aK*zfnva7GmmlAM6brl+;l1y$aE$Ls;JeLRZF6L3 zf3A11Fo=N;Ki};%A`GeVVf0SnvlkyfjY(Mlz5QH`II+_E^5)vDKi`V>$4eLHH@_5L z9Gt&9{`Ye+4CvmteP#H+fFm#M^_!KQ;>xPd&a8bczWScKdhOz;qBbIFOpPIzxKQlZ zBxw{C!kqAv55E=+46?{oW3r#>K&|$&LAYL`ktns#&D366p;@OYl#rHaI(1+nl(u3p z6N(!WR6?;EiWut7q)v2N?Pg<+SDT&#Ndj_)FsbekNM$4@FP0lTLr&|63=&$DR{cD8 zT>?owB$mr=hX-fD3vOR$D;}mfIU8>(5t| zyIUJYe(#y9T60}wNZ-Kg5p^MDd<+5Dp{3&h>wa5Vk0YF zz0EzN(R8?Lh#g?GvFiCd(lVkOL)fXMvUje-+lLxndYOBzdUG*t8m47Tq{k|qy6dEK z+{#aL$1GU2e7-g@RjB5s^E_`=%}UiWokF3Oo1Uyqng_HJ?pgFE4{`Fd_*eM{^v>zq literal 0 HcmV?d00001 From 27a1f3c2ecd4dd8bc39b4dd2d5801c6e865b0f7a Mon Sep 17 00:00:00 2001 From: jorge aguilera Date: Sun, 1 Sep 2024 22:42:51 +0200 Subject: [PATCH 2/4] refactor of initial plugin - add tests --- .../test/nextflow/parquet/PluginTest.groovy | 34 +++++++++++++++++- .../nf-parquet/src/testResources/test.parquet | Bin 0 -> 1063 bytes validation/read.nf | 3 +- 3 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 plugins/nf-parquet/src/testResources/test.parquet diff --git a/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy b/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy index d1c7049..165e71c 100644 --- a/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy +++ b/plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy @@ -37,7 +37,7 @@ class PluginTest extends Dsl2Spec{ return new Manifest(input) } protected Path getManifestPath(Path pluginPath) { - return pluginPath.resolve('build/tmp/jar/MANIFEST.MF') + return pluginPath.resolve('build/resources/main/META-INF/MANIFEST.MF') } } } @@ -63,4 +63,36 @@ class PluginTest extends Dsl2Spec{ result.val == Channel.STOP } + def 'should parse a parquet file in raw mode'(){ + when: + def path = getClass().getResource('/test.parquet').toURI().path + def SCRIPT = """ + include {splitParquet} from 'plugin/nf-parquet' + channel.fromPath("$path").splitParquet() + """.toString() + and: + def result = new MockScriptRunner([:]).setScript(SCRIPT).execute() + then: + result.val == [id:1, name:"test2", sizell:10, value:0.010838246310055144, percentile:0.28001529169191186] + result.val == Channel.STOP + } + + def 'should parse a projection'(){ + when: + def path = getClass().getResource('/test.parquet').toURI().path + def SCRIPT = """ + include {splitParquet} from 'plugin/nf-parquet' + + record SingleRecord(long id, String name) { + } + + channel.fromPath("$path").splitParquet( [record:SingleRecord] ) + """.toString() + and: + def result = new MockScriptRunner([:]).setScript(SCRIPT).execute() + then: + result.val.id == 1 + result.val == Channel.STOP + } + } diff --git a/plugins/nf-parquet/src/testResources/test.parquet b/plugins/nf-parquet/src/testResources/test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..2adc0efefb2300fc41b9e88ca6937e3ac79c8d49 GIT binary patch literal 1063 zcmZWp&ubGw6rM~rVF-)0kT>kYB0o0@tAoS3jTs zxMl#wEdxdfGjb$;&~gKA!n>zuuZ!`4Vt8Qf4n==HSHN!|rhy&ZRN?#kCx6!@B1J9X ziA3e>_lwKCVV9U) z15|yRup(A43^3x}=RLFAPD-6D$QqYE*~@&V1>~FO!6xP+ha(V2n&cNTHuw&@+HsfS zFA_#Qi+3{Jq5zzq=b=W-CDN5*y0}7F?n`9L8oNSa@@czQR+;@}$o?xICO@YcyHxW` zVop&a9gYOMTEelK^--fS+T3mq?N&oH h>O<>rSht*Jb7Z%+MqAb`y^m8TzvLCpK@Wcs{{WDl%o_jz literal 0 HcmV?d00001 diff --git a/validation/read.nf b/validation/read.nf index 47fc479..47e0ef1 100644 --- a/validation/read.nf +++ b/validation/read.nf @@ -1,6 +1,7 @@ include { splitParquet } from 'plugin/nf-parquet' -import myrecords.* +record SingleRecord(long id, String name) { +} channel.fromPath("test*.parquet").splitParquet( record: SingleRecord) | view \ No newline at end of file From 851d9c16ec469a39dc90bfab0fb590d7f0162b9f Mon Sep 17 00:00:00 2001 From: jorge aguilera Date: Mon, 2 Sep 2024 16:41:02 +0200 Subject: [PATCH 3/4] upgrade tests to spock 2.3 - add tests --- README.md | 10 ++++++++++ plugins/nf-parquet/build.gradle | 5 +++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 11264c7..fe510a7 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,16 @@ This plugin provides two new operators: : Load each Parquet file in a source channel, emitting each row as a separate item. +Default method emits every row in the parquet file as a Map. +In case you want to read only a subset +of fields (a projection) you can provide a java Record class with the fields you want to read + +```groovy +record MyRecord( int id, String name) + +splitParquet( [record: MyRecord] ) +``` + `toParquet( path )` : Write each item in a source channel to a Parquet file. diff --git a/plugins/nf-parquet/build.gradle b/plugins/nf-parquet/build.gradle index 31a11ad..996fb31 100644 --- a/plugins/nf-parquet/build.gradle +++ b/plugins/nf-parquet/build.gradle @@ -64,8 +64,9 @@ dependencies { // test configuration testImplementation "io.nextflow:nextflow:$nextflowVersion" - testImplementation ("org.spockframework:spock-core:2.2-groovy-4.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' } - testImplementation ('org.spockframework:spock-junit4:2.2-groovy-4.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' } + testImplementation ("org.spockframework:spock-core:2.3-groovy-4.0") { exclude group: 'org.apache.groovy'; exclude group: 'net.bytebuddy' } + testImplementation ('org.spockframework:spock-junit4:2.3-groovy-4.0') { exclude group: 'org.apache.groovy'; exclude group: 'net.bytebuddy' } + testImplementation(testFixtures("io.nextflow:nextflow:$nextflowVersion")) testImplementation(testFixtures("io.nextflow:nf-commons:$nextflowVersion")) From 8ab8ee2a0bfa0fb0dc05e6ea52874fe2b07c58cb Mon Sep 17 00:00:00 2001 From: jorge aguilera Date: Mon, 2 Sep 2024 16:41:35 +0200 Subject: [PATCH 4/4] fix typo in README - add tests --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fe510a7..019a093 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ In case you want to read only a subset of fields (a projection) you can provide a java Record class with the fields you want to read ```groovy -record MyRecord( int id, String name) +record MyRecord( int id, String name){} splitParquet( [record: MyRecord] ) ```