Send meta too

This commit is contained in:
Andrey Stoyan 2022-05-09 20:22:17 +03:00
parent a4044c82a0
commit e9d4683f9b
2 changed files with 54 additions and 19 deletions

View File

@ -29,7 +29,7 @@ internal class RemoteTask<T : Any>(
taskName: Name, taskName: Name,
taskMeta: Meta, taskMeta: Meta,
): TaskResult<T> = withContext(dispatcher) { ): TaskResult<T> = withContext(dispatcher) {
val dataset = ServiceWorkspace.execute(taskName) as LazyDecodableDataSet val dataset = ServiceWorkspace.execute(taskName)
dataset.finishDecoding(resultType) dataset.finishDecoding(resultType)
workspace.wrapResult(dataset as DataSet<T>, taskName, taskMeta) workspace.wrapResult(dataset as DataSet<T>, taskName, taskMeta)
} }

View File

@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer import kotlinx.serialization.serializer
@ -34,6 +35,7 @@ import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.component1 import space.kscience.dataforge.data.component1
import space.kscience.dataforge.data.component2 import space.kscience.dataforge.data.component2
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaSerializer
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.Task import space.kscience.dataforge.workspace.Task
@ -60,7 +62,10 @@ public class ServiceWorkspace(
get() = context.gather(Task.TYPE) get() = context.gather(Task.TYPE)
private val service = LibService(serviceId, address, port) { private val service = LibService(serviceId, address, port) {
execute of { name -> println("service = $name"); produce(name, Meta.EMPTY) } // TODO execute of { name ->
val res = produce(name, Meta.EMPTY)
LazyDecodableDataSetAdapter(res)
}
} }
/** /**
@ -108,12 +113,26 @@ private object NameCoder : Coder<Name> {
Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset()))
} }
@Serializable
private data class DataPrototype(
val meta: String,
val data: String,
) {
companion object {
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
val meta = Json.encodeToString(MetaSerializer, data.meta)
val string = Json.encodeToString(serializer, data.await())
return DataPrototype(meta, string)
}
}
}
/** /**
* Data class that represents serializable [DataSet]. * Data class that represents serializable [DataSet].
*/ */
@Serializable @Serializable
private data class DataSetPrototype( private data class DataSetPrototype(
val data: Map<String, String>, val data: Map<String, DataPrototype>,
) )
/** /**
@ -121,9 +140,9 @@ private data class DataSetPrototype(
*/ */
private fun <T : Any> DataSet<T>.toPrototype(): DataSetPrototype = runBlocking { private fun <T : Any> DataSet<T>.toPrototype(): DataSetPrototype = runBlocking {
val serializer = serializer(dataType) val serializer = serializer(dataType)
val map = mutableListOf<Pair<String, String>>() val map = mutableListOf<Pair<String, DataPrototype>>()
flowData().map { (name, data) -> flowData().map { (name, data) ->
name.toString() to Json.encodeToString(serializer, data.await()) name.toString() to DataPrototype.of(data, serializer)
}.toList(map) }.toList(map)
DataSetPrototype(map.associate { it }) DataSetPrototype(map.associate { it })
} }
@ -131,11 +150,14 @@ private fun <T : Any> DataSet<T>.toPrototype(): DataSetPrototype = runBlocking {
/** /**
* Trivial [Data] implementation. * Trivial [Data] implementation.
*/ */
private class SimpleData(override val type: KType, val data: Any) : Data<Any> { private class SimpleData(
override val meta: Meta override val type: KType,
get() = Meta.EMPTY override val meta: Meta,
val data: Any,
) : Data<Any> {
override val dependencies: Collection<Goal<*>> override val dependencies: Collection<Goal<*>>
get() = emptyList() get() = emptyList()
override val deferred: Deferred<Any> override val deferred: Deferred<Any>
get() = CompletableDeferred(data) get() = CompletableDeferred(data)
@ -158,43 +180,56 @@ internal interface LazyDecodableDataSet<T : Any> : DataSet<T> {
fun finishDecoding(type: KType) fun finishDecoding(type: KType)
} }
private class LazyDecodableDataSetAdapter<T : Any>(val dataSet: DataSet<T>) :
LazyDecodableDataSet<T>, DataSet<T> by dataSet {
override fun finishDecoding(type: KType) = Unit
}
/** /**
* Trivial [LazyDecodableDataSet] implementation. * Trivial [LazyDecodableDataSet] implementation.
*/ */
private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet<Any> { private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet<Any> {
lateinit var type: KType lateinit var type: KType
lateinit var data: Map<Name, Any> lateinit var data: Map<Name, Pair<Meta, Any>>
override fun finishDecoding(type: KType) { override fun finishDecoding(type: KType) {
this.type = type this.type = type
this.data = prototype.data.map { (name, data) -> val serializer = serializer(type)
Name.parse(name) to Json.decodeFromString(serializer(type), data) this.data = prototype.data
}.associate { (name, data) -> name to data!! } .mapKeys { (name, _) -> Name.of(name) }
.mapValues { (_, pair) ->
val (meta, data) = pair
Pair(
Json.decodeFromString(MetaSerializer, meta),
Json.decodeFromString(serializer, data)!!
)
}
} }
override val dataType: KType override val dataType: KType
get() = type get() = type
override fun flowData(): Flow<NamedData<Any>> = override fun flowData(): Flow<NamedData<Any>> =
data.map { (name, data) -> data.map { (name, pair) ->
val wrapped = SimpleData(dataType, data) val (meta, data) = pair
val wrapped = SimpleData(dataType, meta, data)
SimpleNamedData(name, wrapped) SimpleNamedData(name, wrapped)
}.asFlow() }.asFlow()
override suspend fun getData(name: Name): Data<Any>? = data[name]?.let { data -> override suspend fun getData(name: Name): Data<Any>? = data[name]?.let { (meta, data) ->
SimpleData(dataType, data) SimpleData(dataType, meta, data)
} }
} }
private object DataSetCoder : Coder<DataSet<Any>> { private object DataSetCoder : Coder<LazyDecodableDataSet<Any>> {
override fun decode(entity: Entity, context: CodingContext): DataSet<Any> { override fun decode(entity: Entity, context: CodingContext): LazyDecodableDataSet<Any> {
val string = entity.data.toString(Charset.defaultCharset()) val string = entity.data.toString(Charset.defaultCharset())
val prototype = Json.decodeFromString(serializer<DataSetPrototype>(), string) val prototype = Json.decodeFromString(serializer<DataSetPrototype>(), string)
return SimpleDataSet(prototype) return SimpleDataSet(prototype)
} }
override fun encode(value: DataSet<*>, context: CodingContext): Entity { override fun encode(value: LazyDecodableDataSet<Any>, context: CodingContext): Entity {
val prototype = value.toPrototype() val prototype = value.toPrototype()
val string = Json.encodeToString(serializer(), prototype) val string = Json.encodeToString(serializer(), prototype)
return Entity(RawData.copyFrom(string, Charset.defaultCharset())) return Entity(RawData.copyFrom(string, Charset.defaultCharset()))