+@file:OptIn(ExperimentalSerializationApi::class)
+
package info.mechyrdia.data
+import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
+import kotlinx.serialization.Serializable
import kotlinx.serialization.SerializationException
+import kotlinx.serialization.builtins.ListSerializer
import kotlinx.serialization.descriptors.PrimitiveKind
import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
import kotlinx.serialization.descriptors.SerialDescriptor
import org.bson.codecs.configuration.CodecRegistry
import org.bson.codecs.kotlinx.BsonDecoder
import org.bson.codecs.kotlinx.BsonEncoder
+import org.bson.types.ObjectId
import java.time.Instant
object IdCodec : Codec<Id<*>> {
}
}
+@Serializable
+data class MongoDbMapEntry<K, V>(val key: K, val value: V)
+
+class MongoDbMapSerializer<K, V>(val keySerializer: KSerializer<K>, val valueSerializer: KSerializer<V>) : KSerializer<Map<K, V>> {
+ private val innerSerializer = ListSerializer(MongoDbMapEntry.serializer(keySerializer, valueSerializer))
+
+ override val descriptor: SerialDescriptor = innerSerializer.descriptor
+
+ override fun serialize(encoder: Encoder, value: Map<K, V>) {
+ innerSerializer.serialize(encoder, value.map { MongoDbMapEntry(it.key, it.value) })
+ }
+
+ override fun deserialize(decoder: Decoder): Map<K, V> {
+ return innerSerializer.deserialize(decoder).associate { it.key to it.value }
+ }
+}
+
object InstantSerializer : KSerializer<Instant> {
- override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.STRING)
+ override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.LONG)
override fun serialize(encoder: Encoder, value: Instant) {
if (encoder !is BsonEncoder)
}
object InstantNullableSerializer : KSerializer<Instant?> {
- override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.STRING)
+ override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("InstantSerializer", PrimitiveKind.LONG)
override fun serialize(encoder: Encoder, value: Instant?) {
if (encoder !is BsonEncoder)
if (decoder !is BsonDecoder)
throw SerializationException("Instant is not supported by ${decoder::class}")
- val value = decoder.decodeBsonValue()
- if (value.isNull)
- return null
- return Instant.ofEpochMilli(value.asDateTime().value)
+ return if (decoder.decodeNotNullMark())
+ Instant.ofEpochMilli(decoder.decodeBsonValue().asDateTime().value)
+ else
+ decoder.decodeNull()
+ }
+}
+
+object ObjectIdSerializer : KSerializer<ObjectId> {
+ override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("ObjectIdSerializer", PrimitiveKind.STRING)
+
+ override fun serialize(encoder: Encoder, value: ObjectId) {
+ if (encoder !is BsonEncoder)
+ throw SerializationException("ObjectId is not supported by ${encoder::class}")
+
+ encoder.encodeObjectId(value)
+ }
+
+ override fun deserialize(decoder: Decoder): ObjectId {
+ if (decoder !is BsonDecoder)
+ throw SerializationException("ObjectId is not supported by ${decoder::class}")
+
+ return decoder.decodeObjectId()
+ }
+}
+
+object ObjectIdNullableSerializer : KSerializer<ObjectId?> {
+ override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("ObjectIdSerializer", PrimitiveKind.STRING)
+
+ override fun serialize(encoder: Encoder, value: ObjectId?) {
+ if (encoder !is BsonEncoder)
+ throw SerializationException("ObjectId is not supported by ${encoder::class}")
+
+ if (value == null)
+ encoder.encodeBsonValue(BsonNull.VALUE)
+ else
+ encoder.encodeObjectId(value)
+ }
+
+ override fun deserialize(decoder: Decoder): ObjectId? {
+ if (decoder !is BsonDecoder)
+ throw SerializationException("ObjectId is not supported by ${decoder::class}")
+
+ return if (decoder.decodeNotNullMark())
+ decoder.decodeObjectId()
+ else
+ decoder.decodeNull()
}
}
--- /dev/null
+package info.mechyrdia.data
+
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Updates
+import com.mongodb.reactivestreams.client.gridfs.GridFSBucket
+import io.ktor.util.*
+import io.ktor.util.cio.*
+import io.ktor.utils.io.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.asFlow
+import kotlinx.coroutines.reactive.asPublisher
+import kotlinx.coroutines.reactive.awaitFirst
+import kotlinx.serialization.SerialName
+import kotlinx.serialization.Serializable
+import org.bson.types.ObjectId
+import java.io.File
+import java.time.Instant
+import kotlin.String
+
+@JvmInline
+value class StoragePath(val elements: List<String>) {
+ constructor(path: String) : this(path.split('/').filterNot(String::isEmpty))
+
+ override fun toString(): String {
+ return elements.joinToString(separator = "/")
+ }
+}
+
+enum class StoredFileType {
+ FILE,
+ DIRECTORY;
+}
+
+data class StoredFileEntry(val name: String, val type: StoredFileType)
+
+interface FileStorage {
+ suspend fun prepare() = Unit
+
+ suspend fun createDir(dir: StoragePath): Boolean
+
+ suspend fun listDir(dir: StoragePath): Flow<StoredFileEntry>
+
+ suspend fun deleteDir(dir: StoragePath): Boolean
+
+ suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean
+
+ suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean
+
+ suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean
+
+ suspend fun eraseFile(path: StoragePath): Boolean
+
+ suspend fun performMaintenance() = Unit
+}
+
+class FlatFileStorage(val root: File) : FileStorage {
+ private fun resolveFile(path: StoragePath) = root.combineSafe(path.toString())
+
+ private fun renderEntry(file: File) = StoredFileEntry(file.name, if (file.isFile) StoredFileType.FILE else StoredFileType.DIRECTORY)
+
+ private fun createDir(file: File): Boolean {
+ if (file.isFile) return false
+ if (file.isDirectory) return true
+
+ if (!file.parentFile.exists())
+ if (!createDir(file.parentFile))
+ return false
+
+ file.mkdir()
+ return true
+ }
+
+ override suspend fun prepare() {
+ withContext(Dispatchers.IO) {
+ root.mkdirs()
+ }
+ }
+
+ override suspend fun createDir(dir: StoragePath): Boolean {
+ return withContext(Dispatchers.IO) { createDir(resolveFile(dir)) }
+ }
+
+ override suspend fun listDir(dir: StoragePath): Flow<StoredFileEntry> {
+ return withContext(Dispatchers.IO) { resolveFile(dir).listFiles()?.map { renderEntry(it) }.orEmpty().asFlow() }
+ }
+
+ override suspend fun deleteDir(dir: StoragePath): Boolean {
+ val file = resolveFile(dir)
+ if (!file.isDirectory) return true
+ return withContext(Dispatchers.IO) { file.deleteRecursively() }
+ }
+
+ override suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean {
+ val file = resolveFile(path)
+ if (!file.exists())
+ if (!file.parentFile.isDirectory)
+ return false
+
+ withContext(Dispatchers.IO) {
+ file.writeChannel().use { content.copyTo(this) }
+ }
+
+ return true
+ }
+
+ override suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean {
+ val file = resolveFile(path)
+ if (!file.isFile) return false
+
+ withContext(Dispatchers.IO) {
+ file.readChannel().copyTo(content)
+ }
+
+ return true
+ }
+
+ override suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean {
+ val sourceFile = resolveFile(source)
+ val targetFile = resolveFile(target)
+
+ if (!sourceFile.isFile) return false
+ if (targetFile.exists()) return false
+
+ withContext(Dispatchers.IO) {
+ sourceFile.copyTo(targetFile)
+ }
+
+ return true
+ }
+
+ override suspend fun eraseFile(path: StoragePath): Boolean {
+ val file = resolveFile(path)
+ if (!file.isFile) return true
+ return withContext(Dispatchers.IO) { file.delete() }
+ }
+}
+
+@Serializable
+private data class GridFsEntry(
+ @SerialName(MONGODB_ID_KEY)
+ override val id: Id<GridFsEntry>,
+ val path: String,
+ val file: @Serializable(with = ObjectIdSerializer::class) ObjectId,
+ val created: @Serializable(with = InstantSerializer::class) Instant,
+ val updated: @Serializable(with = InstantSerializer::class) Instant,
+) : DataDocument<GridFsEntry>
+
+private class GridFsStorage(val table: DocumentTable<GridFsEntry>, val bucket: GridFSBucket) : FileStorage {
+ private suspend fun getExact(path: String) = table.locate(Filters.eq(GridFsEntry::path.serialName, path))
+ private suspend fun updateExact(path: String, newFile: ObjectId) {
+ val now = Instant.now()
+
+ table.change(
+ Filters.eq(GridFsEntry::path.serialName, path),
+ Updates.combine(
+ Updates.set(GridFsEntry::file.serialName, newFile),
+ Updates.set(GridFsEntry::updated.serialName, now),
+ Updates.setOnInsert(GridFsEntry::created.serialName, now),
+ Updates.setOnInsert(GridFsEntry::id.serialName, Id<GridFsEntry>())
+ )
+ )
+ }
+
+ private suspend fun getPrefix(path: String) = table.filter(Filters.regex(GridFsEntry::path.serialName, "^${Regex.fromLiteral(path)}"))
+ private suspend fun deletePrefix(path: String) = table.remove(Filters.regex(GridFsEntry::path.serialName, "^${Regex.fromLiteral(path)}"))
+
+ private fun toExactPath(path: StoragePath) = "/$path"
+ private fun toPrefixPath(path: StoragePath) = "/$path/"
+
+ override suspend fun prepare() {
+ table.unique(GridFsEntry::path)
+ }
+
+ override suspend fun createDir(dir: StoragePath): Boolean {
+ return coroutineScope {
+ dir.elements.indices.map { index ->
+ async {
+ getExact(toExactPath(StoragePath(dir.elements.take(index)))) != null
+ }
+ }.awaitAll().none { it }
+ }
+ }
+
+ override suspend fun listDir(dir: StoragePath): Flow<StoredFileEntry> {
+ val prefixPath = toPrefixPath(dir)
+ return getPrefix(prefixPath).map {
+ val subPath = it.path.removePrefix(prefixPath)
+ if (subPath.contains('/'))
+ StoredFileEntry(subPath.substringBefore('/'), StoredFileType.DIRECTORY)
+ else
+ StoredFileEntry(subPath, StoredFileType.FILE)
+ }.distinctBy { it.name }
+ }
+
+ override suspend fun deleteDir(dir: StoragePath): Boolean {
+ deletePrefix(toPrefixPath(dir))
+ return true
+ }
+
+ override suspend fun writeFile(path: StoragePath, content: ByteReadChannel): Boolean {
+ if (getPrefix(toPrefixPath(path)).count() > 0) return false
+
+ val bytesPublisher = flow {
+ content.consumeEachBufferRange { buffer, last ->
+ emit(buffer.copy())
+ !last
+ }
+ }.asPublisher(CoroutineName("grid-fs-writer") + Dispatchers.IO)
+
+ val newId = bucket.uploadFromPublisher(path.elements.last(), bytesPublisher).awaitFirst()
+ updateExact(toExactPath(path), newId)
+ return true
+ }
+
+ override suspend fun readFile(path: StoragePath, content: ByteWriteChannel): Boolean {
+ val file = getExact(toExactPath(path)) ?: return false
+ val gridFsId = file.file
+
+ bucket.downloadToPublisher(gridFsId).asFlow().collect { buffer ->
+ content.writeFully(buffer)
+ }
+
+ return true
+ }
+
+ override suspend fun copyFile(source: StoragePath, target: StoragePath): Boolean {
+ val sourceFile = getExact(toExactPath(source)) ?: return false
+ updateExact(toExactPath(target), sourceFile.file)
+ return true
+ }
+
+ override suspend fun eraseFile(path: StoragePath): Boolean {
+ val file = getExact(toExactPath(path)) ?: return false
+ bucket.delete(file.file).awaitFirst()
+ table.del(file.id)
+ return true
+ }
+
+ override suspend fun performMaintenance() {
+ val allUsedIds = table.all().map { it.file }.toSet()
+ val unusedFiles = bucket.find(Filters.nin(MONGODB_ID_KEY, allUsedIds)).asFlow().map { it.objectId }.toSet()
+ coroutineScope {
+ unusedFiles.map { unusedFile ->
+ launch {
+ bucket.delete(unusedFile).awaitFirst()
+ }
+ }.joinAll()
+ }
+ }
+}