Skip to content

Commit

Permalink
fixed test and filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
datomo committed Sep 4, 2023
1 parent 32dc64e commit db647fe
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2019-2023 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.adapter.enumerable;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Primitive;
import org.bson.BsonValue;
import org.polypheny.db.adapter.java.JavaTypeFactory;
import org.polypheny.db.algebra.core.document.DocumentValues;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.plan.AlgOptCluster;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.schema.ModelTrait;
import org.polypheny.db.util.BuiltInMethod;

public class EnumerableDocumentValues extends DocumentValues implements EnumerableAlg {

/**
* Creates a {@link DocumentValues}.
* {@link ModelTrait#DOCUMENT} node, which contains values.
*
* @param cluster
* @param traitSet
* @param rowType
* @param documentTuples
*/
public EnumerableDocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, AlgDataType rowType, List<BsonValue> documentTuples ) {
super( cluster, traitSet, rowType, ImmutableList.copyOf( documentTuples ) );
}


@Override
public Result implement( EnumerableAlgImplementor implementor, Prefer pref ) {
final JavaTypeFactory typeFactory = (JavaTypeFactory) getCluster().getTypeFactory();
final BlockBuilder builder = new BlockBuilder();
final PhysType physType =
PhysTypeImpl.of(
implementor.getTypeFactory(),
getRowType(),
pref.preferCustom() );
final Type rowClass = physType.getJavaRowType();

final List<Expression> expressions = new ArrayList<>();
final List<AlgDataTypeField> fields = rowType.getFieldList();
for ( BsonValue doc : documentTuples ) {
final List<Expression> literals = new ArrayList<>();

literals.add( Expressions.constant( doc.asDocument().toJson() ) );

expressions.add( physType.record( literals ) );
}
builder.add(
Expressions.return_(
null,
Expressions.call(
BuiltInMethod.AS_ENUMERABLE.method,
Expressions.newArrayInit( Primitive.box( rowClass ), expressions ) ) ) );
return implementor.result( physType, builder.toBlock() );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.adapter.enumerable.EnumerableAggregate;
import org.polypheny.db.adapter.enumerable.EnumerableConvention;
import org.polypheny.db.adapter.enumerable.EnumerableDocumentValues;
import org.polypheny.db.adapter.enumerable.EnumerableFilter;
import org.polypheny.db.adapter.enumerable.EnumerableLimit;
import org.polypheny.db.adapter.enumerable.EnumerableProject;
Expand All @@ -30,6 +31,7 @@
import org.polypheny.db.algebra.logical.document.LogicalDocumentFilter;
import org.polypheny.db.algebra.logical.document.LogicalDocumentProject;
import org.polypheny.db.algebra.logical.document.LogicalDocumentSort;
import org.polypheny.db.algebra.logical.document.LogicalDocumentValues;
import org.polypheny.db.plan.AlgOptRule;
import org.polypheny.db.plan.AlgOptRuleCall;
import org.polypheny.db.plan.AlgOptRuleOperand;
Expand All @@ -43,6 +45,8 @@ public class DocumentToEnumerableRule extends AlgOptRule {
public static DocumentToEnumerableRule FILTER_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.FILTER, operand( LogicalDocumentFilter.class, any() ), "DOCUMENT_FILTER_TO_ENUMERABLE" );
public static DocumentToEnumerableRule SORT_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.SORT, operand( LogicalDocumentSort.class, any() ), "DOCUMENT_SORT_TO_ENUMERABLE" );

public static DocumentToEnumerableRule VALUES_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.VALUES, operand( LogicalDocumentValues.class, any() ), "DOCUMENT_VALUES_TO_ENUMERABLE" );

private final Type type;


Expand All @@ -62,13 +66,26 @@ public void onMatch( AlgOptRuleCall call ) {
convertAggregate( call );
} else if ( type == Type.SORT ) {
convertSort( call );
} else if ( type == Type.VALUES ) {
convertValues( call );
} else {
throw new UnsupportedOperationException( "This document is not supported." );
}

}


private void convertValues( AlgOptRuleCall call ) {
LogicalDocumentValues values = call.alg( 0 );
AlgTraitSet out = values.getTraitSet().replace( EnumerableConvention.INSTANCE );

EnumerableDocumentValues enumerable = new EnumerableDocumentValues( values.getCluster(), out, values.getRowType(), values.documentTuples );
call.transformTo( enumerable );

// call.transformTo( values.getRelationalEquivalent() );
}


private void convertSort( AlgOptRuleCall call ) {
LogicalDocumentSort sort = call.alg( 0 );
AlgTraitSet out = sort.getTraitSet().replace( EnumerableConvention.INSTANCE );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ public void registerModelRules() {
addRule( DocumentToEnumerableRule.FILTER_TO_ENUMERABLE );
addRule( DocumentToEnumerableRule.AGGREGATE_TO_ENUMERABLE );
addRule( DocumentToEnumerableRule.SORT_TO_ENUMERABLE );
addRule( DocumentToEnumerableRule.VALUES_TO_ENUMERABLE );

// Relational
}
Expand Down
1 change: 1 addition & 0 deletions plugins/mqtt-stream/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
testImplementation project(path: ":dbms")
testImplementation project(path: ":core", configuration: "tests")
testImplementation project(path: ":core")
testImplementation project(":plugins:mql-language")
testCompileOnly group: 'org.pf4j', name: 'pf4j', version: pf4jVersion
testImplementation group: "junit", name: "junit", version: junit_version

Expand Down
2 changes: 1 addition & 1 deletion plugins/mqtt-stream/gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pluginVersion = 0.0.1
pluginId = mqtt-stream
pluginClass = org.polypheny.db.mqtt.MqttStreamPlugin
pluginProvider = The Polypheny Project
pluginDependencies =
pluginDependencies = mql-language
pluginUrlPath =
pluginCategories = interface
pluginPolyDependencies =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,32 @@
package org.polypheny.db.mqtt;

import com.google.common.collect.ImmutableList;
import com.sun.jdi.BooleanType;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.AlgRoot;
import org.polypheny.db.algebra.constant.Kind;
import org.polypheny.db.algebra.logical.document.LogicalDocumentFilter;
import org.polypheny.db.algebra.logical.document.LogicalDocumentScan;
import org.polypheny.db.algebra.logical.document.LogicalDocumentValues;
import org.polypheny.db.algebra.logical.relational.LogicalValues;
import org.polypheny.db.algebra.operators.OperatorName;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.algebra.type.AlgDataTypeFieldImpl;
import org.polypheny.db.algebra.type.AlgDataTypeImpl;
import org.polypheny.db.algebra.type.AlgRecordType;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.Catalog.NamespaceType;
import org.polypheny.db.languages.NodeParseException;
import org.polypheny.db.languages.OperatorRegistry;
import org.polypheny.db.languages.QueryLanguage;
import org.polypheny.db.languages.QueryParameters;
import org.polypheny.db.languages.mql.Mql.Type;
import org.polypheny.db.languages.mql.MqlAggregate;
import org.polypheny.db.languages.mql.MqlCount;
import org.polypheny.db.languages.mql.MqlDelete;
import org.polypheny.db.languages.mql.MqlFind;
import org.polypheny.db.languages.mql.MqlInsert;
import org.polypheny.db.languages.mql.MqlNode;
import org.polypheny.db.languages.mql.MqlQueryParameters;
import org.polypheny.db.languages.mql.MqlUpdate;
import org.polypheny.db.languages.mql.parser.MqlParser;
import org.polypheny.db.languages.mql.parser.MqlParser.MqlParserConfig;
import org.polypheny.db.languages.mql2alg.MqlToAlgConverter;
import org.polypheny.db.nodes.Node;
import org.polypheny.db.plan.AlgOptCluster;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.prepare.Context;
import org.polypheny.db.prepare.PolyphenyDbCatalogReader;
import org.polypheny.db.processing.ExtendedQueryParameters;
import org.polypheny.db.processing.Processor;
import org.polypheny.db.rex.RexBuilder;
import org.polypheny.db.rex.RexCall;
import org.polypheny.db.rex.RexInputRef;
import org.polypheny.db.rex.RexLiteral;
Expand All @@ -82,11 +51,9 @@
import org.polypheny.db.stream.StreamProcessor;
import org.polypheny.db.tools.AlgBuilder;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionException;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.util.Pair;
import org.polypheny.db.util.SourceStringReader;

@Slf4j
public class MqttStreamProcessor implements StreamProcessor {
Expand Down Expand Up @@ -126,7 +93,7 @@ private AlgRoot processMqlQuery() {

// QueryParameters parameters = new MqlQueryParameters( this.filterQuery, Catalog.getInstance().getDatabase( Catalog.defaultDatabaseId ).name,NamespaceType.DOCUMENT );

MqlFind find = (MqlFind) mqlProcessor.parse("db.collection.find({" + this.filterQuery + "})").get( 0 );
MqlFind find = (MqlFind) mqlProcessor.parse( String.format( "db.%s.find(%s)", "collection", this.filterQuery ) ).get( 0 );

final AlgDataType rowType =
cluster.getTypeFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void init() {
public void algTest() {
Transaction transaction = TestHelper.getInstance().getTransaction();
Statement st = transaction.createStatement();
String filterQuery = "db.buttontest.find({value:10})";
String filterQuery = "{\"value\":11}";
MqttMessage mqttMessage = new MqttMessage( "10", "button/battery" );
MqttStreamProcessor streamProcessor = new MqttStreamProcessor( mqttMessage, filterQuery, st );
streamProcessor.processStream();
Expand Down

0 comments on commit db647fe

Please sign in to comment.