Skip to content

Commit

Permalink
used router for caching
Browse files Browse the repository at this point in the history
  • Loading branch information
datomo committed Sep 1, 2023
1 parent d19f16f commit f50d191
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 68 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.polypheny.db.algebra.rules.AggregateJoinTransposeRule;
import org.polypheny.db.algebra.rules.AggregateProjectMergeRule;
import org.polypheny.db.algebra.rules.AggregateRemoveRule;
import org.polypheny.db.algebra.rules.CacheSwitcherRule;
import org.polypheny.db.algebra.rules.CalcRemoveRule;
import org.polypheny.db.algebra.rules.DocumentToEnumerableRule;
import org.polypheny.db.algebra.rules.FilterJoinRule;
Expand Down Expand Up @@ -851,7 +850,6 @@ public void registerAbstractRelationalRules() {
//addRule( ProjectRemoveRule.INSTANCE );
addRule( AggregateJoinTransposeRule.INSTANCE );
addRule( AggregateProjectMergeRule.INSTANCE );
addRule( CacheSwitcherRule.INSTANCE );
addRule( CalcRemoveRule.INSTANCE );
addRule( SortRemoveRule.INSTANCE );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@
import org.polypheny.db.catalog.entity.CatalogCollectionPlacement;
import org.polypheny.db.catalog.entity.CatalogColumn;
import org.polypheny.db.catalog.entity.CatalogColumnPlacement;
import org.polypheny.db.catalog.entity.CatalogDataPlacement;
import org.polypheny.db.catalog.entity.CatalogGraphDatabase;
import org.polypheny.db.catalog.entity.CatalogGraphMapping;
import org.polypheny.db.catalog.entity.CatalogGraphPlacement;
import org.polypheny.db.catalog.entity.CatalogPartition;
import org.polypheny.db.catalog.entity.CatalogPartitionPlacement;
import org.polypheny.db.catalog.entity.CatalogSchema;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.catalog.exceptions.UnknownTableException;
import org.polypheny.db.config.RuntimeConfig;
import org.polypheny.db.languages.OperatorRegistry;
import org.polypheny.db.languages.QueryLanguage;
Expand Down Expand Up @@ -185,6 +188,12 @@ public RoutedAlgBuilder handleScan(
long partitionId,
NamespaceType namespaceType ) {

CatalogTable table = Catalog.getInstance().getTable( tableId );

if ( table.cached ) {
return handleCached( builder, statement, storeUniqueName, physicalSchemaName, namespaceType, table );
}

AlgNode node = builder.scan( ImmutableList.of(
PolySchemaBuilder.buildAdapterSchemaName( storeUniqueName, logicalSchemaName, physicalSchemaName ),
logicalTableName + "_" + partitionId ) ).build();
Expand All @@ -208,6 +217,32 @@ public RoutedAlgBuilder handleScan(
}


private RoutedAlgBuilder handleCached( RoutedAlgBuilder builder, Statement statement, String storeUniqueName, String physicalSchemaName, NamespaceType namespaceType, CatalogTable table ) {
//todo add cache status later
CatalogTable cached;
try {
cached = Catalog.getInstance().getTable( table.namespaceId, Catalog.HIDDEN_PREFIX + table.name );
} catch ( UnknownTableException e ) {
throw new RuntimeException( e );
}

CatalogDataPlacement placement = Catalog.getInstance().getDataPlacements( cached.id ).get( 0 );
CatalogPartition partition = Catalog.getInstance().getPartitionsByTable( cached.id ).get( 0 );

return handleScan(
builder,
statement,
cached.id,
placement.getAdapterName(),
cached.getNamespaceName(),
cached.name,
physicalSchemaName,
PolySchemaBuilder.buildAdapterSchemaName( storeUniqueName, cached.getNamespaceName(), physicalSchemaName ),
partition.id,
namespaceType );
}


private AlgDataType getDocumentRowType() {
// label table for cross model queries
final AlgDataTypeFactory typeFactory = new PolyTypeFactoryImpl( AlgDataTypeSystem.DEFAULT );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,20 @@ public Map<String, List<ExportedColumn>> getExportedColumns() {
Map<String, List<ExportedColumn>> map = new HashMap<>();

String[] blockColumns = { "number", "hash", "parent_hash", "nonce", "sha3uncles", "logs_bloom", "transactions_root", "state_root", "receipts_root", "author", "miner", "mix_hash", "difficulty", "total_difficulty", "extra_data", "size", "gas_limit", "gas_used", "timestamp" };
PolyType[] blockTypes = { PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.BIGINT, PolyType.TIMESTAMP };
PolyType[] blockTypes = { PolyType.DECIMAL, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.DECIMAL, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.VARCHAR, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.TIMESTAMP };
createExportedColumns( "block", map, blockColumns, blockTypes );

String[] transactionColumns = { "hash", "nonce", "block_hash", "block_number", "transaction_index", "from", "to", "value", "gas_price", "gas", "input", "creates", "public_key", "raw", "r", "s" };
PolyType[] transactionTypes = { PolyType.VARCHAR, PolyType.BIGINT, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR };
PolyType[] transactionTypes = { PolyType.VARCHAR, PolyType.DECIMAL, PolyType.VARCHAR, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR };
createExportedColumns( "transaction", map, transactionColumns, transactionTypes );

if ( eventDataRetrieval == false ) {
if ( !eventDataRetrieval ) {
this.map = map;
return map;
}

String[] commonEventColumns = { "removed", "log_index", "transaction_index", "transaction_hash", "block_hash", "block_number", "address" };
PolyType[] commonEventTypes = { PolyType.BOOLEAN, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.VARCHAR };
PolyType[] commonEventTypes = { PolyType.BOOLEAN, PolyType.DECIMAL, PolyType.DECIMAL, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.DECIMAL, PolyType.VARCHAR };
createExportedColumnsForEvents( map, commonEventColumns, commonEventTypes );

if ( caching == Boolean.TRUE ) {
Expand Down Expand Up @@ -336,7 +336,7 @@ private void createExportedColumnsForEvents( Map<String, List<ExportedColumn>> m
for ( String address : smartContractAddresses ) {
String contractName = null;
List<JSONObject> contractEvents = null;
if ( useManualABI == true && !contractABI.isEmpty() && !this.contractName.isEmpty() ) {
if ( useManualABI && !contractABI.isEmpty() && !this.contractName.isEmpty() ) {
if ( smartContractAddresses.size() > 1 ) {
throw new IllegalArgumentException( "Only one smart contract address should be provided when using a manual ABI." );
}
Expand Down Expand Up @@ -549,6 +549,8 @@ private Integer getLengthForType( PolyType type ) {
return 300;
case VARBINARY:
return 32;
case DECIMAL:
return 100;
default:
return null;
}
Expand All @@ -564,9 +566,8 @@ static PolyType convertToPolyType( String type ) {
return PolyType.DECIMAL;
} else if ( type.equals( "bytes" ) || type.startsWith( "bytes" ) ) {
return PolyType.VARCHAR; // for dynamic and fixed-size
} else {
return null;
}
throw new RuntimeException( "Could not find a matching PolyType" );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ public class EventCacheManager implements Runnable {
*
* @param manager is used to create new transactions, which are required to create new queries.
*/
public static synchronized EventCacheManager getAndSet( TransactionManager manager ) {
public static synchronized void getAndSet( TransactionManager manager ) {
if ( INSTANCE != null ) {
throw new RuntimeException( String.format( "The %s was already set.", EventCacheManager.class.getSimpleName() ) );
}
INSTANCE = new EventCacheManager( manager );
return INSTANCE;
}


Expand Down Expand Up @@ -148,8 +147,7 @@ void createTables( int sourceAdapterId, Map<String, List<FieldInformation>> tabl

private Transaction getTransaction() {
try {
Transaction transaction = transactionManager.startTransaction( Catalog.defaultDatabaseId, Catalog.defaultUserId, false, "Ethereum Plugin" );
return transaction;
return transactionManager.startTransaction( Catalog.defaultDatabaseId, Catalog.defaultUserId, false, "Ethereum Plugin" );
} catch ( UnknownSchemaException | UnknownDatabaseException | GenericCatalogException | UnknownUserException e ) {
throw new RuntimeException( e );
}
Expand All @@ -172,7 +170,7 @@ void writeToStore( String tableName, List<List<Object>> logResults, int targetAd
AlgDataType rowType = table.getTable().getRowType( transaction.getTypeFactory() );
builder.push( LogicalValues.createOneRow( builder.getCluster() ) );
builder.project( rowType.getFieldList().stream().map( f -> new RexDynamicParam( f.getType(), f.getIndex() ) ).collect( Collectors.toList() ), rowType.getFieldNames() );
builder.insert( (AlgOptTable) table );
builder.insert( table );
// todo DL: we should re-use this for all batches (ignore right now)

AlgNode node = builder.build(); // Construct the algebraic node
Expand Down

0 comments on commit f50d191

Please sign in to comment.