From 4f0fb10b5396f95245ed896f7313fe859bac7ef2 Mon Sep 17 00:00:00 2001 From: David Benedeki <14905969+benedeki@users.noreply.github.com> Date: Sat, 17 Feb 2024 02:54:03 +0100 Subject: [PATCH] #29: Streamline the Ultet processing (#41) The process is now: * initialization * source code analysis and creating the db app model * target database(s) analysis, and update of the `DbAppModel` with it. * `DbItem`s (diffs) generation for each database and grouping them per transaction * SQLs generation from `DbItem`s * execution/print of the SQLs Other chances: * Using _[Balta](https://github.com/AbsaOSS/balta)_ for queries now * Multiple databases (in source only) support * Finished and refactored table diffing --- .../database/src/main/my_schema/my_table.yaml | 2 +- src/main/scala/za/co/absa/ultet/Ultet.scala | 131 +++++---------- .../za/co/absa/ultet/dbitems/DBFunction.scala | 2 + .../absa/ultet/dbitems/DBFunctionFromPG.scala | 18 +- .../ultet/dbitems/DBFunctionFromSource.scala | 2 +- .../za/co/absa/ultet/dbitems/DBItem.scala | 34 +--- .../za/co/absa/ultet/dbitems/DBSchema.scala | 22 +-- .../za/co/absa/ultet/dbitems/DBTable.scala | 116 ++++++------- .../co/absa/ultet/dbitems/DBTableAlter.scala | 57 +++++++ .../co/absa/ultet/dbitems/DBTableInsert.scala | 32 ++++ .../dbitems/extractors/DBTableFromPG.scala | 16 +- .../ultet/dbitems/table/DBTableIndex.scala | 8 +- .../absa/ultet/implicits/MapImplicits.scala | 36 ++++ .../ultet/implicits/OptionImplicits.scala | 31 ++++ .../absa/ultet/implicits/SetImplicits.scala | 32 ++++ .../za/co/absa/ultet/model/DBAppModel.scala | 157 ++++++++++++++++++ .../za/co/absa/ultet/model/DatabaseDef.scala | 50 ++++++ .../za/co/absa/ultet/model/SchemaDef.scala | 65 ++++++++ .../absa/ultet/model/schema/SchemaGrant.scala | 2 +- .../ultet/model/table/TableCreation.scala | 7 +- .../absa/ultet/model/table/TableEntry.scala | 4 +- .../ultet/model/table/TableOwnership.scala | 7 +- .../alterations/TableColumnCommentDrop.scala | 7 +- .../alterations/TableColumnCommentSet.scala | 6 +- .../alterations/TableColumnDefaultDrop.scala | 9 +- .../alterations/TableColumnDefaultSet.scala | 9 +- .../alterations/TableColumnNotNullDrop.scala | 7 +- .../alterations/TablePrimaryKeyAdd.scala | 7 +- .../alterations/TablePrimaryKeyDrop.scala | 7 +- .../model/table/column/TableColumnAdd.scala | 10 +- .../model/table/column/TableColumnDrop.scala | 7 +- .../model/table/index/TableIndexCreate.scala | 9 +- .../model/table/index/TableIndexDrop.scala | 7 +- .../co/absa/ultet/model/table/package.scala | 16 +- .../ultet/parsers/GenericFileParser.scala | 33 ++++ .../ultet/parsers/PgFunctionFileParser.scala | 52 ++---- .../ultet/parsers/PgTableFileParser.scala | 128 ++------------ .../parsers/helpers/DBTableFromYaml.scala | 111 +++++++++++++ .../za/co/absa/ultet/util/DBProperties.scala | 11 ++ .../za/co/absa/ultet/util/FileReader.scala | 64 +++++++ .../co/absa/ultet/util/SourceFileType.scala | 24 +++ .../za/co/absa/ultet/util/SqlExecutor.scala | 53 ++++++ .../za/co/absa/ultet/util/TaskConfig.scala | 40 +++++ .../scala/za/co/absa/ultet/util/package.scala | 35 ++++ ...EntriesPerTransactionEnhancementTest.scala | 77 +++++++++ .../ultet/implicits/OptionImplicitsTest.scala | 33 ++++ .../parsers/PgFunctionFileParserTest.scala | 14 +- .../ultet/parsers/PgTableFileParserTest.scala | 50 ++++-- .../co/absa/ultet/util/FileReaderTest.scala | 23 +++ 49 files changed, 1212 insertions(+), 468 deletions(-) create mode 100644 src/main/scala/za/co/absa/ultet/dbitems/DBTableAlter.scala create mode 100644 src/main/scala/za/co/absa/ultet/dbitems/DBTableInsert.scala create mode 100644 src/main/scala/za/co/absa/ultet/implicits/MapImplicits.scala create mode 100644 src/main/scala/za/co/absa/ultet/implicits/OptionImplicits.scala create mode 100644 src/main/scala/za/co/absa/ultet/implicits/SetImplicits.scala create mode 100644 src/main/scala/za/co/absa/ultet/model/DBAppModel.scala create mode 100644 src/main/scala/za/co/absa/ultet/model/DatabaseDef.scala create mode 100644 src/main/scala/za/co/absa/ultet/model/SchemaDef.scala create mode 100644 src/main/scala/za/co/absa/ultet/parsers/GenericFileParser.scala create mode 100644 src/main/scala/za/co/absa/ultet/parsers/helpers/DBTableFromYaml.scala create mode 100644 src/main/scala/za/co/absa/ultet/util/FileReader.scala create mode 100644 src/main/scala/za/co/absa/ultet/util/SourceFileType.scala create mode 100644 src/main/scala/za/co/absa/ultet/util/SqlExecutor.scala create mode 100644 src/main/scala/za/co/absa/ultet/util/TaskConfig.scala create mode 100644 src/main/scala/za/co/absa/ultet/util/package.scala create mode 100644 src/test/scala/za/co/absa/ultet/implicits/MapImplicits_SqlEntriesPerTransactionEnhancementTest.scala create mode 100644 src/test/scala/za/co/absa/ultet/implicits/OptionImplicitsTest.scala create mode 100644 src/test/scala/za/co/absa/ultet/util/FileReaderTest.scala diff --git a/examples/database/src/main/my_schema/my_table.yaml b/examples/database/src/main/my_schema/my_table.yaml index 2fd9421..f0d264d 100644 --- a/examples/database/src/main/my_schema/my_table.yaml +++ b/examples/database/src/main/my_schema/my_table.yaml @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -table: my_schema.my_table +table: my_table description: This is an example table primary_db: example_db owner: some_owner_user diff --git a/src/main/scala/za/co/absa/ultet/Ultet.scala b/src/main/scala/za/co/absa/ultet/Ultet.scala index 4f52353..058a1cf 100644 --- a/src/main/scala/za/co/absa/ultet/Ultet.scala +++ b/src/main/scala/za/co/absa/ultet/Ultet.scala @@ -18,113 +18,68 @@ package za.co.absa.ultet import com.typesafe.scalalogging.Logger import scopt.OParser +import za.co.absa.balta.classes.DBConnection import za.co.absa.ultet.dbitems.DBItem -import za.co.absa.ultet.model.{SQLEntry, SchemaName, TransactionGroup} -import za.co.absa.ultet.util.{CliParser, Config, DBProperties} +import za.co.absa.ultet.implicits.MapImplicits.SqlEntriesPerTransactionEnhancement +import za.co.absa.ultet.implicits.OptionImplicits.OptionEnhancements +import za.co.absa.ultet.model.{DBAppModel, DatabaseName, SQLEntry, TransactionGroup} +import za.co.absa.ultet.util.FileReader.SchemaFiles +import za.co.absa.ultet.util.{CliParser, Config, DBProperties, FileReader, SqlEntriesPerTransaction, SqlExecutor, SqlsPerTransaction, TaskConfig} -import java.net.URI -import java.nio.file._ -import scala.collection.JavaConverters._ import java.sql.{Connection, DriverManager, ResultSet} import scala.util.{Failure, Success, Try} object Ultet { private val logger = Logger(getClass.getName) - private def extractSQLEntries(dbItems: Set[DBItem]): Seq[SQLEntry] = { - dbItems.toSeq.flatMap { item => item.sqlEntries } - } - - private def runEntries(entries: Seq[SQLEntry], dryRun: Boolean = false)(implicit connection: Connection): Unit = { - if(dryRun) entries.map(_.sqlExpression).foreach(println) - else { - val resultSets: Seq[ResultSet] = runTransaction(connection, entries) - - for (resultSet <- resultSets) { - val numColumns = resultSet.getMetaData.getColumnCount - while (resultSet.next()) { - val row = (1 to numColumns).map(col => resultSet.getString(col)) - logger.info(row.mkString(", ")) - } - resultSet.close() - } - } - } + def main(args: Array[String]): Unit = { - def runTransaction(connection: Connection, entries: Seq[SQLEntry]): Seq[ResultSet] = { - val autoCommitOriginalStatus = connection.getAutoCommit - connection.setAutoCommit(false) + implicit val (appConfig: Config, taskConfig: TaskConfig) = init(args) - val resultSets = Try { - entries.foldLeft(List[ResultSet]()) { case (acc, entry) => - val statement = connection.createStatement() - statement.execute(entry.sqlExpression) - val ret: List[ResultSet] = acc :+ statement.getResultSet - statement.close() - ret - } - } match { - case Success(resultSets) => - connection.commit() - resultSets - case Failure(exception) => - connection.rollback() - connection.close() - throw new Exception("Script execution failed", exception) - } + val sourceURIsPerSchema: SchemaFiles = FileReader.listFileURIsPerSchema(appConfig.sourceFilesRootPath) + val databaseTransactionSqls = DBAppModel + .loadFromSources(sourceURIsPerSchema) + .addDatabasesAnalysis()(taskConfig) + .createSQLEntries() + .map { case (dbName, sqlEntries) => dbName -> sqlEntries.toSql } - connection.setAutoCommit(autoCommitOriginalStatus) - resultSets + if (appConfig.dryRun) print(databaseTransactionSqls) + else execute(databaseTransactionSqls) } - def sortEntries(entries: Seq[SQLEntry]): Map[TransactionGroup.Value, Seq[SQLEntry]] = { - entries - .groupBy(_.transactionGroup) - .mapValues(_.sortBy(_.orderInTransaction)) + private def init(args: Array[String]): (Config, TaskConfig) = { + val appConfig = OParser.parse(CliParser.parser, args, Config()).getOrThrow(new Exception("Failed to load arguments")) + val defaultDB = DBProperties.loadProperties(appConfig.dbConnectionPropertiesPath) + val taskConfig = TaskConfig(DatabaseName(defaultDB.database), Set(defaultDB)) + (appConfig, taskConfig) } - private def listChildPaths(path: Path): List[Path] = Files.list(path) - .iterator() - .asScala - .toList + private def execute(databaseTransactionSqls: Map[DatabaseName, SqlsPerTransaction])(implicit taskConfig: TaskConfig): Unit = { + def executeDatabase(databaseName: DatabaseName, sqls: SqlsPerTransaction): Unit = { + logger.info(s"Executing against database: ${databaseName.value}") + val transactionGroups = TransactionGroup.values.toList // going over transaction groups in their logical order + implicit val dbConnection: DBConnection = taskConfig.dbConnections(databaseName).dbConnection + transactionGroups.foreach(sqls.get(_).foreach(SqlExecutor.execute(_))) + } - def listFileURIsPerSchema(pathString: String): Map[SchemaName, List[URI]] = { - val path = Paths.get(pathString) - val schemaPaths = listChildPaths(path) - schemaPaths - .map(p => SchemaName(p.getFileName.toString) -> listChildPaths(p)) - .toMap - .mapValues(_.map(_.toUri)) + databaseTransactionSqls.foreach { case (dbName, entries) => executeDatabase(dbName, entries) } } - def main(args: Array[String]): Unit = { - val config = OParser.parse(CliParser.parser, args, Config()) match { - case Some(config) => config - case _ => throw new Exception("Failed to load arguments") + private def print(databaseTransactionSqls: Map[DatabaseName, SqlsPerTransaction]): Unit = { + def printDatabase(databaseName: DatabaseName, sqls: SqlsPerTransaction): Unit = { + val transactionGroups = TransactionGroup.values.toList // going over transaction groups in their logical order + + val delimiter = "================================================================================" + val prefix = "= Database: " + val suffix = "=" + val space = " " * (delimiter.length - prefix.length - databaseName.value.length - suffix.length) + println(delimiter) + println(s"$prefix${databaseName.value}$space$suffix") + println(s"$delimiter\n") + transactionGroups.foreach(sqls.get(_).foreach(_.foreach(println))) } - val dbProperties = DBProperties.loadProperties(config.dbConnectionPropertiesPath) - val dbPropertiesSys = DBProperties.getSysDB(dbProperties) - val dbConnection: String = dbProperties.generateConnectionString() - implicit val jdbcConnection: Connection = DriverManager.getConnection( - dbConnection, dbProperties.user, dbProperties.password - ) - - val sourceURIsPerSchema: Map[SchemaName, List[URI]] = listFileURIsPerSchema(config.sourceFilesRootPath) - - val dbItems: Set[DBItem] = DBItem.createDBItems(sourceURIsPerSchema) - val entries: Seq[SQLEntry] = extractSQLEntries(dbItems) - val orderedEntries = sortEntries(entries) - val databaseEntries = orderedEntries.getOrElse(TransactionGroup.Databases, Seq.empty) - val roleEntries = orderedEntries.getOrElse(TransactionGroup.Roles, Seq.empty) - val objectEntries = orderedEntries.getOrElse(TransactionGroup.Objects, Seq.empty) - val indexEntries = orderedEntries.getOrElse(TransactionGroup.Indexes, Seq.empty) - - if (databaseEntries.nonEmpty) runEntries(databaseEntries, config.dryRun) - if (roleEntries.nonEmpty) runEntries(roleEntries, config.dryRun) - if (objectEntries.nonEmpty) runEntries(objectEntries, config.dryRun) - if (indexEntries.nonEmpty) runEntries(indexEntries, config.dryRun) - - jdbcConnection.close() + databaseTransactionSqls.foreach { case (dbName, entries) => printDatabase(dbName, entries) } } + } diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBFunction.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBFunction.scala index d166f5e..798769c 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/DBFunction.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBFunction.scala @@ -25,4 +25,6 @@ trait DBFunction extends DBItem { def paramTypes: Seq[FunctionArgumentType] def schema: SchemaName + def database: DatabaseName + } diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromPG.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromPG.scala index c80c140..8af572d 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromPG.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromPG.scala @@ -18,21 +18,22 @@ package za.co.absa.ultet.dbitems import za.co.absa.ultet.model.function.{FunctionArgumentType, FunctionDrop, FunctionName} -import za.co.absa.ultet.model.{SQLEntry, SchemaName} +import za.co.absa.ultet.model.{DatabaseName, SQLEntry, SchemaName} import java.sql.Connection case class DBFunctionFromPG( schema: SchemaName, fnName: FunctionName, - paramTypes: Seq[FunctionArgumentType] + paramTypes: Seq[FunctionArgumentType], + database: DatabaseName //TODO: Implement this ) extends DBFunction { override def sqlEntries: Seq[SQLEntry] = Seq(FunctionDrop(schema, fnName, paramTypes)) } object DBFunctionFromPG { - def fetchAllOfSchema(schemaName: SchemaName) + def fetchAllOfSchema(databaseName: DatabaseName, schemaName: SchemaName) (implicit jdbcConnection: Connection): Seq[DBFunctionFromPG] = { val query = s""" @@ -47,10 +48,10 @@ object DBFunctionFromPG { | p.prokind = 'f'; |""".stripMargin - fetchAllOfSchemaWithQuery(schemaName, query) + fetchAllOfSchemaWithQuery(databaseName, schemaName, query) } - def fetchAllOverloads(schemaName: SchemaName, functionName: FunctionName) + def fetchAllOverloads(databaseName: DatabaseName, schemaName: SchemaName, functionName: FunctionName) (implicit jdbcConnection: Connection): Seq[DBFunctionFromPG] = { val query = s""" @@ -66,10 +67,10 @@ object DBFunctionFromPG { | p.prokind = 'f'; |""".stripMargin - fetchAllOfSchemaWithQuery(schemaName, query) + fetchAllOfSchemaWithQuery(databaseName, schemaName, query) } - private def fetchAllOfSchemaWithQuery(schemaName: SchemaName, query: String) + private def fetchAllOfSchemaWithQuery(databaseName: DatabaseName, schemaName: SchemaName, query: String) (implicit jdbcConnection: Connection): Seq[DBFunctionFromPG] = { val preparedStatement = jdbcConnection.prepareStatement(query) val result = preparedStatement.executeQuery() @@ -89,7 +90,8 @@ object DBFunctionFromPG { val dbFunctionFromPG = DBFunctionFromPG( schemaName, FunctionName(fnName), - argumentTypes + argumentTypes, + databaseName ) seqBuilder += dbFunctionFromPG } diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromSource.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromSource.scala index ae14fcb..cd93081 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromSource.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromSource.scala @@ -23,7 +23,7 @@ case class DBFunctionFromSource( fnName: FunctionName, paramTypes: Seq[FunctionArgumentType], owner: UserName, - users: Seq[UserName], + users: Set[UserName], schema: SchemaName, database: DatabaseName, sqlBody: String diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBItem.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBItem.scala index a74b951..bb87854 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/DBItem.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBItem.scala @@ -16,7 +16,7 @@ package za.co.absa.ultet.dbitems -import za.co.absa.ultet.model.{SQLEntry, SchemaName} +import za.co.absa.ultet.model.{SQLEntry, SchemaName, TransactionGroup} import za.co.absa.ultet.parsers.PgFunctionFileParser import java.net.URI @@ -27,36 +27,6 @@ trait DBItem { } object DBItem { - def createDBItems(filePathsPerSchema: Map[SchemaName, Seq[URI]]) - (implicit jdbcConnection: Connection): Set[DBItem] = { - val schemas = filePathsPerSchema.keys - val SchemasWithFilesGroupedByType(tables, functions, owners) = groupAllFilesPerSchemaByType(filePathsPerSchema) - - // TODO handle tables - - val dbFunctionsFromSource = functions - .values - .flatten - .map(PgFunctionFileParser().parseFile) - .toSet - val dbFunctionsFromPG = functions - .keys - .flatMap(DBFunctionFromPG.fetchAllOfSchema) - .toSet - - val users: Set[DBItem] = dbFunctionsFromSource.flatMap(f => f.users :+ f.owner).map(DBUser) - - val schemaOwners = owners.mapValues(DBSchema.parseTxtFileContainingSchemaOwner) - val schemaUsers = dbFunctionsFromSource.groupBy(_.schema).mapValues(_.toSeq.map(_.users).flatten.toSet) - - val dbSchemas = schemas.toSet.map { (s: SchemaName) => - val owner = schemaOwners(s) - val users = schemaUsers(s) + owner - DBSchema(s, owner, users.toSeq) - } - - dbFunctionsFromSource ++ dbFunctionsFromPG ++ users ++ dbSchemas - } private case class SchemasWithFilesGroupedByType( tables: Map[SchemaName, Seq[URI]], @@ -71,7 +41,7 @@ object DBItem { ownerFiles.foreach { case (schemaName, uris) => if (uris.size > 1) throw new IllegalArgumentException( s"Detected more than one .txt file in schema ${schemaName.normalized}" - ) else if (uris.size == 0) throw new IllegalArgumentException( + ) else if (uris.isEmpty) throw new IllegalArgumentException( s".txt file in schema ${schemaName.normalized} not found" ) } diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBSchema.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBSchema.scala index 0b39309..3e2b426 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/DBSchema.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBSchema.scala @@ -26,35 +26,35 @@ import java.nio.file.{Files, Paths} import java.util.stream.Collectors case class DBSchema(name: SchemaName, - ownerName: UserName, - users: Seq[UserName]) extends DBItem { + ownerName: Option[UserName], + users: Set[UserName]) extends DBItem { override def sqlEntries: Seq[SQLEntry] = { if (DO_NOT_TOUCH.contains(name.value)) { - throw new Exception(s"Schema ${name.value} is not allow to be referenced") + throw new Exception(s"Schema ${name.value} is not allow to be referenced") //TODO #31 Add warnings to the system } if (DO_NOT_CHOWN.contains(name.value)){ - logger.warn(s"Schema ${name.value} is not allowed to change owner") + logger.warn(s"Schema ${name.value} is not allowed to change owner") //TODO #31 Add warnings to the system Seq( SchemaCreate(name), SchemaGrant(name, users) ) } else { - Seq( - SchemaCreate(name), - SchemaOwner(name, ownerName), - SchemaGrant(name, users) - ) + ownerName.map{SchemaOwner(name, _)}.toSeq ++ + Seq( + SchemaCreate(name), + SchemaGrant(name, users) + ) } } } object DBSchema { private val logger = Logger(getClass.getName) - + val DO_NOT_TOUCH: Seq[String] = Seq("pg_toast", "pg_catalog", "information_schema") val DO_NOT_CHOWN: Seq[String] = Seq("public") - + def parseTxtFileContainingSchemaOwner(fileUri: URI): UserName = { val path = Paths.get(fileUri) val lines = Files.lines(path) diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBTable.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBTable.scala index 60fe8f0..6c232bf 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/DBTable.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBTable.scala @@ -16,32 +16,33 @@ package za.co.absa.ultet.dbitems -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableCreation, TableEntry, TableIdentifier, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier, TableName} import za.co.absa.ultet.model.{DatabaseName, SchemaName, UserName} -import za.co.absa.ultet.model.table.index.{TableIndexCreate, TableIndexDrop} -import za.co.absa.ultet.model.table.alterations.{TableColumnCommentDrop, TableColumnCommentSet, TableColumnDefaultDrop, TableColumnDefaultSet, TableColumnNotNullDrop, TablePrimaryKeyAdd, TablePrimaryKeyDrop} -import DBTable.ColumnsDifferenceResolver +import za.co.absa.ultet.model.table.alterations.{TableColumnCommentDrop, TableColumnCommentSet, TableColumnDefaultDrop, TableColumnDefaultSet, TableColumnNotNullDrop} import za.co.absa.balta.classes.DBConnection import za.co.absa.ultet.dbitems.extractors.DBTableFromPG import za.co.absa.ultet.dbitems.table.DBTableIndex.{DBPrimaryKey, DBSecondaryIndex} import za.co.absa.ultet.dbitems.table.DBTableColumn +import za.co.absa.ultet.implicits.OptionImplicits.OptionEnhancements import za.co.absa.ultet.model.table.column.{TableColumnAdd, TableColumnDrop} import java.sql.Connection -//TODO checks on validity of entries +//TODO #31 Add warnings to the system, checks on validity of entries case class DBTable( - tableName: TableName, - schemaName: SchemaName, + tableIdentifier: TableIdentifier, primaryDBName: DatabaseName, owner: UserName, - description: Option[String] = None, - columns: Seq[DBTableColumn] = Seq.empty, - primaryKey: Option[DBPrimaryKey] = None, - indexes: Set[DBSecondaryIndex] = Set.empty + description: Option[String], + columns: Seq[DBTableColumn], + primaryKey: Option[DBPrimaryKey], + indexes: Set[DBSecondaryIndex] ) { - val tableIdentifier: TableIdentifier = TableIdentifier(tableName, Some(schemaName)) + def schemaName: SchemaName = tableIdentifier.schemaName + + def tableName: TableName = tableIdentifier.tableName + def addColumn(column: DBTableColumn): DBTable = { this.copy(columns = columns ++ Seq(column)) @@ -55,57 +56,30 @@ case class DBTable( this.copy(primaryKey = Some(pk)) } - def -(other: Option[DBTable]): Seq[TableEntry] = { - other match { - case None => { - val pkCreateAlteration: Option[TableAlteration] = primaryKey.map(definedPk => TablePrimaryKeyAdd(schemaName, tableName, definedPk)) - val indicesCreateAlterations = indexes.map(idx => TableIndexCreate(schemaName, idx)) - - Seq(TableCreation(schemaName, tableName, columns)) ++ - pkCreateAlteration.toSeq ++ - indicesCreateAlterations - } - - case Some(definedOther) => this - definedOther - } + def -(other: Option[DBTable]): DBItem = { + other.map(this - _).getOrElse(DBTableInsert(this)) } - def -(other: DBTable): Seq[TableEntry] = { - assert(schemaName == other.schemaName, s"Schema names must match to diff tables, but $schemaName != ${other.schemaName}") - assert(tableName == other.tableName, s"Table names must match to diff tables, but $tableName != ${other.tableName}") - - val removeIndices = this.indexes.diff(other.indexes) - val alterationsToRemoveIndices = removeIndices.map(idx => TableIndexDrop(schemaName, tableName, idx.tableName.normalized)) - - val addIndices = other.indexes.diff(this.indexes) - val alterationsToAddIndices = addIndices.map(idx => TableIndexCreate(schemaName, idx)) - - val pkEntries: Seq[TableAlteration] = (this.primaryKey, other.primaryKey) match { - case (x, y) if x == y => Seq.empty - case (Some(existingPk), Some(newPk)) => Seq( - TablePrimaryKeyDrop(schemaName, tableName, existingPk), - TablePrimaryKeyAdd(schemaName, tableName, newPk) - ) - case (None, Some(newPk)) => Seq(TablePrimaryKeyAdd(schemaName, tableName, newPk)) - case (Some(existingPk), None) => Seq(TablePrimaryKeyDrop(schemaName, tableName, existingPk)) - } - - // todo alter description? - - val diffResolver = ColumnsDifferenceResolver(schemaName, tableName)(columns, other.columns) - - diffResolver.alterationsForColumnAdditions ++ - diffResolver.alterationsForCommonColumns ++ - alterationsToRemoveIndices ++ - diffResolver.alterationsForColumnRemovals - // pkEntries ++ todo #94 - // alterationsToAddIndices ++ todo #94 - + def -(other: DBTable): DBItem = { + DBTableAlter(this, other) } } object DBTable { - case class ColumnsDifferenceResolver(schemaName: SchemaName, tableName: TableName)(thisColumns: Seq[DBTableColumn], otherColumns: Seq[DBTableColumn]) { + + def apply(schemaName: SchemaName, + tableName: TableName, + primaryDBName: DatabaseName, + owner: UserName, + description: Option[String] = None, + columns: Seq[DBTableColumn] = Seq.empty, + primaryKey: Option[DBPrimaryKey] = None, + indexes: Set[DBSecondaryIndex] = Set.empty + ): DBTable = { + new DBTable(TableIdentifier(schemaName, tableName), primaryDBName, owner, description, columns, primaryKey, indexes) + } + + case class ColumnsDifferenceResolver(tableIdentifier: TableIdentifier)(thisColumns: Seq[DBTableColumn], otherColumns: Seq[DBTableColumn]) { private[dbitems] val thisColumnNames = thisColumns.map(_.columnName) private[dbitems] val otherColumnNames = otherColumns.map(_.columnName) @@ -122,19 +96,23 @@ object DBTable { private[dbitems] def commonColumns: Set[(DBTableColumn, DBTableColumn)] = { commonColumnNames.map { commonName => - val tCol = thisColumns.find(_.columnName == commonName).getOrElse(throw new IllegalStateException(s"could not find column $commonName in table ${schemaName.value}.${tableName.value}")) - val oCol = otherColumns.find(_.columnName == commonName).getOrElse(throw new IllegalStateException(s"could not find column $commonName in table ${schemaName.value}.${tableName.value}")) + val tCol = thisColumns + .find(_.columnName == commonName) + .getOrThrow(new IllegalStateException(s"could not find column $commonName in table ${tableIdentifier.fullName}")) + val oCol = otherColumns + .find(_.columnName == commonName) + .getOrThrow(new IllegalStateException(s"could not find column $commonName in table ${tableIdentifier.fullName}")) (tCol, oCol) } } def alterationsForColumnAdditions: Seq[TableAlteration] = { - columnsToAdd.map(col => TableColumnAdd(schemaName, tableName, col)) + columnsToAdd.map(col => TableColumnAdd(tableIdentifier, col)) } def alterationsForColumnRemovals: Seq[TableAlteration] = { - columnsToRemove.map(col => TableColumnDrop(schemaName, tableName, col.columnName)).toSeq + columnsToRemove.map(col => TableColumnDrop(tableIdentifier, col.columnName)).toSeq } def alterationsForCommonColumns: Seq[TableAlteration] = { @@ -147,31 +125,31 @@ object DBTable { } private def generateAlterForDataTypeChange(thisCol: DBTableColumn, otherCol: DBTableColumn): Seq[TableAlteration] = { - // todo for very specific datatype changes only? from/to String, numerics? + // TODO #37 Allow certain data type changes throw new IllegalStateException(s"Cannot change datatype of ${thisCol.columnName.value} from ${thisCol.dataType} to ${otherCol.dataType} ") } private def generateAlterForNotNullChange(thisCol: DBTableColumn, otherCol: DBTableColumn): Seq[TableAlteration] = { (thisCol.notNull, otherCol.notNull) match { case (t, o) if t == o => Seq.empty // no change - case (true, false) => Seq(TableColumnNotNullDrop(schemaName, tableName, otherCol.columnName)) - case (false, true) => throw new IllegalStateException(s"Cannot change [null] to [not null] for ${thisCol.columnName.value} for table ${schemaName.value}.${tableName.value} ") + case (true, false) => Seq(TableColumnNotNullDrop(tableIdentifier, otherCol.columnName)) + case (false, true) => throw new IllegalStateException(s"Cannot change [null] to [not null] for ${thisCol.columnName.value} for table ${tableIdentifier.fullName} ") } } private def generateAlterForDescriptionChange(thisCol: DBTableColumn, otherCol: DBTableColumn): Seq[TableAlteration] = { (thisCol.description, otherCol.description) match { case (t, o) if t == o => Seq.empty // no change - case (Some(t), None) => Seq(TableColumnCommentDrop(schemaName, tableName, thisCol.columnName)) - case (_, Some(o)) => Seq(TableColumnCommentSet(schemaName, tableName, otherCol.columnName, o)) // both add/set + case (Some(_), None) => Seq(TableColumnCommentDrop(tableIdentifier, thisCol.columnName)) + case (_, Some(o)) => Seq(TableColumnCommentSet(tableIdentifier, otherCol.columnName, o)) // both add/set } } private def generateAlterForDefaultChange(thisCol: DBTableColumn, otherCol: DBTableColumn): Seq[TableAlteration] = { (thisCol.default, otherCol.default) match { case (t, o) if t == o => Seq.empty // no change - case (Some(t), None) => Seq(TableColumnDefaultDrop(schemaName, tableName, thisCol.columnName)) - case (_, Some(o)) => Seq(TableColumnDefaultSet(schemaName, tableName, otherCol.columnName, o)) // both add/set + case (Some(t), None) => Seq(TableColumnDefaultDrop(tableIdentifier, thisCol.columnName)) + case (_, Some(o)) => Seq(TableColumnDefaultSet(tableIdentifier, otherCol.columnName, o)) // both add/set } } @@ -179,7 +157,7 @@ object DBTable { def createFromPG(schemaName: SchemaName, tableName: TableName, databaseName: DatabaseName) (implicit jdbcConnection: Option[Connection]): Option[DBTable] = { jdbcConnection.flatMap { dbConnection => - val extractor = new DBTableFromPG(databaseName)(new DBConnection(dbConnection)) + val extractor = DBTableFromPG(databaseName)(new DBConnection(dbConnection)) extractor.extract(schemaName, tableName) } } diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBTableAlter.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBTableAlter.scala new file mode 100644 index 0000000..d252f0d --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBTableAlter.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.dbitems +import za.co.absa.ultet.dbitems.DBTable.ColumnsDifferenceResolver +import za.co.absa.ultet.model.{SQLEntry, SchemaName} +import za.co.absa.ultet.model.table.{TableAlteration, TableName} +import za.co.absa.ultet.model.table.alterations.{TablePrimaryKeyAdd, TablePrimaryKeyDrop} +import za.co.absa.ultet.model.table.index.{TableIndexCreate, TableIndexDrop} + +case class DBTableAlter(newTable: DBTable, origTable: DBTable) extends DBItem { + + assert(newTable.schemaName == origTable.schemaName, s"Schema names must match to diff tables, but ${newTable.schemaName} != ${origTable.schemaName}") + assert(newTable.tableName == origTable.tableName, s"Table names must match to diff tables, but ${newTable.tableName} != ${origTable.tableName}") + + override def sqlEntries: Seq[SQLEntry] = { + val removeIndices = newTable.indexes.diff(origTable.indexes) + val alterationsToRemoveIndices = removeIndices.map(idx => TableIndexDrop(origTable.tableIdentifier, idx.indexName)) + + val addIndices = origTable.indexes.diff(origTable.indexes) + val alterationsToAddIndices = addIndices.map(idx => TableIndexCreate(newTable.tableIdentifier, idx)) + + val pkEntries: Seq[TableAlteration] = (origTable.primaryKey, origTable.primaryKey) match { + case (x, y) if x == y => Seq.empty + case (Some(existingPk), Some(newPk)) => Seq( + TablePrimaryKeyDrop(origTable.tableIdentifier, existingPk), + TablePrimaryKeyAdd(newTable.tableIdentifier, newPk) + ) + case (None, Some(newPk)) => Seq(TablePrimaryKeyAdd(newTable.tableIdentifier, newPk)) + case (Some(existingPk), None) => Seq(TablePrimaryKeyDrop(origTable.tableIdentifier, existingPk)) + } + + //TODO #38 Table comments need escaping + + val diffResolver = ColumnsDifferenceResolver(newTable.tableIdentifier)(origTable.columns, origTable.columns) + + diffResolver.alterationsForColumnAdditions ++ + diffResolver.alterationsForCommonColumns ++ + alterationsToRemoveIndices ++ + diffResolver.alterationsForColumnRemovals ++ + pkEntries ++ + alterationsToAddIndices + } +} diff --git a/src/main/scala/za/co/absa/ultet/dbitems/DBTableInsert.scala b/src/main/scala/za/co/absa/ultet/dbitems/DBTableInsert.scala new file mode 100644 index 0000000..89eab16 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/dbitems/DBTableInsert.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.dbitems +import za.co.absa.ultet.model.SQLEntry +import za.co.absa.ultet.model.table.{TableAlteration, TableCreation} +import za.co.absa.ultet.model.table.alterations.TablePrimaryKeyAdd +import za.co.absa.ultet.model.table.index.TableIndexCreate + +case class DBTableInsert(table: DBTable) extends DBItem { + override def sqlEntries: Seq[SQLEntry] = { + val pkCreateAlteration: Option[TableAlteration] = table.primaryKey.map(definedPk => TablePrimaryKeyAdd(table.tableIdentifier, definedPk)) + val indicesCreateAlterations = table.indexes.map(idx => TableIndexCreate(table.tableIdentifier, idx)) + + Seq(TableCreation(table.tableIdentifier, table.columns)) ++ + pkCreateAlteration.toSeq ++ + indicesCreateAlterations + } +} diff --git a/src/main/scala/za/co/absa/ultet/dbitems/extractors/DBTableFromPG.scala b/src/main/scala/za/co/absa/ultet/dbitems/extractors/DBTableFromPG.scala index 025017b..99a2202 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/extractors/DBTableFromPG.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/extractors/DBTableFromPG.scala @@ -22,7 +22,7 @@ import za.co.absa.ultet.dbitems.DBTable import za.co.absa.ultet.dbitems.table.{DBTableColumn, DBTableIndex} import za.co.absa.ultet.dbitems.table.DBTableIndex.{DBPrimaryKey, DBSecondaryIndex, IndexColumn} import za.co.absa.ultet.model.{DatabaseName, SchemaName, UserName} -import za.co.absa.ultet.model.table.{ColumnName, IndexName, TableName} +import za.co.absa.ultet.model.table.{ColumnName, IndexName, TableIdentifier, TableName} import za.co.absa.ultet.util.ResourceReader case class DBTableFromPG(databaseName: DatabaseName)(implicit connection: DBConnection) extends DBQuerySupport { @@ -31,7 +31,7 @@ case class DBTableFromPG(databaseName: DatabaseName)(implicit connection: DBConn extractDBTable().map {case (dbTable, hasIndexes) => val columns = extractColumns() - val (secondaryIndexes, primaryKey) = if (hasIndexes) extractIndexes() else (List.empty, None) + val (secondaryIndexes, primaryKey) = if (hasIndexes) extractIndexes(dbTable.tableIdentifier) else (List.empty, None) dbTable.copy(columns = columns, primaryKey = primaryKey, indexes = secondaryIndexes.toSet) } } @@ -59,8 +59,10 @@ case class DBTableFromPG(databaseName: DatabaseName)(implicit connection: DBConn runQuery(ResourceReader.sql("pg_table_columns"), schemaAndTableName)(_.map(queryResultRowToColumn).toList) } - private def extractIndexes()(implicit schemaAndTableName: List[SetterFnc]): (Seq[DBSecondaryIndex], Option[DBPrimaryKey]) = { - val indexRows = runQuery(ResourceReader.sql("pg_table_indexes"), schemaAndTableName)(_.map(queryResultRowToIndex).toList.groupBy(_.indexName)) + private def extractIndexes(tableIdentifier: TableIdentifier)(implicit schemaAndTableName: List[SetterFnc]): (Seq[DBSecondaryIndex], Option[DBPrimaryKey]) = { + val indexRows = runQuery(ResourceReader.sql("pg_table_indexes"), schemaAndTableName){queryResult => + queryResult.map(queryResultRowToIndex(_, tableIdentifier)).toList.groupBy(_.indexName) + } indexRows.values.foldLeft((List.empty[DBSecondaryIndex], Option.empty[DBPrimaryKey])) { case ((secondaryIndexesAcc, primKeyAcc), indexRows) => indexRows.head match { case head: DBPrimaryKey => @@ -73,7 +75,7 @@ case class DBTableFromPG(databaseName: DatabaseName)(implicit connection: DBConn } } - private def queryResultRowToIndex(indexRow: QueryResultRow): DBTableIndex = { + private def queryResultRowToIndex(indexRow: QueryResultRow, tableIdentifier: TableIdentifier): DBTableIndex = { def readColumn(): List[IndexColumn] = List(IndexColumn( expression = indexRow.getString("column_expression").get, ascendingOrder = indexRow.getBoolean("is_ascending").get, @@ -82,14 +84,14 @@ case class DBTableFromPG(databaseName: DatabaseName)(implicit connection: DBConn if (indexRow.getBoolean("is_primary").get) { DBPrimaryKey( - tableName = TableName(indexRow.getString("table_name").get), + tableIdentifier = tableIdentifier, indexName = IndexName(indexRow.getString("index_name").get), columns = readColumn(), description = indexRow.getString("description") ) } else { DBSecondaryIndex( - tableName = TableName(indexRow.getString("table_name").get), + tableIdentifier = tableIdentifier, indexName = IndexName(indexRow.getString("index_name").get), columns = readColumn(), description = indexRow.getString("description"), diff --git a/src/main/scala/za/co/absa/ultet/dbitems/table/DBTableIndex.scala b/src/main/scala/za/co/absa/ultet/dbitems/table/DBTableIndex.scala index c7264ba..ee993e6 100644 --- a/src/main/scala/za/co/absa/ultet/dbitems/table/DBTableIndex.scala +++ b/src/main/scala/za/co/absa/ultet/dbitems/table/DBTableIndex.scala @@ -17,10 +17,10 @@ package za.co.absa.ultet.dbitems.table import za.co.absa.ultet.dbitems.table.DBTableIndex.IndexColumn -import za.co.absa.ultet.model.table.{IndexName, TableName} +import za.co.absa.ultet.model.table.{IndexName, TableIdentifier, TableName} trait DBTableIndex extends DBTableMember { - def tableName: TableName + def tableIdentifier: TableIdentifier def indexName: IndexName def columns: Seq[IndexColumn] def description: Option[String] @@ -28,7 +28,7 @@ trait DBTableIndex extends DBTableMember { object DBTableIndex { case class DBPrimaryKey ( - tableName: TableName, + tableIdentifier: TableIdentifier, indexName: IndexName, columns: Seq[IndexColumn], description: Option[String] = None @@ -37,7 +37,7 @@ object DBTableIndex { } case class DBSecondaryIndex ( - tableName: TableName, + tableIdentifier: TableIdentifier, indexName: IndexName, columns: Seq[IndexColumn], description: Option[String] = None, diff --git a/src/main/scala/za/co/absa/ultet/implicits/MapImplicits.scala b/src/main/scala/za/co/absa/ultet/implicits/MapImplicits.scala new file mode 100644 index 0000000..272bedd --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/implicits/MapImplicits.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.implicits + +import za.co.absa.ultet.model.TransactionGroup +import za.co.absa.ultet.util.{SqlEntriesPerTransaction, SqlsPerTransaction} + +object MapImplicits { + implicit class SqlEntriesPerTransactionEnhancement(val entries: SqlEntriesPerTransaction) extends AnyVal { + def toSql: SqlsPerTransaction = { + entries.map { case (transaction, entry) => + val head = + s""" + |---------------------------------------------------------------------------------------- + |-- Transaction group: ${transaction.toString.toUpperCase} + |---------------------------------------------------------------------------------------- + |""".stripMargin + transaction -> (head +: entry.map(_.sqlExpression)) + } + } + } +} diff --git a/src/main/scala/za/co/absa/ultet/implicits/OptionImplicits.scala b/src/main/scala/za/co/absa/ultet/implicits/OptionImplicits.scala new file mode 100644 index 0000000..92f9779 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/implicits/OptionImplicits.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.implicits + +object OptionImplicits { + implicit class OptionEnhancements[T](val option: Option[T]) extends AnyVal { + /** + * Gets the `option` value or throws the provided exception + * + * @param exception the exception to throw in case the `option` is None + * @return + */ + def getOrThrow(exception: => Throwable): T = { + option.getOrElse(throw exception) + } + } +} diff --git a/src/main/scala/za/co/absa/ultet/implicits/SetImplicits.scala b/src/main/scala/za/co/absa/ultet/implicits/SetImplicits.scala new file mode 100644 index 0000000..ae04cae --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/implicits/SetImplicits.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.implicits + +import za.co.absa.ultet.dbitems.DBItem +import za.co.absa.ultet.util.SqlEntriesPerTransaction + +object SetImplicits { + + implicit class DBItemSetEnhancement(val dbItems: Set[DBItem]) extends AnyVal { + def toSortedGroupedSqlEntries: SqlEntriesPerTransaction = { + val sqlEntries = dbItems.toSeq.flatMap(_.sqlEntries) + sqlEntries + .groupBy(_.transactionGroup) + .mapValues(_.sortBy(_.orderInTransaction)) + } } + +} diff --git a/src/main/scala/za/co/absa/ultet/model/DBAppModel.scala b/src/main/scala/za/co/absa/ultet/model/DBAppModel.scala new file mode 100644 index 0000000..f5fbbcc --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/model/DBAppModel.scala @@ -0,0 +1,157 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.model + +import za.co.absa.balta.classes.DBConnection +import za.co.absa.ultet.dbitems.extractors.DBTableFromPG +import za.co.absa.ultet.dbitems.{DBFunction, DBFunctionFromPG, DBItem, DBTable} +import za.co.absa.ultet.implicits.SetImplicits.DBItemSetEnhancement +import za.co.absa.ultet.parsers.{PgFunctionFileParser, PgTableFileParser} +import za.co.absa.ultet.util.{DatabaseDefs, FileReader, SqlEntriesPerTransaction, TaskConfig} +import za.co.absa.ultet.util.FileReader.SchemaFiles +import za.co.absa.ultet.util.SourceFileType.{FunctionSrc, SchemaOwner, TableSrc} + +import java.net.URI + +case class DBAppModel(databases: DatabaseDefs) { + + def +(other: Option[DBAppModel]): DBAppModel = { + other.fold(this)(this + _) + } + + def +(other: DBAppModel): DBAppModel = { + val newDatabases = other.databases.foldLeft(databases) { + case (acc, (dbName, dbDef)) => + val newDbDef = dbDef + acc.get(dbName) + acc + (dbName -> newDbDef) + } + DBAppModel(newDatabases) + } + + def addFunctionSource(schemaName: SchemaName, functionSource: String): DBAppModel = { + val functions = PgFunctionFileParser.parseSource(functionSource) + val newDBAppModel = DBAppModel.fromDBFunctions(functions.toSeq) //we need to use Seq instead of Set because Seq allows covariance while Set only invariance + //TODO #31 Add warnings to the system - check if the function belong to the provided schema + + this + newDBAppModel + } + + def addTableSource(schemaName: SchemaName, tableSource: String): DBAppModel = { + val tables = PgTableFileParser(schemaName).parseSource(tableSource) + val newDBAppModel = DBAppModel.fromTableDefs(tables) + this + newDBAppModel + } + + def addDatabasesAnalysis()(implicit taskConfig: TaskConfig): DBAppModel = { + val newDatabases = databases.foldLeft(this.databases) { + case (acc, (dbName, dbDef)) => + val newDbDef = analyzeDatabase(dbDef) + acc + (dbName -> (dbDef + newDbDef)) + } + copy(databases = newDatabases) + } + + def toDBItems: Map[DatabaseName, Set[DBItem]] = { + databases.map{case (dbName, dbDef) => dbName -> dbDef.toDBItems} + } + + def createSQLEntries(): Map[DatabaseName, SqlEntriesPerTransaction] = { + databases.map{case (dbName, dbDef) => + val sqlEntries = dbDef.toDBItems.toSortedGroupedSqlEntries + dbName -> sqlEntries + } + } + + + private def analyzeDatabase(dbDef: DatabaseDef)(implicit taskConfig: TaskConfig): DatabaseDef = { + implicit val dbConnection: DBConnection = taskConfig.dbConnections(dbDef.databaseName).dbConnection + val tableExtractor = DBTableFromPG(dbDef.databaseName) + val newSchemas = dbDef.schemas.foldLeft(dbDef.schemas) { + case (acc, (schemaName, schemaDef)) => + val functionsInDatabase: Seq[DBFunction] = + DBFunctionFromPG.fetchAllOfSchema(dbDef.databaseName, schemaName)(dbConnection.connection) //TODO #33 Get Postgres fucntions based on list of names + val tablesInDatabase = schemaDef + .tablesFromSource + .values.flatMap(table => tableExtractor.extract(schemaName, table.tableName)) + .map(table => table.tableName -> table) + .toMap + val newSchemaDef = SchemaDef(schemaName, functionsInDatabase.toSet, Map.empty, tablesInDatabase) + acc + (schemaName -> newSchemaDef) + } + DatabaseDef(dbDef.databaseName, newSchemas, createDatabase = false) + // TODO #32 Add database creation support - in case the database cannot be contacted + } +} + +object DBAppModel { + + def loadFromSources(sources: SchemaFiles): DBAppModel = { + sources.foldLeft(emptyDBAppModel) { + case (acc, (schemaName, files)) => + val newDBAppModel = loadSchemaFiles(schemaName, files) + acc + newDBAppModel + } + } + + def fromDBFunctions[T <: DBFunction](functions: Seq[T]): DBAppModel = { + val databases = functions.groupBy(_.database) + + val databaseDefs = databases.map { + case (dbName, dbFunctions) => + val schemas = dbFunctions.groupBy(_.schema) + val schemaDefs = schemas.map { + case (schemaName, schemaFunctions) => + val schemaDef = SchemaDef(schemaName, schemaFunctions.toSet, Map.empty, Map.empty) + schemaName -> schemaDef + } + dbName -> DatabaseDef(dbName, schemaDefs, createDatabase = false) + } + + DBAppModel(databaseDefs) + } + + private def fromTableDefs(tables: Set[DBTable]): DBAppModel = { + val tablesByDB = tables.groupBy(_.primaryDBName) + + val databaseDefs = tablesByDB.map { + case (dbName, dbTables) => + val schemaDef = dbTables.groupBy(_.schemaName).map { + case (schemaName, schemaTables) => + val schemaTablesMap = schemaTables.map(table => table.tableName -> table).toMap + val schemaDef = SchemaDef(schemaName, Set.empty, schemaTablesMap, Map.empty) + schemaName -> schemaDef + } + dbName -> DatabaseDef(dbName, schemaDef, createDatabase = false) + } + + DBAppModel(databaseDefs) + } + + private def loadSchemaFiles(schemaName: SchemaName, files: Set[URI]): DBAppModel = { + files.foldLeft(emptyDBAppModel) { + case (acc, file) => + FileReader.fileType(file) match { + case TableSrc => acc.addTableSource(schemaName, FileReader.readFileAsString(file)) + case FunctionSrc => acc.addFunctionSource(schemaName, FileReader.readFileAsString(file)) + case SchemaOwner => acc // TODO #30 Schema owner support + case _ => acc + } + } + } + + private val emptyDBAppModel = DBAppModel(Map.empty[DatabaseName, DatabaseDef]) +} diff --git a/src/main/scala/za/co/absa/ultet/model/DatabaseDef.scala b/src/main/scala/za/co/absa/ultet/model/DatabaseDef.scala new file mode 100644 index 0000000..d7fd071 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/model/DatabaseDef.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.model + +import za.co.absa.ultet.dbitems.DBItem +import za.co.absa.ultet.util.SchemaDefs + +case class DatabaseDef( + databaseName: DatabaseName, + schemas: SchemaDefs, + createDatabase: Boolean + ) { + def +(other: Option[DatabaseDef]): DatabaseDef = { + other.fold(this)(this + _) + } + + def +(other: DatabaseDef): DatabaseDef = { + val newSchemas = other.schemas.foldLeft(schemas) { + case (acc, (schemaName, schemaDef)) => + val joinedSchema = schemaDef + schemas.get(schemaName) + acc + (schemaName -> joinedSchema) + } + + DatabaseDef( + databaseName, + newSchemas, + createDatabase || other.createDatabase + ) + } + + def toDBItems: Set[DBItem] = { + // TODo #32 Add database creation support + schemas.values.foldLeft(Set.empty[DBItem])((acc, schemaDef) => schemaDef.toDBItems ++ acc) + } +} + diff --git a/src/main/scala/za/co/absa/ultet/model/SchemaDef.scala b/src/main/scala/za/co/absa/ultet/model/SchemaDef.scala new file mode 100644 index 0000000..3dec369 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/model/SchemaDef.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.model + +import za.co.absa.ultet.dbitems.{DBFunction, DBFunctionFromSource, DBItem, DBSchema} +import za.co.absa.ultet.model.function.FunctionName +import za.co.absa.ultet.util.Tables + +case class SchemaDef( + name: SchemaName, + functions: Set[DBFunction], + tablesFromSource: Tables, + tablesFromTarget: Tables + ) { + + def +(other: Option[SchemaDef]): SchemaDef = { + other.fold(this)(this + _) + } + def +(other: SchemaDef): SchemaDef = { + SchemaDef( + name, + functions ++ other.functions, + tablesFromSource ++ other.tablesFromSource, + tablesFromTarget ++ other.tablesFromTarget + ) + } + + def functionNames: Set[FunctionName] = { + functions.map(_.fnName) + } + + def users: Set[UserName] = { + val functionUsers = functions + .collect{case function: DBFunctionFromSource => (function.users, function.owner)} + .foldLeft(Set.empty[UserName]){ case (acc, (users, owner)) => + acc ++ users + owner + } + tablesFromSource.values.foldLeft(functionUsers) { case (acc, table) => + acc + table.owner + } + } + + def toDBItems: Set[DBItem] = { + val accWithSchema: Seq[DBItem] = Seq(DBSchema(name, None /*TODO #30 Schema owner support */, users)) + val accWithTables = tablesFromSource.foldLeft(accWithSchema) { case (acc, (tableName, tableDef)) => + acc :+ (tableDef - tablesFromTarget.get(tableName)) + } + functions ++ accWithTables + } + +} diff --git a/src/main/scala/za/co/absa/ultet/model/schema/SchemaGrant.scala b/src/main/scala/za/co/absa/ultet/model/schema/SchemaGrant.scala index a17d4d1..75fc875 100644 --- a/src/main/scala/za/co/absa/ultet/model/schema/SchemaGrant.scala +++ b/src/main/scala/za/co/absa/ultet/model/schema/SchemaGrant.scala @@ -19,7 +19,7 @@ package za.co.absa.ultet.model.schema import za.co.absa.ultet.model.{SQLEntry, SchemaName, TransactionGroup, UserName} import za.co.absa.ultet.model.TransactionGroup.TransactionGroup -case class SchemaGrant(name: SchemaName, roles: Seq[UserName]) extends SQLEntry { +case class SchemaGrant(name: SchemaName, roles: Set[UserName]) extends SQLEntry { override def sqlExpression: String = { s"GRANT USAGE ON SCHEMA ${name.value} TO ${roles.map(_.value).mkString(", ")};" } diff --git a/src/main/scala/za/co/absa/ultet/model/table/TableCreation.scala b/src/main/scala/za/co/absa/ultet/model/table/TableCreation.scala index a64afa0..81f7152 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/TableCreation.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/TableCreation.scala @@ -20,9 +20,8 @@ import za.co.absa.ultet.dbitems.table.DBTableColumn import za.co.absa.ultet.model.SchemaName case class TableCreation( - schemaName: SchemaName, - tableName: TableName, - columns: Seq[DBTableColumn], + tableIdentifier: TableIdentifier, + columns: Seq[DBTableColumn], ) extends TableEntry { override def sqlExpression: String = { @@ -32,7 +31,7 @@ case class TableCreation( s"${col.columnName.value} ${col.dataType}$notNull$default" } - s"""CREATE TABLE ${schemaName.value}.${tableName.value}( + s"""CREATE TABLE ${tableIdentifier.fullName}( |${columnLines.mkString(",\n")} |);""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/TableEntry.scala b/src/main/scala/za/co/absa/ultet/model/table/TableEntry.scala index 504b8b5..988b422 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/TableEntry.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/TableEntry.scala @@ -21,8 +21,8 @@ import za.co.absa.ultet.model.TransactionGroup import za.co.absa.ultet.model.SchemaName trait TableEntry extends SQLEntry { - def schemaName: SchemaName - def tableName: TableName + + def tableIdentifier: TableIdentifier def transactionGroup: TransactionGroup.TransactionGroup = TransactionGroup.Objects } diff --git a/src/main/scala/za/co/absa/ultet/model/table/TableOwnership.scala b/src/main/scala/za/co/absa/ultet/model/table/TableOwnership.scala index c445208..4ba4e8b 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/TableOwnership.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/TableOwnership.scala @@ -19,13 +19,12 @@ package za.co.absa.ultet.model.table import za.co.absa.ultet.model.{SchemaName, UserName} case class TableOwnership( - schemaName: SchemaName, - tableName: TableName, - owner: UserName + tableIdentifier: TableIdentifier, + owner: UserName ) extends TableAlteration { override def sqlExpression: String = { - s"""ALTER TABLE ${schemaName.value}.${tableName.value} OWNER TO ${owner.value};""" + s"""ALTER TABLE ${tableIdentifier.fullName} OWNER TO ${owner.value};""" } override def orderInTransaction: Int = 201 diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentDrop.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentDrop.scala index 31605a7..02ae665 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentDrop.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentDrop.scala @@ -16,12 +16,11 @@ package za.co.absa.ultet.model.table.alterations -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier} -case class TableColumnCommentDrop(schemaName: SchemaName, tableName: TableName, columnName: ColumnName) extends TableAlteration { +case class TableColumnCommentDrop(tableIdentifier: TableIdentifier, columnName: ColumnName) extends TableAlteration { override def sqlExpression: String = { - s"""COMMENT ON COLUMN ${schemaName.value}.${tableName.value}.${columnName.value} + s"""COMMENT ON COLUMN ${tableIdentifier.fullName}.${columnName.normalized} |IS NULL;""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentSet.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentSet.scala index 76c2b23..f0474f8 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentSet.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnCommentSet.scala @@ -17,11 +17,11 @@ package za.co.absa.ultet.model.table.alterations import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier, TableName} -case class TableColumnCommentSet(schemaName: SchemaName, tableName: TableName, columnName: ColumnName, comment: String) extends TableAlteration { +case class TableColumnCommentSet(tableIdentifier: TableIdentifier, columnName: ColumnName, comment: String) extends TableAlteration { override def sqlExpression: String = { - s"""COMMENT ON COLUMN ${schemaName.value}.${tableName.value}.${columnName.value} + s"""COMMENT ON COLUMN ${tableIdentifier.fullName}.${columnName.normalized} |IS '$comment';""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultDrop.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultDrop.scala index b6bf0e6..6f60599 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultDrop.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultDrop.scala @@ -16,13 +16,12 @@ package za.co.absa.ultet.model.table.alterations -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier} -case class TableColumnDefaultDrop(schemaName: SchemaName, tableName: TableName, columnName: ColumnName) extends TableAlteration { +case class TableColumnDefaultDrop(tableIdentifier: TableIdentifier, columnName: ColumnName) extends TableAlteration { override def sqlExpression: String = { - s"""ALTER TABLE ${schemaName.value}.${tableName.value} - |ALTER COLUMN ${columnName.value} DROP DEFAULT;""".stripMargin + s"""ALTER TABLE ${tableIdentifier.fullName} + |ALTER COLUMN ${columnName.normalized} DROP DEFAULT;""".stripMargin } override def orderInTransaction: Int = 250 diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultSet.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultSet.scala index d82b7bd..750d70a 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultSet.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnDefaultSet.scala @@ -16,13 +16,12 @@ package za.co.absa.ultet.model.table.alterations -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier} -case class TableColumnDefaultSet(schemaName: SchemaName, tableName: TableName, columnName: ColumnName, default: String) extends TableAlteration { +case class TableColumnDefaultSet(tableIdentifier: TableIdentifier, columnName: ColumnName, default: String) extends TableAlteration { override def sqlExpression: String = { - s"""ALTER TABLE ${schemaName.value}.${tableName.value} - |ALTER COLUMN ${columnName.value} SET DEFAULT $default;""".stripMargin + s"""ALTER TABLE ${tableIdentifier.fullName} + |ALTER COLUMN ${columnName.normalized} SET DEFAULT $default;""".stripMargin } override def orderInTransaction: Int = 250 diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnNotNullDrop.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnNotNullDrop.scala index 473f2da..98ad743 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnNotNullDrop.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TableColumnNotNullDrop.scala @@ -16,12 +16,11 @@ package za.co.absa.ultet.model.table.alterations -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier} -case class TableColumnNotNullDrop(schemaName: SchemaName, tableName: TableName, columnName: ColumnName) extends TableAlteration { +case class TableColumnNotNullDrop(tableIdentifier: TableIdentifier, columnName: ColumnName) extends TableAlteration { override def sqlExpression: String = { - s"""ALTER TABLE ${schemaName.value}.${tableName.value} + s"""ALTER TABLE ${tableIdentifier.fullName} |ALTER COLUMN ${columnName.value} DROP NOT NULL;""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyAdd.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyAdd.scala index 546daf1..9ae8383 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyAdd.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyAdd.scala @@ -17,14 +17,13 @@ package za.co.absa.ultet.model.table.alterations import za.co.absa.ultet.dbitems.table.DBTableIndex.DBPrimaryKey -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{TableAlteration, TableName} +import za.co.absa.ultet.model.table.{TableAlteration, TableIdentifier} -case class TablePrimaryKeyAdd(schemaName: SchemaName, tableName: TableName, primaryKey: DBPrimaryKey) extends TableAlteration { +case class TablePrimaryKeyAdd(tableIdentifier: TableIdentifier, primaryKey: DBPrimaryKey) extends TableAlteration { override def sqlExpression: String = { val commandColumnNames = primaryKey.columns.map(_.expression).mkString(", ") - s"""ALTER TABLE ${schemaName.value}.${tableName.value} + s"""ALTER TABLE ${tableIdentifier.fullName} |ADD CONSTRAINT ${primaryKey.indexName} |PRIMARY KEY ($commandColumnNames);""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyDrop.scala b/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyDrop.scala index 96042e5..0b3003e 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyDrop.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/alterations/TablePrimaryKeyDrop.scala @@ -17,12 +17,11 @@ package za.co.absa.ultet.model.table.alterations import za.co.absa.ultet.dbitems.table.DBTableIndex.DBPrimaryKey -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{TableAlteration, TableName} +import za.co.absa.ultet.model.table.{TableAlteration, TableIdentifier} -case class TablePrimaryKeyDrop(schemaName: SchemaName, tableName: TableName, primaryKey: DBPrimaryKey) extends TableAlteration { +case class TablePrimaryKeyDrop(tableIdentifier: TableIdentifier, primaryKey: DBPrimaryKey) extends TableAlteration { override def sqlExpression: String = { - s"""ALTER TABLE ${schemaName.value}.${tableName.value} + s"""ALTER TABLE ${tableIdentifier.fullName} |DROP CONSTRAINT ${primaryKey.indexName};""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnAdd.scala b/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnAdd.scala index 55f01a9..d667c58 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnAdd.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnAdd.scala @@ -17,17 +17,17 @@ package za.co.absa.ultet.model.table.column import za.co.absa.ultet.dbitems.table.DBTableColumn -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{TableAlteration, TableName} +import za.co.absa.ultet.model.table.{TableAlteration, TableIdentifier} -case class TableColumnAdd(schemaName: SchemaName, tableName: TableName, tableColumn: DBTableColumn) extends TableAlteration { +case class TableColumnAdd(tableIdentifier: TableIdentifier, tableColumn: DBTableColumn) extends TableAlteration { override def sqlExpression: String = { val default = tableColumn.default.map(value => s" DEFAULT $value").getOrElse("") val notNull = if (tableColumn.notNull) " NOT NULL" else "" - // todo description? - s"""ALTER TABLE $tableName + //TODO #38 Table comments need escaping + + s"""ALTER TABLE ${tableIdentifier.fullName} | ADD ${tableColumn.columnName} ${tableColumn.dataType}$default$notNull | ;""".stripMargin } diff --git a/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnDrop.scala b/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnDrop.scala index a03b5dd..ea023c8 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnDrop.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/column/TableColumnDrop.scala @@ -16,12 +16,11 @@ package za.co.absa.ultet.model.table.column -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableName} +import za.co.absa.ultet.model.table.{ColumnName, TableAlteration, TableIdentifier} -case class TableColumnDrop(schemaName: SchemaName, tableName: TableName, columnName: ColumnName) extends TableAlteration { +case class TableColumnDrop(tableIdentifier: TableIdentifier, columnName: ColumnName) extends TableAlteration { override def sqlExpression: String = { - s"""ALTER TABLE ${schemaName.value}.${tableName.value} DROP COLUMN ${columnName.value};""" + s"""ALTER TABLE ${tableIdentifier.fullName} DROP COLUMN ${columnName.value};""" } override def orderInTransaction: Int = 280 diff --git a/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexCreate.scala b/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexCreate.scala index 5b7b24b..b641d9f 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexCreate.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexCreate.scala @@ -17,12 +17,9 @@ package za.co.absa.ultet.model.table.index import za.co.absa.ultet.dbitems.table.DBTableIndex.DBSecondaryIndex -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{TableAlteration, TableName} +import za.co.absa.ultet.model.table.{TableAlteration, TableIdentifier, TableName} -case class TableIndexCreate(schemaName: SchemaName, tableIndex: DBSecondaryIndex) extends TableAlteration { - - override def tableName: TableName = tableIndex.tableName +case class TableIndexCreate(tableIdentifier: TableIdentifier, tableIndex: DBSecondaryIndex) extends TableAlteration { override def sqlExpression: String = { val unique = if(tableIndex.unique) " UNIQUE" else "" @@ -35,7 +32,7 @@ case class TableIndexCreate(schemaName: SchemaName, tableIndex: DBSecondaryIndex val nullsDistinct = if (tableIndex.nullsDistinct) "NULLS NOT DISTINCT" else "NULLS DISTINCT" - s"""CREATE$unique INDEX CONCURRENTLY ${tableIndex.indexName} ON ${schemaName.value}.${tableName.value} ($columns) $nullsDistinct;""" + s"""CREATE$unique INDEX CONCURRENTLY ${tableIndex.indexName} ON ${tableIdentifier.fullName} ($columns) $nullsDistinct;""" } override def orderInTransaction: Int = 270 diff --git a/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexDrop.scala b/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexDrop.scala index ce50037..9bd1fbd 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexDrop.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/index/TableIndexDrop.scala @@ -16,12 +16,11 @@ package za.co.absa.ultet.model.table.index -import za.co.absa.ultet.model.SchemaName -import za.co.absa.ultet.model.table.{TableAlteration, TableName} +import za.co.absa.ultet.model.table.{IndexName, TableAlteration, TableIdentifier} -case class TableIndexDrop(schemaName: SchemaName, tableName: TableName, indexName: String) extends TableAlteration { +case class TableIndexDrop(tableIdentifier: TableIdentifier, indexName: IndexName) extends TableAlteration { override def sqlExpression: String = { - s"""DROP INDEX $indexName;""" + s"""DROP INDEX ${indexName.normalized};""" } override def orderInTransaction: Int = 210 diff --git a/src/main/scala/za/co/absa/ultet/model/table/package.scala b/src/main/scala/za/co/absa/ultet/model/table/package.scala index 549b11e..dff2911 100644 --- a/src/main/scala/za/co/absa/ultet/model/table/package.scala +++ b/src/main/scala/za/co/absa/ultet/model/table/package.scala @@ -24,19 +24,7 @@ package object table { case class IndexName(value: String) extends DBObjectName(value) - case class TableIdentifier(tableName: TableName, schemaName: Option[SchemaName] = None) { - def fullName: String = { - schemaName.map(s => s"${s.normalized}.").getOrElse("") + tableName.normalized - } - } - - object TableIdentifier { - def apply(tableName: String, schemaName: String): TableIdentifier = { - if (schemaName.isEmpty) { - new TableIdentifier(TableName(tableName)) - } else { - new TableIdentifier(TableName(tableName), Some(SchemaName(schemaName))) - } - } + case class TableIdentifier(schemaName: SchemaName, tableName: TableName) { + def fullName: String = s"${schemaName.normalized}.${tableName.normalized}" } } diff --git a/src/main/scala/za/co/absa/ultet/parsers/GenericFileParser.scala b/src/main/scala/za/co/absa/ultet/parsers/GenericFileParser.scala new file mode 100644 index 0000000..4ef0b40 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/parsers/GenericFileParser.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.parsers + +import za.co.absa.ultet.util.FileReader + +import java.net.URI + +trait GenericFileParser[T] { + def parseFile(fileUri: URI): Set[T] = { + parseSource(FileReader.readFileAsString(fileUri)) + } + + def parseSource(lines: Seq[String]): Set[T] = { + parseSource(lines.mkString("\n")) + } + + def parseSource(source: String): Set[T] +} diff --git a/src/main/scala/za/co/absa/ultet/parsers/PgFunctionFileParser.scala b/src/main/scala/za/co/absa/ultet/parsers/PgFunctionFileParser.scala index 721943e..e1af995 100644 --- a/src/main/scala/za/co/absa/ultet/parsers/PgFunctionFileParser.scala +++ b/src/main/scala/za/co/absa/ultet/parsers/PgFunctionFileParser.scala @@ -19,21 +19,23 @@ package za.co.absa.ultet.parsers import za.co.absa.ultet.dbitems.DBFunctionFromSource import za.co.absa.ultet.model.function.{FunctionArgumentType, FunctionName} import za.co.absa.ultet.model.{DatabaseName, SchemaName, UserName} -import za.co.absa.ultet.parsers.PgFunctionFileParser._ -import java.net.URI -import java.nio.file.{Files, Paths} -import java.util.stream.Collectors +object PgFunctionFileParser extends GenericFileParser[DBFunctionFromSource] { -case class PgFunctionFileParser() { + override def parseSource(source: String): Set[DBFunctionFromSource] = { + val owner = parseOwnerFromSql(source) + val (databaseName, users) = parseDatabaseNameAndUsersFromSql(source) + val (schemaName, fnName, inParamTypes) = parseSchemaNameFnNameAndInParamTypesFromSql(source) - def parseFile(fileUri: URI): DBFunctionFromSource = { - val path = Paths.get(fileUri) - - val lines = Files.lines(path) - val content = lines.collect(Collectors.joining("\n")) - - parseString(content) + Set(DBFunctionFromSource( + FunctionName(fnName), + inParamTypes.map(FunctionArgumentType), + UserName(owner), + users.map(UserName).toSet, + SchemaName(schemaName), + DatabaseName(databaseName), + source + )) } private def parseOwnerFromSql(sqlStr: String): String = { @@ -84,29 +86,11 @@ case class PgFunctionFileParser() { } } - def parseString(str: String): DBFunctionFromSource = { - val owner = parseOwnerFromSql(str) - val (databaseName, users) = parseDatabaseNameAndUsersFromSql(str) - val (schemaName, fnName, inParamTypes) = parseSchemaNameFnNameAndInParamTypesFromSql(str) - - DBFunctionFromSource( - FunctionName(fnName), - inParamTypes.map(FunctionArgumentType), - UserName(owner), - users.map(UserName), - SchemaName(schemaName), - DatabaseName(databaseName), - str - ) - } -} - -object PgFunctionFileParser { - val ownerRx = """--\s*owner:\s*(\w+)\s*""".r + private val ownerRx = """--\s*owner:\s*(\w+)\s*""".r // 1--1 // 1 - capturing group for owner - val dbUsersRx = """--\s*database:\s*(\w+)\s*\(\s*((?:\w+)\s*(?:,\s*\w+\s*)*\s*)?\)""".r // need to ,-break and trim the users + private val dbUsersRx = """--\s*database:\s*(\w+)\s*\(\s*((?:\w+)\s*(?:,\s*\w+\s*)*\s*)?\)""".r // need to ,-break and trim the users // 1---1 2 34-----4 5------------5 6 7 // 1 - capturing group #1 for db-name // 2,7 verbatim "()" in which users are written @@ -115,7 +99,7 @@ object PgFunctionFileParser { // 5 - other users - comma separated from the first user - val schemaFnParamsRx = """(?i)CREATE(?:\s+OR\s+REPLACE)?\s+FUNCTION\s+(\w+)\.(\w+)\s*\(([,\s\w]+)\)""".r // need to break params + private val schemaFnParamsRx = """(?i)CREATE(?:\s+OR\s+REPLACE)?\s+FUNCTION\s+(\w+)\.(\w+)\s*\(([,\s\w]+)\)""".r // need to break params // 1--1 2-----------------2 3---3 45---5 6 7-------7 8 // 1 - case insensitive matching // 2 - non-capturing group of optionally present " OR REPLACE" @@ -126,7 +110,7 @@ object PgFunctionFileParser { // 7 - capturing-group #3 - parameters are matched there as a block - \s also covers line-breaks => parsed further later - val singleParamCapturingRx = """(?i)\s*(IN|OUT)\s+(\w+)\s+(\w+)\s*""".r // used to break up params from ^^ + private val singleParamCapturingRx = """(?i)\s*(IN|OUT)\s+(\w+)\s+(\w+)\s*""".r // used to break up params from ^^ // 1--1 2------2 3---3 4---4 // 1 - case insensitive matching // 2 - capturing-group #1 IN/OUT param diff --git a/src/main/scala/za/co/absa/ultet/parsers/PgTableFileParser.scala b/src/main/scala/za/co/absa/ultet/parsers/PgTableFileParser.scala index 0711fbc..680e5ad 100644 --- a/src/main/scala/za/co/absa/ultet/parsers/PgTableFileParser.scala +++ b/src/main/scala/za/co/absa/ultet/parsers/PgTableFileParser.scala @@ -18,129 +18,23 @@ package za.co.absa.ultet.parsers import cats.syntax.either._ import io.circe.generic.auto._ -import io.circe.{Error, yaml} +import io.circe.{Error, Json, ParsingFailure, yaml} import za.co.absa.ultet.dbitems.DBTable -import za.co.absa.ultet.dbitems.table.DBTableIndex.{DBPrimaryKey, DBSecondaryIndex, IndexColumn} -import za.co.absa.ultet.dbitems.table.{DBTableColumn, DBTableIndex} -import za.co.absa.ultet.model._ -import za.co.absa.ultet.model.table.{ColumnName, IndexName, TableName} +import za.co.absa.ultet.model.SchemaName +import za.co.absa.ultet.parsers.helpers.DBTableFromYaml -import java.net.URI -import java.nio.file.{Files, Paths} -import java.util.stream.Collectors - - -case class PgTableFileParser() { - import PgTableFileParser.DBTableFromYaml +case class PgTableFileParser(schemaName: SchemaName) extends GenericFileParser[DBTable] { + override def parseSource(source: String): Set[DBTable] = { + val processedYaml = processYaml(source) + Set(processedYaml.convertToDBTable(schemaName)) + } - def parseContentYaml(content: String): DBTableFromYaml = { - val loadedYaml = yaml.parser.parse(content) - val processedYaml = loadedYaml + private [parsers] def processYaml(source: String): DBTableFromYaml = { + val loadedYaml: Either[ParsingFailure, Json] = yaml.parser.parse(source) + loadedYaml .leftMap(err => err: Error) .flatMap(_.as[DBTableFromYaml]) .valueOr(throw _) - processedYaml - } - - def parseFileYaml(fileUri: URI): DBTable = { - val path = Paths.get(fileUri) - val lines = Files.lines(path) - val content = lines.collect(Collectors.joining("\n")) - - parseContentYaml(content).convertToDBTable } } - - -object PgTableFileParser { - - case class DBTableFromYaml( - table: String, - description: Option[String], - primaryDBName: String, - owner: String, - columns: Seq[Map[String, String]] = Seq.empty, - primaryKey: Option[Map[String, String]] = None, - indexes: Seq[Map[String, String]] = Seq.empty - ) { - private def prepareColumns: Seq[DBTableColumn] = { - columns.map( - currCol => { - DBTableColumn( - ColumnName(currCol("columnName")), - currCol("dataType"), - currCol("notNull").toBoolean, - currCol.get("description"), - currCol.get("default"), - ) - } - ) - } - - private def prepareIndexes: Seq[DBSecondaryIndex] = { - indexes.map( - currIndex => { - DBSecondaryIndex( - tableName = TableName(currIndex("tableName")), - indexName = IndexName(currIndex("indexName")), - columns = currIndex("indexBy") - .replaceAll("""^\[|\]$""", "") - .split(",") - .map(col => IndexColumn(col.trim)) - .toList, - unique = currIndex.getOrElse("unique", "false").toBoolean, - // todo better all this - //currIndex.getOrElse("ascendingOrder", "true").toBoolean, - //currIndex.get("nullsFirstOverride").map(_.toBoolean), - nullsDistinct = currIndex.getOrElse("nullsDistinct", "true").toBoolean, - description = None, - constraint = None - ) - } - ) - } - - private def preparePrimaryKey: Option[DBPrimaryKey] = { - if (primaryKey.isDefined) { - val cols = primaryKey.get("columns") - val pkName = primaryKey.get("name") - val indexColumns = cols.replaceAll("""^\[|\]$""", "") - .split(",") - .map(currColName => IndexColumn(currColName.trim)) - .toList - - val preparedPk = DBPrimaryKey( - TableName(table), - IndexName(pkName), - indexColumns, - ) - Some(preparedPk) - } else { - None - } - } - - def convertToDBTable: DBTable = { - val schemaAndTbl = table.split("\\.", 2) - val semiPreparedTable = DBTable( - TableName(schemaAndTbl(1)), - SchemaName(schemaAndTbl(0)), - DatabaseName(primaryDBName), - UserName(owner), - description - ) - - val withColumns = prepareColumns.foldLeft(semiPreparedTable) { case (acc, preparedColumn) => - acc.addColumn(preparedColumn) - } - val withColumnsAndIndexes = prepareIndexes.foldLeft(withColumns) { case (acc, preparedIndex) => - acc.addIndex(preparedIndex) - } - preparePrimaryKey match { - case Some(pk) => withColumnsAndIndexes.definePrimaryKey(pk) - case _ => withColumnsAndIndexes - } - } - } -} diff --git a/src/main/scala/za/co/absa/ultet/parsers/helpers/DBTableFromYaml.scala b/src/main/scala/za/co/absa/ultet/parsers/helpers/DBTableFromYaml.scala new file mode 100644 index 0000000..8f3f129 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/parsers/helpers/DBTableFromYaml.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.parsers.helpers + +import za.co.absa.ultet.dbitems.DBTable +import za.co.absa.ultet.dbitems.table.DBTableColumn +import za.co.absa.ultet.dbitems.table.DBTableIndex.{DBPrimaryKey, DBSecondaryIndex, IndexColumn} +import za.co.absa.ultet.model.{DatabaseName, SchemaName, UserName} +import za.co.absa.ultet.model.table.{ColumnName, IndexName, TableIdentifier, TableName} + +case class DBTableFromYaml( + table: String, + description: Option[String], + primaryDBName: String, + owner: String, + columns: Seq[Map[String, String]] = Seq.empty, + primaryKey: Option[Map[String, String]] = None, + indexes: Seq[Map[String, String]] = Seq.empty +) { + private def prepareColumns: Seq[DBTableColumn] = { + columns.map( + currCol => { + DBTableColumn( + ColumnName(currCol("columnName")), + currCol("dataType"), + currCol("notNull").toBoolean, + currCol.get("description"), + currCol.get("default"), + ) + } + ) + } + + private def prepareIndexes(tableIdentifier: TableIdentifier): Seq[DBSecondaryIndex] = { + indexes.map( + currIndex => { + DBSecondaryIndex( + tableIdentifier = tableIdentifier, + indexName = IndexName(currIndex("indexName")), + columns = currIndex("indexBy") + .replaceAll("""^\[|\]$""", "") + .split(",") + .map(col => IndexColumn(col.trim)) + .toList, + unique = currIndex.getOrElse("unique", "false").toBoolean, + // todo better all this + //currIndex.getOrElse("ascendingOrder", "true").toBoolean, + //currIndex.get("nullsFirstOverride").map(_.toBoolean), + nullsDistinct = currIndex.getOrElse("nullsDistinct", "true").toBoolean, + description = None, + constraint = None + ) + } + ) + } + + private def preparePrimaryKey(tableIdentifier: TableIdentifier): Option[DBPrimaryKey] = { + if (primaryKey.isDefined) { + val cols = primaryKey.get("columns") + val pkName = primaryKey.get("name") + val indexColumns = cols.replaceAll("""^\[|\]$""", "") + .split(",") + .map(currColName => IndexColumn(currColName.trim)) + .toList + + val preparedPk = DBPrimaryKey( + tableIdentifier, + IndexName(pkName), + indexColumns, + ) + Some(preparedPk) + } else { + None + } + } + + def convertToDBTable(schemaName: SchemaName): DBTable = { + val semiPreparedTable = DBTable( + schemaName, + TableName(table), + DatabaseName(primaryDBName), + UserName(owner), + description + ) + + val withColumns = prepareColumns.foldLeft(semiPreparedTable) { case (acc, preparedColumn) => + acc.addColumn(preparedColumn) + } + val withColumnsAndIndexes = prepareIndexes(semiPreparedTable.tableIdentifier).foldLeft(withColumns) { case (acc, preparedIndex) => + acc.addIndex(preparedIndex) + } + preparePrimaryKey(semiPreparedTable.tableIdentifier) match { + case Some(pk) => withColumnsAndIndexes.definePrimaryKey(pk) + case _ => withColumnsAndIndexes + } + } +} diff --git a/src/main/scala/za/co/absa/ultet/util/DBProperties.scala b/src/main/scala/za/co/absa/ultet/util/DBProperties.scala index 0a72c0e..9481c90 100644 --- a/src/main/scala/za/co/absa/ultet/util/DBProperties.scala +++ b/src/main/scala/za/co/absa/ultet/util/DBProperties.scala @@ -17,8 +17,10 @@ package za.co.absa.ultet.util import com.typesafe.config.ConfigFactory +import za.co.absa.balta.classes.DBConnection import java.io.File +import java.sql.DriverManager import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` // Example of the connection string @@ -43,6 +45,15 @@ case class DBProperties(serverName: String, password: String, protocol: String = "jdbc", subprotocol: String = "postgresql") { + + lazy val dbConnection: DBConnection = createConnection() + + private def createConnection(): DBConnection = { + val connectionString = generateConnectionString() + val connection = DriverManager.getConnection(connectionString, user, password) + new DBConnection(connection) + } + def generateConnectionString(): String = { subprotocol match { case "postgresql" => getPostgresString diff --git a/src/main/scala/za/co/absa/ultet/util/FileReader.scala b/src/main/scala/za/co/absa/ultet/util/FileReader.scala new file mode 100644 index 0000000..473fb69 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/util/FileReader.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.util + +import za.co.absa.ultet.model.SchemaName +import za.co.absa.ultet.util.SourceFileType.{FunctionSrc, SourceFileType} + +import java.net.URI +import java.nio.file._ +import java.util.stream.Collectors +import scala.collection.JavaConverters.asScalaIteratorConverter + +object FileReader { + + type SchemaFiles = Map[SchemaName, Set[URI]] + + def readFileAsString(fileUri: URI): String = { + val path = Paths.get(fileUri) + + val lines = Files.lines(path) + lines.collect(Collectors.joining("\n")) + } + + def listFileURIsPerSchema(pathString: String): SchemaFiles = { + val path = Paths.get(pathString) + val schemaPaths = listChildPaths(path) + schemaPaths + .map(p => SchemaName(p.getFileName.toString) -> listChildPaths(p)) + .toMap + .mapValues(_.map(_.toUri)) + } + + def fileType(uri: URI): SourceFileType = { + val path = uri.getPath + if (path.endsWith(".sql")) { + SourceFileType.FunctionSrc + } else if (path.endsWith(".yml")) { + SourceFileType.TableSrc + } else if (path == "owner.txt") { + SourceFileType.SchemaOwner + } else { + SourceFileType.Unknown + } + } + + private def listChildPaths(path: Path): Set[Path] = Files.list(path) + .iterator() + .asScala + .toSet +} diff --git a/src/main/scala/za/co/absa/ultet/util/SourceFileType.scala b/src/main/scala/za/co/absa/ultet/util/SourceFileType.scala new file mode 100644 index 0000000..5723065 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/util/SourceFileType.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.util + +object SourceFileType extends Enumeration { + + final type SourceFileType = Value + + final val FunctionSrc, TableSrc, SchemaOwner, Unknown = Value +} diff --git a/src/main/scala/za/co/absa/ultet/util/SqlExecutor.scala b/src/main/scala/za/co/absa/ultet/util/SqlExecutor.scala new file mode 100644 index 0000000..e15573d --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/util/SqlExecutor.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.util + +import com.typesafe.scalalogging.Logger +import za.co.absa.balta.classes.{DBConnection, DBQuerySupport} + +object SqlExecutor extends DBQuerySupport { + private val logger = Logger(getClass.getName) + + def execute(sqls: Seq[String])(implicit dbConnection: DBConnection): Unit = { + val autoCommitOriginalStatus = dbConnection.connection.getAutoCommit + dbConnection.connection.setAutoCommit(false) + var sql: Option[String] = None + + try { + sqls.foreach { entry => + sql = Some(entry) + runQuery(entry, List.empty) { _ => } + } + dbConnection.connection.commit() + } catch { + case exception: Exception => + dbConnection.connection.rollback() + sql match { + case Some(query) => logger.error(s"""Script execution failed at: `$query` with error: "${exception.getMessage}"""") + case None => logger.error(s"""Script execution failed with error: "${exception.getMessage}"""") + } + throw exception + } + finally { + dbConnection.connection.setAutoCommit(autoCommitOriginalStatus) + } + } + + def execute(sql: String)(implicit dbConnection: DBConnection): Unit = { + execute(Seq(sql)) + } +} diff --git a/src/main/scala/za/co/absa/ultet/util/TaskConfig.scala b/src/main/scala/za/co/absa/ultet/util/TaskConfig.scala new file mode 100644 index 0000000..4c495f8 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/util/TaskConfig.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.util + +import za.co.absa.ultet.model.DatabaseName + +case class TaskConfig( + defaultDatabase: DatabaseName, + dbConnections: Map[DatabaseName, DBProperties], + createDatabases: Boolean, + clearAllSchemaFunctions: Boolean, + clearAllSchemaTables: Boolean + ) + +object TaskConfig { + def apply( + defaultDatabase: DatabaseName, + dbConnections: Set[DBProperties] = Set.empty, + createDatabases: Boolean = true, + clearAllSchemaFunctions: Boolean = true, + clearAllSchemaTables: Boolean = false + ): TaskConfig = { + val map = dbConnections.map(db => DatabaseName(db.database) -> db).toMap + new TaskConfig(defaultDatabase, map, createDatabases, clearAllSchemaFunctions, clearAllSchemaTables) + } +} diff --git a/src/main/scala/za/co/absa/ultet/util/package.scala b/src/main/scala/za/co/absa/ultet/util/package.scala new file mode 100644 index 0000000..ef43d42 --- /dev/null +++ b/src/main/scala/za/co/absa/ultet/util/package.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet + +import za.co.absa.ultet.dbitems.DBTable +import za.co.absa.ultet.model.table.TableName +import za.co.absa.ultet.model.{DatabaseDef, DatabaseName, SQLEntry, SchemaDef, SchemaName, TransactionGroup} + +package object util { + + type DatabaseDefs = Map[DatabaseName, DatabaseDef] + + type SchemaDefs = Map[SchemaName, SchemaDef] + + type SqlEntriesPerTransaction = Map[TransactionGroup.Value, Seq[SQLEntry]] + + type SqlsPerTransaction = Map[TransactionGroup.Value, Seq[String]] + + type Tables = Map[TableName, DBTable] + +} diff --git a/src/test/scala/za/co/absa/ultet/implicits/MapImplicits_SqlEntriesPerTransactionEnhancementTest.scala b/src/test/scala/za/co/absa/ultet/implicits/MapImplicits_SqlEntriesPerTransactionEnhancementTest.scala new file mode 100644 index 0000000..fe805e2 --- /dev/null +++ b/src/test/scala/za/co/absa/ultet/implicits/MapImplicits_SqlEntriesPerTransactionEnhancementTest.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.implicits + +import org.scalatest.funsuite.{AnyFunSuite, AnyFunSuiteLike} +import za.co.absa.ultet.util.SqlEntriesPerTransaction +import za.co.absa.ultet.implicits.MapImplicits.SqlEntriesPerTransactionEnhancement +import za.co.absa.ultet.model.function.{FunctionArgumentType, FunctionDrop, FunctionName} +import za.co.absa.ultet.model.{SchemaName, TransactionGroup, UserEntry, UserName} + +class MapImplicits_SqlEntriesPerTransactionEnhancementTest extends AnyFunSuiteLike { + test("toSql without any entries") { + val input:SqlEntriesPerTransaction = Map.empty + val result = input.toSql + assert(result.isEmpty) + } + + test("toSql wit two transaction groups") { + + val transaction1 = TransactionGroup.Roles + val transaction2 = TransactionGroup.Objects + val entry1 = UserEntry(UserName("user1")) + val entry2 = FunctionDrop(SchemaName("foo"), FunctionName("fmc1"), Seq.empty) + val entry3 = FunctionDrop(SchemaName("bar"), FunctionName("fnc2"), Seq(FunctionArgumentType("TEXT"))) + val input: SqlEntriesPerTransaction = Map( + transaction1 -> Seq(entry1), + transaction2 -> Seq(entry2, entry3) + ) + val result = input.toSql + + val expected1 = Seq( + """ + |---------------------------------------------------------------------------------------- + |-- Transaction group: ROLES + |---------------------------------------------------------------------------------------- + |""".stripMargin, + """DO + |$do$ + | BEGIN + | IF EXISTS ( + | SELECT FROM pg_catalog.pg_roles + | WHERE lowercase(rolname) = 'user1') THEN + | + | RAISE NOTICE 'Role "user1" already exists. Skipping.'; + | ELSE + | CREATE ROLE user1; + | END IF; + | END + |$do$;""".stripMargin + ) + val expected2 = Seq( + """ + |---------------------------------------------------------------------------------------- + |-- Transaction group: OBJECTS + |---------------------------------------------------------------------------------------- + |""".stripMargin, + """DROP FUNCTION foo.fmc1();""", + """DROP FUNCTION bar.fnc2(TEXT);""" + ) + + assert(result.values.toList == List(expected1, expected2)) + } +} diff --git a/src/test/scala/za/co/absa/ultet/implicits/OptionImplicitsTest.scala b/src/test/scala/za/co/absa/ultet/implicits/OptionImplicitsTest.scala new file mode 100644 index 0000000..d58c56e --- /dev/null +++ b/src/test/scala/za/co/absa/ultet/implicits/OptionImplicitsTest.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.implicits + +import org.scalatest.funsuite.{AnyFunSuite, AnyFunSuiteLike} +import za.co.absa.ultet.implicits.OptionImplicits.OptionEnhancements + +class OptionImplicitsTest extends AnyFunSuiteLike { + test("getOrThrow returns the value if it is defined") { + val opt = Some(true) + assert(opt.getOrThrow(new Exception("Foo"))) + } + + test("getOrThrow throws an exception if the value is not defined") { + val opt = None + assertThrows[Exception](opt.getOrThrow(new Exception("Foo"))) + } + +} diff --git a/src/test/scala/za/co/absa/ultet/parsers/PgFunctionFileParserTest.scala b/src/test/scala/za/co/absa/ultet/parsers/PgFunctionFileParserTest.scala index b681388..529580e 100644 --- a/src/test/scala/za/co/absa/ultet/parsers/PgFunctionFileParserTest.scala +++ b/src/test/scala/za/co/absa/ultet/parsers/PgFunctionFileParserTest.scala @@ -30,11 +30,11 @@ class PgFunctionFileParserTest extends AnyFlatSpec with Matchers { |-- Function: my_schema.public_function([Function_Param_Count]) |-- [Descrip""".stripMargin - PgFunctionFileParser().parseString(functionString) shouldBe DBFunctionFromSource( + PgFunctionFileParser.parseSource(functionString).head shouldBe DBFunctionFromSource( fnName = FunctionName("public_functionX"), paramTypes = Seq(FunctionArgumentType("TEXT"), FunctionArgumentType("INTEGER"), FunctionArgumentType("hstore")), owner = UserName("owner_user123"), - users = Seq(UserName("user_for_accessA"), UserName("user_for_accessB"), UserName("user_for_accessZ")), + users = Set(UserName("user_for_accessA"), UserName("user_for_accessB"), UserName("user_for_accessZ")), schema = SchemaName("my_schema1"), database = DatabaseName("eXample_db"), sqlBody = functionString // the whole thing @@ -58,11 +58,11 @@ class PgFunctionFileParserTest extends AnyFlatSpec with Matchers { |-- Function: my_schema.public_function([Function_Param_Count]) |-- [Descrip""".stripMargin - PgFunctionFileParser().parseString(functionString) shouldBe DBFunctionFromSource( + PgFunctionFileParser.parseSource(functionString).head shouldBe DBFunctionFromSource( fnName = FunctionName("public_functionX"), paramTypes = Seq.empty, owner = UserName("owner_user123"), - users = Seq.empty, + users = Set.empty, schema = SchemaName("my_schema1"), database = DatabaseName("eXample_db"), sqlBody = functionString // the whole thing @@ -71,15 +71,15 @@ class PgFunctionFileParserTest extends AnyFlatSpec with Matchers { } it should "parse example function example file" in { - val testFileUri = getClass().getClassLoader().getResource("public_function_example.sql").toURI + val testFileUri = getClass.getClassLoader.getResource("public_function_example.sql").toURI - val parsedDBItemFromSource = PgFunctionFileParser().parseFile(testFileUri) + val parsedDBItemFromSource = PgFunctionFileParser.parseFile(testFileUri).head parsedDBItemFromSource.fnName shouldBe FunctionName("public_function") parsedDBItemFromSource.schema shouldBe SchemaName("my_schema") parsedDBItemFromSource.paramTypes shouldBe Seq(FunctionArgumentType("TEXT")) parsedDBItemFromSource.owner shouldBe UserName("some_owner_user") - parsedDBItemFromSource.users shouldBe Seq(UserName("user_for_access")) + parsedDBItemFromSource.users shouldBe Set(UserName("user_for_access")) parsedDBItemFromSource.database shouldBe DatabaseName("example_db") parsedDBItemFromSource.sqlBody should include("CREATE OR REPLACE FUNCTION my_schema.public_function") } diff --git a/src/test/scala/za/co/absa/ultet/parsers/PgTableFileParserTest.scala b/src/test/scala/za/co/absa/ultet/parsers/PgTableFileParserTest.scala index 231ebe0..afb3843 100644 --- a/src/test/scala/za/co/absa/ultet/parsers/PgTableFileParserTest.scala +++ b/src/test/scala/za/co/absa/ultet/parsers/PgTableFileParserTest.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.ultet.parsers import org.scalatest.flatspec.AnyFlatSpec @@ -6,14 +22,16 @@ import za.co.absa.ultet.dbitems.DBTable import za.co.absa.ultet.dbitems.table.DBTableColumn import za.co.absa.ultet.dbitems.table.DBTableIndex.{DBPrimaryKey, DBSecondaryIndex, IndexColumn} import za.co.absa.ultet.model._ -import za.co.absa.ultet.model.table.{ColumnName, IndexName, TableName} -import za.co.absa.ultet.parsers.PgTableFileParser.DBTableFromYaml +import za.co.absa.ultet.model.table.{ColumnName, IndexName, TableIdentifier, TableName} +import za.co.absa.ultet.parsers.helpers.DBTableFromYaml class PgTableFileParserTest extends AnyFlatSpec with Matchers { + private val schemaName = SchemaName("testSchema") + "PgTableFileParserTest" should "return semi-prepared object table from example content" in { val tableString = - """table: testSchema.testTable + """table: testTable |description: Some Description of this madness |primaryDBName: primaryDB |owner: some_owner_user @@ -30,8 +48,8 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { | indexBy: "[column1]" |""".stripMargin - PgTableFileParser().parseContentYaml(tableString) shouldBe DBTableFromYaml( - table = "testSchema.testTable", + PgTableFileParser(schemaName).processYaml(tableString) shouldBe DBTableFromYaml( + table = "testTable", description = Some("Some Description of this madness"), primaryDBName = "primaryDB", owner = "some_owner_user", @@ -56,7 +74,7 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { "PgTableFileParserTest" should "return well-prepared table object from example content" in { val tableString = - """table: testSchema.testTable + """table: testTable |description: Some Description of this madness |primaryDBName: primaryDB |owner: some_owner_user @@ -73,7 +91,7 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { | indexBy: "[column1]" |""".stripMargin - PgTableFileParser().parseContentYaml(tableString).convertToDBTable shouldBe DBTable( + PgTableFileParser(schemaName).parseSource(tableString).head shouldBe DBTable( tableName = TableName("testTable"), schemaName = SchemaName("testSchema"), description = Some("Some Description of this madness"), @@ -87,21 +105,21 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { ), ), primaryKey = Some(DBPrimaryKey( + tableIdentifier = TableIdentifier(SchemaName("testSchema"), TableName("testTable")), columns = Seq("id_key_field1", "id_key_field1").map(IndexColumn(_)), - indexName = IndexName("pk_my_table"), - tableName = TableName("testSchema.testTable") + indexName = IndexName("pk_my_table") )), indexes = Set(DBSecondaryIndex( + tableIdentifier = TableIdentifier(SchemaName("testSchema"), TableName("testTable")), indexName = IndexName("idx_some_name"), - tableName = TableName("testTable"), - columns = Seq("column1").map(IndexColumn(_)), + columns = Seq("column1").map(IndexColumn(_)) )) ) } "PgTableFileParserTest" should "return semi-prepared object table from example content, some attributes empty" in { val tableString = - """table: testSchema.testTable + """table: testTable |description: Some Description of this madness |primaryDBName: primaryDB |owner: some_owner_user @@ -110,8 +128,8 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { |indexes: [] |""".stripMargin - PgTableFileParser().parseContentYaml(tableString) shouldBe DBTableFromYaml( - table = "testSchema.testTable", + PgTableFileParser(schemaName).processYaml(tableString) shouldBe DBTableFromYaml( + table = "testTable", description = Some("Some Description of this madness"), primaryDBName = "primaryDB", owner = "some_owner_user", @@ -120,7 +138,7 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { "PgTableFileParserTest" should "return well-prepared object table from example content, some attributes empty" in { val tableString = - """table: testSchema.testTable + """table: testTable |description: Some Description of this madness |primaryDBName: primaryDB |owner: some_owner_user @@ -129,7 +147,7 @@ class PgTableFileParserTest extends AnyFlatSpec with Matchers { |indexes: [] |""".stripMargin - PgTableFileParser().parseContentYaml(tableString).convertToDBTable shouldBe DBTable( + PgTableFileParser(schemaName).parseSource(tableString).head shouldBe DBTable( tableName = TableName("testTable"), schemaName = SchemaName("testSchema"), description = Some("Some Description of this madness"), diff --git a/src/test/scala/za/co/absa/ultet/util/FileReaderTest.scala b/src/test/scala/za/co/absa/ultet/util/FileReaderTest.scala new file mode 100644 index 0000000..a2fb11a --- /dev/null +++ b/src/test/scala/za/co/absa/ultet/util/FileReaderTest.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.ultet.util + +import org.scalatest.funsuite.{AnyFunSuite, AnyFunSuiteLike} + +class FileReaderTest extends AnyFunSuiteLike { + // TODO Add tests +}