diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/ComponentProvider.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/ComponentProvider.scala index f5290ca4fea..508c6dea79c 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/ComponentProvider.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/ComponentProvider.scala @@ -131,7 +131,11 @@ trait ComponentProvider { object NussknackerVersion { - val current: NussknackerVersion = NussknackerVersion(new Semver(BuildInfo.version)) + val current: NussknackerVersion = parse(BuildInfo.version) + + def parse(versionString: String): NussknackerVersion = { + NussknackerVersion(Semver.coerce(versionString)) + } } diff --git a/components-api/src/test/scala/pl/touk/nussknacker/engine/api/component/NussknackerVersionTest.scala b/components-api/src/test/scala/pl/touk/nussknacker/engine/api/component/NussknackerVersionTest.scala new file mode 100644 index 00000000000..dcad01b97b2 --- /dev/null +++ b/components-api/src/test/scala/pl/touk/nussknacker/engine/api/component/NussknackerVersionTest.scala @@ -0,0 +1,19 @@ +package pl.touk.nussknacker.engine.api.component + +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers +import scala.jdk.CollectionConverters._ + +class NussknackerVersionTest extends AnyFunSuiteLike with Matchers { + + test("should allow to use underscores in nussknacker version") { + val version = + NussknackerVersion.parse("1.18.0-preview_testing-mechanism-iceberg-fix-2024-09-26-20745-9048b0f0a-SNAPSHOT") + version.value.getMajor shouldBe 1 + version.value.getMinor shouldBe 18 + version.value.getPatch shouldBe 0 + version.value.getPreRelease.asScala shouldBe empty + version.value.getBuild.asScala shouldBe empty + } + +} diff --git a/docs/Changelog.md b/docs/Changelog.md index c09c4e9ee67..8ee9b236549 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -17,10 +17,11 @@ * [#6734](https://github.com/TouK/nussknacker/pull/6734) Tables from external catalogs are now refreshed automatically when entering into node form. Please be aware that changes in `tableDefinition.sql` are not refreshed. To do this, use `/app/processingtype/reload` API - * [#6741](https://github.com/TouK/nussknacker/pull/6741) Added `catalogConfiguration` configuration option allowing to set up catalog + * [#6741](https://github.com/TouK/nussknacker/pull/6741) [#6886](https://github.com/TouK/nussknacker/pull/6886) Added `catalogConfiguration` configuration option allowing to set up catalog directly in config instead of by `tableDefinition.sql` * [#6741](https://github.com/TouK/nussknacker/pull/6741) (Breaking change) Fully qualified table paths are used instead of table names in table source and sink components in `Table` parameter + * [#6950](https://github.com/TouK/nussknacker/pull/6950) Fix for testing mechanism for table sources: using full, model classpath instead of only flinkTable.jar * [#6716](https://github.com/TouK/nussknacker/pull/6716) Fix type hints for #COLLECTION.merge function. * [#6695](https://github.com/TouK/nussknacker/pull/6695) From now on, arrays on UI are visible as lists but on a background they are stored as it is and SpeL converts them to lists in a runtime. @@ -47,8 +48,6 @@ some types of unallowed expressions. * [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions - shorter message in logs -* [#6886](https://github.com/TouK/nussknacker/pull/6886) Fix for "Illegal table name:$nuCatalog" error when using Apache Iceberg catalog. - Internal Nussknacker catalog is now named `_nu_catalog` * [#6766](https://github.com/TouK/nussknacker/pull/6766) Scenario labels support - you can assign labels to scenarios and use them to filter the scenario list * [#6176](https://github.com/TouK/nussknacker/pull/6176) Update most dependencies to latest versions, most important ones: * Jackson 2.15.4 -> 2.17.2 diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala index 35f822b4578..0a0216db6e9 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala @@ -7,15 +7,15 @@ import org.apache.commons.io.FileUtils import org.apache.flink.configuration.{Configuration, CoreOptions, PipelineOptions} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.$ -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.catalog.ObjectIdentifier import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.test.{TestData, TestRecord} import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition._ -import pl.touk.nussknacker.engine.util.ThreadUtils +import java.net.URLClassLoader import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, StandardOpenOption} import java.util.UUID @@ -24,18 +24,17 @@ import scala.util.{Failure, Success, Try, Using} object FlinkMiniClusterTableOperations extends LazyLogging { - def parseTestRecords(records: List[TestRecord], schema: Schema): List[Row] = - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - implicit val env: StreamTableEnvironment = MiniClusterEnvBuilder.buildStreamTableEnv - val (inputTablePath, inputTableName) = createTempFileTable(schema) - val parsedRecords = Try { - writeRecordsToFile(inputTablePath, records) - val inputTable = env.from(s"`$inputTableName`") - env.toDataStream(inputTable).executeAndCollect().asScala.toList - } - cleanup(inputTablePath) - parsedRecords.get + def parseTestRecords(records: List[TestRecord], schema: Schema): List[Row] = { + implicit val env: StreamTableEnvironment = MiniClusterEnvBuilder.buildStreamTableEnv + val (inputTablePath, inputTableName) = createTempFileTable(schema) + val parsedRecords = Try { + writeRecordsToFile(inputTablePath, records) + val inputTable = env.from(s"`$inputTableName`") + env.toDataStream(inputTable).executeAndCollect().asScala.toList } + cleanup(inputTablePath) + parsedRecords.get + } def generateLiveTestData( limit: Int, @@ -61,21 +60,18 @@ object FlinkMiniClusterTableOperations extends LazyLogging { limit: Int, schema: Schema, buildSourceTable: TableEnvironment => Table - ): TestData = - // setting context classloader because Flink in multiple places relies on it and without this temporary override it doesnt have - // the necessary classes - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - implicit val env: TableEnvironment = MiniClusterEnvBuilder.buildTableEnv - val sourceTable = buildSourceTable(env) - val (outputFilePath, outputTableName) = createTempFileTable(schema) - val generatedRows = Try { - insertDataAndAwait(sourceTable, outputTableName, limit) - readRecordsFromFilesUnderPath(outputFilePath) - } - cleanup(outputFilePath) - val rows = generatedRows.get - TestData(rows.map(TestRecord(_))) + ): TestData = { + implicit val env: TableEnvironment = MiniClusterEnvBuilder.buildTableEnv + val sourceTable = buildSourceTable(env) + val (outputFilePath, outputTableName) = createTempFileTable(schema) + val generatedRows = Try { + insertDataAndAwait(sourceTable, outputTableName, limit) + readRecordsFromFilesUnderPath(outputFilePath) } + cleanup(outputFilePath) + val rows = generatedRows.get + TestData(rows.map(TestRecord(_))) + } private def writeRecordsToFile(path: Path, records: List[TestRecord]): Unit = { val jsonRecords: List[String] = records.map(_.json.noSpaces) @@ -165,27 +161,28 @@ object FlinkMiniClusterTableOperations extends LazyLogging { private object MiniClusterEnvBuilder { - // TODO: how to get path of jar cleaner? Through config? - private val classPathUrlsForMiniClusterTestingEnv = List( - "components/flink-table/flinkTable.jar" - ).map(Path.of(_).toUri.toURL) - - private val streamEnvConfig = { + private lazy val streamEnvConfig = { val conf = new Configuration() // parent-first - otherwise linkage error (loader constraint violation, a different class with the same name was // previously loaded by 'app') for class 'org.apache.commons.math3.random.RandomDataGenerator' conf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") - // without this, on Flink taskmanager level the classloader is basically empty - conf.set( - PipelineOptions.CLASSPATHS, - classPathUrlsForMiniClusterTestingEnv.map(_.toString).asJava - ) + // Here is a hidden assumption that getClass.getClassLoader is the model classloader and another hidden assumuption that model classloader has all necessary jars (including connectors) + // TODO: we should explicitly pass model classloader + we should split model classloader into libs that are only for + // testing mechanism purpose (in the real deployment, they are already available in Flink), for example table connectors + Thread.currentThread().getContextClassLoader match { + case url: URLClassLoader => + conf.set(PipelineOptions.CLASSPATHS, url.getURLs.toList.map(_.toString).asJava) + case _ => + logger.warn( + "Context classloader is not a URLClassLoader. Probably data generation invocation wasn't wrapped with ModelData.withThisAsContextClassLoader. MiniCluster classpath set up will be skipped." + ) + } conf.set(CoreOptions.DEFAULT_PARALLELISM, Int.box(1)) } - private val tableEnvConfig = EnvironmentSettings.newInstance().withConfiguration(streamEnvConfig).build() + private lazy val tableEnvConfig = EnvironmentSettings.newInstance().withConfiguration(streamEnvConfig).build() def buildTableEnv: TableEnvironment = TableEnvironment.create(tableEnvConfig) diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala index 300e9b140e3..9a1ef401097 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/test/ModelDataTestInfoProvider.scala @@ -134,11 +134,13 @@ class ModelDataTestInfoProvider(modelData: ModelData) extends TestInfoProvider w } private def generateTestData(generators: NonEmptyList[(NodeId, TestDataGenerator)], size: Int) = { - val sourceTestDataList = generators.map { case (sourceId, testDataGenerator) => - val sourceTestRecords = testDataGenerator.generateTestData(size).testRecords - sourceTestRecords.map(testRecord => ScenarioTestJsonRecord(sourceId, testRecord)) + modelData.withThisAsContextClassLoader { + val sourceTestDataList = generators.map { case (sourceId, testDataGenerator) => + val sourceTestRecords = testDataGenerator.generateTestData(size).testRecords + sourceTestRecords.map(testRecord => ScenarioTestJsonRecord(sourceId, testRecord)) + } + ListUtil.mergeLists(sourceTestDataList.toList, size) } - ListUtil.mergeLists(sourceTestDataList.toList, size) } override def prepareTestData( diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestDataPreparer.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestDataPreparer.scala index eba06f459ca..0c9b7a2a3f0 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestDataPreparer.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/TestDataPreparer.scala @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.definition.clazz.ClassDefinitionSet import pl.touk.nussknacker.engine.definition.globalvariables.ExpressionConfigDefinition import pl.touk.nussknacker.engine.expression.ExpressionEvaluator import pl.touk.nussknacker.engine.graph.expression.Expression +import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.variables.GlobalVariablesPreparer class TestDataPreparer( @@ -50,7 +51,11 @@ class TestDataPreparer( case Nil => List.empty case _ => source match { - case s: SourceTestSupport[T @unchecked] => s.testRecordParser.parse(jsonRecordList.map(_.record)) + case s: SourceTestSupport[T @unchecked] => + val parser = s.testRecordParser + ThreadUtils.withThisAsContextClassLoader(classloader) { + parser.parse(jsonRecordList.map(_.record)) + } case other => throw new IllegalArgumentException( s"Source ${other.getClass} cannot be stubbed - it doesn't provide test data parser" diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala index de4ea4870bd..44f4770a30c 100644 --- a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala +++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala @@ -27,7 +27,8 @@ class DockerBasedInstallationExampleNuEnvironment( "NUSSKNACKER_VERSION" -> nussknackerImageVersion ), logConsumers = Seq( - ServiceLogConsumer("bootstrap-setup", new Slf4jLogConsumer(slf4jLogger)) + ServiceLogConsumer("bootstrap-setup", new Slf4jLogConsumer(slf4jLogger)), + ServiceLogConsumer("designer", new Slf4jLogConsumer(slf4jLogger)), ), waitingFor = Some( WaitingForService(