Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Component: Append Only Digest Generation and Other Fixes #2451

Merged
merged 148 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
148 commits
Select commit Hold shift + click to select a range
130c0c2
Bug Fix: Bitemporal milestoning Derive Main schema removes the VALID_…
prasar-ashutosh Aug 14, 2023
b25832b
Bug Fix: Bitemporal milestoning Schema Evolution must ignore user pro…
prasar-ashutosh Aug 14, 2023
f4d6186
Adding code for concurrent safety feature
prasar-ashutosh Aug 16, 2023
f43bd95
Adding test for Multi Ingest Mode with concurrent Safety
prasar-ashutosh Aug 16, 2023
f712980
Adding tests for concurrent safety
prasar-ashutosh Aug 17, 2023
8973f91
Code Clean up
prasar-ashutosh Aug 17, 2023
97d93c9
Bug Fix: Bitemporal temp tables must be deleted after usage
prasar-ashutosh Aug 18, 2023
5b6e979
Update readme and code review comments
prasar-ashutosh Aug 18, 2023
2d45a3a
Fix typo
prasar-ashutosh Aug 18, 2023
30c64f4
Fix typos in readme
prasar-ashutosh Aug 18, 2023
3263040
Bug Fix: Empty Batch Handling in Unitemp Snapshot
prasar-ashutosh Aug 21, 2023
50faeae
Bug Fix: Code review comments
prasar-ashutosh Aug 21, 2023
7d29ffd
Implement StagedFilesDatasetReference
kumuwu Aug 23, 2023
136793f
Implement StagedFilesSelection
kumuwu Aug 23, 2023
bb9b3ad
Support for Empty Batch Handling in Unitemporal Snapshot
prasar-ashutosh Aug 23, 2023
871461b
Support for FailEmptyBatch strategy in Unitemporal Snapshot
prasar-ashutosh Aug 24, 2023
ddc2bdd
Merge remote-tracking branch 'Ashutosh/concurrent_safety_support' int…
kumuwu Aug 28, 2023
fd35802
Enrich datasets to add additionalDatasetproperties every where
prasar-ashutosh Aug 28, 2023
6bd3eef
Add tests for Empty Data handling
prasar-ashutosh Aug 28, 2023
a0bccd4
Support ICEBERG_TABLE_2022 for Iceberg tables
prasar-ashutosh Aug 28, 2023
16aece8
Implement bulk load for h2
kumuwu Aug 29, 2023
96fa3b9
Merge branch 'concurrent_safety_support' of https://github.com/prasar…
kumuwu Aug 29, 2023
e2efc2a
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Aug 29, 2023
738f9c8
Add missing datasetAdditionalProperties in BigQueryHelper
kumuwu Aug 29, 2023
e82af33
Implement executor flow and add end-to-end test
kumuwu Aug 29, 2023
9753521
Add test
kumuwu Aug 30, 2023
7cc25fd
Add support for digest udf in h2 and add tests
kumuwu Aug 31, 2023
a3c7f74
Clean up
kumuwu Aug 31, 2023
a1bb51a
Add file format and validation for file format and add tests
kumuwu Aug 31, 2023
edeef49
Add missing statement
kumuwu Aug 31, 2023
b067e6e
Fix typo in year
kumuwu Sep 4, 2023
528405b
Fix comments
kumuwu Sep 4, 2023
db7a7dd
Add H2 MD5
kumuwu Sep 4, 2023
248e8f4
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 5, 2023
42ff733
Change file format interface
kumuwu Sep 5, 2023
9197ba8
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 6, 2023
a8726a9
Change stats
kumuwu Sep 7, 2023
b4b5e23
Merge branch 'zhlizh-bulk-load' of https://github.com/kumuwu/legend-e…
kumuwu Sep 11, 2023
f36e6bf
Change stats - make snowflake always return stats no matter success o…
kumuwu Sep 11, 2023
733271c
Implement bulk load in using copy and insert + modify interface for c…
kumuwu Sep 12, 2023
c82272b
Add Support for metadata for BulkLoad Task
prasar-ashutosh Sep 12, 2023
c6a2c59
Refactor Digest Generation Strategy
prasar-ashutosh Sep 13, 2023
402f683
Implement bulk load for big query
kumuwu Sep 13, 2023
68af4a8
Addressed Code Review Comments
prasar-ashutosh Sep 14, 2023
0a5b14a
Clean up
kumuwu Sep 14, 2023
998b78f
Add basic tests for bulk load for big query
kumuwu Sep 14, 2023
c1a1e54
Refactor Code to rename AppendLog to BulkLoad
prasar-ashutosh Sep 15, 2023
68f54d6
Add default bulkLoad Batchid
prasar-ashutosh Sep 15, 2023
cf6b4d0
Refactor Append Log table name
prasar-ashutosh Sep 15, 2023
9295a2a
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 19, 2023
ed912c3
Merge remote-tracking branch 'origin/master' into append_metadata
prasar-ashutosh Sep 19, 2023
f03b337
Merge branch 'append_metadata' of https://github.com/prasar-ashutosh/…
kumuwu Sep 19, 2023
3e69548
Resolve conflicts
kumuwu Sep 19, 2023
97a7eb0
Add digest udf and more tests
kumuwu Sep 20, 2023
4e28794
Fix digest problem
kumuwu Sep 20, 2023
d804bd2
Change H2 digest algo
kumuwu Sep 21, 2023
106de03
Fix tests
kumuwu Sep 21, 2023
2e630df
Fix typo
kumuwu Sep 21, 2023
00f3f0b
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 22, 2023
31b2199
Refactor file format and load options
kumuwu Sep 26, 2023
b948e02
Refactor dataset, selection, reference logic
kumuwu Sep 26, 2023
6611f90
Fix other comments
kumuwu Sep 26, 2023
33f9e88
Fix big query bulk load ingestor flow and add end-to-end tests
kumuwu Sep 28, 2023
17e9349
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Oct 2, 2023
e1a5f5d
Add rows with error handling and test
kumuwu Oct 3, 2023
577082c
Address comments
kumuwu Oct 3, 2023
8d7316a
Bulk Load Batch ID and Task ID & PK Validation (#11)
kumuwu Oct 4, 2023
9e5ff38
Increase the precision of timestamp to micros
prasar-ashutosh Sep 27, 2023
196b1be
Initial Design of Versioning and dedup model
prasar-ashutosh Sep 29, 2023
238e719
Move Dedup and versioning at top level
prasar-ashutosh Sep 29, 2023
deefe1b
Introduce temp Staging Dataset in the Planner
prasar-ashutosh Oct 3, 2023
699e004
Restructuring Code to versioning and deduplication
prasar-ashutosh Oct 3, 2023
ce1146c
Adding code for invoking deduplication and versioning
prasar-ashutosh Oct 4, 2023
2adf9c7
Sql tests for Non temp Snapshot
prasar-ashutosh Oct 5, 2023
6ad633b
Add validation for Append Only
prasar-ashutosh Oct 5, 2023
05e8791
Fix validation in nontemporal snapshot
kumuwu Oct 5, 2023
c904d99
Fix the sqls for dedup and versioning and add H2 tests
prasar-ashutosh Oct 6, 2023
453f3ef
Fix the sqls for dedup and versioning and add H2 tests
prasar-ashutosh Oct 6, 2023
ed6d37c
Fix AppendOnly and prepare skeleton for testing cases
kumuwu Oct 6, 2023
fd32f7c
Merge branch 'all_versions' of https://github.com/prasar-ashutosh/leg…
kumuwu Oct 6, 2023
9077d90
Fix the sqls for dedup and versioning and add H2 tests
prasar-ashutosh Oct 6, 2023
69db789
Fix the sqls for dedup and versioning and add H2 tests
prasar-ashutosh Oct 6, 2023
62b860e
Fix AppendOnly tests
kumuwu Oct 9, 2023
b884f65
Merge branch 'all_versions' of https://github.com/prasar-ashutosh/leg…
kumuwu Oct 9, 2023
cb8e05c
Add logic for throwing error in case of Fail on dups or data error
prasar-ashutosh Oct 9, 2023
1954934
Add tests for dedup and versioning
prasar-ashutosh Oct 10, 2023
6a0e05e
Add h2 tests for AppendOnly
kumuwu Oct 10, 2023
ea89d6c
Merge branch 'all_versions' of https://github.com/prasar-ashutosh/leg…
kumuwu Oct 10, 2023
75f9270
Change interface for ingest to return List of Result
prasar-ashutosh Oct 10, 2023
944920e
Add tests for H2 nontemporal snaphot
prasar-ashutosh Oct 10, 2023
8a6edf9
Fix the logic for incoming records count
prasar-ashutosh Oct 11, 2023
078b2de
Fix bitemp from and through SQL tests
kumuwu Oct 11, 2023
315bb10
Fix some bitemp tests
kumuwu Oct 11, 2023
19e7ae7
Refactor Version Resolver
prasar-ashutosh Oct 11, 2023
12d022d
Refactor Version Resolver
prasar-ashutosh Oct 11, 2023
e813eeb
Fix all bitemp tests
kumuwu Oct 11, 2023
66514f2
Merge branch 'all_versions' of https://github.com/prasar-ashutosh/leg…
kumuwu Oct 11, 2023
378651e
Test cases for unitemp snapshot
prasar-ashutosh Oct 11, 2023
49b22ee
Fix Append Only all version h2 tests
kumuwu Oct 12, 2023
53c918c
Fix upper case and fix missed stats SQL test changes
kumuwu Oct 12, 2023
ed9c191
Test cases for unitemp snapshot
prasar-ashutosh Oct 12, 2023
d69de62
Test cases for unitemp snapshot
prasar-ashutosh Oct 13, 2023
ab4667b
Changes for unitemp Delta and nontemp delta
prasar-ashutosh Oct 18, 2023
380ee52
Tests for non temporal Delta
prasar-ashutosh Oct 18, 2023
98b39c9
Tests for unitemp delta
prasar-ashutosh Oct 20, 2023
8f7b8e7
Merge with Master
prasar-ashutosh Oct 20, 2023
38c9791
Clean up Code
prasar-ashutosh Oct 20, 2023
d12c897
Clean up Version Resolver
prasar-ashutosh Oct 20, 2023
adc6742
Rename performVersioning to performStageVersioning
prasar-ashutosh Oct 20, 2023
00d2aa8
Clean up Version Resolver
prasar-ashutosh Oct 23, 2023
82db3da
Fix tests
prasar-ashutosh Oct 25, 2023
22cbb36
Add validations in Max and All version
prasar-ashutosh Oct 25, 2023
5fd693f
Clean up Nontemp Snapshot Tests
prasar-ashutosh Oct 25, 2023
ff6afe4
Clean up Unitemp Snapshot Tests
prasar-ashutosh Oct 26, 2023
8eda737
Clean up Unitemp and non temp delta Tests
prasar-ashutosh Oct 26, 2023
e27866d
Clean up Append Scenarios
prasar-ashutosh Oct 26, 2023
04a9ddf
H2 Tests for unitemp Delta
prasar-ashutosh Oct 27, 2023
03c12e1
H2 Tests for unitemp Delta
prasar-ashutosh Oct 30, 2023
aee96f2
Fix for Incoming rows count stats
prasar-ashutosh Oct 30, 2023
6232093
Remaining tests for unitemp delta
prasar-ashutosh Oct 30, 2023
2cf059d
Refactor the validations
prasar-ashutosh Oct 31, 2023
728ccea
Add unitemp snapshot h2 tests
kumuwu Oct 31, 2023
fb17f95
Merge branch 'master' into dedup_and_version
prasar-ashutosh Nov 1, 2023
6ebb580
Reuse a single empty file for all tests
kumuwu Nov 1, 2023
8ccf1b2
Merge branch 'dedup_and_version' of https://github.com/prasar-ashutos…
kumuwu Nov 1, 2023
d061656
Fix the mapping in persistence runner and associated tests
prasar-ashutosh Nov 2, 2023
d430101
Fix fields to exclude for digest + clean up
kumuwu Nov 2, 2023
61e63be
Address Review comments
prasar-ashutosh Nov 3, 2023
bcfd493
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Nov 3, 2023
029528c
Add digest generation for append only + h2 tests
kumuwu Nov 6, 2023
be220be
Add tests for other sinks + fix bugs in big query
kumuwu Nov 9, 2023
93b3fd2
Fix show commands not being properly quoted
kumuwu Nov 10, 2023
758bac6
Implement UserProvidedDigestGenStrategy and change Append Only's dige…
kumuwu Nov 10, 2023
37b5429
Fix persistence test runner
kumuwu Nov 10, 2023
c482858
Fix persistence test runner
kumuwu Nov 10, 2023
a1d61e1
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Nov 14, 2023
9eb704b
Fix show commands not being properly quoted
kumuwu Nov 14, 2023
1cc0c1a
Fix persistence test runner
kumuwu Nov 15, 2023
75830f6
Implement load options for snowflake bulk load
kumuwu Nov 15, 2023
d38f116
Update snowflake create iceberg table syntax
kumuwu Nov 16, 2023
9975d56
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Nov 16, 2023
95a68aa
Address comments
kumuwu Nov 22, 2023
70c0eb6
Memsql Bug Fix for Sharded Table
prasar-ashutosh Nov 22, 2023
892acf6
Implement FilteredDataset and add SQL tests
kumuwu Nov 22, 2023
953b744
Merge branch 'zhlizh-append-only-digest' of https://github.com/kumuwu…
kumuwu Nov 22, 2023
06cf988
Add h2 tests
kumuwu Nov 23, 2023
c2ac933
Support for filepaths and filepatterns in Snowflake Load
prasar-ashutosh Nov 27, 2023
abdd853
Address Code Review Comments
prasar-ashutosh Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.common;

public enum FileFormatType
{
CSV,
JSON,
AVRO,
PARQUET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.audit.Auditing;
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy;
import org.immutables.value.Value;

import java.util.Optional;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

Expand All @@ -32,7 +32,11 @@
)
public interface AppendOnlyAbstract extends IngestMode
{
Optional<String> digestField();
@Value.Default
default DigestGenStrategy digestGenStrategy()
{
return NoDigestGenStrategy.builder().build();
}

Auditing auditing();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ public Dataset visitAppendOnly(AppendOnlyAbstract appendOnly)
{
boolean isAuditingFieldPK = doesDatasetContainsAnyPK(mainSchemaFields);
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, isAuditingFieldPK));
if (appendOnly.digestField().isPresent())
{
addDigestField(mainSchemaFields, appendOnly.digestField().get());
}
appendOnly.digestGenStrategy().accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY).ifPresent(digest -> addDigestField(mainSchemaFields, digest));
removeDataSplitField(appendOnly.dataSplitField());
return mainDatasetDefinitionBuilder.schema(mainSchemaDefinitionBuilder.addAllFields(mainSchemaFields).build()).build();
}
Expand Down Expand Up @@ -138,11 +135,7 @@ public Dataset visitBitemporalDelta(BitemporalDeltaAbstract bitemporalDelta)
@Override
public Dataset visitBulkLoad(BulkLoadAbstract bulkLoad)
{
Optional<String> digestField = bulkLoad.digestGenStrategy().accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY);
if (digestField.isPresent())
{
addDigestField(mainSchemaFields, digestField.get());
}
bulkLoad.digestGenStrategy().accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY).ifPresent(digest -> addDigestField(mainSchemaFields, digest));
Field batchIdField = Field.builder()
.name(bulkLoad.batchIdField())
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.*;
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategyAbstract;
Expand Down Expand Up @@ -75,7 +77,7 @@ public IngestMode visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return AppendOnly
.builder()
.digestField(applyCase(appendOnly.digestField()))
.digestGenStrategy(appendOnly.digestGenStrategy().accept(new DigestGenStrategyCaseConverter()))
.auditing(appendOnly.auditing().accept(new AuditingCaseConverter()))
.deduplicationStrategy(appendOnly.deduplicationStrategy())
.versioningStrategy(appendOnly.versioningStrategy().accept(new VersionStrategyCaseConverter()))
Expand Down Expand Up @@ -244,6 +246,14 @@ public DigestGenStrategy visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrateg
.digestField(applyCase(udfBasedDigestGenStrategy.digestField()))
.build();
}

@Override
public DigestGenStrategy visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return UserProvidedDigestGenStrategy.builder()
.digestField(applyCase(userProvidedDigestGenStrategy.digestField()))
.build();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UDFBasedDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;

import java.util.Collections;
Expand All @@ -37,7 +38,7 @@ private IngestModeVisitors()
@Override
public Boolean visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return appendOnly.filterExistingRecords();
return appendOnly.digestGenStrategy().accept(DIGEST_GEN_STRATEGY_DIGEST_REQUIRED);
}

@Override
Expand Down Expand Up @@ -88,7 +89,7 @@ public Boolean visitBulkLoad(BulkLoadAbstract bulkLoad)
@Override
public Optional<String> visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return appendOnly.digestField();
return appendOnly.digestGenStrategy().accept(EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY);
}

@Override
Expand Down Expand Up @@ -140,7 +141,7 @@ public Optional<String> visitBulkLoad(BulkLoadAbstract bulkLoad)
public Set<String> visitAppendOnly(AppendOnlyAbstract appendOnly)
{
Set<String> metaFields = new HashSet<>();
appendOnly.digestField().ifPresent(metaFields::add);
appendOnly.digestGenStrategy().accept(EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY).ifPresent(metaFields::add);
appendOnly.dataSplitField().ifPresent(metaFields::add);
return metaFields;
}
Expand Down Expand Up @@ -374,6 +375,12 @@ public Boolean visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract
{
return true;
}

@Override
public Boolean visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return true;
}
};

public static final DigestGenStrategyVisitor<Optional<String>> EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY = new DigestGenStrategyVisitor<Optional<String>>()
Expand All @@ -389,6 +396,12 @@ public Optional<String> visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategy
{
return Optional.of(udfBasedDigestGenStrategy.digestField());
}

@Override
public Optional<String> visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return Optional.of(userProvidedDigestGenStrategy.digestField());
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public interface DigestGenStrategyVisitor<T>
T visitNoDigestGenStrategy(NoDigestGenStrategyAbstract noDigestGenStrategy);

T visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract udfBasedDigestGenStrategy);

T visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.List;
import java.util.stream.Collectors;

public class DigestGenerationHandler implements DigestGenStrategyVisitor<Void>
{
private List<Value> fieldsToSelect;
private List<Value> fieldsToInsert;
private Dataset stagingDataset;
private Dataset mainDataset;

public DigestGenerationHandler(Dataset mainDataset, Dataset stagingDataset, List<Value> fieldsToSelect, List<Value> fieldsToInsert)
{
this.mainDataset = mainDataset;
this.stagingDataset = stagingDataset;
this.fieldsToSelect = fieldsToSelect;
this.fieldsToInsert = fieldsToInsert;
}

@Override
public Void visitNoDigestGenStrategy(NoDigestGenStrategyAbstract noDigestGenStrategy)
{
return null;
}

@Override
public Void visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract udfBasedDigestGenStrategy)
{
Value digestValue = DigestUdf
.builder()
.udfName(udfBasedDigestGenStrategy.digestUdfName())
.addAllFieldNames(stagingDataset.schemaReference().fieldValues().stream().map(fieldValue -> fieldValue.fieldName()).collect(Collectors.toList()))
.addAllValues(fieldsToSelect)
.dataset(stagingDataset)
.build();
String digestField = udfBasedDigestGenStrategy.digestField();
fieldsToInsert.add(FieldValue.builder().datasetRef(mainDataset.datasetReference()).fieldName(digestField).build());
fieldsToSelect.add(digestValue);
return null;
}

@Override
public Void visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.immutables.value.Value;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface UserProvidedDigestGenStrategyAbstract extends DigestGenStrategy
{
String digestField();

@Override
default <T> T accept(DigestGenStrategyVisitor<T> visitor)
{
return visitor.visitUserProvidedDigestGenStrategy(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface DatasetAdditionalPropertiesAbstract extends LogicalPlanNode

Optional<TableOrigin> tableOrigin();

Optional<String> externalVolume();
Optional<IcebergProperties> icebergProperties();

Map<String, String> tags();
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ public Dataset applyCaseOnDataset(Dataset dataset, Function<String, String> stra
return derivedDataset;
}

if (dataset instanceof FilteredDataset)
{
FilteredDataset filteredDataset = FilteredDataset.builder()
.name(newName.orElseThrow(IllegalStateException::new))
.group(newSchemaName)
.database(newDatabaseName)
.schema(schemaDefinition)
.filter(((FilteredDataset) dataset).filter())
.datasetAdditionalProperties(dataset.datasetAdditionalProperties())
.build();

if (dataset.datasetReference().alias().isPresent())
{
filteredDataset = filteredDataset.withAlias(dataset.datasetReference().alias().get());
}
return filteredDataset;
}

if (dataset instanceof StagedFilesDataset)
{
StagedFilesDataset stagedFilesDataset = StagedFilesDataset.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.immutables.value.Value;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface FilteredDatasetAbstract extends DatasetDefinitionAbstract
{
Condition filter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.common;
package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.immutables.value.Value;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

import java.util.Optional;

@Value.Immutable
@Value.Style(
@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface LoadOptionsAbstract
public interface IcebergPropertiesAbstract extends LogicalPlanNode
{
Optional<String> fieldDelimiter();

Optional<String> encoding();

Optional<String> nullMarker();

Optional<String> quote();

Optional<Long> skipLeadingRows();
String catalog();

Optional<Long> maxBadRecords();
String externalVolume();

Optional<String> compression();
String baseLocation();
}
Loading
Loading