Make dataset serialization non-blocking

This commit is contained in:
Andrey Stoyan 2022-05-10 10:49:38 +03:00
parent 07a0bd551b
commit 230a3f1e22
2 changed files with 9 additions and 10 deletions

View File

@ -5,6 +5,7 @@ import io.lambdarpc.coding.CodingContext
import io.lambdarpc.transport.grpc.Entity import io.lambdarpc.transport.grpc.Entity
import io.lambdarpc.transport.serialization.Entity import io.lambdarpc.transport.serialization.Entity
import io.lambdarpc.transport.serialization.RawData import io.lambdarpc.transport.serialization.RawData
import kotlinx.coroutines.runBlocking
import java.nio.charset.Charset import java.nio.charset.Charset
internal object DataSetCoder : Coder<SerializableDataSet<Any>> { internal object DataSetCoder : Coder<SerializableDataSet<Any>> {
@ -15,7 +16,7 @@ internal object DataSetCoder : Coder<SerializableDataSet<Any>> {
} }
override fun encode(value: SerializableDataSet<Any>, context: CodingContext): Entity { override fun encode(value: SerializableDataSet<Any>, context: CodingContext): Entity {
val prototype = DataSetPrototype.of(value) val prototype = runBlocking { DataSetPrototype.of(value) } // TODO update LambdaRPC and remove blocking
val string = prototype.toJson() val string = prototype.toJson()
return Entity(RawData.copyFrom(string, Charset.defaultCharset())) return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
} }

View File

@ -2,11 +2,11 @@ package space.kscience.dataforge.distributed.serialization
import kotlinx.coroutines.Deferred import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer import kotlinx.serialization.serializer
@ -29,13 +29,13 @@ internal data class DataSetPrototype(val data: Map<String, DataPrototype>) {
fun toJson(): String = Json.encodeToString(serializer(), this) fun toJson(): String = Json.encodeToString(serializer(), this)
companion object { companion object {
fun <T : Any> of(dataSet: DataSet<T>): DataSetPrototype = runBlocking { suspend fun <T : Any> of(dataSet: DataSet<T>): DataSetPrototype = coroutineScope {
val serializer = serializer(dataSet.dataType) val serializer = serializer(dataSet.dataType)
val map = mutableListOf<Pair<String, Deferred<DataPrototype>>>() val flow = mutableListOf<Pair<String, Deferred<DataPrototype>>>()
dataSet.flowData().map { (name, data) -> dataSet.flowData().map { (name, data) ->
name.toString() to async { DataPrototype.of(data, serializer) } name.toString() to async { DataPrototype.of(data, serializer) }
}.toList(map) }.toList(flow)
DataSetPrototype(map.associate { (name, deferred) -> name to deferred.await() }) DataSetPrototype(flow.associate { (name, deferred) -> name to deferred.await() })
} }
fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string)
@ -68,8 +68,6 @@ private class SerializableDataSetImpl(private val prototype: DataSetPrototype) :
/** /**
* Trivial named data implementation. * Trivial named data implementation.
*/ */
private class SimpleNamedData( private class SimpleNamedData(override val name: Name, override val data: Data<Any>) :
override val name: Name, NamedData<Any>, Data<Any> by data
override val data: Data<Any>,
) : NamedData<Any>, Data<Any> by data
} }