From: Lanius Trolling Date: Tue, 23 Jul 2024 17:03:32 +0000 (-0400) Subject: Add config properties for OpenAI GPT model, assistant name, instructions, and temperature X-Git-Url: https://gitweb.starshipfights.net/?a=commitdiff_plain;h=99a89b3f609ca7dbec825f35e447bbc96e3ea392;p=factbooks Add config properties for OpenAI GPT model, assistant name, instructions, and temperature --- diff --git a/src/jvmMain/kotlin/info/mechyrdia/Configuration.kt b/src/jvmMain/kotlin/info/mechyrdia/Configuration.kt index 2e3eba0..ff3be8e 100644 --- a/src/jvmMain/kotlin/info/mechyrdia/Configuration.kt +++ b/src/jvmMain/kotlin/info/mechyrdia/Configuration.kt @@ -1,62 +1,66 @@ -package info.mechyrdia - -import info.mechyrdia.data.Id -import info.mechyrdia.data.NationData -import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable -import java.io.File - -@Serializable -sealed class FileStorageConfig { - @Serializable - @SerialName("flat") - data class Flat(val baseDir: String) : FileStorageConfig() - - @Serializable - @SerialName("gridFS") - data object GridFs : FileStorageConfig() -} - -@Serializable -data class OpenAiConfig( - val token: String, - val orgId: String, - val project: String? = null, -) - -@Serializable -data class Configuration( - val host: String = "127.0.0.1", - val port: Int = 8080, - - val isDevMode: Boolean = false, - - val storage: FileStorageConfig = FileStorageConfig.Flat(".."), - - val dbName: String = "nslore", - val dbConn: String = "mongodb://localhost:27017", - - val ownerNation: String = "mechyrdia", - val emergencyPassword: String? = null, - - val openAi: OpenAiConfig? = null, -) { - companion object { - val Current: Configuration by lazy { - val file = File(System.getProperty("info.mechyrdia.configpath", "./config.json")) - if (!file.isFile) { - if (file.exists()) - file.deleteRecursively() - - file.writeText(JsonFileCodec.encodeToString(serializer(), Configuration()), Charsets.UTF_8) - } - - JsonFileCodec.decodeFromString(serializer(), file.readText(Charsets.UTF_8)) - } - } -} - -val OwnerNationId: Id - get() = Id(Configuration.Current.ownerNation) - -const val MainDomainName = "https://mechyrdia.info" +package info.mechyrdia + +import info.mechyrdia.data.Id +import info.mechyrdia.data.NationData +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import java.io.File + +@Serializable +sealed class FileStorageConfig { + @Serializable + @SerialName("flat") + data class Flat(val baseDir: String) : FileStorageConfig() + + @Serializable + @SerialName("gridFS") + data object GridFs : FileStorageConfig() +} + +@Serializable +data class OpenAiConfig( + val token: String, + val orgId: String, + val project: String? = null, + val assistantModel: String = "gpt-4o-mini", + val assistantName: String = "Natural-language Universal Knowledge Engine", + val assistantInstructions: String = "You are a helpful interactive encyclopedia, able to answer questions with information from the provided files", + val assistantTemperature: Double = 1.0, +) + +@Serializable +data class Configuration( + val host: String = "127.0.0.1", + val port: Int = 8080, + + val isDevMode: Boolean = false, + + val storage: FileStorageConfig = FileStorageConfig.Flat(".."), + + val dbName: String = "nslore", + val dbConn: String = "mongodb://localhost:27017", + + val ownerNation: String = "mechyrdia", + val emergencyPassword: String? = null, + + val openAi: OpenAiConfig? = null, +) { + companion object { + val Current: Configuration by lazy { + val file = File(System.getProperty("info.mechyrdia.configpath", "./config.json")) + if (!file.isFile) { + if (file.exists()) + file.deleteRecursively() + + file.writeText(JsonFileCodec.encodeToString(serializer(), Configuration()), Charsets.UTF_8) + } + + JsonFileCodec.decodeFromString(serializer(), file.readText(Charsets.UTF_8)) + } + } +} + +val OwnerNationId: Id + get() = Id(Configuration.Current.ownerNation) + +const val MainDomainName = "https://mechyrdia.info" diff --git a/src/jvmMain/kotlin/info/mechyrdia/robot/RobotService.kt b/src/jvmMain/kotlin/info/mechyrdia/robot/RobotService.kt index 2afa2ae..486c87d 100644 --- a/src/jvmMain/kotlin/info/mechyrdia/robot/RobotService.kt +++ b/src/jvmMain/kotlin/info/mechyrdia/robot/RobotService.kt @@ -1,404 +1,401 @@ -package info.mechyrdia.robot - -import info.mechyrdia.Configuration -import info.mechyrdia.MainDomainName -import info.mechyrdia.data.* -import info.mechyrdia.lore.RobotFactbookLoader -import io.ktor.client.* -import io.ktor.client.engine.java.* -import io.ktor.client.plugins.* -import io.ktor.client.plugins.contentnegotiation.* -import io.ktor.client.plugins.logging.* -import io.ktor.client.request.* -import io.ktor.http.* -import io.ktor.serialization.kotlinx.json.* -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import java.time.Instant -import kotlin.random.Random -import kotlin.time.Duration.Companion.minutes - -val RobotGlobalsId = Id("RobotGlobalsInstance") - -@Serializable -data class RobotGlobals( - @SerialName(MONGODB_ID_KEY) - override val id: Id = RobotGlobalsId, - - val lastFileUpload: @Serializable(with = InstantNullableSerializer::class) Instant? = null, - val fileIdMap: Map = emptyMap(), - val vectorStoreId: RobotVectorStoreId? = null, - val assistantId: RobotAssistantId? = null, - val ongoingThreadIds: Set = emptySet(), -) : DataDocument { - suspend fun save(): RobotGlobals { - set(this) - return this - } - - companion object : TableHolder { - override val Table = DocumentTable() - - suspend fun get() = Table.get(RobotGlobalsId) - suspend fun set(instance: RobotGlobals) = Table.put(instance) - suspend fun delete() = Table.del(RobotGlobalsId) - - override suspend fun initialize() = Unit - } -} - -private fun RobotGlobals.plusThread(threadId: RobotThreadId) = copy( - ongoingThreadIds = ongoingThreadIds + threadId -) - -private fun RobotGlobals.minusThread(threadId: RobotThreadId) = copy( - ongoingThreadIds = ongoingThreadIds - threadId -) - -enum class RobotServiceStatus { - NOT_CONFIGURED, - LOADING, - FAILED, - READY, -} - -class RobotService( - token: String, - orgId: String, - project: String?, -) { - private val robotClient = RobotClient( - HttpClient(Java) { - defaultRequest { - header(HttpHeaders.Authorization, "Bearer $token") - header("OpenAI-Organization", orgId) - project?.let { header("OpenAI-Project", it) } - header("OpenAI-Beta", "assistants=v2") - } - - install(ContentNegotiation) { - json(JsonRobotCodec) - } - - Logging { - level = LogLevel.INFO - sanitizeHeader("") { it == HttpHeaders.Authorization } - } - - install(HttpRequestRetry) { - retryOnExceptionOrServerErrors(5) - delayMillis { retry -> - (1 shl (retry - 1)) * 1000L + Random.nextLong(250L, 750L) - } - } - - expectSuccess = true - - install(RobotRateLimiter) - } - ) - - private suspend fun createThread(): RobotThreadId { - return robotClient.createThread(RobotCreateThreadRequest()).id.also { threadId -> - (RobotGlobals.get() ?: RobotGlobals()).plusThread(threadId).save() - } - } - - private suspend fun deleteThread(threadId: RobotThreadId) { - robotClient.deleteThread(threadId) - (RobotGlobals.get() ?: RobotGlobals()).minusThread(threadId).save() - } - - private suspend fun RobotGlobals.gcOldThreads(): RobotGlobals { - for (threadId in ongoingThreadIds) - try { - robotClient.deleteThread(threadId) - } catch (ex: ClientRequestException) { - logger.warn("Unable to delete thread at ID $threadId", ex) - } - return copy(ongoingThreadIds = emptySet()) - } - - private suspend fun updateFiles(prevGlobals: RobotGlobals?, onNewFileId: (suspend (RobotFileId) -> Unit)? = null): RobotGlobals { - val robotGlobals = prevGlobals ?: RobotGlobals() - - val fileIdMap = buildMap { - putAll(robotGlobals.fileIdMap) - - val factbooks = robotGlobals.lastFileUpload?.let { - RobotFactbookLoader.loadAllFactbooksSince(it) - } ?: RobotFactbookLoader.loadAllFactbooks() - - for ((name, text) in factbooks) { - remove(name)?.let { oldId -> - try { - robotClient.deleteFile(oldId) - } catch (ex: ClientRequestException) { - logger.warn("Unable to delete file $name at ID $oldId", ex) - } - } - - val newId = robotClient.uploadFile( - "assistants", - FileUpload( - text.toByteArray(), - ContentType.Text.Plain.withCharset(Charsets.UTF_8), - name.toOpenAiName() - ) - ).id - - this[name] = newId - onNewFileId?.invoke(newId) - - logger.info("Factbook $name has been uploaded") - } - } - - return robotGlobals.copy(lastFileUpload = Instant.now(), fileIdMap = fileIdMap).save() - } - - suspend fun initialize() { - var robotGlobals = updateFiles(RobotGlobals.get()?.gcOldThreads()) - - val vectorStoreId = robotGlobals.vectorStoreId ?: robotClient.createVectorStore( - RobotCreateVectorStoreRequest( - name = "lore_documents", - fileIds = robotGlobals.fileIdMap.values.toList(), - ) - ).id.also { vsId -> - robotGlobals = robotGlobals.copy(vectorStoreId = vsId).save() - } - - logger.info("Vector store has been created") - - poll { - robotClient.getVectorStore(vectorStoreId).status == "completed" - } - - logger.info("Vector store creation is complete") - - if (robotGlobals.assistantId == null) - robotGlobals = robotGlobals.copy( - assistantId = robotClient.createAssistant( - RobotCreateAssistantRequest( - model = "gpt-4o", - name = "Natural-language Universal Knowledge Engine", - instructions = "You are a helpful interactive encyclopedia, able to answer questions with information from the provided files", - tools = listOf( - RobotCreateAssistantRequestTool("file_search") - ), - toolResources = RobotCreateAssistantRequestToolResources( - fileSearch = RobotCreateAssistantRequestFileSearchResources( - vectorStoreIds = listOf(vectorStoreId) - ) - ), - temperature = 1.0 - ) - ).id - ).save() - - logger.info("Assistant has been created") - } - - suspend fun performMaintenance() { - var robotGlobals = RobotGlobals.get() ?: RobotGlobals() - - val vectorStoreId = robotGlobals.vectorStoreId ?: robotClient.createVectorStore( - RobotCreateVectorStoreRequest( - name = "lore_documents", - fileIds = robotGlobals.fileIdMap.values.toList(), - ) - ).id.also { vsId -> - robotGlobals = robotGlobals.copy(vectorStoreId = vsId).save() - } - - updateFiles(robotGlobals) { fileId -> - robotClient.addFileToVectorStore(vectorStoreId, fileId) - } - - logger.info("Vector store has been updated") - - poll { - robotClient.getVectorStore(vectorStoreId).fileCounts.inProgress == 0 - } - - logger.info("Vector store update is complete") - } - - suspend fun reset() { - RobotGlobals.get()?.gcOldThreads()?.copy( - lastFileUpload = null, - fileIdMap = emptyMap(), - vectorStoreId = null, - assistantId = null, - )?.save() - - while (true) { - val assistants = robotClient.listAssistants().data - if (assistants.isEmpty()) break - - assistants.map { it.id }.forEach { - robotClient.deleteAssistant(it) - } - } - - while (true) { - val vectorStores = robotClient.listVectorStores().data - if (vectorStores.isEmpty()) break - - vectorStores.map { it.id }.forEach { - robotClient.deleteVectorStore(it) - } - } - - robotClient.listFiles().data.map { it.id }.forEach { - robotClient.deleteFile(it) - } - - initialize() - } - - inner class Conversation(private val nationId: Id) { - private var assistantId: RobotAssistantId? = null - private var threadId: RobotThreadId? = null - - suspend fun send(userMessage: String): Flow { - val assistant = assistantId ?: pollValue { RobotGlobals.get()?.assistantId } - .also { assistantId = it } - - val thread = threadId ?: createThread().also { threadId = it } - - val messages = listOf( - RobotCreateThreadRequestMessage( - role = "user", - content = userMessage - ) - ) - - val tokenTracker = ConversationMessageTokenTracker() - - return flow { - emit(RobotConversationMessage.User(userMessage)) - - val annotationTargets = mutableListOf>() - val collectionScope = CoroutineScope(currentCoroutineContext()) - - robotClient.createRun(thread, assistant, messages) - .filter { it.event == "thread.message.delta" } - .mapNotNull { it.data } - .map { JsonRobotCodec.decodeFromString(RobotMessageDelta.serializer(), it) } - .collect { eventData -> - val annotationTexts = eventData.delta.content.flatMap { it.text.annotations }.map { annotation -> - val annotationIndex = annotationTargets.size - annotationTargets.add(collectionScope.async { - val fileName = robotClient.getFile(annotation.fileCitation.fileId).filename.fromOpenAiName() - val fileText = annotation.fileCitation.quote.let { if (it.isNotBlank()) ": $it" else it } - "$MainDomainName/lore/$fileName$fileText" - }) - annotation.text to " [${annotationIndex + 1}]" - } - - val contents = eventData.delta.content.joinToString(separator = "") { textContent -> - textContent.text.value - } - - val replacedContents = annotationTexts.fold(contents) { text, (replace, replaceWith) -> - text.replace(replace, replaceWith) - } - - emit(RobotConversationMessage.Robot(replacedContents)) - } - - emit(RobotConversationMessage.Cite(annotationTargets.awaitAll())) - - emit(RobotConversationMessage.Ready) - }.onEach { message -> - tokenTracker.addMessage(message) - }.onCompletion { _ -> - RobotUser.addTokens(nationId, tokenTracker.calculateTokens()) - } - } - - suspend fun isExhausted(): Boolean { - val usedTokens = RobotUser.getTokens(nationId) - val tokenLimit = RobotUser.getMaxTokens(nationId) - return usedTokens >= tokenLimit - } - - suspend fun close() { - threadId?.let { deleteThread(it) } - } - } - - companion object { - private val logger: Logger = LoggerFactory.getLogger(RobotService::class.java) - - private val maintenanceScope = CoroutineScope(SupervisorJob() + CoroutineName("robot-service-maintenance")) - - private val instanceHolder by lazy { - CoroutineScope(CoroutineName("robot-service-initialization")).async { - Configuration.Current.openAi?.let { (token, orgId, project) -> - RobotService(token, orgId, project).apply { - initialize() - } - } - } - } - - var status: RobotServiceStatus = if (Configuration.Current.openAi != null) RobotServiceStatus.LOADING else RobotServiceStatus.NOT_CONFIGURED - private set - - suspend fun getInstance() = try { - instanceHolder.await() - } catch (ex: Exception) { - null - } - - fun initialize() { - instanceHolder.invokeOnCompletion { ex -> - status = if (ex != null) { - logger.error("RobotService failed to initialize", ex) - RobotServiceStatus.FAILED - } else { - logger.info("RobotService successfully initialized") - RobotServiceStatus.READY - } - } - - maintenanceScope.launch { - getInstance()?.let { instance -> - while (true) { - delay(30.minutes) - - launch(SupervisorJob(currentCoroutineContext().job)) { - instance.performMaintenance() - } - } - } - } - } - } -} - -@Serializable -sealed class RobotConversationMessage { - @Serializable - @SerialName("ready") - data object Ready : RobotConversationMessage() - - @Serializable - @SerialName("user") - data class User(val text: String) : RobotConversationMessage() - - @Serializable - @SerialName("robot") - data class Robot(val text: String) : RobotConversationMessage() - - @Serializable - @SerialName("cite") - data class Cite(val urls: List) : RobotConversationMessage() -} +package info.mechyrdia.robot + +import info.mechyrdia.Configuration +import info.mechyrdia.MainDomainName +import info.mechyrdia.OpenAiConfig +import info.mechyrdia.data.* +import info.mechyrdia.lore.RobotFactbookLoader +import io.ktor.client.* +import io.ktor.client.engine.java.* +import io.ktor.client.plugins.* +import io.ktor.client.plugins.contentnegotiation.* +import io.ktor.client.plugins.logging.* +import io.ktor.client.request.* +import io.ktor.http.* +import io.ktor.serialization.kotlinx.json.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.time.Instant +import kotlin.random.Random +import kotlin.time.Duration.Companion.minutes + +val RobotGlobalsId = Id("RobotGlobalsInstance") + +@Serializable +data class RobotGlobals( + @SerialName(MONGODB_ID_KEY) + override val id: Id = RobotGlobalsId, + + val lastFileUpload: @Serializable(with = InstantNullableSerializer::class) Instant? = null, + val fileIdMap: Map = emptyMap(), + val vectorStoreId: RobotVectorStoreId? = null, + val assistantId: RobotAssistantId? = null, + val ongoingThreadIds: Set = emptySet(), +) : DataDocument { + suspend fun save(): RobotGlobals { + set(this) + return this + } + + companion object : TableHolder { + override val Table = DocumentTable() + + suspend fun get() = Table.get(RobotGlobalsId) + suspend fun set(instance: RobotGlobals) = Table.put(instance) + suspend fun delete() = Table.del(RobotGlobalsId) + + override suspend fun initialize() = Unit + } +} + +private fun RobotGlobals.plusThread(threadId: RobotThreadId) = copy( + ongoingThreadIds = ongoingThreadIds + threadId +) + +private fun RobotGlobals.minusThread(threadId: RobotThreadId) = copy( + ongoingThreadIds = ongoingThreadIds - threadId +) + +enum class RobotServiceStatus { + NOT_CONFIGURED, + LOADING, + FAILED, + READY, +} + +class RobotService( + private val config: OpenAiConfig, +) { + private val robotClient = RobotClient( + HttpClient(Java) { + defaultRequest { + header(HttpHeaders.Authorization, "Bearer ${config.token}") + header("OpenAI-Organization", config.orgId) + config.project?.let { header("OpenAI-Project", it) } + header("OpenAI-Beta", "assistants=v2") + } + + install(ContentNegotiation) { + json(JsonRobotCodec) + } + + Logging { + level = LogLevel.INFO + sanitizeHeader("") { it == HttpHeaders.Authorization } + } + + install(HttpRequestRetry) { + retryOnExceptionOrServerErrors(5) + delayMillis { retry -> + (1 shl (retry - 1)) * 1000L + Random.nextLong(250L, 750L) + } + } + + expectSuccess = true + + install(RobotRateLimiter) + } + ) + + private suspend fun createThread(): RobotThreadId { + return robotClient.createThread(RobotCreateThreadRequest()).id.also { threadId -> + (RobotGlobals.get() ?: RobotGlobals()).plusThread(threadId).save() + } + } + + private suspend fun deleteThread(threadId: RobotThreadId) { + robotClient.deleteThread(threadId) + (RobotGlobals.get() ?: RobotGlobals()).minusThread(threadId).save() + } + + private suspend fun RobotGlobals.gcOldThreads(): RobotGlobals { + for (threadId in ongoingThreadIds) + try { + robotClient.deleteThread(threadId) + } catch (ex: ClientRequestException) { + logger.warn("Unable to delete thread at ID $threadId", ex) + } + return copy(ongoingThreadIds = emptySet()) + } + + private suspend fun updateFiles(prevGlobals: RobotGlobals?, onNewFileId: (suspend (RobotFileId) -> Unit)? = null): RobotGlobals { + val robotGlobals = prevGlobals ?: RobotGlobals() + + val fileIdMap = buildMap { + putAll(robotGlobals.fileIdMap) + + val factbooks = robotGlobals.lastFileUpload?.let { + RobotFactbookLoader.loadAllFactbooksSince(it) + } ?: RobotFactbookLoader.loadAllFactbooks() + + for ((name, text) in factbooks) { + remove(name)?.let { oldId -> + try { + robotClient.deleteFile(oldId) + } catch (ex: ClientRequestException) { + logger.warn("Unable to delete file $name at ID $oldId", ex) + } + } + + val newId = robotClient.uploadFile( + "assistants", + FileUpload( + text.toByteArray(), + ContentType.Text.Plain.withCharset(Charsets.UTF_8), + name.toOpenAiName() + ) + ).id + + this[name] = newId + onNewFileId?.invoke(newId) + + logger.info("Factbook $name has been uploaded") + } + } + + return robotGlobals.copy(lastFileUpload = Instant.now(), fileIdMap = fileIdMap).save() + } + + suspend fun initialize() { + var robotGlobals = updateFiles(RobotGlobals.get()?.gcOldThreads()) + + val vectorStoreId = robotGlobals.vectorStoreId ?: robotClient.createVectorStore( + RobotCreateVectorStoreRequest( + name = "lore_documents", + fileIds = robotGlobals.fileIdMap.values.toList(), + ) + ).id.also { vsId -> + robotGlobals = robotGlobals.copy(vectorStoreId = vsId).save() + } + + logger.info("Vector store has been created") + + poll { + robotClient.getVectorStore(vectorStoreId).status == "completed" + } + + logger.info("Vector store creation is complete") + + if (robotGlobals.assistantId == null) + robotGlobals = robotGlobals.copy( + assistantId = robotClient.createAssistant( + RobotCreateAssistantRequest( + model = config.assistantModel, + name = config.assistantName, + instructions = config.assistantInstructions, + tools = listOf( + RobotCreateAssistantRequestTool("file_search") + ), + toolResources = RobotCreateAssistantRequestToolResources( + fileSearch = RobotCreateAssistantRequestFileSearchResources( + vectorStoreIds = listOf(vectorStoreId) + ) + ), + temperature = config.assistantTemperature + ) + ).id + ).save() + + logger.info("Assistant has been created") + } + + suspend fun performMaintenance() { + var robotGlobals = RobotGlobals.get() ?: RobotGlobals() + + val vectorStoreId = robotGlobals.vectorStoreId ?: robotClient.createVectorStore( + RobotCreateVectorStoreRequest( + name = "lore_documents", + fileIds = robotGlobals.fileIdMap.values.toList(), + ) + ).id.also { vsId -> + robotGlobals = robotGlobals.copy(vectorStoreId = vsId).save() + } + + updateFiles(robotGlobals) { fileId -> + robotClient.addFileToVectorStore(vectorStoreId, fileId) + } + + logger.info("Vector store has been updated") + + poll { + robotClient.getVectorStore(vectorStoreId).fileCounts.inProgress == 0 + } + + logger.info("Vector store update is complete") + } + + suspend fun reset() { + RobotGlobals.get()?.gcOldThreads()?.copy( + lastFileUpload = null, + fileIdMap = emptyMap(), + vectorStoreId = null, + assistantId = null, + )?.save() + + while (true) { + val assistants = robotClient.listAssistants().data + if (assistants.isEmpty()) break + + assistants.map { it.id }.forEach { + robotClient.deleteAssistant(it) + } + } + + while (true) { + val vectorStores = robotClient.listVectorStores().data + if (vectorStores.isEmpty()) break + + vectorStores.map { it.id }.forEach { + robotClient.deleteVectorStore(it) + } + } + + robotClient.listFiles().data.map { it.id }.forEach { + robotClient.deleteFile(it) + } + + initialize() + } + + inner class Conversation(private val nationId: Id) { + private var assistantId: RobotAssistantId? = null + private var threadId: RobotThreadId? = null + + suspend fun send(userMessage: String): Flow { + val assistant = assistantId ?: pollValue { RobotGlobals.get()?.assistantId } + .also { assistantId = it } + + val thread = threadId ?: createThread().also { threadId = it } + + val messages = listOf( + RobotCreateThreadRequestMessage( + role = "user", + content = userMessage + ) + ) + + val tokenTracker = ConversationMessageTokenTracker() + + return flow { + emit(RobotConversationMessage.User(userMessage)) + + val annotationTargets = mutableListOf>() + val collectionScope = CoroutineScope(currentCoroutineContext()) + + robotClient.createRun(thread, assistant, messages) + .filter { it.event == "thread.message.delta" } + .mapNotNull { it.data } + .map { JsonRobotCodec.decodeFromString(RobotMessageDelta.serializer(), it) } + .collect { eventData -> + val annotationTexts = eventData.delta.content.flatMap { it.text.annotations }.map { annotation -> + val annotationIndex = annotationTargets.size + annotationTargets.add(collectionScope.async { + val fileName = robotClient.getFile(annotation.fileCitation.fileId).filename.fromOpenAiName() + val fileText = annotation.fileCitation.quote.let { if (it.isNotBlank()) ": $it" else it } + "$MainDomainName/lore/$fileName$fileText" + }) + annotation.text to " [${annotationIndex + 1}]" + } + + val contents = eventData.delta.content.joinToString(separator = "") { textContent -> + textContent.text.value + } + + val replacedContents = annotationTexts.fold(contents) { text, (replace, replaceWith) -> + text.replace(replace, replaceWith) + } + + emit(RobotConversationMessage.Robot(replacedContents)) + } + + emit(RobotConversationMessage.Cite(annotationTargets.awaitAll())) + + emit(RobotConversationMessage.Ready) + }.onEach { message -> + tokenTracker.addMessage(message) + }.onCompletion { _ -> + RobotUser.addTokens(nationId, tokenTracker.calculateTokens()) + } + } + + suspend fun isExhausted(): Boolean { + val usedTokens = RobotUser.getTokens(nationId) + val tokenLimit = RobotUser.getMaxTokens(nationId) + return usedTokens >= tokenLimit + } + + suspend fun close() { + threadId?.let { deleteThread(it) } + } + } + + companion object { + private val logger: Logger = LoggerFactory.getLogger(RobotService::class.java) + + private val maintenanceScope = CoroutineScope(SupervisorJob() + CoroutineName("robot-service-maintenance")) + + private val instanceHolder by lazy { + CoroutineScope(CoroutineName("robot-service-initialization")).async { + Configuration.Current.openAi?.let(::RobotService)?.apply { + initialize() + } + } + } + + var status: RobotServiceStatus = if (Configuration.Current.openAi != null) RobotServiceStatus.LOADING else RobotServiceStatus.NOT_CONFIGURED + private set + + suspend fun getInstance() = try { + instanceHolder.await() + } catch (ex: Exception) { + null + } + + fun initialize() { + instanceHolder.invokeOnCompletion { ex -> + status = if (ex != null) { + logger.error("RobotService failed to initialize", ex) + RobotServiceStatus.FAILED + } else { + logger.info("RobotService successfully initialized") + RobotServiceStatus.READY + } + } + + maintenanceScope.launch { + getInstance()?.let { instance -> + while (true) { + delay(30.minutes) + + launch(SupervisorJob(currentCoroutineContext().job)) { + instance.performMaintenance() + } + } + } + } + } + } +} + +@Serializable +sealed class RobotConversationMessage { + @Serializable + @SerialName("ready") + data object Ready : RobotConversationMessage() + + @Serializable + @SerialName("user") + data class User(val text: String) : RobotConversationMessage() + + @Serializable + @SerialName("robot") + data class Robot(val text: String) : RobotConversationMessage() + + @Serializable + @SerialName("cite") + data class Cite(val urls: List) : RobotConversationMessage() +}