Await data in parallel

This commit is contained in:
Andrey Stoyan 2022-05-10 10:31:26 +03:00
parent f62507e1b9
commit 07a0bd551b

View File

@ -1,5 +1,7 @@
package space.kscience.dataforge.distributed.serialization package space.kscience.dataforge.distributed.serialization
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
@ -29,11 +31,11 @@ internal data class DataSetPrototype(val data: Map<String, DataPrototype>) {
companion object { companion object {
fun <T : Any> of(dataSet: DataSet<T>): DataSetPrototype = runBlocking { fun <T : Any> of(dataSet: DataSet<T>): DataSetPrototype = runBlocking {
val serializer = serializer(dataSet.dataType) val serializer = serializer(dataSet.dataType)
val map = mutableListOf<Pair<String, DataPrototype>>() val map = mutableListOf<Pair<String, Deferred<DataPrototype>>>()
dataSet.flowData().map { (name, data) -> dataSet.flowData().map { (name, data) ->
name.toString() to DataPrototype.of(data, serializer) name.toString() to async { DataPrototype.of(data, serializer) }
}.toList(map) }.toList(map)
DataSetPrototype(map.associate { it }) DataSetPrototype(map.associate { (name, deferred) -> name to deferred.await() })
} }
fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string)