diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt index 1bb218b1..02392235 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt @@ -5,6 +5,7 @@ import io.lambdarpc.coding.CodingContext import io.lambdarpc.transport.grpc.Entity import io.lambdarpc.transport.serialization.Entity import io.lambdarpc.transport.serialization.RawData +import kotlinx.coroutines.runBlocking import java.nio.charset.Charset internal object DataSetCoder : Coder> { @@ -15,7 +16,7 @@ internal object DataSetCoder : Coder> { } override fun encode(value: SerializableDataSet, context: CodingContext): Entity { - val prototype = DataSetPrototype.of(value) + val prototype = runBlocking { DataSetPrototype.of(value) } // TODO update LambdaRPC and remove blocking val string = prototype.toJson() return Entity(RawData.copyFrom(string, Charset.defaultCharset())) } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt index dad6fa1b..f30aa0b2 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -2,11 +2,11 @@ package space.kscience.dataforge.distributed.serialization import kotlinx.coroutines.Deferred import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.runBlocking import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import kotlinx.serialization.serializer @@ -29,13 +29,13 @@ internal data class DataSetPrototype(val data: Map) { fun toJson(): String = Json.encodeToString(serializer(), this) companion object { - fun of(dataSet: DataSet): DataSetPrototype = runBlocking { + suspend fun of(dataSet: DataSet): DataSetPrototype = coroutineScope { val serializer = serializer(dataSet.dataType) - val map = mutableListOf>>() + val flow = mutableListOf>>() dataSet.flowData().map { (name, data) -> name.toString() to async { DataPrototype.of(data, serializer) } - }.toList(map) - DataSetPrototype(map.associate { (name, deferred) -> name to deferred.await() }) + }.toList(flow) + DataSetPrototype(flow.associate { (name, deferred) -> name to deferred.await() }) } fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) @@ -68,8 +68,6 @@ private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : /** * Trivial named data implementation. */ - private class SimpleNamedData( - override val name: Name, - override val data: Data, - ) : NamedData, Data by data + private class SimpleNamedData(override val name: Name, override val data: Data) : + NamedData, Data by data }