Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/466 standardization support for second fractions #679

Merged
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ To enable processing of time entries from other systems **Standardization** offe
string and even numeric values to timestamp or date types. It's done using Spark's ability to convert strings to
timestamp/date with some enhancements. The pattern placeholders and usage is described in Java's
[`SimpleDateFormat` class description](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) with
the addition of recognizing two keywords `epoch` and `milliepoch` (case insensitive) to denote the number of
seconds/milliseconds since epoch (1970/01/01 00:00:00.000 UTC).
the addition of recognizing some keywords (like `epoch` and `milliepoch` (case insensitive)) to denote the number of
seconds/milliseconds since epoch (1970/01/01 00:00:00.000 UTC) and some additional placeholders.
It should be noted explicitly that `epoch` and `milliepoch` are considered a pattern including time zone.

Summary:
Expand Down Expand Up @@ -254,13 +254,21 @@ Summary:
| z | General time zone | Pacific Standard Time; PST; GMT-08:00 |
| Z | RFC 822 time zone | -0800 |
| X | ISO 8601 time zone | -08; -0800; -08:00 |
| _epoch_ | Seconds since 1970/01/01 00:00:00 | 1557136493|
| _milliepoch_ | Milliseconds since 1970/01/01 00:00:00.0000| 15571364938124 |
| _epoch_ | Seconds since 1970/01/01 00:00:00 | 1557136493, 1557136493.136|
Zejnilovic marked this conversation as resolved.
Show resolved Hide resolved
| _epochmilli_ | Milliseconds since 1970/01/01 00:00:00.0000| 15571364938124, 15571364938124.001 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does milli have 4 more places

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

| _epochmicro_ | Milliseconds since 1970/01/01 00:00:00.0000| 15571364938124789, 15571364938124789.999 |
GeorgiChochov marked this conversation as resolved.
Show resolved Hide resolved
| _epochnano_ | Milliseconds since 1970/01/01 00:00:00.0000| 15571364938124789101 |
GeorgiChochov marked this conversation as resolved.
Show resolved Hide resolved
| i | Microsecond | 111, 321001 |
| n | Nanosecond | 999, 542113879 |


**NB!** Spark uses US Locale and because on-the-fly conversion would be complicated, at the moment we stick to this
hardcoded locale as well. E.g. `am/pm` for `a` placeholder, English names of days and months etc.

**NB!** The keywords are case **insensitive**. Therefore, there is no difference between `epoch` and `EpoCH`.

**NB!** While _nanoseconds_ designation is supported on input, it's not supported in storage or further usage. So any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put some * or something to the format or something, that would lead people to this point.

value behind microseconds precision will be truncated.

##### Time Zone support
As it has been mentioned, it's highly recommended to use timestamps with the time zone. But it's not unlikely that the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ sealed trait TypeParser {
}

object TypeParser extends StandardizationCommon {
import za.co.absa.enceladus.utils.implicits.ColumnImplicits.ColumnEnhancements

// scalastyle:off magic.number
private val decimalType = DecimalType(30,9)
// scalastyle:on magic.number

private val MillisecondsPerSecond = 1000
private val MicrosecondsPerSecond = 1000000
private val NanosecondsPerSecond = 1000000000


def standardize(field: StructField, path: String, origSchema: StructType)
(implicit udfLib: UDFLibrary): ParseOutput = {
// udfLib implicit is present for error column UDF implementation
Expand Down Expand Up @@ -266,8 +277,6 @@ object TypeParser extends StandardizationCommon {
override protected def assemblePrimitiveCastErrorLogic(castedCol: Column): Column = {
//NB! loss of precision is not addressed for any fractional type

import za.co.absa.enceladus.utils.implicits.ColumnImplicits.ColumnEnhancements

if (allowInfinity) {
castedCol.isNull or castedCol.isNaN
} else {
Expand Down Expand Up @@ -309,7 +318,6 @@ object TypeParser extends StandardizationCommon {
* Other | ->String->to_date | ->String->to_timestamp->to_utc_timestamp->to_date
*/
private trait DateTimeParser extends PrimitiveParser {
protected val basicCastFunction: (Column, String) => Column //for epoch casting
protected val pattern: DateTimePattern = DateTimePattern.fromStructField(field)

override protected def assemblePrimitiveCastLogic: Column = {
Expand All @@ -328,11 +336,6 @@ object TypeParser extends StandardizationCommon {
}
}

private def castEpoch(): Column = {
val epochPattern: String = Defaults.getGlobalFormat(field.dataType)
basicCastFunction(from_unixtime(column.cast(LongType) / pattern.epochFactor, epochPattern), epochPattern)
}

private def castWithPattern(): Column = {
// sadly with parquet support, incoming might not be all `plain`
origType match {
Expand Down Expand Up @@ -370,6 +373,10 @@ object TypeParser extends StandardizationCommon {
castStringColumn(nonStringColumn.cast(StringType))
}

protected def castEpoch(): Column = {
(column.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
}

protected def castStringColumn(stringColumn: Column): Column

protected def castDateColumn(dateColumn: Column): Column
Expand All @@ -382,16 +389,31 @@ object TypeParser extends StandardizationCommon {
path: String,
origSchema: StructType,
parent: Option[Parent]) extends DateTimeParser {
protected val basicCastFunction: (Column, String) => Column = to_date //for epoch casting

override protected def castStringColumn(stringColumn: Column): Column = {
pattern.defaultTimeZone.map(tz =>
to_date(to_utc_timestamp(to_timestamp(stringColumn, pattern), tz))
private def applyPatternToStringColumn(column: Column, pattern: String, defaultTimeZone: Option[String]): Column = {
defaultTimeZone.map(tz =>
to_date(to_utc_timestamp(to_timestamp(column, pattern), tz))
).getOrElse(
to_date(stringColumn, pattern)
to_date(column, pattern)
)
}

override def castEpoch(): Column = {
// number cannot be cast to date directly, so first casting to timestamp and then truncating
to_date(super.castEpoch())
}

override protected def castStringColumn(stringColumn: Column): Column = {
if (pattern.containsSecondFractions) {
// date doesn't need to care about second fractions
applyPatternToStringColumn(
stringColumn.removeSections(
Seq(pattern.millisecondsPosition, pattern.microsecondsPosition, pattern.nanosecondsPosition).flatten
GeorgiChochov marked this conversation as resolved.
Show resolved Hide resolved
), pattern.patternWithoutSecondFractions, pattern.defaultTimeZone)
} else {
applyPatternToStringColumn(stringColumn, pattern, pattern.defaultTimeZone)
}
}

override protected def castDateColumn(dateColumn: Column): Column = {
pattern.defaultTimeZone.map(
tz => to_date(to_utc_timestamp(dateColumn, tz))
Expand All @@ -413,11 +435,36 @@ object TypeParser extends StandardizationCommon {
path: String,
origSchema: StructType,
parent: Option[Parent]) extends DateTimeParser {
protected val basicCastFunction: (Column, String) => Column = to_timestamp //for epoch casting

private def applyPatternToStringColumn(column: Column, pattern: String, defaultTimeZone: Option[String]): Column = {
val interim: Column = to_timestamp(column, pattern)
GeorgiChochov marked this conversation as resolved.
Show resolved Hide resolved
defaultTimeZone.map(to_utc_timestamp(interim, _)).getOrElse(interim)
}

override protected def castStringColumn(stringColumn: Column): Column = {
val interim: Column = to_timestamp(stringColumn, pattern)
pattern.defaultTimeZone.map(to_utc_timestamp(interim, _)).getOrElse(interim)
if (pattern.containsSecondFractions) {
//this is a trick how to enforce fractions of seconds into teh timestamp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//this is a trick how to enforce fractions of seconds into teh timestamp
//this is a trick how to enforce fractions of seconds into the timestamp

// - turn into timestamp up to seconds precision and that into unix_timestamp,
// - the second fractions turn into numeric fractions
// - add both together and convert to timestamp
val colSeconds = unix_timestamp(applyPatternToStringColumn(
stringColumn.removeSections(
Seq(pattern.millisecondsPosition, pattern.microsecondsPosition, pattern.nanosecondsPosition).flatten
), pattern.patternWithoutSecondFractions, pattern.defaultTimeZone))

val colMilliseconds: Option[Column] =
pattern.millisecondsPosition.map(stringColumn.zeroBasedSubstr(_).cast(decimalType) / MillisecondsPerSecond)
val colMicroseconds: Option[Column] =
pattern.microsecondsPosition.map(stringColumn.zeroBasedSubstr(_).cast(decimalType) / MicrosecondsPerSecond)
val colNanoseconds: Option[Column] =
pattern.nanosecondsPosition.map(stringColumn.zeroBasedSubstr(_).cast(decimalType) / NanosecondsPerSecond)
val colFractions: Column =
(colMilliseconds ++ colMicroseconds ++ colNanoseconds).reduceOption(_ + _).getOrElse(lit(0))

(colSeconds + colFractions).cast(TimestampType)
} else {
applyPatternToStringColumn(stringColumn, pattern, pattern.defaultTimeZone)
}
}

override protected def castDateColumn(dateColumn: Column): Column = {
Expand Down
Loading