From: Lanius Trolling Date: Sun, 7 Apr 2024 22:05:37 +0000 (-0400) Subject: Create generic file-storage API X-Git-Url: https://gitweb.starshipfights.net/?a=commitdiff_plain;h=2242f6e6b21b58474c88dd4431387772b88ce0e9;p=factbooks Create generic file-storage API --- diff --git a/src/jvmMain/kotlin/info/mechyrdia/data/bson.kt b/src/jvmMain/kotlin/info/mechyrdia/data/bson.kt index 1c82cb6..8eca341 100644 --- a/src/jvmMain/kotlin/info/mechyrdia/data/bson.kt +++ b/src/jvmMain/kotlin/info/mechyrdia/data/bson.kt @@ -1,7 +1,12 @@ +@file:OptIn(ExperimentalSerializationApi::class) + package info.mechyrdia.data +import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable import kotlinx.serialization.SerializationException +import kotlinx.serialization.builtins.ListSerializer import kotlinx.serialization.descriptors.PrimitiveKind import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor import kotlinx.serialization.descriptors.SerialDescriptor @@ -18,6 +23,7 @@ import org.bson.codecs.configuration.CodecProvider import org.bson.codecs.configuration.CodecRegistry import org.bson.codecs.kotlinx.BsonDecoder import org.bson.codecs.kotlinx.BsonEncoder +import org.bson.types.ObjectId import java.time.Instant object IdCodec : Codec> { @@ -43,8 +49,25 @@ object IdCodecProvider : CodecProvider { } } +@Serializable +data class MongoDbMapEntry(val key: K, val value: V) + +class MongoDbMapSerializer(val keySerializer: KSerializer, val valueSerializer: KSerializer) : KSerializer> { + private val innerSerializer = ListSerializer(MongoDbMapEntry.serializer(keySerializer, valueSerializer)) + + override val descriptor: SerialDescriptor = innerSerializer.descriptor + + override fun serialize(encoder: Encoder, value: Map) { + innerSerializer.serialize(encoder, value.map { MongoDbMapEntry(it.key, it.value) }) + } + + override fun deserialize(decoder: Decoder): Map { + return innerSerializer.deserialize(decoder).associate { it.key to it.value } + } +} + object InstantSerializer : KSerializer { - override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.STRING) + override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.LONG) override fun serialize(encoder: Encoder, value: Instant) { if (encoder !is BsonEncoder) @@ -62,7 +85,7 @@ object InstantSerializer : KSerializer { } object InstantNullableSerializer : KSerializer { - override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.STRING) + override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.LONG) override fun serialize(encoder: Encoder, value: Instant?) { if (encoder !is BsonEncoder) @@ -78,9 +101,51 @@ object InstantNullableSerializer : KSerializer { if (decoder !is BsonDecoder) throw SerializationException("Instant is not supported by ${decoder::class}") - val value = decoder.decodeBsonValue() - if (value.isNull) - return null - return Instant.ofEpochMilli(value.asDateTime().value) + return if (decoder.decodeNotNullMark()) + Instant.ofEpochMilli(decoder.decodeBsonValue().asDateTime().value) + else + decoder.decodeNull() + } +} + +object ObjectIdSerializer : KSerializer { + override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("ObjectIdSerializer", PrimitiveKind.STRING) + + override fun serialize(encoder: Encoder, value: ObjectId) { + if (encoder !is BsonEncoder) + throw SerializationException("ObjectId is not supported by ${encoder::class}") + + encoder.encodeObjectId(value) + } + + override fun deserialize(decoder: Decoder): ObjectId { + if (decoder !is BsonDecoder) + throw SerializationException("ObjectId is not supported by ${decoder::class}") + + return decoder.decodeObjectId() + } +} + +object ObjectIdNullableSerializer : KSerializer { + override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("ObjectIdSerializer", PrimitiveKind.STRING) + + override fun serialize(encoder: Encoder, value: ObjectId?) { + if (encoder !is BsonEncoder) + throw SerializationException("ObjectId is not supported by ${encoder::class}") + + if (value == null) + encoder.encodeBsonValue(BsonNull.VALUE) + else + encoder.encodeObjectId(value) + } + + override fun deserialize(decoder: Decoder): ObjectId? { + if (decoder !is BsonDecoder) + throw SerializationException("ObjectId is not supported by ${decoder::class}") + + return if (decoder.decodeNotNullMark()) + decoder.decodeObjectId() + else + decoder.decodeNull() } } diff --git a/src/jvmMain/kotlin/info/mechyrdia/data/data_files.kt b/src/jvmMain/kotlin/info/mechyrdia/data/data_files.kt new file mode 100644 index 0000000..c0eb09e --- /dev/null +++ b/src/jvmMain/kotlin/info/mechyrdia/data/data_files.kt @@ -0,0 +1,251 @@ +package info.mechyrdia.data + +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import com.mongodb.reactivestreams.client.gridfs.GridFSBucket +import io.ktor.util.* +import io.ktor.util.cio.* +import io.ktor.utils.io.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import org.bson.types.ObjectId +import java.io.File +import java.time.Instant +import kotlin.String + +@JvmInline +value class StoragePath(val elements: List) { + constructor(path: String) : this(path.split('/').filterNot(String::isEmpty)) + + override fun toString(): String { + return elements.joinToString(separator = "/") + } +} + +enum class StoredFileType { + FILE, + DIRECTORY; +} + +data class StoredFileEntry(val name: String, val type: StoredFileType) + +interface FileStorage { + suspend fun prepare() = Unit + + suspend fun createDir(dir: StoragePath): Boolean + + suspend fun listDir(dir: StoragePath): Flow + + suspend fun deleteDir(dir: StoragePath): Boolean + + suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean + + suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean + + suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean + + suspend fun eraseFile(path: StoragePath): Boolean + + suspend fun performMaintenance() = Unit +} + +class FlatFileStorage(val root: File) : FileStorage { + private fun resolveFile(path: StoragePath) = root.combineSafe(path.toString()) + + private fun renderEntry(file: File) = StoredFileEntry(file.name, if (file.isFile) StoredFileType.FILE else StoredFileType.DIRECTORY) + + private fun createDir(file: File): Boolean { + if (file.isFile) return false + if (file.isDirectory) return true + + if (!file.parentFile.exists()) + if (!createDir(file.parentFile)) + return false + + file.mkdir() + return true + } + + override suspend fun prepare() { + withContext(Dispatchers.IO) { + root.mkdirs() + } + } + + override suspend fun createDir(dir: StoragePath): Boolean { + return withContext(Dispatchers.IO) { createDir(resolveFile(dir)) } + } + + override suspend fun listDir(dir: StoragePath): Flow { + return withContext(Dispatchers.IO) { resolveFile(dir).listFiles()?.map { renderEntry(it) }.orEmpty().asFlow() } + } + + override suspend fun deleteDir(dir: StoragePath): Boolean { + val file = resolveFile(dir) + if (!file.isDirectory) return true + return withContext(Dispatchers.IO) { file.deleteRecursively() } + } + + override suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean { + val file = resolveFile(path) + if (!file.exists()) + if (!file.parentFile.isDirectory) + return false + + withContext(Dispatchers.IO) { + file.writeChannel().use { content.copyTo(this) } + } + + return true + } + + override suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean { + val file = resolveFile(path) + if (!file.isFile) return false + + withContext(Dispatchers.IO) { + file.readChannel().copyTo(content) + } + + return true + } + + override suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean { + val sourceFile = resolveFile(source) + val targetFile = resolveFile(target) + + if (!sourceFile.isFile) return false + if (targetFile.exists()) return false + + withContext(Dispatchers.IO) { + sourceFile.copyTo(targetFile) + } + + return true + } + + override suspend fun eraseFile(path: StoragePath): Boolean { + val file = resolveFile(path) + if (!file.isFile) return true + return withContext(Dispatchers.IO) { file.delete() } + } +} + +@Serializable +private data class GridFsEntry( + @SerialName(MONGODB_ID_KEY) + override val id: Id, + val path: String, + val file: @Serializable(with = ObjectIdSerializer::class) ObjectId, + val created: @Serializable(with = InstantSerializer::class) Instant, + val updated: @Serializable(with = InstantSerializer::class) Instant, +) : DataDocument + +private class GridFsStorage(val table: DocumentTable, val bucket: GridFSBucket) : FileStorage { + private suspend fun getExact(path: String) = table.locate(Filters.eq(GridFsEntry::path.serialName, path)) + private suspend fun updateExact(path: String, newFile: ObjectId) { + val now = Instant.now() + + table.change( + Filters.eq(GridFsEntry::path.serialName, path), + Updates.combine( + Updates.set(GridFsEntry::file.serialName, newFile), + Updates.set(GridFsEntry::updated.serialName, now), + Updates.setOnInsert(GridFsEntry::created.serialName, now), + Updates.setOnInsert(GridFsEntry::id.serialName, Id()) + ) + ) + } + + private suspend fun getPrefix(path: String) = table.filter(Filters.regex(GridFsEntry::path.serialName, "^${Regex.fromLiteral(path)}")) + private suspend fun deletePrefix(path: String) = table.remove(Filters.regex(GridFsEntry::path.serialName, "^${Regex.fromLiteral(path)}")) + + private fun toExactPath(path: StoragePath) = "/$path" + private fun toPrefixPath(path: StoragePath) = "/$path/" + + override suspend fun prepare() { + table.unique(GridFsEntry::path) + } + + override suspend fun createDir(dir: StoragePath): Boolean { + return coroutineScope { + dir.elements.indices.map { index -> + async { + getExact(toExactPath(StoragePath(dir.elements.take(index)))) != null + } + }.awaitAll().none { it } + } + } + + override suspend fun listDir(dir: StoragePath): Flow { + val prefixPath = toPrefixPath(dir) + return getPrefix(prefixPath).map { + val subPath = it.path.removePrefix(prefixPath) + if (subPath.contains('/')) + StoredFileEntry(subPath.substringBefore('/'), StoredFileType.DIRECTORY) + else + StoredFileEntry(subPath, StoredFileType.FILE) + }.distinctBy { it.name } + } + + override suspend fun deleteDir(dir: StoragePath): Boolean { + deletePrefix(toPrefixPath(dir)) + return true + } + + override suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean { + if (getPrefix(toPrefixPath(path)).count() > 0) return false + + val bytesPublisher = flow { + content.consumeEachBufferRange { buffer, last -> + emit(buffer.copy()) + !last + } + }.asPublisher(CoroutineName("grid-fs-writer") + Dispatchers.IO) + + val newId = bucket.uploadFromPublisher(path.elements.last(), bytesPublisher).awaitFirst() + updateExact(toExactPath(path), newId) + return true + } + + override suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean { + val file = getExact(toExactPath(path)) ?: return false + val gridFsId = file.file + + bucket.downloadToPublisher(gridFsId).asFlow().collect { buffer -> + content.writeFully(buffer) + } + + return true + } + + override suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean { + val sourceFile = getExact(toExactPath(source)) ?: return false + updateExact(toExactPath(target), sourceFile.file) + return true + } + + override suspend fun eraseFile(path: StoragePath): Boolean { + val file = getExact(toExactPath(path)) ?: return false + bucket.delete(file.file).awaitFirst() + table.del(file.id) + return true + } + + override suspend fun performMaintenance() { + val allUsedIds = table.all().map { it.file }.toSet() + val unusedFiles = bucket.find(Filters.nin(MONGODB_ID_KEY, allUsedIds)).asFlow().map { it.objectId }.toSet() + coroutineScope { + unusedFiles.map { unusedFile -> + launch { + bucket.delete(unusedFile).awaitFirst() + } + }.joinAll() + } + } +} diff --git a/src/jvmMain/kotlin/info/mechyrdia/data/data_flow.kt b/src/jvmMain/kotlin/info/mechyrdia/data/data_flow.kt new file mode 100644 index 0000000..80f541c --- /dev/null +++ b/src/jvmMain/kotlin/info/mechyrdia/data/data_flow.kt @@ -0,0 +1,21 @@ +package info.mechyrdia.data + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector + +fun Flow.distinct(): Flow = DistinctSetFlow(this) { it } +fun Flow.distinctBy(keySelector: (T) -> K): Flow = DistinctSetFlow(this, keySelector) + +private class DistinctSetFlow( + private val upstream: Flow, + private val keySelector: (T) -> K +) : Flow { + override suspend fun collect(collector: FlowCollector) { + val previousKeys = mutableSetOf() + upstream.collect { value -> + val key = keySelector(value) + if (previousKeys.add(key)) + collector.emit(value) + } + } +}