Skip to content

Commit

Permalink
Persistence Component: Append Only Digest Generation and Other Fixes (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu authored and AFine-gs committed Jan 9, 2024
1 parent 081e545 commit a8fe2ce
Show file tree
Hide file tree
Showing 163 changed files with 3,633 additions and 989 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.common;

public enum FileFormatType
{
CSV,
JSON,
AVRO,
PARQUET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.audit.Auditing;
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy;
import org.immutables.value.Value;

import java.util.Optional;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

Expand All @@ -32,7 +32,11 @@
)
public interface AppendOnlyAbstract extends IngestMode
{
Optional<String> digestField();
@Value.Default
default DigestGenStrategy digestGenStrategy()
{
return NoDigestGenStrategy.builder().build();
}

Auditing auditing();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ public Dataset visitAppendOnly(AppendOnlyAbstract appendOnly)
{
boolean isAuditingFieldPK = doesDatasetContainsAnyPK(mainSchemaFields);
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, isAuditingFieldPK));
if (appendOnly.digestField().isPresent())
{
addDigestField(mainSchemaFields, appendOnly.digestField().get());
}
appendOnly.digestGenStrategy().accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY).ifPresent(digest -> addDigestField(mainSchemaFields, digest));
removeDataSplitField(appendOnly.dataSplitField());
return mainDatasetDefinitionBuilder.schema(mainSchemaDefinitionBuilder.addAllFields(mainSchemaFields).build()).build();
}
Expand Down Expand Up @@ -138,11 +135,7 @@ public Dataset visitBitemporalDelta(BitemporalDeltaAbstract bitemporalDelta)
@Override
public Dataset visitBulkLoad(BulkLoadAbstract bulkLoad)
{
Optional<String> digestField = bulkLoad.digestGenStrategy().accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY);
if (digestField.isPresent())
{
addDigestField(mainSchemaFields, digestField.get());
}
bulkLoad.digestGenStrategy().accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY).ifPresent(digest -> addDigestField(mainSchemaFields, digest));
Field batchIdField = Field.builder()
.name(bulkLoad.batchIdField())
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.*;
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategyAbstract;
Expand Down Expand Up @@ -75,7 +77,7 @@ public IngestMode visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return AppendOnly
.builder()
.digestField(applyCase(appendOnly.digestField()))
.digestGenStrategy(appendOnly.digestGenStrategy().accept(new DigestGenStrategyCaseConverter()))
.auditing(appendOnly.auditing().accept(new AuditingCaseConverter()))
.deduplicationStrategy(appendOnly.deduplicationStrategy())
.versioningStrategy(appendOnly.versioningStrategy().accept(new VersionStrategyCaseConverter()))
Expand Down Expand Up @@ -244,6 +246,14 @@ public DigestGenStrategy visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrateg
.digestField(applyCase(udfBasedDigestGenStrategy.digestField()))
.build();
}

@Override
public DigestGenStrategy visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return UserProvidedDigestGenStrategy.builder()
.digestField(applyCase(userProvidedDigestGenStrategy.digestField()))
.build();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UDFBasedDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;

import java.util.Collections;
Expand All @@ -37,7 +38,7 @@ private IngestModeVisitors()
@Override
public Boolean visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return appendOnly.filterExistingRecords();
return appendOnly.digestGenStrategy().accept(DIGEST_GEN_STRATEGY_DIGEST_REQUIRED);
}

@Override
Expand Down Expand Up @@ -88,7 +89,7 @@ public Boolean visitBulkLoad(BulkLoadAbstract bulkLoad)
@Override
public Optional<String> visitAppendOnly(AppendOnlyAbstract appendOnly)
{
return appendOnly.digestField();
return appendOnly.digestGenStrategy().accept(EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY);
}

@Override
Expand Down Expand Up @@ -140,7 +141,7 @@ public Optional<String> visitBulkLoad(BulkLoadAbstract bulkLoad)
public Set<String> visitAppendOnly(AppendOnlyAbstract appendOnly)
{
Set<String> metaFields = new HashSet<>();
appendOnly.digestField().ifPresent(metaFields::add);
appendOnly.digestGenStrategy().accept(EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY).ifPresent(metaFields::add);
appendOnly.dataSplitField().ifPresent(metaFields::add);
return metaFields;
}
Expand Down Expand Up @@ -374,6 +375,12 @@ public Boolean visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract
{
return true;
}

@Override
public Boolean visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return true;
}
};

public static final DigestGenStrategyVisitor<Optional<String>> EXTRACT_DIGEST_FIELD_FROM_DIGEST_GEN_STRATEGY = new DigestGenStrategyVisitor<Optional<String>>()
Expand All @@ -389,6 +396,12 @@ public Optional<String> visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategy
{
return Optional.of(udfBasedDigestGenStrategy.digestField());
}

@Override
public Optional<String> visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return Optional.of(userProvidedDigestGenStrategy.digestField());
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public interface DigestGenStrategyVisitor<T>
T visitNoDigestGenStrategy(NoDigestGenStrategyAbstract noDigestGenStrategy);

T visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract udfBasedDigestGenStrategy);

T visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.List;
import java.util.stream.Collectors;

public class DigestGenerationHandler implements DigestGenStrategyVisitor<Void>
{
private List<Value> fieldsToSelect;
private List<Value> fieldsToInsert;
private Dataset stagingDataset;
private Dataset mainDataset;

public DigestGenerationHandler(Dataset mainDataset, Dataset stagingDataset, List<Value> fieldsToSelect, List<Value> fieldsToInsert)
{
this.mainDataset = mainDataset;
this.stagingDataset = stagingDataset;
this.fieldsToSelect = fieldsToSelect;
this.fieldsToInsert = fieldsToInsert;
}

@Override
public Void visitNoDigestGenStrategy(NoDigestGenStrategyAbstract noDigestGenStrategy)
{
return null;
}

@Override
public Void visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract udfBasedDigestGenStrategy)
{
Value digestValue = DigestUdf
.builder()
.udfName(udfBasedDigestGenStrategy.digestUdfName())
.addAllFieldNames(stagingDataset.schemaReference().fieldValues().stream().map(fieldValue -> fieldValue.fieldName()).collect(Collectors.toList()))
.addAllValues(fieldsToSelect)
.dataset(stagingDataset)
.build();
String digestField = udfBasedDigestGenStrategy.digestField();
fieldsToInsert.add(FieldValue.builder().datasetRef(mainDataset.datasetReference()).fieldName(digestField).build());
fieldsToSelect.add(digestValue);
return null;
}

@Override
public Void visitUserProvidedDigestGenStrategy(UserProvidedDigestGenStrategyAbstract userProvidedDigestGenStrategy)
{
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.immutables.value.Value;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface UserProvidedDigestGenStrategyAbstract extends DigestGenStrategy
{
String digestField();

@Override
default <T> T accept(DigestGenStrategyVisitor<T> visitor)
{
return visitor.visitUserProvidedDigestGenStrategy(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface DatasetAdditionalPropertiesAbstract extends LogicalPlanNode

Optional<TableOrigin> tableOrigin();

Optional<String> externalVolume();
Optional<IcebergProperties> icebergProperties();

Map<String, String> tags();
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ public Dataset applyCaseOnDataset(Dataset dataset, Function<String, String> stra
return derivedDataset;
}

if (dataset instanceof FilteredDataset)
{
FilteredDataset filteredDataset = FilteredDataset.builder()
.name(newName.orElseThrow(IllegalStateException::new))
.group(newSchemaName)
.database(newDatabaseName)
.schema(schemaDefinition)
.filter(((FilteredDataset) dataset).filter())
.datasetAdditionalProperties(dataset.datasetAdditionalProperties())
.build();

if (dataset.datasetReference().alias().isPresent())
{
filteredDataset = filteredDataset.withAlias(dataset.datasetReference().alias().get());
}
return filteredDataset;
}

if (dataset instanceof StagedFilesDataset)
{
StagedFilesDataset stagedFilesDataset = StagedFilesDataset.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.immutables.value.Value;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface FilteredDatasetAbstract extends DatasetDefinitionAbstract
{
Condition filter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.common;
package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.immutables.value.Value;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

import java.util.Optional;

@Value.Immutable
@Value.Style(
@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface LoadOptionsAbstract
public interface IcebergPropertiesAbstract extends LogicalPlanNode
{
Optional<String> fieldDelimiter();

Optional<String> encoding();

Optional<String> nullMarker();

Optional<String> quote();

Optional<Long> skipLeadingRows();
String catalog();

Optional<Long> maxBadRecords();
String externalVolume();

Optional<String> compression();
String baseLocation();
}
Loading

0 comments on commit a8fe2ce

Please sign in to comment.