Skip to content

Commit

Permalink
#29: Streamline the Ultet processing (#41)
Browse files Browse the repository at this point in the history
The process is now:
* initialization
* source code analysis and creating the db app model
* target database(s) analysis, and update of the `DbAppModel` with it.
* `DbItem`s (diffs) generation for each database and grouping them per transaction 
* SQLs generation from `DbItem`s
* execution/print of the SQLs

Other chances:
* Using _[Balta](https://github.com/AbsaOSS/balta)_ for queries now
* Multiple databases (in source only) support
* Finished and refactored table diffing
  • Loading branch information
benedeki authored Feb 17, 2024
1 parent 5f32eb8 commit 4f0fb10
Show file tree
Hide file tree
Showing 49 changed files with 1,212 additions and 468 deletions.
2 changes: 1 addition & 1 deletion examples/database/src/main/my_schema/my_table.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

table: my_schema.my_table
table: my_table
description: This is an example table
primary_db: example_db
owner: some_owner_user
Expand Down
131 changes: 43 additions & 88 deletions src/main/scala/za/co/absa/ultet/Ultet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,113 +18,68 @@ package za.co.absa.ultet

import com.typesafe.scalalogging.Logger
import scopt.OParser
import za.co.absa.balta.classes.DBConnection
import za.co.absa.ultet.dbitems.DBItem
import za.co.absa.ultet.model.{SQLEntry, SchemaName, TransactionGroup}
import za.co.absa.ultet.util.{CliParser, Config, DBProperties}
import za.co.absa.ultet.implicits.MapImplicits.SqlEntriesPerTransactionEnhancement
import za.co.absa.ultet.implicits.OptionImplicits.OptionEnhancements
import za.co.absa.ultet.model.{DBAppModel, DatabaseName, SQLEntry, TransactionGroup}
import za.co.absa.ultet.util.FileReader.SchemaFiles
import za.co.absa.ultet.util.{CliParser, Config, DBProperties, FileReader, SqlEntriesPerTransaction, SqlExecutor, SqlsPerTransaction, TaskConfig}

import java.net.URI
import java.nio.file._
import scala.collection.JavaConverters._
import java.sql.{Connection, DriverManager, ResultSet}
import scala.util.{Failure, Success, Try}

object Ultet {
private val logger = Logger(getClass.getName)

private def extractSQLEntries(dbItems: Set[DBItem]): Seq[SQLEntry] = {
dbItems.toSeq.flatMap { item => item.sqlEntries }
}

private def runEntries(entries: Seq[SQLEntry], dryRun: Boolean = false)(implicit connection: Connection): Unit = {
if(dryRun) entries.map(_.sqlExpression).foreach(println)
else {
val resultSets: Seq[ResultSet] = runTransaction(connection, entries)

for (resultSet <- resultSets) {
val numColumns = resultSet.getMetaData.getColumnCount
while (resultSet.next()) {
val row = (1 to numColumns).map(col => resultSet.getString(col))
logger.info(row.mkString(", "))
}
resultSet.close()
}
}
}
def main(args: Array[String]): Unit = {

def runTransaction(connection: Connection, entries: Seq[SQLEntry]): Seq[ResultSet] = {
val autoCommitOriginalStatus = connection.getAutoCommit
connection.setAutoCommit(false)
implicit val (appConfig: Config, taskConfig: TaskConfig) = init(args)

val resultSets = Try {
entries.foldLeft(List[ResultSet]()) { case (acc, entry) =>
val statement = connection.createStatement()
statement.execute(entry.sqlExpression)
val ret: List[ResultSet] = acc :+ statement.getResultSet
statement.close()
ret
}
} match {
case Success(resultSets) =>
connection.commit()
resultSets
case Failure(exception) =>
connection.rollback()
connection.close()
throw new Exception("Script execution failed", exception)
}
val sourceURIsPerSchema: SchemaFiles = FileReader.listFileURIsPerSchema(appConfig.sourceFilesRootPath)
val databaseTransactionSqls = DBAppModel
.loadFromSources(sourceURIsPerSchema)
.addDatabasesAnalysis()(taskConfig)
.createSQLEntries()
.map { case (dbName, sqlEntries) => dbName -> sqlEntries.toSql }

connection.setAutoCommit(autoCommitOriginalStatus)
resultSets
if (appConfig.dryRun) print(databaseTransactionSqls)
else execute(databaseTransactionSqls)
}

def sortEntries(entries: Seq[SQLEntry]): Map[TransactionGroup.Value, Seq[SQLEntry]] = {
entries
.groupBy(_.transactionGroup)
.mapValues(_.sortBy(_.orderInTransaction))
private def init(args: Array[String]): (Config, TaskConfig) = {
val appConfig = OParser.parse(CliParser.parser, args, Config()).getOrThrow(new Exception("Failed to load arguments"))
val defaultDB = DBProperties.loadProperties(appConfig.dbConnectionPropertiesPath)
val taskConfig = TaskConfig(DatabaseName(defaultDB.database), Set(defaultDB))
(appConfig, taskConfig)
}

private def listChildPaths(path: Path): List[Path] = Files.list(path)
.iterator()
.asScala
.toList
private def execute(databaseTransactionSqls: Map[DatabaseName, SqlsPerTransaction])(implicit taskConfig: TaskConfig): Unit = {
def executeDatabase(databaseName: DatabaseName, sqls: SqlsPerTransaction): Unit = {
logger.info(s"Executing against database: ${databaseName.value}")
val transactionGroups = TransactionGroup.values.toList // going over transaction groups in their logical order
implicit val dbConnection: DBConnection = taskConfig.dbConnections(databaseName).dbConnection
transactionGroups.foreach(sqls.get(_).foreach(SqlExecutor.execute(_)))
}

def listFileURIsPerSchema(pathString: String): Map[SchemaName, List[URI]] = {
val path = Paths.get(pathString)
val schemaPaths = listChildPaths(path)
schemaPaths
.map(p => SchemaName(p.getFileName.toString) -> listChildPaths(p))
.toMap
.mapValues(_.map(_.toUri))
databaseTransactionSqls.foreach { case (dbName, entries) => executeDatabase(dbName, entries) }
}

def main(args: Array[String]): Unit = {
val config = OParser.parse(CliParser.parser, args, Config()) match {
case Some(config) => config
case _ => throw new Exception("Failed to load arguments")
private def print(databaseTransactionSqls: Map[DatabaseName, SqlsPerTransaction]): Unit = {
def printDatabase(databaseName: DatabaseName, sqls: SqlsPerTransaction): Unit = {
val transactionGroups = TransactionGroup.values.toList // going over transaction groups in their logical order

val delimiter = "================================================================================"
val prefix = "= Database: "
val suffix = "="
val space = " " * (delimiter.length - prefix.length - databaseName.value.length - suffix.length)
println(delimiter)
println(s"$prefix${databaseName.value}$space$suffix")
println(s"$delimiter\n")
transactionGroups.foreach(sqls.get(_).foreach(_.foreach(println)))
}

val dbProperties = DBProperties.loadProperties(config.dbConnectionPropertiesPath)
val dbPropertiesSys = DBProperties.getSysDB(dbProperties)
val dbConnection: String = dbProperties.generateConnectionString()
implicit val jdbcConnection: Connection = DriverManager.getConnection(
dbConnection, dbProperties.user, dbProperties.password
)

val sourceURIsPerSchema: Map[SchemaName, List[URI]] = listFileURIsPerSchema(config.sourceFilesRootPath)

val dbItems: Set[DBItem] = DBItem.createDBItems(sourceURIsPerSchema)
val entries: Seq[SQLEntry] = extractSQLEntries(dbItems)
val orderedEntries = sortEntries(entries)
val databaseEntries = orderedEntries.getOrElse(TransactionGroup.Databases, Seq.empty)
val roleEntries = orderedEntries.getOrElse(TransactionGroup.Roles, Seq.empty)
val objectEntries = orderedEntries.getOrElse(TransactionGroup.Objects, Seq.empty)
val indexEntries = orderedEntries.getOrElse(TransactionGroup.Indexes, Seq.empty)

if (databaseEntries.nonEmpty) runEntries(databaseEntries, config.dryRun)
if (roleEntries.nonEmpty) runEntries(roleEntries, config.dryRun)
if (objectEntries.nonEmpty) runEntries(objectEntries, config.dryRun)
if (indexEntries.nonEmpty) runEntries(indexEntries, config.dryRun)

jdbcConnection.close()
databaseTransactionSqls.foreach { case (dbName, entries) => printDatabase(dbName, entries) }
}

}
2 changes: 2 additions & 0 deletions src/main/scala/za/co/absa/ultet/dbitems/DBFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ trait DBFunction extends DBItem {
def paramTypes: Seq[FunctionArgumentType]
def schema: SchemaName

def database: DatabaseName

}
18 changes: 10 additions & 8 deletions src/main/scala/za/co/absa/ultet/dbitems/DBFunctionFromPG.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ package za.co.absa.ultet.dbitems


import za.co.absa.ultet.model.function.{FunctionArgumentType, FunctionDrop, FunctionName}
import za.co.absa.ultet.model.{SQLEntry, SchemaName}
import za.co.absa.ultet.model.{DatabaseName, SQLEntry, SchemaName}

import java.sql.Connection

case class DBFunctionFromPG(
schema: SchemaName,
fnName: FunctionName,
paramTypes: Seq[FunctionArgumentType]
paramTypes: Seq[FunctionArgumentType],
database: DatabaseName //TODO: Implement this
) extends DBFunction {
override def sqlEntries: Seq[SQLEntry] = Seq(FunctionDrop(schema, fnName, paramTypes))
}

object DBFunctionFromPG {

def fetchAllOfSchema(schemaName: SchemaName)
def fetchAllOfSchema(databaseName: DatabaseName, schemaName: SchemaName)
(implicit jdbcConnection: Connection): Seq[DBFunctionFromPG] = {
val query =
s"""
Expand All @@ -47,10 +48,10 @@ object DBFunctionFromPG {
| p.prokind = 'f';
|""".stripMargin

fetchAllOfSchemaWithQuery(schemaName, query)
fetchAllOfSchemaWithQuery(databaseName, schemaName, query)
}

def fetchAllOverloads(schemaName: SchemaName, functionName: FunctionName)
def fetchAllOverloads(databaseName: DatabaseName, schemaName: SchemaName, functionName: FunctionName)
(implicit jdbcConnection: Connection): Seq[DBFunctionFromPG] = {
val query =
s"""
Expand All @@ -66,10 +67,10 @@ object DBFunctionFromPG {
| p.prokind = 'f';
|""".stripMargin

fetchAllOfSchemaWithQuery(schemaName, query)
fetchAllOfSchemaWithQuery(databaseName, schemaName, query)
}

private def fetchAllOfSchemaWithQuery(schemaName: SchemaName, query: String)
private def fetchAllOfSchemaWithQuery(databaseName: DatabaseName, schemaName: SchemaName, query: String)
(implicit jdbcConnection: Connection): Seq[DBFunctionFromPG] = {
val preparedStatement = jdbcConnection.prepareStatement(query)
val result = preparedStatement.executeQuery()
Expand All @@ -89,7 +90,8 @@ object DBFunctionFromPG {
val dbFunctionFromPG = DBFunctionFromPG(
schemaName,
FunctionName(fnName),
argumentTypes
argumentTypes,
databaseName
)
seqBuilder += dbFunctionFromPG
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ case class DBFunctionFromSource(
fnName: FunctionName,
paramTypes: Seq[FunctionArgumentType],
owner: UserName,
users: Seq[UserName],
users: Set[UserName],
schema: SchemaName,
database: DatabaseName,
sqlBody: String
Expand Down
34 changes: 2 additions & 32 deletions src/main/scala/za/co/absa/ultet/dbitems/DBItem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.ultet.dbitems

import za.co.absa.ultet.model.{SQLEntry, SchemaName}
import za.co.absa.ultet.model.{SQLEntry, SchemaName, TransactionGroup}
import za.co.absa.ultet.parsers.PgFunctionFileParser

import java.net.URI
Expand All @@ -27,36 +27,6 @@ trait DBItem {
}

object DBItem {
def createDBItems(filePathsPerSchema: Map[SchemaName, Seq[URI]])
(implicit jdbcConnection: Connection): Set[DBItem] = {
val schemas = filePathsPerSchema.keys
val SchemasWithFilesGroupedByType(tables, functions, owners) = groupAllFilesPerSchemaByType(filePathsPerSchema)

// TODO handle tables

val dbFunctionsFromSource = functions
.values
.flatten
.map(PgFunctionFileParser().parseFile)
.toSet
val dbFunctionsFromPG = functions
.keys
.flatMap(DBFunctionFromPG.fetchAllOfSchema)
.toSet

val users: Set[DBItem] = dbFunctionsFromSource.flatMap(f => f.users :+ f.owner).map(DBUser)

val schemaOwners = owners.mapValues(DBSchema.parseTxtFileContainingSchemaOwner)
val schemaUsers = dbFunctionsFromSource.groupBy(_.schema).mapValues(_.toSeq.map(_.users).flatten.toSet)

val dbSchemas = schemas.toSet.map { (s: SchemaName) =>
val owner = schemaOwners(s)
val users = schemaUsers(s) + owner
DBSchema(s, owner, users.toSeq)
}

dbFunctionsFromSource ++ dbFunctionsFromPG ++ users ++ dbSchemas
}

private case class SchemasWithFilesGroupedByType(
tables: Map[SchemaName, Seq[URI]],
Expand All @@ -71,7 +41,7 @@ object DBItem {
ownerFiles.foreach { case (schemaName, uris) =>
if (uris.size > 1) throw new IllegalArgumentException(
s"Detected more than one .txt file in schema ${schemaName.normalized}"
) else if (uris.size == 0) throw new IllegalArgumentException(
) else if (uris.isEmpty) throw new IllegalArgumentException(
s".txt file in schema ${schemaName.normalized} not found"
)
}
Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/za/co/absa/ultet/dbitems/DBSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@ import java.nio.file.{Files, Paths}
import java.util.stream.Collectors

case class DBSchema(name: SchemaName,
ownerName: UserName,
users: Seq[UserName]) extends DBItem {
ownerName: Option[UserName],
users: Set[UserName]) extends DBItem {
override def sqlEntries: Seq[SQLEntry] = {
if (DO_NOT_TOUCH.contains(name.value)) {
throw new Exception(s"Schema ${name.value} is not allow to be referenced")
throw new Exception(s"Schema ${name.value} is not allow to be referenced") //TODO #31 Add warnings to the system
}

if (DO_NOT_CHOWN.contains(name.value)){
logger.warn(s"Schema ${name.value} is not allowed to change owner")
logger.warn(s"Schema ${name.value} is not allowed to change owner") //TODO #31 Add warnings to the system
Seq(
SchemaCreate(name),
SchemaGrant(name, users)
)
} else {
Seq(
SchemaCreate(name),
SchemaOwner(name, ownerName),
SchemaGrant(name, users)
)
ownerName.map{SchemaOwner(name, _)}.toSeq ++
Seq(
SchemaCreate(name),
SchemaGrant(name, users)
)
}
}
}

object DBSchema {
private val logger = Logger(getClass.getName)

val DO_NOT_TOUCH: Seq[String] = Seq("pg_toast", "pg_catalog", "information_schema")
val DO_NOT_CHOWN: Seq[String] = Seq("public")

def parseTxtFileContainingSchemaOwner(fileUri: URI): UserName = {
val path = Paths.get(fileUri)
val lines = Files.lines(path)
Expand Down
Loading

0 comments on commit 4f0fb10

Please sign in to comment.