From cac986ec688496761fe242160702c7c57a532d0d Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 18 Oct 2024 09:05:43 +0200 Subject: [PATCH] #719 Improve the error message about file size not divisible by record size. --- .../spark/cobol/parameters/spark_test.sc | 25 +++++++++ .../cobol/source/scanners/CobolScanners.scala | 22 +++++--- .../cobrix/spark/cobol/utils/FileUtils.scala | 8 +-- .../integration/Test27RecordLengthSpec.scala | 2 +- .../regression/Test07IgnoreHiddenFiles.scala | 51 ++++++++++++++----- .../spark/cobol/utils/FileUtilsSpec.scala | 34 +++++++------ 6 files changed, 104 insertions(+), 38 deletions(-) create mode 100644 spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/spark_test.sc diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/spark_test.sc b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/spark_test.sc new file mode 100644 index 00000000..10df33db --- /dev/null +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/spark_test.sc @@ -0,0 +1,25 @@ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ + +val spark: SparkSession = SparkSession.builder() + .master("local[2]") + .appName("test") + .config("spark.ui.enabled", "false") + .config("spark.driver.bindAddress","127.0.0.1") + .config("spark.driver.host", "127.0.0.1") + .config("spark.sql.session.timeZone", "Africa/Johannesburg") + .getOrCreate() + +import spark.implicits._ + +val df = List(+0.0, -0.0, 0, 1.0, 1.001, 1.000001, 1.99, 1.999, -1.9999999, -1.001).toDF("n") + +val df2 = df + .withColumn("to_long", col("n") .cast("long")) + .withColumn("to_decimal", col("n").cast("decimal(38,0)")) + .withColumn("floor", floor(col("n"))) + .withColumn("round", when(col("n") >= 0, floor(col("n").cast("decimal(15,10)"))).otherwise(ceil(col("n").cast("decimal(15,10)")))) + +df2.printSchema() + +df2.show() diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala index d062da13..570571b7 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala @@ -95,8 +95,19 @@ private[source] object CobolScanners extends Logging { val recordSize = reader.getRecordSize sourceDirs.foreach(sourceDir => { - if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, conf, recordSize)) { - throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.") + if (!debugIgnoreFileSize) { + val nonDivisibleFiles = getNonDivisibleFiles(sourceDir, conf, recordSize) + + if (nonDivisibleFiles.nonEmpty) { + nonDivisibleFiles.head match { + case (name, size) => + if (nonDivisibleFiles.length > 1) { + throw new IllegalArgumentException(s"Multiple file sizes are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Example file: $name size ($size bytes).") + } else { + throw new IllegalArgumentException(s"File $name size ($size bytes) is NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record).") + } + } + } } }) @@ -164,15 +175,14 @@ private[source] object CobolScanners extends Logging { recordParser(reader, records) } - private def areThereNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Boolean = { - + private def getNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Seq[(String, Long)] = { val fileSystem = new Path(sourceDir).getFileSystem(hadoopConfiguration) if (FileUtils.getNumberOfFilesInDir(sourceDir, fileSystem) < FileUtils.THRESHOLD_DIR_LENGTH_FOR_SINGLE_FILE_CHECK) { - FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem) > 0 + FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem) } else { - FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem) + FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem).toSeq.sorted } } } \ No newline at end of file diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala index a213df04..35ec188f 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala @@ -186,7 +186,7 @@ object FileUtils extends Logging { /** * Finds the first file that is non-divisible by a given divisor and logs its name. */ - def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Boolean = { + def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Option[(String, Long)] = { val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter)) @@ -197,13 +197,13 @@ object FileUtils extends Logging { logger.error(s"File ${firstNonDivisibleFile.get.getPath} size (${firstNonDivisibleFile.get.getLen}) IS NOT divisible by $divisor.") } - firstNonDivisibleFile.isDefined + firstNonDivisibleFile.map(status => (status.getPath.toString, status.getLen)) } /** * Finds all the files the are not divisible by a given divisor and logs their names. */ - def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Long = { + def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Seq[(String, Long)] = { val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter)) @@ -213,7 +213,7 @@ object FileUtils extends Logging { allNonDivisibleFiles.foreach(file => logger.error(s"File ${file.getPath} size (${file.getLen}) IS NOT divisible by $divisor.")) } - allNonDivisibleFiles.length + allNonDivisibleFiles.map(status => (status.getPath.toString, status.getLen)) } private def isNonDivisible(fileStatus: FileStatus, divisor: Long) = fileStatus.getLen % divisor != 0 diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala index 21307117..d2671860 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala @@ -78,7 +78,7 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF val ex = intercept[IllegalArgumentException] { getDataFrame(tmpFileName, Map("record_length" -> "7")).collect() } - assert(ex.getMessage.contains("are NOT DIVISIBLE by the RECORD SIZE")) + assert(ex.getMessage.contains("NOT DIVISIBLE by the RECORD SIZE")) } } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test07IgnoreHiddenFiles.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test07IgnoreHiddenFiles.scala index 6e4f4c75..b771d9e0 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test07IgnoreHiddenFiles.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test07IgnoreHiddenFiles.scala @@ -30,17 +30,29 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp test("Test findAndLogFirstNonDivisibleFile() finds a file") { withTempDirectory("testHidden1") { tmpDir => - createFileSize1(Files.createFile(Paths.get(tmpDir, "a"))) - assert(FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)) - assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1) + createFileSize1(Files.createFile(Paths.get(tmpDir, "a-file.dat"))) + + val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem) + val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) + + assert(nonDivisibleFileOpt.isDefined) + assert(nonDivisibleFileOpt.get._1.endsWith("a-file.dat")) + assert(nonDivisibleFileOpt.get._2 == 1) + + assert(nonDivisibleFiles.length == 1) + assert(nonDivisibleFiles.head._2 == 1) + assert(nonDivisibleFiles.head._1.endsWith("a-file.dat")) } } test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file") { withTempDirectory("testHidden1") { tmpDir => createFileSize1(Files.createFile(Paths.get(tmpDir, ".a"))) - assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)) - assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0) + val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem) + val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) + + assert(nonDivisibleFileOpt.isEmpty) + assert(nonDivisibleFiles.isEmpty) } } @@ -48,8 +60,11 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp withTempDirectory("testHidden3") { tmpDir => Files.createDirectory(Paths.get(tmpDir, "dir1")) createFileSize1(Files.createFile(Paths.get(tmpDir, "dir1", ".b2"))) - assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)) - assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0) + val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem) + val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) + + assert(nonDivisibleFileOpt.isEmpty) + assert(nonDivisibleFiles.isEmpty) } } @@ -57,16 +72,28 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp withTempDirectory("testHidden4") { tmpDir => Files.createDirectory(Paths.get(tmpDir, ".dir2")) createFileSize1(Files.createFile(Paths.get(tmpDir, ".dir2", "c1"))) - assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)) - assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0) + val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem) + val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) + + assert(nonDivisibleFileOpt.isEmpty) + assert(nonDivisibleFiles.isEmpty) } } test("Test findAndLogFirstNonDivisibleFile() works with globbing") { withTempDirectory("testHidden1") { tmpDir => - createFileSize1(Files.createFile(Paths.get(tmpDir, "a"))) - assert(FileUtils.findAndLogFirstNonDivisibleFile(s"$tmpDir/*", 2, fileSystem)) - assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1) + createFileSize1(Files.createFile(Paths.get(tmpDir, "a.dat"))) + + val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem) + val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) + + assert(nonDivisibleFileOpt.isDefined) + assert(nonDivisibleFileOpt.get._1.endsWith("a.dat")) + assert(nonDivisibleFileOpt.get._2 == 1) + + assert(nonDivisibleFiles.length == 1) + assert(nonDivisibleFiles.head._1.endsWith("a.dat")) + assert(nonDivisibleFiles.head._2 == 1) } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtilsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtilsSpec.scala index 1a9ccd1d..3aedd255 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtilsSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtilsSpec.scala @@ -127,7 +127,6 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft } it should "return the number of files inside a directory" in { - val length = 10 val numFiles = 5 @@ -137,12 +136,10 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft } it should "return 0 if there are no files inside a directory" in { - assertResult(0)(FileUtils.getNumberOfFilesInDir(controlledLengthFilesDir.getAbsolutePath, fileSystem)) } it should "return 1 if there source is actually a file" in { - val aFile = getRandomFileToBeWritten produceFileOfLength(aFile, 10) @@ -150,27 +147,30 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft } it should "return the file itself if non-divisible and if asked for first file" in { - val aFile = getRandomFileToBeWritten val divisor = 10 produceFileOfLength(aFile, divisor + 1) - assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem)) + val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem) + + assert(notDivisibleFileOpt.isDefined) + assert(notDivisibleFileOpt.get._2 == 11) } it should "return the file itself if non-divisible and if asked for multiple files" in { - val aFile = getRandomFileToBeWritten val divisor = 10 produceFileOfLength(aFile, divisor + 1) - assertResult(1)(FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem)) + val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem) + + assert(notDivisibleFiles.length == 1) + assert(notDivisibleFiles.head._2 == 11) } it should "return true if found first non-divisible file" in { - val divisor = 10 produceFileOfLength(getRandomFileToBeWritten, divisor) @@ -178,11 +178,12 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft produceFileOfLength(getRandomFileToBeWritten, divisor * 3) produceFileOfLength(getRandomFileToBeWritten, divisor + 1) // non-divisible - assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)) + val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem) + + assert(notDivisibleFileOpt.isDefined) } it should "return number of non-divisible files" in { - val divisor = 10 produceFileOfLength(getRandomFileToBeWritten, divisor) @@ -191,11 +192,12 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft produceFileOfLength(getRandomFileToBeWritten, divisor * 4 + 1) // non-divisible produceFileOfLength(getRandomFileToBeWritten, divisor * 5 + 1) // non-divisible - assertResult(2)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)) + val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem) + + assert(notDivisibleFiles.length == 2) } it should "return false if no files are non-divisible by expected divisor" in { - val divisor = 10 produceFileOfLength(getRandomFileToBeWritten, divisor) @@ -203,12 +205,12 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft produceFileOfLength(getRandomFileToBeWritten, divisor * 3) produceFileOfLength(getRandomFileToBeWritten, divisor * 4) - assertResult(false)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)) + val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem) + assert(notDivisibleFileOpt.isEmpty) } it should "return 0 if no files are non-divisible by expected divisor" in { - val divisor = 10 produceFileOfLength(getRandomFileToBeWritten, divisor) @@ -217,7 +219,9 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft produceFileOfLength(getRandomFileToBeWritten, divisor * 4) // non-divisible produceFileOfLength(getRandomFileToBeWritten, divisor * 5) // non-divisible - assertResult(0)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)) + val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem) + + assert(notDivisibleFiles.isEmpty) } private def getRandomFileToBeWritten: File = new File(controlledLengthFilesDir, UUID.randomUUID().toString)