Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Library: Dry run mode support for Bulk Load and Capture Data Errors #2642

Merged
merged 39 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a4031a2
Implement SQL logging
kumuwu Feb 6, 2024
9b2346b
Clean up
kumuwu Feb 6, 2024
29bd733
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Feb 7, 2024
6ab1e54
Address code review Comments and add test for additional metadata check
prasar-ashutosh Feb 12, 2024
9ce5d58
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Feb 27, 2024
e3a19d5
Dry Run mode for Snowflake
prasar-ashutosh Feb 7, 2024
9e93ddc
Changes for Data Quality Errors - Duplicates and Data Version Error
prasar-ashutosh Feb 15, 2024
c4f87e6
Expose sample Row count as a parameter
prasar-ashutosh Feb 16, 2024
6385fac
Dry run changes for Ingestor Mode
prasar-ashutosh Feb 20, 2024
a032d01
Fix the Data Error Definition
prasar-ashutosh Feb 20, 2024
666e16a
Address code review comments
prasar-ashutosh Feb 27, 2024
1cc2276
Implement bulk load dry run generic approach
kumuwu Feb 27, 2024
b2e43a5
Changes for Adding hash to temp tables
prasar-ashutosh Feb 28, 2024
313e0df
Change logical try_cast to cast in non safe-cast flow + refactor
kumuwu Feb 28, 2024
f4ef2f0
Merge branch 'dry_run_mode_sf' of https://github.com/prasar-ashutosh/…
kumuwu Feb 28, 2024
ca4aef5
Add cleanup operations for dry run
kumuwu Feb 28, 2024
a3e1e58
Merge remote-tracking branch 'origin/master' into dry_run_mode_sf
prasar-ashutosh Feb 28, 2024
d8bff93
Rebase with master
prasar-ashutosh Feb 28, 2024
8a1e171
Implement logic for sample row count + add SQL tests + address some c…
kumuwu Feb 29, 2024
989aa2c
Address comments
kumuwu Feb 29, 2024
b4de553
Merge branch 'dry_run_mode_sf' of https://github.com/prasar-ashutosh/…
kumuwu Feb 29, 2024
e36cf4b
Add sorting for h2 datatype checks and add test for sample row count …
kumuwu Mar 1, 2024
e077251
Address comments
kumuwu Mar 4, 2024
3a5f626
Fix snowflake datatype check - to check column by column and other fixes
kumuwu Mar 4, 2024
c4252eb
Address comments
kumuwu Mar 4, 2024
d5e7af2
Fix checkstyle
kumuwu Mar 4, 2024
ea16748
Fix snowflake datatype check
kumuwu Mar 5, 2024
8f82dfe
Merge pull request #16 from kumuwu/zhlizh-bulk-load-dry-run
prasar-ashutosh Mar 5, 2024
e7a0c8f
Clean up Batch Error
prasar-ashutosh Mar 5, 2024
d07e0da
Add error category
kumuwu Mar 6, 2024
ce51c12
Merge branch 'dry_run_mode_sf' of https://github.com/prasar-ashutosh/…
kumuwu Mar 6, 2024
1d2497e
Clean up interface for Data Error
prasar-ashutosh Mar 6, 2024
39664d9
Fix test
kumuwu Mar 6, 2024
e880452
Add exception handling and adapt to new DataError model
kumuwu Mar 7, 2024
3852239
Handle Data Errors in Duplicates and Data version error cases
prasar-ashutosh Mar 8, 2024
342ccfc
Fix tests
kumuwu Mar 11, 2024
78ed3a9
Provided an interface for init Datasets to pass the ingestRunId
prasar-ashutosh Mar 11, 2024
40c6df6
Clean up the ingest run id generation
prasar-ashutosh Mar 11, 2024
bea2326
Fix regex in H2 Sink to catch the exceptions
prasar-ashutosh Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,15 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- JACKSON -->
<!-- ECLIPSE COLLECTIONS -->
<dependency>
<groupId>org.eclipse.collections</groupId>
<artifactId>eclipse-collections-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.collections</groupId>
<artifactId>eclipse-collections</artifactId>
</dependency>
<!-- ECLIPSE COLLECTIONS -->
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package org.finos.legend.engine.persistence.components.common;

public enum DedupAndVersionErrorStatistics
public enum DedupAndVersionErrorSqlType
{
MAX_DUPLICATES,
MAX_DATA_ERRORS;
DUPLICATE_ROWS,
MAX_DATA_ERRORS,
DATA_ERROR_ROWS;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;

import java.util.ArrayList;
import java.util.List;

public class DeriveDataErrorRowsLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
{
private List<String> primaryKeys;
private List<String> remainingColumns;
private Dataset tempStagingDataset;
private int sampleRowCount;

public static final String DATA_VERSION_ERROR_COUNT = "legend_persistence_error_count";

public DeriveDataErrorRowsLogicalPlan(List<String> primaryKeys, List<String> remainingColumns, Dataset tempStagingDataset, int sampleRowCount)
{
this.primaryKeys = primaryKeys;
this.remainingColumns = remainingColumns;
this.tempStagingDataset = tempStagingDataset;
this.sampleRowCount = sampleRowCount;
}

@Override
public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
return null;
}

@Override
public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
if (maxVersionStrategy.performStageVersioning())
{
return getLogicalPlanForDataErrors(maxVersionStrategy.versioningField());
}
else
{
return null;
}
}

@Override
public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
if (allVersionsStrategyAbstract.performStageVersioning())
{
return getLogicalPlanForDataErrors(allVersionsStrategyAbstract.versioningField());
}
else
{
return null;
}
}

private LogicalPlan getLogicalPlanForDataErrors(String versionField)
{
List<Value> pKsAndVersion = new ArrayList<>();
for (String pk: primaryKeys)
{
pKsAndVersion.add(FieldValue.builder().fieldName(pk).build());
}
pKsAndVersion.add(FieldValue.builder().fieldName(versionField).build());

List<Value> distinctValueFields = new ArrayList<>();
for (String field: remainingColumns)
{
distinctValueFields.add(FieldValue.builder().fieldName(field).build());
}

FunctionImpl countDistinct = FunctionImpl.builder()
.functionName(FunctionName.COUNT)
.addValue(FunctionImpl.builder().functionName(FunctionName.DISTINCT).addAllValue(distinctValueFields).build())
.alias(DATA_VERSION_ERROR_COUNT)
.build();

Selection selectDataError = Selection.builder()
.source(tempStagingDataset)
.groupByFields(pKsAndVersion)
.addAllFields(pKsAndVersion)
.addFields(countDistinct)
.havingCondition(GreaterThan.of(FieldValue.builder().fieldName(DATA_VERSION_ERROR_COUNT).build(), ObjectValue.of(1)))
.limit(sampleRowCount)
.build();

return LogicalPlan.builder().addOps(selectDataError).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorStatistics;
import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
Expand All @@ -25,14 +25,14 @@
import java.util.ArrayList;
import java.util.List;

public class DeriveDataErrorCheckLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
public class DeriveMaxDataErrorLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
{

List<String> primaryKeys;
List<String> remainingColumns;
Dataset tempStagingDataset;

public DeriveDataErrorCheckLogicalPlan(List<String> primaryKeys, List<String> remainingColumns, Dataset tempStagingDataset)
public DeriveMaxDataErrorLogicalPlan(List<String> primaryKeys, List<String> remainingColumns, Dataset tempStagingDataset)
{
this.primaryKeys = primaryKeys;
this.remainingColumns = remainingColumns;
Expand Down Expand Up @@ -73,7 +73,7 @@ public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersi

private LogicalPlan getLogicalPlanForDataErrorCheck(String versionField)
{
String maxDataErrorAlias = DedupAndVersionErrorStatistics.MAX_DATA_ERRORS.name();
String maxDataErrorAlias = DedupAndVersionErrorSqlType.MAX_DATA_ERRORS.name();
String distinctRowCount = "legend_persistence_distinct_rows";
List<Value> pKsAndVersion = new ArrayList<>();
for (String pk: primaryKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.logicalplan;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
Expand Down Expand Up @@ -116,4 +117,14 @@ public static LogicalPlan getLogicalPlanForMaxOfField(Dataset dataset, String fi
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}

public static LogicalPlan getLogicalPlanForSelectAllFieldsWithStringFieldEquals(FieldValue field, String fieldValue)
{
Selection selection = Selection.builder()
.addFields(All.INSTANCE)
.source(field.datasetRef())
.condition(Equals.of(field, StringValue.of(fieldValue)))
.build();
return LogicalPlan.builder().addOps(selection).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface SelectionAbstract extends Dataset, Operation

Optional<List<Value>> groupByFields();

Optional<Condition> havingCondition();

Optional<String> alias();

Optional<Integer> limit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public interface StagedFilesDatasetProperties

List<String> filePatterns();

@Value.Derived
default boolean validationModeSupported()
{
return false;
}

@Value.Check
default void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Parameter;
import static org.immutables.value.Value.Style;

@Immutable
Expand All @@ -34,15 +33,17 @@
)
public interface CopyAbstract extends Operation
{
@Parameter(order = 0)
Dataset targetDataset();

@Parameter(order = 1)
Dataset sourceDataset();

@Parameter(order = 2)
List<Value> fields();

@Parameter(order = 3)
StagedFilesDatasetProperties stagedFilesDatasetProperties();

@org.immutables.value.Value.Default
default boolean validationMode()
{
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface MetadataFileNameFieldAbstract extends Value
{
StagedFilesDatasetProperties stagedFilesDatasetProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface MetadataRowNumberFieldAbstract extends Value
{
StagedFilesDatasetProperties stagedFilesDatasetProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Parameter;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface TryCastFunctionAbstract extends Value
{
@Parameter(order = 0)
Value field();

@Parameter(order = 1)
FieldType type();
}
Loading
Loading