Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports macro-aware transcoding of Ion 1.1 streams. #66

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<groupId>com.amazon.ion</groupId>
<artifactId>ion-java</artifactId>
<!-- Take the latest version after the first one that supports Ion 1.1. -->
<version>[1.11.8-SNAPSHOT,]</version>
<version>[1.11.10-SNAPSHOT,]</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
13 changes: 7 additions & 6 deletions src/com/amazon/ion/benchmark/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.nio.file.Path;

import static com.amazon.ion.benchmark.IonUtilities.getMinorVersion;
import static com.amazon.ion.benchmark.IonUtilities.isFormatHeaderPresent;

/**
Expand Down Expand Up @@ -71,7 +72,7 @@ MeasurableReadTask createReadTask(Path inputPath, ReadOptionsCombination options

@Override
MeasurableWriteTask createWriteTask(Path inputPath, WriteOptionsCombination options) throws IOException {
return new IonMeasurableWriteTask(inputPath, options);
return IonUtilities.createIonMeasurableWriteTask(inputPath, options);
}

@Override
Expand All @@ -85,8 +86,8 @@ Path convert(Path input, Path output, OptionsCombinationBase options) throws IOE
Format sourceFormat = classify(input);
switch (sourceFormat) {
case ION_TEXT:
if (options.limit == Integer.MAX_VALUE) {
// The input is already text and it is not being limited.
if (options.limit == Integer.MAX_VALUE && (getMinorVersion(ION_TEXT, input.toFile()) == options.ionMinorVersion)) {
// The input is already text in the requested minor version, and it is not being limited.
return input;
}
IonUtilities.rewriteIonFile(ION_TEXT, input, output, options, IonUtilities::newTextWriterSupplier);
Expand Down Expand Up @@ -124,7 +125,7 @@ MeasurableReadTask createReadTask(Path inputPath, ReadOptionsCombination options

@Override
MeasurableWriteTask createWriteTask(Path inputPath, WriteOptionsCombination options) throws IOException {
return new IonMeasurableWriteTask(inputPath, options);
return IonUtilities.createIonMeasurableWriteTask(inputPath, options);
}

@Override
Expand All @@ -140,7 +141,7 @@ Path convert(Path input, Path output, OptionsCombinationBase options) throws IOE
case ION_TEXT:
case ION_BINARY:
// Down-convert to JSON.
IonUtilities.rewriteIonFile(ION_BINARY, input, output, options, IonUtilities::newJsonWriterSupplier);
IonUtilities.rewriteIonFile(sourceFormat, input, output, options, IonUtilities::newJsonWriterSupplier);
break;
case JSON:
if (options.limit == Integer.MAX_VALUE) {
Expand Down Expand Up @@ -188,7 +189,7 @@ Path convert(Path input, Path output, OptionsCombinationBase options) throws IOE
switch (sourceFormat) {
case ION_BINARY:
case ION_TEXT:
IonUtilities.rewriteIonFile(ION_BINARY, input, output, options, IonUtilities::newCborWriterSupplier);
IonUtilities.rewriteIonFile(sourceFormat, input, output, options, IonUtilities::newCborWriterSupplier);
break;
case JSON:
JacksonUtilities.rewriteJsonToCbor(input, output, options);
Expand Down
57 changes: 57 additions & 0 deletions src/com/amazon/ion/benchmark/IonMeasurableWriteTask_1_1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.amazon.ion.benchmark;

import com.amazon.ion.MacroAwareIonWriter;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.function.Consumer;

/**
* A MeasurableWriteTask for writing data in the Ion 1.1+ format (either text or binary) using input data that is
* also in the Ion 1.1+ format. This ensures encoding directives and macro invocations are preserved, providing
* a more accurate measurement of the write performance of the input. In cases where either the input or the output
* is Ion 1.0, {@link IonMeasurableWriteTask} should be used instead, as encoding directives and macro invocations
* cannot be preserved in that case.
*/
public class IonMeasurableWriteTask_1_1 extends MeasurableWriteTask<MacroAwareIonWriter> {

private final IonUtilities.IonWriterSupplier writerBuilder;

/**
* @param inputPath path to the data to re-write.
* @param options options to use when writing.
* @throws IOException if thrown when handling the options.
*/
IonMeasurableWriteTask_1_1(Path inputPath, WriteOptionsCombination options) throws IOException {
super(inputPath, options);
if (options.format == Format.ION_TEXT) {
writerBuilder = IonUtilities.newTextWriterSupplier(options);
} else if (options.format == Format.ION_BINARY) {
writerBuilder = IonUtilities.newBinaryWriterSupplier(options);
} else {
throw new IllegalStateException("IonFormatWriter is compatible only with ION_TEXT and ION_BINARY");
}
}

@Override
void generateWriteInstructionsDom(Consumer<WriteInstruction<MacroAwareIonWriter>> instructionsSink) {
throw new UnsupportedOperationException("Write benchmarking of Ion 1.1 from the DOM is not yet supported.");
}

@Override
void generateWriteInstructionsStreaming(Consumer<WriteInstruction<MacroAwareIonWriter>> instructionsSink) throws IOException {
IonUtilities.rewriteIon11File(inputFile, options, new RecordingMacroAwareIonWriter(instructionsSink));
}

@Override
MacroAwareIonWriter newWriter(OutputStream outputStream) throws IOException {
return (MacroAwareIonWriter) writerBuilder.get(outputStream);
}

@Override
void closeWriter(MacroAwareIonWriter writer) throws IOException {
// Note: this closes the underlying OutputStream.
writer.close();
}
}
113 changes: 90 additions & 23 deletions src/com/amazon/ion/benchmark/IonUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import com.amazon.ion.IonSystem;
import com.amazon.ion.IonType;
import com.amazon.ion.IonWriter;
import com.amazon.ion.MacroAwareIonReader;
import com.amazon.ion.MacroAwareIonWriter;
import com.amazon.ion.OffsetSpan;
import com.amazon.ion.SpanProvider;
import com.amazon.ion.SymbolTable;
import com.amazon.ion.impl._Private_IonConstants;
import com.amazon.ion.impl._Private_IonReaderBuilder;
import com.amazon.ion.impl._Private_IonSystem;
import com.amazon.ion.impl._Private_IonWriter;
import com.amazon.ion.impl.bin.LengthPrefixStrategy;
Expand All @@ -19,6 +22,7 @@
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.system.IonSystemBuilder;
import com.amazon.ion.system.IonTextWriterBuilder;
import com.amazon.ion.system.IonTextWriterBuilder_1_1;
import com.amazon.ion.system.SimpleCatalog;

import java.io.BufferedInputStream;
Expand All @@ -28,6 +32,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -311,7 +316,12 @@ static IonWriterSupplier newCborWriterSupplier(OptionsCombinationBase options) {
* @throws IOException if thrown when parsing shared symbol tables.
*/
static IonWriterSupplier newTextWriterSupplier(OptionsCombinationBase options) throws IOException {
return newTextWriterSupplier(options, IonTextWriterBuilder.standard());
if (options.ionMinorVersion == 0) {
return newTextWriterSupplier(options, IonTextWriterBuilder.standard());
} else if (options.ionMinorVersion == 1) {
return newTextWriterSupplier_1_1(options);
}
throw new IllegalStateException();
}

/**
Expand All @@ -325,7 +335,7 @@ static IonWriterSupplier newJsonWriterSupplier(OptionsCombinationBase options) t
}

/**
* Creates a new IonWriterSupplier for text IonWriters.
* Creates a new IonWriterSupplier for Ion 1.0 text or JSON IonWriters.
* @param options the options to use when creating writers.
* @param builder the builder to use to construct new writers.
* @return a new instance.
Expand All @@ -335,6 +345,18 @@ private static IonWriterSupplier newTextWriterSupplier(OptionsCombinationBase op
return builder.withImports(parseImportsFromFile(options.importsForBenchmarkFile))::build;
}

/**
* Creates a new IonWriterSupplier for Ion 1.1 text IonWriters.
* @param options the options to use when creating writers.
* @return a new instance.
* @throws IOException if thrown when parsing shared symbol tables.
*/
private static IonWriterSupplier newTextWriterSupplier_1_1(OptionsCombinationBase options) throws IOException {
IonTextWriterBuilder_1_1 builder = IonEncodingVersion.ION_1_1.textWriterBuilder();
builder.withImports(parseImportsFromFile(options.importsForBenchmarkFile));
return builder::build;
}

/**
* Create a new IonCatalog populated with SharedSymbolTables read from the given file.
* @param importsFile the file containing the shared symbol tables to import.
Expand Down Expand Up @@ -458,6 +480,33 @@ private static void writeValuesWithOptions(
}
}

/**
* Rewrite the given Ion 1.1+ file to another Ion 1.1+ stream using the given options.
* @param inputFile an ion 1.1+ file.
* @param options the options to use when re-writing.
* @param writer the writer of the new stream.
* @throws IOException if thrown when reading or writing.
*/
static void rewriteIon11File(File inputFile, OptionsCombinationBase options, IonWriter writer) throws IOException {
try (
MacroAwareIonReader macroAwareIonReader = ((_Private_IonReaderBuilder) newReaderBuilderForInput(options))
.buildMacroAware(options.newInputStream(inputFile))
) {
macroAwareIonReader.prepareTranscodeTo((MacroAwareIonWriter) writer);
int i = 0;
boolean isUnlimited = options.limit == Integer.MAX_VALUE;
while (isUnlimited || i < options.limit) {
if (!macroAwareIonReader.transcodeNext()) {
break;
}
if (options.flushPeriod != null && i % options.flushPeriod == 0) {
writer.flush();
}
i++;
}
}
}

/**
* Rewrite the given Ion file using the given options.
* @param inputFormat the format of 'input'; must be ION_BINARY, ION_TEXT, or JSON.
Expand All @@ -475,29 +524,33 @@ static void rewriteIonFile(Format inputFormat, Path input, Path output, OptionsC
IonReader reader = null;
TranscodeFunction transcodeFunction = STANDARD_TRANSCODE;
try {
if (
options.flushPeriod == null &&
options.importsForInputFile == null &&
options.importsForBenchmarkFile == null &&
options.format == Format.ION_BINARY &&
// Minor versions may add new kinds of system values, so it is not possible to maintain system value
// boundaries when downgrading to a previous minor version.
getMinorVersion(inputFormat, input.toFile()) <= options.ionMinorVersion
) {
// Use system-level reader to preserve the same symbol tables from the input.
writer = writerSupplier.get(options.newOutputStream(outputFile));
reader = ((_Private_IonSystem) ION_SYSTEM).newSystemReader(options.newInputStream(inputFile));
if (options.ionMinorVersion > getMinorVersion(inputFormat, input.toFile())) {
// Because symbol IDs will be transferred during the system transcode, *and* this is a format
// upgrade, the symbol IDs need to be transformed to point to the same text in the new format.
transcodeFunction = TRANSCODE_1_0_TO_1_1;
}
int inputMinorVersion = getMinorVersion(inputFormat, input.toFile());
writer = writerSupplier.get(options.newOutputStream(outputFile));
if (inputMinorVersion > 0 && options.ionMinorVersion > 0) {
rewriteIon11File(inputFile, options, writer);
} else {
// Do not preserve the existing symbol table boundaries.
writer = writerSupplier.get(options.newOutputStream(outputFile));
reader = newReaderBuilderForInput(options).build(options.newInputStream(inputFile));
if (
options.flushPeriod == null &&
options.importsForInputFile == null &&
options.importsForBenchmarkFile == null &&
options.format == Format.ION_BINARY &&
// Minor versions may add new kinds of system values, so it is not possible to maintain system value
// boundaries when downgrading to a previous minor version.
inputMinorVersion <= options.ionMinorVersion
) {
// Use system-level reader to preserve the same symbol tables from the input.
reader = ((_Private_IonSystem) ION_SYSTEM).newSystemReader(options.newInputStream(inputFile));
if (options.ionMinorVersion > inputMinorVersion) {
// Because symbol IDs will be transferred during the system transcode, *and* this is a format
// upgrade, the symbol IDs need to be transformed to point to the same text in the new format.
transcodeFunction = TRANSCODE_1_0_TO_1_1;
}
} else {
// Do not preserve the existing symbol table boundaries.
reader = newReaderBuilderForInput(options).build(options.newInputStream(inputFile));
}
writeValuesWithOptions(reader, writer, options, transcodeFunction);
}
writeValuesWithOptions(reader, writer, options, transcodeFunction);
} finally {
if (writer != null) {
writer.close();
Expand Down Expand Up @@ -557,4 +610,18 @@ static IonSystem ionSystemForInput(OptionsCombinationBase options) throws IOExce
}
return IonSystemBuilder.standard().withCatalog(newCatalog(options.importsForInputFile)).build();
}

/**
* Create a MeasurableWriteTask of the appropriate type for the given input and options.
* @param inputPath the input data.
* @param options the benchmark options.
* @return a new MeasurableWriteTask.
* @throws IOException if thrown when trying to classify the input file.
*/
static MeasurableWriteTask<?> createIonMeasurableWriteTask(Path inputPath, WriteOptionsCombination options) throws IOException {
if (options.ionMinorVersion > 0 && getMinorVersion(Format.classify(inputPath), inputPath.toFile()) > 0) {
return new IonMeasurableWriteTask_1_1(inputPath, options);
}
return new IonMeasurableWriteTask(inputPath, options);
}
}
Loading