Skip to content

Commit

Permalink
Supports macro-aware transcoding of Ion 1.1 streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
tgregg committed Dec 12, 2024
1 parent 87cad9f commit d9d852e
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<groupId>com.amazon.ion</groupId>
<artifactId>ion-java</artifactId>
<!-- Take the latest version after the first one that supports Ion 1.1. -->
<version>[1.11.8-SNAPSHOT,]</version>
<version>[1.11.10-SNAPSHOT,]</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
13 changes: 7 additions & 6 deletions src/com/amazon/ion/benchmark/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.nio.file.Path;

import static com.amazon.ion.benchmark.IonUtilities.getMinorVersion;
import static com.amazon.ion.benchmark.IonUtilities.isFormatHeaderPresent;

/**
Expand Down Expand Up @@ -71,7 +72,7 @@ MeasurableReadTask createReadTask(Path inputPath, ReadOptionsCombination options

@Override
MeasurableWriteTask createWriteTask(Path inputPath, WriteOptionsCombination options) throws IOException {
return new IonMeasurableWriteTask(inputPath, options);
return IonUtilities.createIonMeasurableWriteTask(inputPath, options);
}

@Override
Expand All @@ -85,8 +86,8 @@ Path convert(Path input, Path output, OptionsCombinationBase options) throws IOE
Format sourceFormat = classify(input);
switch (sourceFormat) {
case ION_TEXT:
if (options.limit == Integer.MAX_VALUE) {
// The input is already text and it is not being limited.
if (options.limit == Integer.MAX_VALUE && (getMinorVersion(ION_TEXT, input.toFile()) == options.ionMinorVersion)) {
// The input is already text in the requested minor version, and it is not being limited.
return input;
}
IonUtilities.rewriteIonFile(ION_TEXT, input, output, options, IonUtilities::newTextWriterSupplier);
Expand Down Expand Up @@ -124,7 +125,7 @@ MeasurableReadTask createReadTask(Path inputPath, ReadOptionsCombination options

@Override
MeasurableWriteTask createWriteTask(Path inputPath, WriteOptionsCombination options) throws IOException {
return new IonMeasurableWriteTask(inputPath, options);
return IonUtilities.createIonMeasurableWriteTask(inputPath, options);
}

@Override
Expand All @@ -140,7 +141,7 @@ Path convert(Path input, Path output, OptionsCombinationBase options) throws IOE
case ION_TEXT:
case ION_BINARY:
// Down-convert to JSON.
IonUtilities.rewriteIonFile(ION_BINARY, input, output, options, IonUtilities::newJsonWriterSupplier);
IonUtilities.rewriteIonFile(sourceFormat, input, output, options, IonUtilities::newJsonWriterSupplier);
break;
case JSON:
if (options.limit == Integer.MAX_VALUE) {
Expand Down Expand Up @@ -188,7 +189,7 @@ Path convert(Path input, Path output, OptionsCombinationBase options) throws IOE
switch (sourceFormat) {
case ION_BINARY:
case ION_TEXT:
IonUtilities.rewriteIonFile(ION_BINARY, input, output, options, IonUtilities::newCborWriterSupplier);
IonUtilities.rewriteIonFile(sourceFormat, input, output, options, IonUtilities::newCborWriterSupplier);
break;
case JSON:
JacksonUtilities.rewriteJsonToCbor(input, output, options);
Expand Down
69 changes: 69 additions & 0 deletions src/com/amazon/ion/benchmark/IonMeasurableWriteTask_1_1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.amazon.ion.benchmark;

import com.amazon.ion.MacroAwareIonReader;
import com.amazon.ion.MacroAwareIonWriter;
import com.amazon.ion.impl._Private_IonReaderBuilder;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Consumer;

/**
* A MeasurableWriteTask for writing data in the Ion 1.1+ format (either text or binary) using input data that is
* also in the Ion 1.1+ format. This ensures encoding directives and macro invocations are preserved, providing
* a more accurate measurement of the write performance of the input. In cases where either the input or the output
* is Ion 1.0, {@link IonMeasurableWriteTask} should be used instead, as encoding directives and macro invocations
* cannot be preserved in that case.
*/
public class IonMeasurableWriteTask_1_1 extends MeasurableWriteTask<MacroAwareIonWriter> {

private final IonUtilities.IonWriterSupplier writerBuilder;

/**
* @param inputPath path to the data to re-write.
* @param options options to use when writing.
* @throws IOException if thrown when handling the options.
*/
IonMeasurableWriteTask_1_1(Path inputPath, WriteOptionsCombination options) throws IOException {
super(inputPath, options);
if (options.format == Format.ION_TEXT) {
writerBuilder = IonUtilities.newTextWriterSupplier(options);
} else if (options.format == Format.ION_BINARY) {
writerBuilder = IonUtilities.newBinaryWriterSupplier(options);
} else {
throw new IllegalStateException("IonFormatWriter is compatible only with ION_TEXT and ION_BINARY");
}
}

@Override
void generateWriteInstructionsDom(Consumer<WriteInstruction<MacroAwareIonWriter>> instructionsSink) {
throw new UnsupportedOperationException("Write benchmarking of Ion 1.1 from the DOM is not yet supported.");
}

@Override
void generateWriteInstructionsStreaming(Consumer<WriteInstruction<MacroAwareIonWriter>> instructionsSink) throws IOException {
if (options.limit != Integer.MAX_VALUE || options.flushPeriod != null) {
throw new UnsupportedOperationException("Benchmarking Ion 1.1 write using --limit or --ion-flush-period is not yet supported.");
}
// TODO support buildMacroAware from InputStream to avoid having to buffer all bytes.
try (
MacroAwareIonReader reader = ((_Private_IonReaderBuilder) IonUtilities.newReaderBuilderForInput(options))
.buildMacroAware(Files.readAllBytes(inputFile.toPath()))
) {
reader.transcodeTo(new RecordingMacroAwareIonWriter(instructionsSink));
}
}

@Override
MacroAwareIonWriter newWriter(OutputStream outputStream) throws IOException {
return (MacroAwareIonWriter) writerBuilder.get(outputStream);
}

@Override
void closeWriter(MacroAwareIonWriter writer) throws IOException {
// Note: this closes the underlying OutputStream.
writer.close();
}
}
103 changes: 80 additions & 23 deletions src/com/amazon/ion/benchmark/IonUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import com.amazon.ion.IonSystem;
import com.amazon.ion.IonType;
import com.amazon.ion.IonWriter;
import com.amazon.ion.MacroAwareIonReader;
import com.amazon.ion.MacroAwareIonWriter;
import com.amazon.ion.OffsetSpan;
import com.amazon.ion.SpanProvider;
import com.amazon.ion.SymbolTable;
import com.amazon.ion.impl._Private_IonConstants;
import com.amazon.ion.impl._Private_IonReaderBuilder;
import com.amazon.ion.impl._Private_IonSystem;
import com.amazon.ion.impl._Private_IonWriter;
import com.amazon.ion.impl.bin.LengthPrefixStrategy;
Expand All @@ -19,6 +22,7 @@
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.system.IonSystemBuilder;
import com.amazon.ion.system.IonTextWriterBuilder;
import com.amazon.ion.system.IonTextWriterBuilder_1_1;
import com.amazon.ion.system.SimpleCatalog;

import java.io.BufferedInputStream;
Expand All @@ -28,6 +32,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -311,7 +316,12 @@ static IonWriterSupplier newCborWriterSupplier(OptionsCombinationBase options) {
* @throws IOException if thrown when parsing shared symbol tables.
*/
static IonWriterSupplier newTextWriterSupplier(OptionsCombinationBase options) throws IOException {
return newTextWriterSupplier(options, IonTextWriterBuilder.standard());
if (options.ionMinorVersion == 0) {
return newTextWriterSupplier(options, IonTextWriterBuilder.standard());
} else if (options.ionMinorVersion == 1) {
return newTextWriterSupplier_1_1(options);
}
throw new IllegalStateException();
}

/**
Expand All @@ -325,7 +335,7 @@ static IonWriterSupplier newJsonWriterSupplier(OptionsCombinationBase options) t
}

/**
* Creates a new IonWriterSupplier for text IonWriters.
* Creates a new IonWriterSupplier for Ion 1.0 text or JSON IonWriters.
* @param options the options to use when creating writers.
* @param builder the builder to use to construct new writers.
* @return a new instance.
Expand All @@ -335,6 +345,18 @@ private static IonWriterSupplier newTextWriterSupplier(OptionsCombinationBase op
return builder.withImports(parseImportsFromFile(options.importsForBenchmarkFile))::build;
}

/**
* Creates a new IonWriterSupplier for Ion 1.1 text IonWriters.
* @param options the options to use when creating writers.
* @return a new instance.
* @throws IOException if thrown when parsing shared symbol tables.
*/
private static IonWriterSupplier newTextWriterSupplier_1_1(OptionsCombinationBase options) throws IOException {
IonTextWriterBuilder_1_1 builder = IonEncodingVersion.ION_1_1.textWriterBuilder();
builder.withImports(parseImportsFromFile(options.importsForBenchmarkFile));
return builder::build;
}

/**
* Create a new IonCatalog populated with SharedSymbolTables read from the given file.
* @param importsFile the file containing the shared symbol tables to import.
Expand Down Expand Up @@ -458,6 +480,23 @@ private static void writeValuesWithOptions(
}
}

/**
* Rewrite the given Ion 1.1+ file to another Ion 1.1+ stream using the given options.
* @param input an ion 1.1+ file.
* @param options the options to use when re-writing.
* @param writer the writer of the new stream.
* @throws IOException if thrown when reading or writing.
*/
private static void rewriteIon11File(Path input, OptionsCombinationBase options, IonWriter writer) throws IOException{
if (options.limit != Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Macro-aware transcoding of Ion 1.1 with the --limit option not yet supported.");
}
// TODO add a method to MacroAwareIonReader to write one value at a time so that 'limit' can be used
try (MacroAwareIonReader macroAwareIonReader = ((_Private_IonReaderBuilder) newReaderBuilderForInput(options)).buildMacroAware(Files.readAllBytes(input))) {
macroAwareIonReader.transcodeTo((MacroAwareIonWriter) writer);
}
}

/**
* Rewrite the given Ion file using the given options.
* @param inputFormat the format of 'input'; must be ION_BINARY, ION_TEXT, or JSON.
Expand All @@ -475,29 +514,33 @@ static void rewriteIonFile(Format inputFormat, Path input, Path output, OptionsC
IonReader reader = null;
TranscodeFunction transcodeFunction = STANDARD_TRANSCODE;
try {
if (
options.flushPeriod == null &&
options.importsForInputFile == null &&
options.importsForBenchmarkFile == null &&
options.format == Format.ION_BINARY &&
// Minor versions may add new kinds of system values, so it is not possible to maintain system value
// boundaries when downgrading to a previous minor version.
getMinorVersion(inputFormat, input.toFile()) <= options.ionMinorVersion
) {
// Use system-level reader to preserve the same symbol tables from the input.
writer = writerSupplier.get(options.newOutputStream(outputFile));
reader = ((_Private_IonSystem) ION_SYSTEM).newSystemReader(options.newInputStream(inputFile));
if (options.ionMinorVersion > getMinorVersion(inputFormat, input.toFile())) {
// Because symbol IDs will be transferred during the system transcode, *and* this is a format
// upgrade, the symbol IDs need to be transformed to point to the same text in the new format.
transcodeFunction = TRANSCODE_1_0_TO_1_1;
}
int inputMinorVersion = getMinorVersion(inputFormat, input.toFile());
writer = writerSupplier.get(options.newOutputStream(outputFile));
if (inputMinorVersion > 0 && options.ionMinorVersion > 0) {
rewriteIon11File(input, options, writer);
} else {
// Do not preserve the existing symbol table boundaries.
writer = writerSupplier.get(options.newOutputStream(outputFile));
reader = newReaderBuilderForInput(options).build(options.newInputStream(inputFile));
if (
options.flushPeriod == null &&
options.importsForInputFile == null &&
options.importsForBenchmarkFile == null &&
options.format == Format.ION_BINARY &&
// Minor versions may add new kinds of system values, so it is not possible to maintain system value
// boundaries when downgrading to a previous minor version.
inputMinorVersion <= options.ionMinorVersion
) {
// Use system-level reader to preserve the same symbol tables from the input.
reader = ((_Private_IonSystem) ION_SYSTEM).newSystemReader(options.newInputStream(inputFile));
if (options.ionMinorVersion > inputMinorVersion) {
// Because symbol IDs will be transferred during the system transcode, *and* this is a format
// upgrade, the symbol IDs need to be transformed to point to the same text in the new format.
transcodeFunction = TRANSCODE_1_0_TO_1_1;
}
} else {
// Do not preserve the existing symbol table boundaries.
reader = newReaderBuilderForInput(options).build(options.newInputStream(inputFile));
}
writeValuesWithOptions(reader, writer, options, transcodeFunction);
}
writeValuesWithOptions(reader, writer, options, transcodeFunction);
} finally {
if (writer != null) {
writer.close();
Expand Down Expand Up @@ -557,4 +600,18 @@ static IonSystem ionSystemForInput(OptionsCombinationBase options) throws IOExce
}
return IonSystemBuilder.standard().withCatalog(newCatalog(options.importsForInputFile)).build();
}

/**
* Create a MeasurableWriteTask of the appropriate type for the given input and options.
* @param inputPath the input data.
* @param options the benchmark options.
* @return a new MeasurableWriteTask.
* @throws IOException if thrown when trying to classify the input file.
*/
static MeasurableWriteTask<?> createIonMeasurableWriteTask(Path inputPath, WriteOptionsCombination options) throws IOException {
if (options.ionMinorVersion > 0 && getMinorVersion(Format.classify(inputPath), inputPath.toFile()) > 0) {
return new IonMeasurableWriteTask_1_1(inputPath, options);
}
return new IonMeasurableWriteTask(inputPath, options);
}
}
Loading

0 comments on commit d9d852e

Please sign in to comment.