diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index aab089d9..91aa77f0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -17,12 +17,10 @@ package za.co.absa.cobrix.cobol.reader import za.co.absa.cobrix.cobol.internal.Logging +import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.common.Constants -import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage -import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory} -import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock} -import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock} import za.co.absa.cobrix.cobol.reader.extractors.raw._ import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler import za.co.absa.cobrix.cobol.reader.index.IndexGenerator @@ -34,8 +32,6 @@ import za.co.absa.cobrix.cobol.reader.schema.CobolSchema import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator -import java.nio.charset.{Charset, StandardCharsets} -import scala.collection.immutable.HashMap import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -79,6 +75,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], Some(new TextRecordExtractor(reParams)) case None if readerProperties.isText => Some(new TextFullRecordExtractor(reParams)) + case None if readerProperties.recordFormat == FixedLength && (readerProperties.lengthFieldExpression.nonEmpty || readerProperties.lengthFieldMap.nonEmpty) => + Some(new FixedWithRecordLengthExprRawRecordExtractor(reParams, readerProperties)) case None if readerProperties.recordFormat == FixedBlock => val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock) FixedBlockParameters.validate(fbParams) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala new file mode 100644 index 00000000..ba57a671 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala @@ -0,0 +1,204 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import org.slf4j.LoggerFactory +import za.co.absa.cobrix.cobol.parser.ast.Primitive +import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator + +class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, + readerProperties: ReaderParameters) extends Serializable with RawRecordExtractor { + private val log = LoggerFactory.getLogger(this.getClass) + ctx.headerStream.close() + + final private val copyBookRecordSize = ctx.copybook.getRecordSize + final private val (recordLengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, readerProperties.lengthFieldMap, ctx.copybook) + + final private val lengthField = recordLengthField.map(_.field) + final private val lengthMap = recordLengthField.map(_.valueMap).getOrElse(Map.empty) + final private val isLengthMapEmpty = lengthMap.isEmpty + + type RawRecord = (String, Array[Byte]) + + private var cachedValue: Option[RawRecord] = _ + private var byteIndex = readerProperties.fileStartOffset + + final private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, ctx.copybook) + final private val recordLengthAdjustment = readerProperties.rdwAdjustment + + fetchNext() + + override def offset: Long = cachedValue match { + case Some(v) => ctx.inputStream.offset - v._2.length + case None => ctx.inputStream.offset + } + + override def hasNext: Boolean = cachedValue.nonEmpty + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + cachedValue match { + case None => throw new NoSuchElementException + case Some(value) => + fetchNext() + value._2 + } + } + + private def fetchNext(): Unit = { + var recordFetched = false + while (!recordFetched) { + val binaryData = if (lengthField.nonEmpty) { + fetchRecordUsingRecordLengthField() + } else { + fetchRecordUsingRecordLengthFieldExpression(lengthFieldExpr.get) + } + + binaryData match { + case None => + cachedValue = None + recordFetched = true + case Some(data) if data.length < readerProperties.minimumRecordLength || data.length > readerProperties.maximumRecordLength => + recordFetched = false + case Some(data) => + val segmentId = getSegmentId(data) + val segmentIdStr = segmentId.getOrElse("") + + cachedValue = Some(segmentIdStr, data) + recordFetched = true + } + } + } + + private def fetchRecordUsingRecordLengthField(): Option[Array[Byte]] = { + if (lengthField.isEmpty) { + throw new IllegalStateException(s"For variable length reader either RDW record headers or record length field should be provided.") + } + + val lengthFieldBlock = lengthField.get.binaryProperties.offset + lengthField.get.binaryProperties.actualSize + + val binaryDataStart = ctx.inputStream.next(readerProperties.startOffset + lengthFieldBlock) + + byteIndex += readerProperties.startOffset + lengthFieldBlock + + if (binaryDataStart.length < readerProperties.startOffset + lengthFieldBlock) { + return None + } + + val recordLength = lengthField match { + case Some(lengthAST) => getRecordLengthFromField(lengthAST, binaryDataStart) + case None => copyBookRecordSize + } + + val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset + + byteIndex += restOfDataLength + + if (restOfDataLength > 0) { + Some(binaryDataStart ++ ctx.inputStream.next(restOfDataLength)) + } else { + Some(binaryDataStart) + } + } + + final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = { + val length = if (isLengthMapEmpty) { + ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { + case i: Int => i + case l: Long => l.toInt + case s: String => s.toInt + case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).") + case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") + } + } else { + ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { + case i: Int => getRecordLengthFromMapping(i.toString) + case l: Long => getRecordLengthFromMapping(l.toString) + case s: String => getRecordLengthFromMapping(s) + case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).") + case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") + } + } + length + recordLengthAdjustment + } + + final private def getRecordLengthFromMapping(v: String): Int = { + lengthMap.get(v) match { + case Some(len) => len + case None => throw new IllegalStateException(s"Record length value '$v' is not mapped to a record length.") + } + } + + final private def getBytesAsHexString(bytes: Array[Byte]): String = { + bytes.map("%02X" format _).mkString + } + + private def fetchRecordUsingRecordLengthFieldExpression(expr: RecordLengthExpression): Option[Array[Byte]] = { + val lengthFieldBlock = expr.requiredBytesToread + val evaluator = expr.evaluator + + val binaryDataStart = ctx.inputStream.next(readerProperties.startOffset + lengthFieldBlock) + + byteIndex += readerProperties.startOffset + lengthFieldBlock + + if (binaryDataStart.length < readerProperties.startOffset + lengthFieldBlock) { + return None + } + + expr.fields.foreach{ + case (name, field) => + val obj = ctx.copybook.extractPrimitiveField(field, binaryDataStart, readerProperties.startOffset) + try { + obj match { + case i: Int => evaluator.setValue(name, i) + case l: Long => evaluator.setValue(name, l.toInt) + case s: String => evaluator.setValue(name, s.toInt) + case _ => throw new IllegalStateException(s"Record length value of the field ${field.name} must be an integral type.") + } + } catch { + case ex: NumberFormatException => + throw new IllegalStateException(s"Encountered an invalid value of the record length field. Cannot parse '$obj' as an integer in: ${field.name} = '$obj'.", ex) + } + } + + val recordLength = evaluator.eval() + + val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset + + byteIndex += restOfDataLength + + if (restOfDataLength > 0) { + Some(binaryDataStart ++ ctx.inputStream.next(restOfDataLength)) + } else { + Some(binaryDataStart) + } + } + + private def getSegmentId(data: Array[Byte]): Option[String] = { + segmentIdField.map(field => { + val fieldValue = ctx.copybook.extractPrimitiveField(field, data, readerProperties.startOffset) + if (fieldValue == null) { + log.error(s"An unexpected null encountered for segment id at $byteIndex") + "" + } else { + fieldValue.toString.trim + } + }) + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test26FixLengthWithIdGeneration.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test26FixLengthWithIdGeneration.scala new file mode 100644 index 00000000..c0f1212f --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test26FixLengthWithIdGeneration.scala @@ -0,0 +1,195 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.regression + +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.{Logger, LoggerFactory} +import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase} +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture +import za.co.absa.cobrix.spark.cobol.utils.SparkUtils + +class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase with BinaryFileFixture with SimpleComparisonBase { + + private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass) + + private val copybook = + """ 01 R. + 05 IND PIC X(1). + 05 LEN PIC 9(1). + 05 SEGMENT1. + 10 FIELD1 PIC X(1). + 05 SEGMENT2 REDEFINES SEGMENT1. + 10 FIELD2 PIC X(2). + 05 SEGMENT3 REDEFINES SEGMENT1. + 10 FIELD3 PIC X(3). + """ + + val binFileContentsLengthField: Array[Byte] = Array[Byte]( + // A1 + 0xC1.toByte, 0xF3.toByte, 0xF1.toByte, + // B22 + 0xC2.toByte, 0xF4.toByte, 0xF2.toByte, 0xF2.toByte, + // C333 + 0xC3.toByte, 0xF5.toByte, 0xF3.toByte, 0xF3.toByte, 0xF3.toByte, + // A2 + 0xC1.toByte, 0xF3.toByte, 0xF2.toByte, + // B23 + 0xC2.toByte, 0xF4.toByte, 0xF2.toByte, 0xF3.toByte, + // A3 + 0xC1.toByte, 0xF3.toByte, 0xF3.toByte, + // C345 + 0xC3.toByte, 0xF5.toByte, 0xF3.toByte, 0xF4.toByte, 0xF5.toByte, + // A4 + 0xC1.toByte, 0xF3.toByte, 0xF4.toByte, + // A5 + 0xC1.toByte, 0xF3.toByte, 0xF5.toByte, + ) + + val binFileContentsLengthExpr: Array[Byte] = Array[Byte]( + // A1 + 0xC1.toByte, 0xF2.toByte, 0xF1.toByte, + // B22 + 0xC2.toByte, 0xF3.toByte, 0xF2.toByte, 0xF2.toByte, + // C333 + 0xC3.toByte, 0xF4.toByte, 0xF3.toByte, 0xF3.toByte, 0xF3.toByte, + // A2 + 0xC1.toByte, 0xF2.toByte, 0xF2.toByte, + // B23 + 0xC2.toByte, 0xF3.toByte, 0xF2.toByte, 0xF3.toByte, + // A3 + 0xC1.toByte, 0xF2.toByte, 0xF3.toByte, + // C345 + 0xC3.toByte, 0xF4.toByte, 0xF3.toByte, 0xF4.toByte, 0xF5.toByte, + // A4 + 0xC1.toByte, 0xF2.toByte, 0xF4.toByte, + // A5 + 0xC1.toByte, 0xF2.toByte, 0xF5.toByte, + ) + + val expected: String = + """[ { + | "Seg_Id0" : "ID_0_0", + | "IND" : "A", + | "SEGMENT1" : { + | "FIELD1" : "1" + | } + |}, { + | "Seg_Id0" : "ID_0_0", + | "Seg_Id1" : "ID_0_0_L1_1", + | "IND" : "B", + | "SEGMENT2" : { + | "FIELD2" : "22" + | } + |}, { + | "Seg_Id0" : "ID_0_0", + | "Seg_Id1" : "ID_0_0_L1_2", + | "IND" : "C", + | "SEGMENT3" : { + | "FIELD3" : "333" + | } + |}, { + | "Seg_Id0" : "ID_0_3", + | "IND" : "A", + | "SEGMENT1" : { + | "FIELD1" : "2" + | } + |}, { + | "Seg_Id0" : "ID_0_3", + | "Seg_Id1" : "ID_0_3_L1_1", + | "IND" : "B", + | "SEGMENT2" : { + | "FIELD2" : "23" + | } + |}, { + | "Seg_Id0" : "ID_0_5", + | "IND" : "A", + | "SEGMENT1" : { + | "FIELD1" : "3" + | } + |}, { + | "Seg_Id0" : "ID_0_5", + | "Seg_Id1" : "ID_0_5_L1_1", + | "IND" : "C", + | "SEGMENT3" : { + | "FIELD3" : "345" + | } + |}, { + | "Seg_Id0" : "ID_0_7", + | "IND" : "A", + | "SEGMENT1" : { + | "FIELD1" : "4" + | } + |}, { + | "Seg_Id0" : "ID_0_8", + | "IND" : "A", + | "SEGMENT1" : { + | "FIELD1" : "5" + | } + |} ]""".stripMargin + + "EBCDIC files" should { + "correctly work with segment id generation option with length field" in { + withTempBinFile("fix_length_reg", ".dat", binFileContentsLengthField) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "LEN") + .option("segment_field", "IND") + .option("segment_id_prefix", "ID") + .option("segment_id_level0", "A") + .option("segment_id_level1", "_") + .option("redefine-segment-id-map:0", "SEGMENT1 => A") + .option("redefine-segment-id-map:1", "SEGMENT2 => B") + .option("redefine-segment-id-map:2", "SEGMENT3 => C") + .option("input_split_records", 1) + .option("pedantic", "true") + .load(tmpFileName) + + val actual = SparkUtils.convertDataFrameToPrettyJSON(df.drop("LEN").orderBy("Seg_Id0", "Seg_Id1")) + + assertEqualsMultiline(actual, expected) + } + } + + "correctly work with segment id generation option with length expression" in { + withTempBinFile("fix_length_reg", ".dat", binFileContentsLengthExpr) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "LEN + 1") + .option("segment_field", "IND") + .option("segment_id_prefix", "ID") + .option("segment_id_level0", "A") + .option("segment_id_level1", "_") + .option("redefine-segment-id-map:0", "SEGMENT1 => A") + .option("redefine-segment-id-map:1", "SEGMENT2 => B") + .option("redefine-segment-id-map:2", "SEGMENT3 => C") + .option("input_split_records", 1) + .option("pedantic", "true") + .load(tmpFileName) + + val actual = SparkUtils.convertDataFrameToPrettyJSON(df.drop("LEN").orderBy("Seg_Id0", "Seg_Id1")) + + assertEqualsMultiline(actual, expected) + } + } + } +}