Skip to content

Commit

Permalink
Persistence Component: expose input files bytes size for Snowflake Bu… (
Browse files Browse the repository at this point in the history
  • Loading branch information
agarwali authored Oct 29, 2024
1 parent 644e5b5 commit d6dd8d5
Show file tree
Hide file tree
Showing 38 changed files with 793 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public enum StatisticName
ROWS_UPDATED("rowsUpdated"),
ROWS_DELETED("rowsDeleted"),
FILES_LOADED("filesLoaded"),
ROWS_WITH_ERRORS("rowsWithErrors");
ROWS_WITH_ERRORS("rowsWithErrors"),
INPUT_FILES_BYTES_SCANNED("inputFilesBytesScanned");

String value;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import java.util.List;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface FunctionalDatasetAbstract extends DatasetReference
{

List<Value> value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ public enum FunctionName
CONVERT,
STRUCT,
CONCAT,
JSON_EXTRACT_PATH_TEXT,
APPROX_COUNT_DISTINCT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ public interface RelationalExecutionHelper

List<Map<String, Object>> executeQuery(String sql);

TabularData executeQueryAndGetResultsAsTabularData(String sql);

TabularData executeQueryAndGetResultsAsTabularData(String sql, int rows);

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.executor;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;

public interface RelationalTransactionManager
{
void close() throws SQLException;

void beginTransaction() throws SQLException;

void commitTransaction() throws SQLException;

void revertTransaction() throws SQLException;

boolean executeInCurrentTransaction(String sql) throws SQLException;

List<Map<String, Object>> convertResultSetToList(String sql) throws SQLException;

List<Map<String, Object>> convertResultSetToList(String sql, int rows) throws SQLException;

default TabularData convertResultSetToTabularData(String sql) throws SQLException
{
return TabularData.builder()
.addAllData(convertResultSetToList(sql))
.build();
}

default TabularData convertResultSetToTabularData(String sql, int rows) throws SQLException
{
return TabularData.builder()
.addAllData(convertResultSetToList(sql, rows))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.sql;

import org.finos.legend.engine.persistence.components.executor.ResultData;
package org.finos.legend.engine.persistence.components.executor;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public class TabularData implements ResultData
{
private final List<Map<String, Object>> data;

public TabularData(List<Map<String, Object>> data)
{
this.data = data;
}
import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

public List<Map<String, Object>> getData()
{
return data;
}
@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface TabularDataAbstract extends ResultData
{
Optional<String> queryId();

// todo: Add a field for schema
}
List<Map<String, Object>> data();
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
import org.finos.legend.engine.persistence.components.relational.api.ErrorCategory;
import org.finos.legend.engine.persistence.components.relational.api.IngestorResult;
import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
Expand Down Expand Up @@ -393,7 +393,7 @@ protected int findNullValuesDataErrors(Executor<SqlGen, TabularData, SqlPlan> ex
List<TabularData> results = executor.executePhysicalPlanAndGetResults(pair.getTwo());
if (!results.isEmpty())
{
List<Map<String, Object>> resultSets = results.get(0).getData();
List<Map<String, Object>> resultSets = results.get(0).data();
for (Map<String, Object> row : resultSets)
{
for (String column : pair.getOne().stream().map(FieldValue::fieldName).collect(Collectors.toSet()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.StagedFilesSelectionVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.ToArrayFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.TruncateVisitor;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.DDLStatement;
import org.finos.legend.engine.persistence.components.util.PlaceholderValue;
Expand Down Expand Up @@ -110,7 +110,7 @@ public List<TabularData> executePhysicalPlanAndGetResults(SqlPlan physicalPlan,
List<Map<String, Object>> queryResult = bigQueryHelper.executeQuery(enrichedSql);
if (!queryResult.isEmpty())
{
resultSetList.add(new TabularData(queryResult));
resultSetList.add(TabularData.builder().addAllData(queryResult).build());
}
}
return resultSetList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.TableId;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.executor.TypeMapping;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
Expand Down Expand Up @@ -454,6 +455,22 @@ public List<Map<String, Object>> executeQuery(String sql)
}
}

@Override
public TabularData executeQueryAndGetResultsAsTabularData(String sql)
{
return TabularData.builder()
.addAllData(executeQuery(sql))
.build();
}

@Override
public TabularData executeQueryAndGetResultsAsTabularData(String sql, int rows)
{
return TabularData.builder()
.addAllData(executeQuery(sql, rows))
.build();
}

// Execute statement in a transaction - either use an existing one or use a new one
public Map<StatisticName, Object> executeLoadStatement(String sql)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
import org.finos.legend.engine.persistence.components.relational.bigquery.executor.BigQueryConnection;
import org.finos.legend.engine.persistence.components.relational.bigquery.executor.BigQueryHelper;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.schemaevolution.IncompatibleSchemaChangeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.executor.Executor;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.ingestmode.AppendOnlyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.BitemporalDeltaAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.BitemporalSnapshotAbstract;
Expand All @@ -40,7 +41,7 @@
import org.finos.legend.engine.persistence.components.relational.api.IngestorResult;
import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection;
import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.sink.Sink;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.transformer.Transformer;

Expand Down Expand Up @@ -146,7 +146,7 @@ private List<Map<String, Object>> derivePartitionSpecList(List<String> partition

if (!partitionSpecResult.isEmpty())
{
List<Map<String, Object>> partitionSpecRows = partitionSpecResult.get(0).getData();
List<Map<String, Object>> partitionSpecRows = partitionSpecResult.get(0).data();
for (Map<String, Object> partitionSpec: partitionSpecRows)
{
partitionSpecList.add(partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.api.utils.ApiUtils;
import org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.schemaevolution.SchemaEvolution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.api.utils.ApiUtils;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.TransformOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.finos.legend.engine.persistence.components.common.Resources;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.executor.Executor;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad;
import org.finos.legend.engine.persistence.components.ingestmode.IngestMode;
import org.finos.legend.engine.persistence.components.ingestmode.IngestModeVisitors;
Expand All @@ -41,7 +42,7 @@
import org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils;
import org.finos.legend.engine.persistence.components.relational.exception.BulkLoadException;
import org.finos.legend.engine.persistence.components.relational.exception.MultiDatasetException;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.TransformOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.api.utils.ApiUtils;
import org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.schemaevolution.SchemaEvolution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.operations.*;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.executor.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.TransformOptions;
Expand Down
Loading

0 comments on commit d6dd8d5

Please sign in to comment.