Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#672 Implement parsing of copybooks with spark-cobol options #673

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Among the motivations for this project, it is possible to highlight:

- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays)

- Supports nested structures and arrays (including "flattened" nested names)
- Supports nested structures and arrays

- Supports HDFS as well as local file systems

Expand Down Expand Up @@ -319,7 +319,7 @@ The fat jar will have '-bundle' suffix. You can also download pre-built bundles

Then, run `spark-shell` or `spark-submit` adding the fat jar as the option.
```sh
$ spark-shell --jars spark-cobol_2.12_3.3-2.6.12-SNAPSHOT-bundle.jar
$ spark-shell --jars spark-cobol_2.12_3.3-2.7.0-SNAPSHOT-bundle.jar
```

> <b>A note for building and running tests on Windows</b>
Expand Down Expand Up @@ -350,8 +350,18 @@ Currently, specifying multiple paths in `load()` is not supported. Use the follo
### Spark SQL schema extraction
This library also provides convenient methods to extract Spark SQL schemas and Cobol layouts from copybooks.

If you want to extract a Spark SQL schema from a copybook:
If you want to extract a Spark SQL schema from a copybook by providing same options you provide to Spark:
```scala
// Same options that you use for spark.read.format("cobol").option()
val options = Map("schema_retention_policy" -> "keep_original")

val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook), options)
val sparkSchema = cobolSchema.getSparkSchema.toString()

println(sparkSchema)
```

If you want to extract a Spark SQL schema from a copybook using the Cobol parser directly:
```scala
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
Expand Down Expand Up @@ -1397,6 +1407,74 @@ When using `9` 8 refers to the number of digits the number has. Here, the size o
```
You can have decimals when using COMP-3 as well.

### Flattening schema with GROUPs and OCCURS
Flattening could be helpful when migrating data from mainframe data with fields that have OCCURs (arrays) to a relational
databases that do not support nested arrays.

Cobrix has a method that can flatten the schema automatically given a DataFrame produced by `spark-cobol`.

Spark Scala example:
```scala
val dfFlat = SparkUtils.flattenSchema(df, useShortFieldNames = false)
```

PySpark example
```python
from pyspark.sql import SparkSession, DataFrame, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from py4j.java_gateway import java_import

schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("subjects", ArrayType(StringType()), True)
])

# Sample data
data = [
(1, "Alice", ["Math", "Science"]),
(2, "Bob", ["History", "Geography"]),
(3, "Charlie", ["English", "Math", "Physics"])
]

# Create a test DataFrame
df = spark.createDataFrame(data, schema)

# Show the Dataframe before flattening
df.show()

# Flatten the schema using Cobrix Scala 'SparkUtils.flattenSchema' method
sc = spark.sparkContext
java_import(sc._gateway.jvm, "za.co.absa.cobrix.spark.cobol.utils.SparkUtils")
dfFlatJvm = spark._jvm.SparkUtils.flattenSchema(df._jdf, False)
dfFlat = DataFrame(dfFlatJvm, SQLContext(sc))

# Show the Dataframe after flattening
dfFlat.show(truncate=False)
dfFlat.printSchema()
```

The output looks like this:
```
# Before flattening
+---+-------+------------------------+
|id |name |subjects |
+---+-------+------------------------+
|1 |Alice |[Math, Science] |
|2 |Bob |[History, Geography] |
|3 |Charlie|[English, Math, Physics]|
+---+-------+------------------------+

# After flattening
+---+-------+----------+----------+----------+
|id |name |subjects_0|subjects_1|subjects_2|
+---+-------+----------+----------+----------+
|1 |Alice |Math |Science |null |
|2 |Bob |History |Geography |null |
|3 |Charlie|English |Math |Physics |
+---+-------+----------+----------+----------+
```

## Summary of all available options

##### File reading options
Expand Down
2 changes: 1 addition & 1 deletion cobol-converters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobrix_2.12</artifactId>
<version>2.6.12-SNAPSHOT</version>
<version>2.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cobol-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobrix_2.12</artifactId>
<version>2.6.12-SNAPSHOT</version>
<version>2.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,20 @@

package za.co.absa.cobrix.cobol.reader

import java.nio.charset.{Charset, StandardCharsets}
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.policies.FillerNamingPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{AsciiText, CobrixAsciiText, FixedLength}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{AsciiText, FixedLength}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.iterator.FixedLenNestedRowIterator
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema

import scala.collection.immutable.HashMap
import scala.reflect.ClassTag

/**
* The Cobol data reader that produces nested structure schema
*
* @param copyBookContents A copybook contents.
* @param startOffset Specifies the number of bytes at the beginning of each record that can be ignored.
* @param endOffset Specifies the number of bytes at the end of each record that can be ignored.
* @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
*/
class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
isEbcdic: Boolean,
ebcdicCodePage: CodePage,
floatingPointFormat: FloatingPointFormat,
startOffset: Int,
endOffset: Int,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
dropGroupFillers: Boolean,
dropValueFillers: Boolean,
fillerNamingPolicy: FillerNamingPolicy,
nonTerminals: Seq[String],
occursMappings: Map[String, Map[String, Int]],
readerProperties: ReaderParameters,
handler: RecordHandler[T]) extends FixedLenReader with Serializable {

Expand All @@ -63,22 +39,22 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],

override def getRecordSize: Int = {
val recordInternalsSize = readerProperties.recordLength.getOrElse(cobolSchema.getRecordSize)
recordInternalsSize + startOffset + endOffset
recordInternalsSize + readerProperties.startOffset + readerProperties.endOffset
}

@throws(classOf[Exception])
override def getRecordIterator(binaryData: Array[Byte]): Iterator[Seq[Any]] = {
checkBinaryDataValidity(binaryData)
val singleRecordIterator = readerProperties.recordFormat == AsciiText || readerProperties.recordFormat == FixedLength
new FixedLenNestedRowIterator(binaryData, cobolSchema, readerProperties, schemaRetentionPolicy, startOffset, endOffset, singleRecordIterator, handler)
new FixedLenNestedRowIterator(binaryData, cobolSchema, readerProperties, readerProperties.startOffset, readerProperties.endOffset, singleRecordIterator, handler)
}

def checkBinaryDataValidity(binaryData: Array[Byte]): Unit = {
if (startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = $startOffset. A record start offset cannot be negative.")
if (readerProperties.startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = ${readerProperties.startOffset}. A record start offset cannot be negative.")
}
if (endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = $endOffset. A record end offset cannot be negative.")
if (readerProperties.endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = ${readerProperties.endOffset}. A record end offset cannot be negative.")
}
readerProperties.recordLength match {
case Some(len) =>
Expand All @@ -96,61 +72,10 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
}

private def getExpectedLength: Int = {
cobolSchema.getRecordSize + startOffset + endOffset
cobolSchema.getRecordSize + readerProperties.startOffset + readerProperties.endOffset
}

private def loadCopyBook(copyBookContents: Seq[String]): CobolSchema = {
val encoding = if (isEbcdic) EBCDIC else ASCII
val segmentRedefines = readerProperties.multisegment.map(r => r.segmentIdRedefineMap.values.toList.distinct).getOrElse(Nil)
val fieldParentMap = readerProperties.multisegment.map(r => r.fieldParentMap).getOrElse(HashMap[String, String]())
val asciiCharset = if (readerProperties.asciiCharset.isEmpty) StandardCharsets.UTF_8 else Charset.forName(readerProperties.asciiCharset)

val schema = if (copyBookContents.size == 1)
CopybookParser.parseTree(encoding,
copyBookContents.head,
dropGroupFillers,
dropValueFillers,
fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
ebcdicCodePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
floatingPointFormat,
nonTerminals,
occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
else
Copybook.merge(
copyBookContents.map(
CopybookParser.parseTree(encoding,
_,
dropGroupFillers,
dropValueFillers,
fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
ebcdicCodePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
floatingPointFormat,
nonTerminals,
occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
)
)
new CobolSchema(schema, schemaRetentionPolicy, "", false, readerProperties.generateRecordBytes, metadataPolicy = readerProperties.metadataPolicy)
CobolSchema.fromReaderParameters(copyBookContents, readerProperties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,59 +199,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
}

private def loadCopyBook(copyBookContents: Seq[String]): CobolSchema = {
val encoding = if (readerProperties.isEbcdic) EBCDIC else ASCII
val segmentRedefines = readerProperties.multisegment.map(r => r.segmentIdRedefineMap.values.toList.distinct).getOrElse(Nil)
val fieldParentMap = readerProperties.multisegment.map(r => r.fieldParentMap).getOrElse(HashMap[String, String]())
val codePage = getCodePage(readerProperties.ebcdicCodePage, readerProperties.ebcdicCodePageClass)
val asciiCharset = if (readerProperties.asciiCharset.isEmpty) StandardCharsets.US_ASCII else Charset.forName(readerProperties.asciiCharset)

val schema = if (copyBookContents.size == 1)
CopybookParser.parseTree(encoding,
copyBookContents.head,
readerProperties.dropGroupFillers,
readerProperties.dropValueFillers,
readerProperties.fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
readerProperties.stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
codePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
readerProperties.floatingPointFormat,
readerProperties.nonTerminals,
readerProperties.occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
else
Copybook.merge(copyBookContents.map(cpb =>
CopybookParser.parseTree(encoding,
cpb,
readerProperties.dropGroupFillers,
readerProperties.dropValueFillers,
readerProperties.fillerNamingPolicy,
segmentRedefines,
fieldParentMap,
readerProperties.stringTrimmingPolicy,
readerProperties.commentPolicy,
readerProperties.strictSignOverpunch,
readerProperties.improvedNullDetection,
readerProperties.decodeBinaryAsHex,
codePage,
asciiCharset,
readerProperties.isUtf16BigEndian,
readerProperties.floatingPointFormat,
nonTerminals = readerProperties.nonTerminals,
readerProperties.occursMappings,
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
))
val segIdFieldCount = readerProperties.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0)
val segmentIdPrefix = readerProperties.multisegment.map(p => p.segmentIdPrefix).getOrElse("")
new CobolSchema(schema, readerProperties.schemaPolicy, readerProperties.inputFileNameColumn, readerProperties.generateRecordId, readerProperties.generateRecordBytes, segIdFieldCount, segmentIdPrefix)
CobolSchema.fromReaderParameters(copyBookContents, readerProperties)
}

private def checkInputArgumentsValidity(): Unit = {
Expand All @@ -271,13 +219,6 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
}
}

private def getCodePage(codePageName: String, codePageClass: Option[String]): CodePage = {
codePageClass match {
case Some(c) => CodePage.getCodePageByClass(c)
case None => CodePage.getCodePageByName(codePageName)
}
}

private def getRecordHeaderParser: RecordHeaderParser = {
val adjustment1 = if (readerProperties.isRdwPartRecLength) -4 else 0
val adjustment2 = readerProperties.rdwAdjustment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class FixedLenNestedRowIterator[T: ClassTag](
val binaryData: Array[Byte],
val cobolSchema: CobolSchema,
readerProperties: ReaderParameters,
policy: SchemaRetentionPolicy,
startOffset: Int,
endOffset: Int,
singleRecordOnly: Boolean,
Expand Down Expand Up @@ -89,7 +88,7 @@ class FixedLenNestedRowIterator[T: ClassTag](
cobolSchema.getCobolSchema.ast,
binaryData,
offset,
policy,
readerProperties.schemaPolicy,
readerProperties.variableSizeOccurs,
generateRecordBytes = readerProperties.generateRecordBytes,
activeSegmentRedefine = activeSegmentRedefine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class CobolParameters(
isEbcdic: Boolean,
ebcdicCodePage: String,
ebcdicCodePageClass: Option[String],
asciiCharset: String,
asciiCharset: Option[String],
fieldCodePage: Map[String, String],
isUtf16BigEndian: Boolean,
floatingPointFormat: FloatingPointFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class ReaderParameters(
isText: Boolean = false,
ebcdicCodePage: String = "common",
ebcdicCodePageClass: Option[String] = None,
asciiCharset: String = "",
asciiCharset: Option[String] = None,
fieldCodePage: Map[String, String] = Map.empty[String, String],
isUtf16BigEndian: Boolean = true,
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
Expand All @@ -103,7 +103,7 @@ case class ReaderParameters(
fileEndOffset: Int = 0,
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot,
stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
allowPartialRecords: Boolean = false,
multisegment: Option[MultisegmentParameters] = None,
Expand Down
Loading
Loading