Skip to content

Commit

Permalink
#719 Improve the error message about file size not divisible by recor…
Browse files Browse the repository at this point in the history
…d size.
  • Loading branch information
yruslan committed Oct 18, 2024
1 parent 070938d commit cac986e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("test")
.config("spark.ui.enabled", "false")
.config("spark.driver.bindAddress","127.0.0.1")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.session.timeZone", "Africa/Johannesburg")
.getOrCreate()

import spark.implicits._

val df = List(+0.0, -0.0, 0, 1.0, 1.001, 1.000001, 1.99, 1.999, -1.9999999, -1.001).toDF("n")

val df2 = df
.withColumn("to_long", col("n") .cast("long"))
.withColumn("to_decimal", col("n").cast("decimal(38,0)"))
.withColumn("floor", floor(col("n")))
.withColumn("round", when(col("n") >= 0, floor(col("n").cast("decimal(15,10)"))).otherwise(ceil(col("n").cast("decimal(15,10)"))))

df2.printSchema()

df2.show()
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,19 @@ private[source] object CobolScanners extends Logging {
val recordSize = reader.getRecordSize

sourceDirs.foreach(sourceDir => {
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, conf, recordSize)) {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
if (!debugIgnoreFileSize) {
val nonDivisibleFiles = getNonDivisibleFiles(sourceDir, conf, recordSize)

if (nonDivisibleFiles.nonEmpty) {
nonDivisibleFiles.head match {
case (name, size) =>
if (nonDivisibleFiles.length > 1) {
throw new IllegalArgumentException(s"Multiple file sizes are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Example file: $name size ($size bytes).")
} else {
throw new IllegalArgumentException(s"File $name size ($size bytes) is NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record).")
}
}
}
}
})

Expand Down Expand Up @@ -164,15 +175,14 @@ private[source] object CobolScanners extends Logging {
recordParser(reader, records)
}

private def areThereNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Boolean = {

private def getNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Seq[(String, Long)] = {
val fileSystem = new Path(sourceDir).getFileSystem(hadoopConfiguration)

if (FileUtils.getNumberOfFilesInDir(sourceDir, fileSystem) < FileUtils.THRESHOLD_DIR_LENGTH_FOR_SINGLE_FILE_CHECK) {
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem) > 0
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem)
}
else {
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem)
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem).toSeq.sorted
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object FileUtils extends Logging {
/**
* Finds the first file that is non-divisible by a given divisor and logs its name.
*/
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Boolean = {
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Option[(String, Long)] = {

val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))

Expand All @@ -197,13 +197,13 @@ object FileUtils extends Logging {
logger.error(s"File ${firstNonDivisibleFile.get.getPath} size (${firstNonDivisibleFile.get.getLen}) IS NOT divisible by $divisor.")
}

firstNonDivisibleFile.isDefined
firstNonDivisibleFile.map(status => (status.getPath.toString, status.getLen))
}

/**
* Finds all the files the are not divisible by a given divisor and logs their names.
*/
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Long = {
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Seq[(String, Long)] = {

val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))

Expand All @@ -213,7 +213,7 @@ object FileUtils extends Logging {
allNonDivisibleFiles.foreach(file => logger.error(s"File ${file.getPath} size (${file.getLen}) IS NOT divisible by $divisor."))
}

allNonDivisibleFiles.length
allNonDivisibleFiles.map(status => (status.getPath.toString, status.getLen))
}

private def isNonDivisible(fileStatus: FileStatus, divisor: Long) = fileStatus.getLen % divisor != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF
val ex = intercept[IllegalArgumentException] {
getDataFrame(tmpFileName, Map("record_length" -> "7")).collect()
}
assert(ex.getMessage.contains("are NOT DIVISIBLE by the RECORD SIZE"))
assert(ex.getMessage.contains("NOT DIVISIBLE by the RECORD SIZE"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,70 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp

test("Test findAndLogFirstNonDivisibleFile() finds a file") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
assert(FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
createFileSize1(Files.createFile(Paths.get(tmpDir, "a-file.dat")))

val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isDefined)
assert(nonDivisibleFileOpt.get._1.endsWith("a-file.dat"))
assert(nonDivisibleFileOpt.get._2 == 1)

assert(nonDivisibleFiles.length == 1)
assert(nonDivisibleFiles.head._2 == 1)
assert(nonDivisibleFiles.head._1.endsWith("a-file.dat"))
}
}

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, ".a")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isEmpty)
assert(nonDivisibleFiles.isEmpty)
}
}

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file in a nested dir") {
withTempDirectory("testHidden3") { tmpDir =>
Files.createDirectory(Paths.get(tmpDir, "dir1"))
createFileSize1(Files.createFile(Paths.get(tmpDir, "dir1", ".b2")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isEmpty)
assert(nonDivisibleFiles.isEmpty)
}
}

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden dir") {
withTempDirectory("testHidden4") { tmpDir =>
Files.createDirectory(Paths.get(tmpDir, ".dir2"))
createFileSize1(Files.createFile(Paths.get(tmpDir, ".dir2", "c1")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isEmpty)
assert(nonDivisibleFiles.isEmpty)
}
}

test("Test findAndLogFirstNonDivisibleFile() works with globbing") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
assert(FileUtils.findAndLogFirstNonDivisibleFile(s"$tmpDir/*", 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
createFileSize1(Files.createFile(Paths.get(tmpDir, "a.dat")))

val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isDefined)
assert(nonDivisibleFileOpt.get._1.endsWith("a.dat"))
assert(nonDivisibleFileOpt.get._2 == 1)

assert(nonDivisibleFiles.length == 1)
assert(nonDivisibleFiles.head._1.endsWith("a.dat"))
assert(nonDivisibleFiles.head._2 == 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
}

it should "return the number of files inside a directory" in {

val length = 10
val numFiles = 5

Expand All @@ -137,52 +136,54 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
}

it should "return 0 if there are no files inside a directory" in {

assertResult(0)(FileUtils.getNumberOfFilesInDir(controlledLengthFilesDir.getAbsolutePath, fileSystem))
}

it should "return 1 if there source is actually a file" in {

val aFile = getRandomFileToBeWritten
produceFileOfLength(aFile, 10)

assertResult(1)(FileUtils.getNumberOfFilesInDir(aFile.getAbsolutePath, fileSystem))
}

it should "return the file itself if non-divisible and if asked for first file" in {

val aFile = getRandomFileToBeWritten

val divisor = 10
produceFileOfLength(aFile, divisor + 1)

assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.isDefined)
assert(notDivisibleFileOpt.get._2 == 11)
}

it should "return the file itself if non-divisible and if asked for multiple files" in {

val aFile = getRandomFileToBeWritten

val divisor = 10
produceFileOfLength(aFile, divisor + 1)

assertResult(1)(FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.length == 1)
assert(notDivisibleFiles.head._2 == 11)
}

it should "return true if found first non-divisible file" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
produceFileOfLength(getRandomFileToBeWritten, divisor + 1) // non-divisible

assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.isDefined)
}

it should "return number of non-divisible files" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
Expand All @@ -191,24 +192,25 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
produceFileOfLength(getRandomFileToBeWritten, divisor * 4 + 1) // non-divisible
produceFileOfLength(getRandomFileToBeWritten, divisor * 5 + 1) // non-divisible

assertResult(2)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.length == 2)
}

it should "return false if no files are non-divisible by expected divisor" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
produceFileOfLength(getRandomFileToBeWritten, divisor * 4)

assertResult(false)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.isEmpty)
}

it should "return 0 if no files are non-divisible by expected divisor" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
Expand All @@ -217,7 +219,9 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
produceFileOfLength(getRandomFileToBeWritten, divisor * 4) // non-divisible
produceFileOfLength(getRandomFileToBeWritten, divisor * 5) // non-divisible

assertResult(0)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.isEmpty)
}

private def getRandomFileToBeWritten: File = new File(controlledLengthFilesDir, UUID.randomUUID().toString)
Expand Down

0 comments on commit cac986e

Please sign in to comment.