Create generic file-storage API
authorLanius Trolling <lanius@laniustrolling.dev>
Sun, 7 Apr 2024 22:05:37 +0000 (18:05 -0400)
committerLanius Trolling <lanius@laniustrolling.dev>
Sun, 7 Apr 2024 22:05:37 +0000 (18:05 -0400)
src/jvmMain/kotlin/info/mechyrdia/data/bson.kt
src/jvmMain/kotlin/info/mechyrdia/data/data_files.kt [new file with mode: 0644]
src/jvmMain/kotlin/info/mechyrdia/data/data_flow.kt [new file with mode: 0644]

index 1c82cb6b51f91d7d4f11060645b51e4dfe29b2fc..8eca341e0a38093c0db0a16244bdf0acb8fff7e4 100644 (file)
@@ -1,7 +1,12 @@
+@file:OptIn(ExperimentalSerializationApi::class)
+
 package info.mechyrdia.data
 
+import kotlinx.serialization.ExperimentalSerializationApi
 import kotlinx.serialization.KSerializer
+import kotlinx.serialization.Serializable
 import kotlinx.serialization.SerializationException
+import kotlinx.serialization.builtins.ListSerializer
 import kotlinx.serialization.descriptors.PrimitiveKind
 import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
 import kotlinx.serialization.descriptors.SerialDescriptor
@@ -18,6 +23,7 @@ import org.bson.codecs.configuration.CodecProvider
 import org.bson.codecs.configuration.CodecRegistry
 import org.bson.codecs.kotlinx.BsonDecoder
 import org.bson.codecs.kotlinx.BsonEncoder
+import org.bson.types.ObjectId
 import java.time.Instant
 
 object IdCodec : Codec<Id<*>> {
@@ -43,8 +49,25 @@ object IdCodecProvider : CodecProvider {
        }
 }
 
+@Serializable
+data class MongoDbMapEntry<K, V>(val key: K, val value: V)
+
+class MongoDbMapSerializer<K, V>(val keySerializer: KSerializer<K>, val valueSerializer: KSerializer<V>) : KSerializer<Map<K, V>> {
+       private val innerSerializer = ListSerializer(MongoDbMapEntry.serializer(keySerializer, valueSerializer))
+       
+       override val descriptor: SerialDescriptor = innerSerializer.descriptor
+       
+       override fun serialize(encoder: Encoder, value: Map<K, V>) {
+               innerSerializer.serialize(encoder, value.map { MongoDbMapEntry(it.key, it.value) })
+       }
+       
+       override fun deserialize(decoder: Decoder): Map<K, V> {
+               return innerSerializer.deserialize(decoder).associate { it.key to it.value }
+       }
+}
+
 object InstantSerializer : KSerializer<Instant> {
-       override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.STRING)
+       override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.LONG)
        
        override fun serialize(encoder: Encoder, value: Instant) {
                if (encoder !is BsonEncoder)
@@ -62,7 +85,7 @@ object InstantSerializer : KSerializer<Instant> {
 }
 
 object InstantNullableSerializer : KSerializer<Instant?> {
-       override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.STRING)
+       override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.LONG)
        
        override fun serialize(encoder: Encoder, value: Instant?) {
                if (encoder !is BsonEncoder)
@@ -78,9 +101,51 @@ object InstantNullableSerializer : KSerializer<Instant?> {
                if (decoder !is BsonDecoder)
                        throw SerializationException("Instant is not supported by ${decoder::class}")
                
-               val value = decoder.decodeBsonValue()
-               if (value.isNull)
-                       return null
-               return Instant.ofEpochMilli(value.asDateTime().value)
+               return if (decoder.decodeNotNullMark())
+                       Instant.ofEpochMilli(decoder.decodeBsonValue().asDateTime().value)
+               else
+                       decoder.decodeNull()
+       }
+}
+
+object ObjectIdSerializer : KSerializer<ObjectId> {
+       override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("ObjectIdSerializer", PrimitiveKind.STRING)
+       
+       override fun serialize(encoder: Encoder, value: ObjectId) {
+               if (encoder !is BsonEncoder)
+                       throw SerializationException("ObjectId is not supported by ${encoder::class}")
+               
+               encoder.encodeObjectId(value)
+       }
+       
+       override fun deserialize(decoder: Decoder): ObjectId {
+               if (decoder !is BsonDecoder)
+                       throw SerializationException("ObjectId is not supported by ${decoder::class}")
+               
+               return decoder.decodeObjectId()
+       }
+}
+
+object ObjectIdNullableSerializer : KSerializer<ObjectId?> {
+       override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("ObjectIdSerializer", PrimitiveKind.STRING)
+       
+       override fun serialize(encoder: Encoder, value: ObjectId?) {
+               if (encoder !is BsonEncoder)
+                       throw SerializationException("ObjectId is not supported by ${encoder::class}")
+               
+               if (value == null)
+                       encoder.encodeBsonValue(BsonNull.VALUE)
+               else
+                       encoder.encodeObjectId(value)
+       }
+       
+       override fun deserialize(decoder: Decoder): ObjectId? {
+               if (decoder !is BsonDecoder)
+                       throw SerializationException("ObjectId is not supported by ${decoder::class}")
+               
+               return if (decoder.decodeNotNullMark())
+                       decoder.decodeObjectId()
+               else
+                       decoder.decodeNull()
        }
 }
diff --git a/src/jvmMain/kotlin/info/mechyrdia/data/data_files.kt b/src/jvmMain/kotlin/info/mechyrdia/data/data_files.kt
new file mode 100644 (file)
index 0000000..c0eb09e
--- /dev/null
@@ -0,0 +1,251 @@
+package info.mechyrdia.data
+
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Updates
+import com.mongodb.reactivestreams.client.gridfs.GridFSBucket
+import io.ktor.util.*
+import io.ktor.util.cio.*
+import io.ktor.utils.io.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.asFlow
+import kotlinx.coroutines.reactive.asPublisher
+import kotlinx.coroutines.reactive.awaitFirst
+import kotlinx.serialization.SerialName
+import kotlinx.serialization.Serializable
+import org.bson.types.ObjectId
+import java.io.File
+import java.time.Instant
+import kotlin.String
+
+@JvmInline
+value class StoragePath(val elements: List<String>) {
+       constructor(path: String) : this(path.split('/').filterNot(String::isEmpty))
+       
+       override fun toString(): String {
+               return elements.joinToString(separator = "/")
+       }
+}
+
+enum class StoredFileType {
+       FILE,
+       DIRECTORY;
+}
+
+data class StoredFileEntry(val name: String, val type: StoredFileType)
+
+interface FileStorage {
+       suspend fun prepare() = Unit
+       
+       suspend fun createDir(dir: StoragePath): Boolean
+       
+       suspend fun listDir(dir: StoragePath): Flow<StoredFileEntry>
+       
+       suspend fun deleteDir(dir: StoragePath): Boolean
+       
+       suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean
+       
+       suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean
+       
+       suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean
+       
+       suspend fun eraseFile(path: StoragePath): Boolean
+       
+       suspend fun performMaintenance() = Unit
+}
+
+class FlatFileStorage(val root: File) : FileStorage {
+       private fun resolveFile(path: StoragePath) = root.combineSafe(path.toString())
+       
+       private fun renderEntry(file: File) = StoredFileEntry(file.name, if (file.isFile) StoredFileType.FILE else StoredFileType.DIRECTORY)
+       
+       private fun createDir(file: File): Boolean {
+               if (file.isFile) return false
+               if (file.isDirectory) return true
+               
+               if (!file.parentFile.exists())
+                       if (!createDir(file.parentFile))
+                               return false
+               
+               file.mkdir()
+               return true
+       }
+       
+       override suspend fun prepare() {
+               withContext(Dispatchers.IO) {
+                       root.mkdirs()
+               }
+       }
+       
+       override suspend fun createDir(dir: StoragePath): Boolean {
+               return withContext(Dispatchers.IO) { createDir(resolveFile(dir)) }
+       }
+       
+       override suspend fun listDir(dir: StoragePath): Flow<StoredFileEntry> {
+               return withContext(Dispatchers.IO) { resolveFile(dir).listFiles()?.map { renderEntry(it) }.orEmpty().asFlow() }
+       }
+       
+       override suspend fun deleteDir(dir: StoragePath): Boolean {
+               val file = resolveFile(dir)
+               if (!file.isDirectory) return true
+               return withContext(Dispatchers.IO) { file.deleteRecursively() }
+       }
+       
+       override suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean {
+               val file = resolveFile(path)
+               if (!file.exists())
+                       if (!file.parentFile.isDirectory)
+                               return false
+               
+               withContext(Dispatchers.IO) {
+                       file.writeChannel().use { content.copyTo(this) }
+               }
+               
+               return true
+       }
+       
+       override suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean {
+               val file = resolveFile(path)
+               if (!file.isFile) return false
+               
+               withContext(Dispatchers.IO) {
+                       file.readChannel().copyTo(content)
+               }
+               
+               return true
+       }
+       
+       override suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean {
+               val sourceFile = resolveFile(source)
+               val targetFile = resolveFile(target)
+               
+               if (!sourceFile.isFile) return false
+               if (targetFile.exists()) return false
+               
+               withContext(Dispatchers.IO) {
+                       sourceFile.copyTo(targetFile)
+               }
+               
+               return true
+       }
+       
+       override suspend fun eraseFile(path: StoragePath): Boolean {
+               val file = resolveFile(path)
+               if (!file.isFile) return true
+               return withContext(Dispatchers.IO) { file.delete() }
+       }
+}
+
+@Serializable
+private data class GridFsEntry(
+       @SerialName(MONGODB_ID_KEY)
+       override val id: Id<GridFsEntry>,
+       val path: String,
+       val file: @Serializable(with = ObjectIdSerializer::class) ObjectId,
+       val created: @Serializable(with = InstantSerializer::class) Instant,
+       val updated: @Serializable(with = InstantSerializer::class) Instant,
+) : DataDocument<GridFsEntry>
+
+private class GridFsStorage(val table: DocumentTable<GridFsEntry>, val bucket: GridFSBucket) : FileStorage {
+       private suspend fun getExact(path: String) = table.locate(Filters.eq(GridFsEntry::path.serialName, path))
+       private suspend fun updateExact(path: String, newFile: ObjectId) {
+               val now = Instant.now()
+               
+               table.change(
+                       Filters.eq(GridFsEntry::path.serialName, path),
+                       Updates.combine(
+                               Updates.set(GridFsEntry::file.serialName, newFile),
+                               Updates.set(GridFsEntry::updated.serialName, now),
+                               Updates.setOnInsert(GridFsEntry::created.serialName, now),
+                               Updates.setOnInsert(GridFsEntry::id.serialName, Id<GridFsEntry>())
+                       )
+               )
+       }
+       
+       private suspend fun getPrefix(path: String) = table.filter(Filters.regex(GridFsEntry::path.serialName, "^${Regex.fromLiteral(path)}"))
+       private suspend fun deletePrefix(path: String) = table.remove(Filters.regex(GridFsEntry::path.serialName, "^${Regex.fromLiteral(path)}"))
+       
+       private fun toExactPath(path: StoragePath) = "/$path"
+       private fun toPrefixPath(path: StoragePath) = "/$path/"
+       
+       override suspend fun prepare() {
+               table.unique(GridFsEntry::path)
+       }
+       
+       override suspend fun createDir(dir: StoragePath): Boolean {
+               return coroutineScope {
+                       dir.elements.indices.map { index ->
+                               async {
+                                       getExact(toExactPath(StoragePath(dir.elements.take(index)))) != null
+                               }
+                       }.awaitAll().none { it }
+               }
+       }
+       
+       override suspend fun listDir(dir: StoragePath): Flow<StoredFileEntry> {
+               val prefixPath = toPrefixPath(dir)
+               return getPrefix(prefixPath).map {
+                       val subPath = it.path.removePrefix(prefixPath)
+                       if (subPath.contains('/'))
+                               StoredFileEntry(subPath.substringBefore('/'), StoredFileType.DIRECTORY)
+                       else
+                               StoredFileEntry(subPath, StoredFileType.FILE)
+               }.distinctBy { it.name }
+       }
+       
+       override suspend fun deleteDir(dir: StoragePath): Boolean {
+               deletePrefix(toPrefixPath(dir))
+               return true
+       }
+       
+       override suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean {
+               if (getPrefix(toPrefixPath(path)).count() > 0) return false
+               
+               val bytesPublisher = flow {
+                       content.consumeEachBufferRange { buffer, last ->
+                               emit(buffer.copy())
+                               !last
+                       }
+               }.asPublisher(CoroutineName("grid-fs-writer") + Dispatchers.IO)
+               
+               val newId = bucket.uploadFromPublisher(path.elements.last(), bytesPublisher).awaitFirst()
+               updateExact(toExactPath(path), newId)
+               return true
+       }
+       
+       override suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean {
+               val file = getExact(toExactPath(path)) ?: return false
+               val gridFsId = file.file
+               
+               bucket.downloadToPublisher(gridFsId).asFlow().collect { buffer ->
+                       content.writeFully(buffer)
+               }
+               
+               return true
+       }
+       
+       override suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean {
+               val sourceFile = getExact(toExactPath(source)) ?: return false
+               updateExact(toExactPath(target), sourceFile.file)
+               return true
+       }
+       
+       override suspend fun eraseFile(path: StoragePath): Boolean {
+               val file = getExact(toExactPath(path)) ?: return false
+               bucket.delete(file.file).awaitFirst()
+               table.del(file.id)
+               return true
+       }
+       
+       override suspend fun performMaintenance() {
+               val allUsedIds = table.all().map { it.file }.toSet()
+               val unusedFiles = bucket.find(Filters.nin(MONGODB_ID_KEY, allUsedIds)).asFlow().map { it.objectId }.toSet()
+               coroutineScope {
+                       unusedFiles.map { unusedFile ->
+                               launch {
+                                       bucket.delete(unusedFile).awaitFirst()
+                               }
+                       }.joinAll()
+               }
+       }
+}
diff --git a/src/jvmMain/kotlin/info/mechyrdia/data/data_flow.kt b/src/jvmMain/kotlin/info/mechyrdia/data/data_flow.kt
new file mode 100644 (file)
index 0000000..80f541c
--- /dev/null
@@ -0,0 +1,21 @@
+package info.mechyrdia.data
+
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.FlowCollector
+
+fun <T> Flow<T>.distinct(): Flow<T> = DistinctSetFlow(this) { it }
+fun <T, K> Flow<T>.distinctBy(keySelector: (T) -> K): Flow<T> = DistinctSetFlow(this, keySelector)
+
+private class DistinctSetFlow<T, K>(
+       private val upstream: Flow<T>,
+       private val keySelector: (T) -> K
+) : Flow<T> {
+       override suspend fun collect(collector: FlowCollector<T>) {
+               val previousKeys = mutableSetOf<K>()
+               upstream.collect { value ->
+                       val key = keySelector(value)
+                       if (previousKeys.add(key))
+                               collector.emit(value)
+               }
+       }
+}