Skip to content

Commit

Permalink
Change interface for ingest to return List of Result
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Oct 10, 2023
1 parent ea89d6c commit 75f9270
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,13 @@ public static LogicalPlan getLogicalPlanForMinAndMaxForField(Dataset dataset, St
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}

public static LogicalPlan getLogicalPlanForMaxOfField(Dataset dataset, String fieldName)
{
FieldValue field = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(fieldName).build();
Selection selection = Selection.builder()
.addFields(FunctionImpl.builder().functionName(FunctionName.MAX).addValue(field).alias(MAX_OF_FIELD).build())
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected Dataset mainDataset()
return datasets.mainDataset();
}

protected Dataset stagingDataset()
public Dataset stagingDataset()
{
return effectiveStagingDataset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected IngestorResult ingestViaExecutorAndVerifyStagingFilters(IngestMode ing
// Load csv data
loadData(path, datasets.stagingDataset(), 1);
RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection());
IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets);
IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets).get(0);

verifyStagingFilters(ingestor, connection, datasets);
return ingestorResult;
Expand Down Expand Up @@ -383,7 +383,7 @@ public IngestorResult executePlansAndVerifyForCaseConversion(IngestMode ingestMo
.caseConversion(CaseConversion.TO_UPPER)
.build();

IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets);
IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets).get(0);

Map<StatisticName, Object> actualStats = result.statisticByName();

Expand Down Expand Up @@ -430,7 +430,7 @@ protected IngestorResult executePlansAndVerifyResults(IngestMode ingestMode, Pla
.enableSchemaEvolution(options.enableSchemaEvolution())
.schemaEvolutionCapabilitySet(userCapabilitySet)
.build();
IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets);
IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets).get(0);

Map<StatisticName, Object> actualStats = result.statisticByName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testMilestoning() throws IOException, InterruptedException
.build();

RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection());
IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets);
IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets).get(0);

// Verify
List<Map<String, Object>> tableData = runQuery("select * from `demo`.`append_log` order by col_int asc");
Expand Down Expand Up @@ -178,7 +178,7 @@ public void testMilestoningFailure() throws IOException, InterruptedException
.build();

RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection());
IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets);
IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets).get(0);

// Verify
List<Map<String, Object>> tableData = runQuery("select * from `demo`.`append_log` order by col_int asc");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,37 @@

package org.finos.legend.engine.persistence.components.relational.api;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.collections.api.tuple.Pair;
import org.eclipse.collections.impl.tuple.Tuples;
import org.finos.legend.engine.persistence.components.common.DatasetFilter;
import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.ingestmode.DeriveMainDatasetSchemaFromStaging;
import org.finos.legend.engine.persistence.components.ingestmode.IngestMode;
import org.finos.legend.engine.persistence.components.ingestmode.IngestModeCaseConverter;
import org.finos.legend.engine.persistence.components.common.FilterType;
import org.finos.legend.engine.persistence.components.common.OptimizationFilter;
import org.finos.legend.engine.persistence.components.executor.Executor;
import org.finos.legend.engine.persistence.components.ingestmode.*;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetsCaseConverter;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.planner.Planner;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.transformer.Transformer;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset;
import org.finos.legend.engine.persistence.components.util.LockInfoDataset;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;

import java.util.List;
import java.util.*;

import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MAX_OF_FIELD;
import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MIN_OF_FIELD;

public class ApiUtils
{
Expand Down Expand Up @@ -98,4 +116,123 @@ private static LockInfoDataset getLockInfoDataset(Datasets datasets)
return lockInfoDataset;
}

public static Optional<Long> getNextBatchId(Datasets datasets, Executor<SqlGen, TabularData, SqlPlan> executor,
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
{
if (ingestMode.accept(IngestModeVisitors.IS_INGEST_MODE_TEMPORAL) || ingestMode instanceof BulkLoad)
{
LogicalPlan logicalPlanForNextBatchId = LogicalPlanFactory.getLogicalPlanForNextBatchId(datasets, ingestMode);
List<TabularData> tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForNextBatchId));
Optional<Object> nextBatchId = getFirstColumnValue(getFirstRowForFirstResult(tabularData));
if (nextBatchId.isPresent())
{
return retrieveValueAsLong(nextBatchId.get());
}
}
return Optional.empty();
}

public static Optional<Map<OptimizationFilter, Pair<Object, Object>>> getOptimizationFilterBounds(Datasets datasets, Executor<SqlGen, TabularData, SqlPlan> executor,
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
{
List<OptimizationFilter> filters = ingestMode.accept(IngestModeVisitors.RETRIEVE_OPTIMIZATION_FILTERS);
if (!filters.isEmpty())
{
Map<OptimizationFilter, Pair<Object, Object>> map = new HashMap<>();
for (OptimizationFilter filter : filters)
{
LogicalPlan logicalPlanForMinAndMaxForField = LogicalPlanFactory.getLogicalPlanForMinAndMaxForField(datasets.stagingDataset(), filter.fieldName());
List<TabularData> tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForMinAndMaxForField));
Map<String, Object> resultMap = getFirstRowForFirstResult(tabularData);
// Put into map only when not null
Object lower = resultMap.get(MIN_OF_FIELD);
Object upper = resultMap.get(MAX_OF_FIELD);
if (lower != null && upper != null)
{
map.put(filter, Tuples.pair(lower, upper));
}
}
return Optional.of(map);
}
return Optional.empty();
}

public static List<DatasetFilter> extractDatasetFilters(MetadataDataset metadataDataset, Executor<SqlGen, TabularData, SqlPlan> executor, SqlPlan physicalPlan) throws JsonProcessingException
{
List<DatasetFilter> datasetFilters = new ArrayList<>();
List<TabularData> results = executor.executePhysicalPlanAndGetResults(physicalPlan);
Optional<String> stagingFilters = results.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(stringObjectMap -> (String) stringObjectMap.get(metadataDataset.stagingFiltersField()));

// Convert map of Filters to List of Filters
if (stagingFilters.isPresent())
{
Map<String, Map<String, Object>> datasetFiltersMap = new ObjectMapper().readValue(stagingFilters.get(), new TypeReference<Map<String, Map<String, Object>>>() {});
for (Map.Entry<String, Map<String, Object>> filtersMapEntry : datasetFiltersMap.entrySet())
{
for (Map.Entry<String, Object> filterEntry : filtersMapEntry.getValue().entrySet())
{
DatasetFilter datasetFilter = DatasetFilter.of(filtersMapEntry.getKey(), FilterType.fromName(filterEntry.getKey()), filterEntry.getValue());
datasetFilters.add(datasetFilter);
}
}
}
return datasetFilters;
}

public static List<DataSplitRange> getDataSplitRanges(Executor<SqlGen, TabularData, SqlPlan> executor, Planner planner,
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
{
List<DataSplitRange> dataSplitRanges = new ArrayList<>();
if (ingestMode.versioningStrategy() instanceof AllVersionsStrategy)
{
Dataset stagingDataset = planner.stagingDataset();
String dataSplitField = ingestMode.dataSplitField().get();
LogicalPlan logicalPlanForMaxOfField = LogicalPlanFactory.getLogicalPlanForMaxOfField(stagingDataset, dataSplitField);
List<TabularData> tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForMaxOfField));
Map<String, Object> row = getFirstRowForFirstResult(tabularData);
Long maxDataSplit = retrieveValueAsLong(row.get(MAX_OF_FIELD)).orElseThrow(IllegalStateException::new);
for (int i = 1; i <= maxDataSplit; i++)
{
dataSplitRanges.add(DataSplitRange.of(i, i));
}
}
return dataSplitRanges;
}

public static Optional<Long> retrieveValueAsLong(Object obj)
{
if (obj instanceof Integer)
{
return Optional.of(Long.valueOf((Integer) obj));
}
else if (obj instanceof Long)
{
return Optional.of((Long) obj);
}
return Optional.empty();
}

public static Map<String, Object> getFirstRowForFirstResult(List<TabularData> tabularData)
{
Map<String, Object> resultMap = tabularData.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.orElse(Collections.emptyMap());
return resultMap;
}

public static Optional<Object> getFirstColumnValue(Map<String, Object> row)
{
Optional<Object> object = Optional.empty();
if (!row.isEmpty())
{
object = row.values().stream().findFirst();
}
return object;
}
}
Loading

0 comments on commit 75f9270

Please sign in to comment.