diff --git a/core/src/main/java/org/polypheny/db/adapter/enumerable/EnumerableDocumentValues.java b/core/src/main/java/org/polypheny/db/adapter/enumerable/EnumerableDocumentValues.java new file mode 100644 index 0000000000..7bfd0e2a14 --- /dev/null +++ b/core/src/main/java/org/polypheny/db/adapter/enumerable/EnumerableDocumentValues.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.adapter.enumerable; + +import com.google.common.collect.ImmutableList; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.bson.BsonValue; +import org.polypheny.db.adapter.java.JavaTypeFactory; +import org.polypheny.db.algebra.core.document.DocumentValues; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.algebra.type.AlgDataTypeField; +import org.polypheny.db.plan.AlgOptCluster; +import org.polypheny.db.plan.AlgTraitSet; +import org.polypheny.db.schema.ModelTrait; +import org.polypheny.db.util.BuiltInMethod; + +public class EnumerableDocumentValues extends DocumentValues implements EnumerableAlg { + + /** + * Creates a {@link DocumentValues}. + * {@link ModelTrait#DOCUMENT} node, which contains values. + * + * @param cluster + * @param traitSet + * @param rowType + * @param documentTuples + */ + public EnumerableDocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, AlgDataType rowType, List documentTuples ) { + super( cluster, traitSet, rowType, ImmutableList.copyOf( documentTuples ) ); + } + + + @Override + public Result implement( EnumerableAlgImplementor implementor, Prefer pref ) { + final JavaTypeFactory typeFactory = (JavaTypeFactory) getCluster().getTypeFactory(); + final BlockBuilder builder = new BlockBuilder(); + final PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + getRowType(), + pref.preferCustom() ); + final Type rowClass = physType.getJavaRowType(); + + final List expressions = new ArrayList<>(); + final List fields = rowType.getFieldList(); + for ( BsonValue doc : documentTuples ) { + final List literals = new ArrayList<>(); + + literals.add( Expressions.constant( doc.asDocument().toJson() ) ); + + expressions.add( physType.record( literals ) ); + } + builder.add( + Expressions.return_( + null, + Expressions.call( + BuiltInMethod.AS_ENUMERABLE.method, + Expressions.newArrayInit( Primitive.box( rowClass ), expressions ) ) ) ); + return implementor.result( physType, builder.toBlock() ); + } + +} diff --git a/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java b/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java index 0530c25fff..d1570b9615 100644 --- a/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java +++ b/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.polypheny.db.adapter.enumerable.EnumerableAggregate; import org.polypheny.db.adapter.enumerable.EnumerableConvention; +import org.polypheny.db.adapter.enumerable.EnumerableDocumentValues; import org.polypheny.db.adapter.enumerable.EnumerableFilter; import org.polypheny.db.adapter.enumerable.EnumerableLimit; import org.polypheny.db.adapter.enumerable.EnumerableProject; @@ -30,6 +31,7 @@ import org.polypheny.db.algebra.logical.document.LogicalDocumentFilter; import org.polypheny.db.algebra.logical.document.LogicalDocumentProject; import org.polypheny.db.algebra.logical.document.LogicalDocumentSort; +import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; import org.polypheny.db.plan.AlgOptRule; import org.polypheny.db.plan.AlgOptRuleCall; import org.polypheny.db.plan.AlgOptRuleOperand; @@ -43,6 +45,8 @@ public class DocumentToEnumerableRule extends AlgOptRule { public static DocumentToEnumerableRule FILTER_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.FILTER, operand( LogicalDocumentFilter.class, any() ), "DOCUMENT_FILTER_TO_ENUMERABLE" ); public static DocumentToEnumerableRule SORT_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.SORT, operand( LogicalDocumentSort.class, any() ), "DOCUMENT_SORT_TO_ENUMERABLE" ); + public static DocumentToEnumerableRule VALUES_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.VALUES, operand( LogicalDocumentValues.class, any() ), "DOCUMENT_VALUES_TO_ENUMERABLE" ); + private final Type type; @@ -62,6 +66,8 @@ public void onMatch( AlgOptRuleCall call ) { convertAggregate( call ); } else if ( type == Type.SORT ) { convertSort( call ); + } else if ( type == Type.VALUES ) { + convertValues( call ); } else { throw new UnsupportedOperationException( "This document is not supported." ); } @@ -69,6 +75,17 @@ public void onMatch( AlgOptRuleCall call ) { } + private void convertValues( AlgOptRuleCall call ) { + LogicalDocumentValues values = call.alg( 0 ); + AlgTraitSet out = values.getTraitSet().replace( EnumerableConvention.INSTANCE ); + + EnumerableDocumentValues enumerable = new EnumerableDocumentValues( values.getCluster(), out, values.getRowType(), values.documentTuples ); + call.transformTo( enumerable ); + + // call.transformTo( values.getRelationalEquivalent() ); + } + + private void convertSort( AlgOptRuleCall call ) { LogicalDocumentSort sort = call.alg( 0 ); AlgTraitSet out = sort.getTraitSet().replace( EnumerableConvention.INSTANCE ); diff --git a/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java index e6abfe2f19..f831335452 100644 --- a/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java +++ b/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java @@ -830,6 +830,7 @@ public void registerModelRules() { addRule( DocumentToEnumerableRule.FILTER_TO_ENUMERABLE ); addRule( DocumentToEnumerableRule.AGGREGATE_TO_ENUMERABLE ); addRule( DocumentToEnumerableRule.SORT_TO_ENUMERABLE ); + addRule( DocumentToEnumerableRule.VALUES_TO_ENUMERABLE ); // Relational } diff --git a/plugins/mqtt-stream/build.gradle b/plugins/mqtt-stream/build.gradle index 80cdc02fc8..b02282606a 100644 --- a/plugins/mqtt-stream/build.gradle +++ b/plugins/mqtt-stream/build.gradle @@ -22,6 +22,7 @@ dependencies { testImplementation project(path: ":dbms") testImplementation project(path: ":core", configuration: "tests") testImplementation project(path: ":core") + testImplementation project(":plugins:mql-language") testCompileOnly group: 'org.pf4j', name: 'pf4j', version: pf4jVersion testImplementation group: "junit", name: "junit", version: junit_version diff --git a/plugins/mqtt-stream/gradle.properties b/plugins/mqtt-stream/gradle.properties index 82bbbbe18d..888b2d1806 100644 --- a/plugins/mqtt-stream/gradle.properties +++ b/plugins/mqtt-stream/gradle.properties @@ -19,7 +19,7 @@ pluginVersion = 0.0.1 pluginId = mqtt-stream pluginClass = org.polypheny.db.mqtt.MqttStreamPlugin pluginProvider = The Polypheny Project -pluginDependencies = +pluginDependencies = mql-language pluginUrlPath = pluginCategories = interface pluginPolyDependencies = diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java index f2a3749883..c02826b38e 100644 --- a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java @@ -17,63 +17,32 @@ package org.polypheny.db.mqtt; import com.google.common.collect.ImmutableList; -import com.sun.jdi.BooleanType; -import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.bson.BsonArray; -import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonInt32; -import org.bson.BsonString; import org.bson.BsonValue; -import org.bson.conversions.Bson; import org.polypheny.db.PolyImplementation; import org.polypheny.db.algebra.AlgNode; import org.polypheny.db.algebra.AlgRoot; import org.polypheny.db.algebra.constant.Kind; -import org.polypheny.db.algebra.logical.document.LogicalDocumentFilter; -import org.polypheny.db.algebra.logical.document.LogicalDocumentScan; import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; import org.polypheny.db.algebra.logical.relational.LogicalValues; import org.polypheny.db.algebra.operators.OperatorName; import org.polypheny.db.algebra.type.AlgDataType; -import org.polypheny.db.algebra.type.AlgDataTypeFactory; import org.polypheny.db.algebra.type.AlgDataTypeField; import org.polypheny.db.algebra.type.AlgDataTypeFieldImpl; -import org.polypheny.db.algebra.type.AlgDataTypeImpl; import org.polypheny.db.algebra.type.AlgRecordType; -import org.polypheny.db.catalog.Catalog; -import org.polypheny.db.catalog.Catalog.NamespaceType; -import org.polypheny.db.languages.NodeParseException; import org.polypheny.db.languages.OperatorRegistry; import org.polypheny.db.languages.QueryLanguage; -import org.polypheny.db.languages.QueryParameters; -import org.polypheny.db.languages.mql.Mql.Type; -import org.polypheny.db.languages.mql.MqlAggregate; -import org.polypheny.db.languages.mql.MqlCount; -import org.polypheny.db.languages.mql.MqlDelete; import org.polypheny.db.languages.mql.MqlFind; -import org.polypheny.db.languages.mql.MqlInsert; -import org.polypheny.db.languages.mql.MqlNode; -import org.polypheny.db.languages.mql.MqlQueryParameters; -import org.polypheny.db.languages.mql.MqlUpdate; -import org.polypheny.db.languages.mql.parser.MqlParser; -import org.polypheny.db.languages.mql.parser.MqlParser.MqlParserConfig; import org.polypheny.db.languages.mql2alg.MqlToAlgConverter; -import org.polypheny.db.nodes.Node; import org.polypheny.db.plan.AlgOptCluster; import org.polypheny.db.plan.AlgTraitSet; import org.polypheny.db.prepare.Context; import org.polypheny.db.prepare.PolyphenyDbCatalogReader; -import org.polypheny.db.processing.ExtendedQueryParameters; import org.polypheny.db.processing.Processor; -import org.polypheny.db.rex.RexBuilder; import org.polypheny.db.rex.RexCall; import org.polypheny.db.rex.RexInputRef; import org.polypheny.db.rex.RexLiteral; @@ -82,11 +51,9 @@ import org.polypheny.db.stream.StreamProcessor; import org.polypheny.db.tools.AlgBuilder; import org.polypheny.db.transaction.Statement; -import org.polypheny.db.transaction.Transaction; import org.polypheny.db.transaction.TransactionException; import org.polypheny.db.type.PolyType; import org.polypheny.db.util.Pair; -import org.polypheny.db.util.SourceStringReader; @Slf4j public class MqttStreamProcessor implements StreamProcessor { @@ -126,7 +93,7 @@ private AlgRoot processMqlQuery() { // QueryParameters parameters = new MqlQueryParameters( this.filterQuery, Catalog.getInstance().getDatabase( Catalog.defaultDatabaseId ).name,NamespaceType.DOCUMENT ); - MqlFind find = (MqlFind) mqlProcessor.parse("db.collection.find({" + this.filterQuery + "})").get( 0 ); + MqlFind find = (MqlFind) mqlProcessor.parse( String.format( "db.%s.find(%s)", "collection", this.filterQuery ) ).get( 0 ); final AlgDataType rowType = cluster.getTypeFactory() diff --git a/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttTest.java b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttTest.java index 25348b006c..c398fad5e0 100644 --- a/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttTest.java +++ b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttTest.java @@ -32,7 +32,7 @@ public static void init() { public void algTest() { Transaction transaction = TestHelper.getInstance().getTransaction(); Statement st = transaction.createStatement(); - String filterQuery = "db.buttontest.find({value:10})"; + String filterQuery = "{\"value\":11}"; MqttMessage mqttMessage = new MqttMessage( "10", "button/battery" ); MqttStreamProcessor streamProcessor = new MqttStreamProcessor( mqttMessage, filterQuery, st ); streamProcessor.processStream();