Skip to content

Commit

Permalink
1.0.75 (#80)
Browse files Browse the repository at this point in the history
* 1.0.75

* wip

* 1.0.75

* hsql usage db

* fix storage

* config
  • Loading branch information
acharneski authored May 28, 2024
1 parent 2870c5e commit bbf75d1
Show file tree
Hide file tree
Showing 21 changed files with 395 additions and 76 deletions.
4 changes: 3 additions & 1 deletion core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ kotlin {
val junit_version = "5.10.1"
val logback_version = "1.4.11"
val jackson_version = "2.17.0"
val hsqldb_version = "2.7.2"

dependencies {

implementation(group = "com.simiacryptus", name = "jo-penai", version = "1.0.57")
implementation(group = "org.hsqldb", name = "hsqldb", version = hsqldb_version)

implementation("org.apache.commons:commons-text:1.11.0")

Expand Down Expand Up @@ -179,4 +181,4 @@ if (System.getenv("GPG_PRIVATE_KEY") != null && System.getenv("GPG_PASSPHRASE")
useInMemoryPgpKeys(System.getenv("GPG_PRIVATE_KEY"), System.getenv("GPG_PASSPHRASE"))
sign(configurations.archives.get())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import com.simiacryptus.skyenet.core.platform.StorageInterface
import com.simiacryptus.skyenet.core.platform.User
import com.simiacryptus.skyenet.core.util.FunctionWrapper
import com.simiacryptus.skyenet.core.util.JsonFunctionRecorder
import java.io.File

open class ActorSystem<T : Enum<*>>(
val actors: Map<String, BaseActor<*, *>>,
val dataStorage: StorageInterface,
val user: User?,
val session: Session
) {
protected val pool by lazy { ApplicationServices.clientManager.getPool(session, user, dataStorage) }
protected val pool by lazy { ApplicationServices.clientManager.getPool(session, user) }

private val actorMap = mutableMapOf<T, BaseActor<*, *>>()

Expand Down Expand Up @@ -66,7 +65,7 @@ open class ActorSystem<T : Enum<*>>(
private fun getWrapper(name: String) = synchronized(wrapperMap) {
wrapperMap.getOrPut(name) {
FunctionWrapper(JsonFunctionRecorder(
dataStorage.getSessionDir(user, session).resolve("actors/$name").apply { mkdirs() }
dataStorage.getDataDir(user, session).resolve("actors/$name").apply { mkdirs() }
))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.simiacryptus.jopenai.ApiModel
import com.simiacryptus.jopenai.models.APIProvider
import com.simiacryptus.jopenai.models.ChatModels
import com.simiacryptus.jopenai.models.OpenAIModel
import com.simiacryptus.skyenet.core.platform.ApplicationServicesConfig.dataStorageRoot
import com.simiacryptus.skyenet.core.platform.ApplicationServicesConfig.isLocked
import com.simiacryptus.skyenet.core.platform.file.*
import com.simiacryptus.skyenet.core.util.Selenium
import java.io.File
Expand All @@ -16,33 +18,37 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random

object ApplicationServices {
object ApplicationServicesConfig {

var isLocked: Boolean = false
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
}
var authorizationManager: AuthorizationInterface = AuthorizationManager()
var dataStorageRoot: File = File(System.getProperty("user.home"), ".skyenet")
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
}
var userSettingsManager: UserSettingsInterface = UserSettingsManager()
}

object ApplicationServices {
var authorizationManager: AuthorizationInterface = AuthorizationManager()
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
}
var authenticationManager: AuthenticationInterface = AuthenticationManager()
var userSettingsManager: UserSettingsInterface = UserSettingsManager()
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
}
var dataStorageFactory: (File) -> StorageInterface = { DataStorage(it) }
var authenticationManager: AuthenticationInterface = AuthenticationManager()
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
}
var dataStorageRoot: File = File(System.getProperty("user.home"), ".skyenet")
var dataStorageFactory: (File) -> StorageInterface = { DataStorage(it) }
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
Expand All @@ -65,7 +71,7 @@ object ApplicationServices {
require(!isLocked) { "ApplicationServices is locked" }
field = value
}
var usageManager: UsageInterface = UsageManager(File(dataStorageRoot, "usage"))
var usageManager: UsageInterface = HSQLUsageManager(File(dataStorageRoot, "usage"))
set(value) {
require(!isLocked) { "ApplicationServices is locked" }
field = value
Expand Down Expand Up @@ -115,6 +121,11 @@ interface StorageInterface {
session: Session
): File

fun getDataDir(
user: User?,
session: Session
): File

fun getSessionName(
user: User?,
session: Session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import com.simiacryptus.jopenai.models.APIProvider
import com.simiacryptus.jopenai.models.OpenAIModel
import com.simiacryptus.jopenai.util.ClientUtil
import com.simiacryptus.skyenet.core.platform.ApplicationServices.dataStorageFactory
import com.simiacryptus.skyenet.core.platform.ApplicationServices.dataStorageRoot
import com.simiacryptus.skyenet.core.platform.ApplicationServices.userSettingsManager
import com.simiacryptus.skyenet.core.platform.ApplicationServicesConfig.dataStorageRoot
import com.simiacryptus.skyenet.core.platform.AuthorizationInterface.OperationType
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient
import org.apache.hc.core5.http.HttpRequest
Expand All @@ -31,15 +31,13 @@ open class ClientManager {
fun getClient(
session: Session,
user: User?,
dataStorage: StorageInterface?,
): OpenAIClient {
log.debug("Fetching client for session: {}, user: {}", session, user)
val key = SessionKey(session, user)
return if (null == dataStorage) clientCache[key] ?: throw IllegalStateException("No data storage")
else clientCache.getOrPut(key) { createClient(session, user, dataStorage)!! }
return clientCache.getOrPut(key) { createClient(session, user)!! }
}

protected open fun createPool(session: Session, user: User?, dataStorage: StorageInterface?) =
protected open fun createPool(session: Session, user: User?) =
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
500, TimeUnit.MILLISECONDS,
Expand All @@ -54,12 +52,11 @@ open class ClientManager {
fun getPool(
session: Session,
user: User?,
dataStorage: StorageInterface?,
): ThreadPoolExecutor {
log.debug("Fetching thread pool for session: {}, user: {}", session, user)
val key = SessionKey(session, user)
return poolCache.getOrPut(key) {
createPool(session, user, dataStorage)
createPool(session, user)
}
}

Expand Down Expand Up @@ -93,41 +90,34 @@ open class ClientManager {
protected open fun createClient(
session: Session,
user: User?,
dataStorage: StorageInterface?,
): OpenAIClient? {
log.debug("Creating client for session: {}, user: {}", session, user)
val sessionDir = dataStorageFactory(dataStorageRoot).getDataDir(user, session).apply { mkdirs() }
if (user != null) {
val userSettings = userSettingsManager.getUserSettings(user)

val logfile = dataStorageFactory(dataStorageRoot).getSessionDir(user, session).resolve("openai.log").apply { mkdirs() }.resolve("openai.log")
logfile?.parentFile?.mkdirs()
log.debug("Logfile: {}", logfile)
val userApi =
if (userSettings.apiKeys.isNotEmpty())
MonitoredClient(
key = userSettings.apiKeys,
apiBase = userSettings.apiBase,
logfile = logfile,
logfile = sessionDir.resolve("openai.log"),
session = session,
user = user,
workPool = getPool(session, user, dataStorage),
workPool = getPool(session, user),
) else null
if (userApi != null) return userApi
}
val canUseGlobalKey = ApplicationServices.authorizationManager.isAuthorized(
null, user, OperationType.GlobalKey
)
if (!canUseGlobalKey) throw RuntimeException("No API key")
val logfile = dataStorageRoot?.resolve("${if (session.isGlobal()) "global" else "user-sessions/$user"}/$session/openai.log")
?.apply { parentFile?.mkdirs() }
logfile?.parentFile?.mkdirs()
return (if (ClientUtil.keyMap.isNotEmpty()) {
MonitoredClient(
key = ClientUtil.keyMap.mapKeys { APIProvider.valueOf(it.key) },
logfile = logfile,
logfile = sessionDir?.resolve("openai.log"),
session = session,
user = user,
workPool = getPool(session, user, dataStorage),
workPool = getPool(session, user),
)
} else {
null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package com.simiacryptus.skyenet.core.platform

import com.google.common.util.concurrent.AtomicDouble
import com.simiacryptus.jopenai.ApiModel
import com.simiacryptus.jopenai.models.ChatModels
import com.simiacryptus.jopenai.models.OpenAIModel
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.Timestamp
import java.util.concurrent.atomic.AtomicInteger

class HSQLUsageManager(private val dbFile: File) : UsageInterface {

private val connection: Connection
private val logger: Logger = LoggerFactory.getLogger(HSQLUsageManager::class.java)

init {
logger.info("Initializing HSQLUsageManager with database file: ${dbFile.absolutePath}")
Class.forName("org.hsqldb.jdbc.JDBCDriver")
connection = DriverManager.getConnection("jdbc:hsqldb:file:${dbFile.absolutePath};shutdown=true", "SA", "")
logger.debug("Database connection established: $connection")
createSchema()
}

private fun createSchema() {
logger.info("Creating database schema if not exists")
connection.createStatement().executeUpdate(
"""
CREATE TABLE IF NOT EXISTS usage (
session_id VARCHAR(255),
api_key VARCHAR(255),
model VARCHAR(255),
prompt_tokens INT,
completion_tokens INT,
cost DOUBLE,
datetime TIMESTAMP,
PRIMARY KEY (session_id, api_key, model, prompt_tokens, completion_tokens, cost, datetime)
)
"""
)
}


private fun updateSchema() {
logger.info("Updating database schema if needed")
// Add schema update logic here if needed
}

private fun deleteSchema() {
logger.info("Deleting database schema if exists")
connection.createStatement().executeUpdate("DROP TABLE IF EXISTS usage")
logger.debug("Schema deleted")
}

override fun incrementUsage(session: Session, apiKey: String?, model: OpenAIModel, tokens: ApiModel.Usage) {
logger.info("Incrementing usage for session: ${session.sessionId}, apiKey: $apiKey, model: ${model.modelName}")
val usageKey = UsageInterface.UsageKey(session, apiKey, model)
val usageValues = getUsageValues(usageKey)
usageValues.addAndGet(tokens)
saveUsageValues(usageKey, usageValues)
logger.debug("Usage incremented for session: ${session.sessionId}, apiKey: $apiKey, model: ${model.modelName}")
}

override fun getUserUsageSummary(apiKey: String): Map<OpenAIModel, ApiModel.Usage> {
logger.debug("Executing SQL query to get user usage summary for apiKey: $apiKey")
val statement = connection.prepareStatement(
"""
SELECT model, SUM(prompt_tokens), SUM(completion_tokens), SUM(cost)
FROM usage
WHERE api_key = ?
GROUP BY model
"""
)
statement.setString(1, apiKey)
val resultSet = statement.executeQuery()
return generateUsageSummary(resultSet)
}

override fun getSessionUsageSummary(session: Session): Map<OpenAIModel, ApiModel.Usage> {
logger.info("Getting session usage summary for session: ${session.sessionId}")
val statement = connection.prepareStatement(
"""
SELECT model, SUM(prompt_tokens), SUM(completion_tokens), SUM(cost)
FROM usage
WHERE session_id = ?
GROUP BY model
"""
)
statement.setString(1, session.sessionId)
val resultSet = statement.executeQuery()
return generateUsageSummary(resultSet)
}

override fun clear() {
logger.debug("Executing SQL statement to clear all usage data")
connection.createStatement().executeUpdate("DELETE FROM usage")
}

private fun getUsageValues(usageKey: UsageInterface.UsageKey): UsageInterface.UsageValues {
logger.info("Getting usage values for session: ${usageKey.session.sessionId}, apiKey: ${usageKey.apiKey}, model: ${usageKey.model.modelName}")
//logger.debug("Executing SQL query to get usage values for session: ${usageKey.session.sessionId}, apiKey: ${usageKey.apiKey}, model: ${usageKey.model.modelName}")
val statement = connection.prepareStatement(
"""
SELECT COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(cost), 0)
FROM usage
WHERE session_id = ? AND api_key = ? AND model = ?
"""
)
statement.setString(1, usageKey.session.sessionId)
statement.setString(2, usageKey.apiKey ?: "")
statement.setString(3, usageKey.model.toString())
val resultSet = statement.executeQuery()
resultSet.next()
return UsageInterface.UsageValues(
AtomicInteger(resultSet.getInt(1)),
AtomicInteger(resultSet.getInt(2)),
AtomicDouble(resultSet.getDouble(3))
)
}

private fun saveUsageValues(usageKey: UsageInterface.UsageKey, usageValues: UsageInterface.UsageValues) {
logger.info("Saving usage values for session: ${usageKey.session.sessionId}, apiKey: ${usageKey.apiKey}, model: ${usageKey.model.modelName}")
logger.debug("Executing SQL statement to save usage values for session: ${usageKey.session.sessionId}, apiKey: ${usageKey.apiKey}, model: ${usageKey.model.modelName}")
val statement = connection.prepareStatement(
"""
INSERT INTO usage (session_id, api_key, model, prompt_tokens, completion_tokens, cost, datetime)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
)
statement.setString(1, usageKey.session.sessionId)
statement.setString(2, usageKey.apiKey ?: "")
statement.setString(3, usageKey.model.modelName)
statement.setInt(4, usageValues.inputTokens.get())
statement.setInt(5, usageValues.outputTokens.get())
statement.setDouble(6, usageValues.cost.get())
statement.setTimestamp(7, Timestamp(System.currentTimeMillis()))
logger.debug("Executing statement: $statement")
logger.debug("With parameters: ${usageKey.session.sessionId}, ${usageKey.apiKey}, ${usageKey.model.modelName}, ${usageValues.inputTokens.get()}, ${usageValues.outputTokens.get()}, ${usageValues.cost.get()}")
statement.executeUpdate()
}

private fun generateUsageSummary(resultSet: ResultSet): Map<OpenAIModel, ApiModel.Usage> {
logger.debug("Generating usage summary from result set")
val summary = mutableMapOf<OpenAIModel, ApiModel.Usage>()
while (resultSet.next()) {
val string = resultSet.getString(1)
val model = openAIModel(string)
val usage = ApiModel.Usage(
prompt_tokens = resultSet.getInt(2),
completion_tokens = resultSet.getInt(3),
cost = resultSet.getDouble(4)
)
summary[model] = usage
}
return summary
}

private fun openAIModel(string: String): OpenAIModel {
logger.debug("Retrieving OpenAI model for string: $string")
val model = ChatModels.values().filter {
it.key == string || it.value.modelName == string || it.value.name == string
}.toList().firstOrNull()?.second ?: throw RuntimeException("Unknown model $string")
logger.debug("OpenAI model retrieved: $model")
return model
}

companion object {
private val log = org.slf4j.LoggerFactory.getLogger(HSQLUsageManager::class.java)
}
}
Loading

0 comments on commit bbf75d1

Please sign in to comment.