diff --git a/dataforge-distributed/build.gradle.kts b/dataforge-distributed/build.gradle.kts new file mode 100644 index 00000000..e5629e81 --- /dev/null +++ b/dataforge-distributed/build.gradle.kts @@ -0,0 +1,41 @@ +plugins { + id("ru.mipt.npm.gradle.mpp") +} + +kotlin { + sourceSets { + commonMain { + dependencies { + api(project(":dataforge-context")) + api(project(":dataforge-data")) + api(project(":dataforge-io")) + api(project(":dataforge-workspace")) + } + } + jvmMain { + dependencies { + // TODO include fat jar of lambdarpc + api(files("lambdarpc-core-0.0.1.jar")) + runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") + api("io.grpc:grpc-protobuf:1.44.0") + api("com.google.protobuf:protobuf-java-util:3.19.4") + api("com.google.protobuf:protobuf-kotlin:3.19.4") + api("io.grpc:grpc-kotlin-stub:1.2.1") + api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2") + api("org.slf4j:slf4j-simple:1.7.36") + api("io.github.microutils:kotlin-logging-jvm:2.1.21") + } + } + } +} + +kscience { + useSerialization { + json() + } +} + +readme { + maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt new file mode 100644 index 00000000..c9dec94e --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt @@ -0,0 +1,37 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.utils.Endpoint +import space.kscience.dataforge.context.AbstractPlugin +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task +import kotlin.reflect.KType + +/** + * Plugin that purpose is to communicate with remote plugins. + * @param endpoint Endpoint of the remote plugin. + */ +public abstract class ClientWorkspacePlugin(endpoint: Endpoint) : AbstractPlugin() { + + /** + * Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. + */ + abstract override val tag: PluginTag + + /** + * Enumeration of names of remote tasks and their result types. + */ + public abstract val tasks: Map + + private val _tasks: Map> by lazy { + tasks.mapValues { (_, type) -> + RemoteTask(endpoint, type) + } + } + + override fun content(target: String): Map = + when (target) { + Task.TYPE -> _tasks + else -> emptyMap() + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt similarity index 95% rename from dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt rename to dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt index 9fc205b0..e0d3cb3b 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.workspace.distributed +package space.kscience.dataforge.distributed import io.lambdarpc.dsl.ServiceDispatcher import io.lambdarpc.utils.Endpoint diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt new file mode 100644 index 00000000..b6849754 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -0,0 +1,81 @@ +package space.kscience.dataforge.distributed + +import io.ktor.utils.io.core.* +import io.lambdarpc.dsl.LibService +import io.lambdarpc.dsl.def +import io.lambdarpc.utils.Address +import io.lambdarpc.utils.Port +import io.lambdarpc.utils.toSid +import kotlinx.coroutines.runBlocking +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.gather +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.distributed.serialization.DataSetCoder +import space.kscience.dataforge.distributed.serialization.NameCoder +import space.kscience.dataforge.distributed.serialization.SerializableDataSetAdapter +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult + +/** + * Workspace that exposes its tasks for remote clients. + */ +public class ServiceWorkspace( + address: String = "localhost", + port: Int? = null, + override val context: Context = Global.buildContext("workspace".asName()), + data: DataSet<*> = runBlocking { DataTree {} }, + override val targets: Map = mapOf(), +) : Workspace, Closeable { + + override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) + + override val tasks: Map> + get() = context.gather(Task.TYPE) + + private val service = LibService(serviceId, address, port) { + execute of { name -> + val res = produce(name, Meta.EMPTY) + SerializableDataSetAdapter(res) + } + } + + /** + * Address this workspace is available on. + */ + public val address: Address = Address(address) + + /** + * Port this workspace is available on. + */ + public val port: Port + get() = service.port + + /** + * Start [ServiceWorkspace] as a service. + */ + public fun start(): Unit = service.start() + + /** + * Await termination of the service. + */ + public fun awaitTermination(): Unit = service.awaitTermination() + + /** + * Shutdown service. + */ + public fun shutdown(): Unit = service.shutdown() + + override fun close(): Unit = service.shutdown() + + public companion object { + internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() + internal val execute by serviceId.def(NameCoder, DataSetCoder) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt new file mode 100644 index 00000000..1bb218b1 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt @@ -0,0 +1,22 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import java.nio.charset.Charset + +internal object DataSetCoder : Coder> { + override fun decode(entity: Entity, context: CodingContext): SerializableDataSet { + val string = entity.data.toString(Charset.defaultCharset()) + val prototype = DataSetPrototype.fromJson(string) + return prototype.toDataSet() + } + + override fun encode(value: SerializableDataSet, context: CodingContext): Entity { + val prototype = DataSetPrototype.of(value) + val string = prototype.toJson() + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt new file mode 100644 index 00000000..beb22124 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt @@ -0,0 +1,20 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import space.kscience.dataforge.names.Name +import java.nio.charset.Charset + +internal object NameCoder : Coder { + override fun decode(entity: Entity, context: CodingContext): Name { + require(entity.hasData()) { "Entity should contain data" } + val string = entity.data.toString(Charset.defaultCharset()) + return Name.parse(string) + } + + override fun encode(value: Name, context: CodingContext): Entity = + Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt new file mode 100644 index 00000000..2265b524 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt @@ -0,0 +1,16 @@ +package space.kscience.dataforge.distributed.serialization + +import space.kscience.dataforge.data.DataSet +import kotlin.reflect.KType + +/** + * Represents [DataSet] that should be initialized before usage. + */ +internal interface SerializableDataSet : DataSet { + fun finishDecoding(type: KType) +} + +internal class SerializableDataSetAdapter(dataSet: DataSet) : + SerializableDataSet, DataSet by dataSet { + override fun finishDecoding(type: KType) = Unit +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt new file mode 100644 index 00000000..b3e400cc --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt @@ -0,0 +1,56 @@ +package space.kscience.dataforge.distributed.serialization + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.Goal +import space.kscience.dataforge.data.await +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MetaSerializer +import kotlin.reflect.KType + +/** + * [Data] representation that is trivially serializable. + */ +@Serializable +internal data class DataPrototype( + val meta: String, + val data: String, +) { + fun toData(type: KType): Data = + SimpleData( + type = type, + meta = Json.decodeFromString(MetaSerializer, meta), + data = Json.decodeFromString(kotlinx.serialization.serializer(type), data)!! + ) + + companion object { + suspend fun of(data: Data, serializer: KSerializer): DataPrototype { + val meta = Json.encodeToString(MetaSerializer, data.meta) + val string = Json.encodeToString(serializer, data.await()) + return DataPrototype(meta, string) + } + } +} + +/** + * Trivial [Data] implementation. + */ +private class SimpleData( + override val type: KType, + override val meta: Meta, + val data: T, +) : Data { + override val dependencies: Collection> + get() = emptyList() + + override val deferred: Deferred + get() = CompletableDeferred(data) + + override fun async(coroutineScope: CoroutineScope): Deferred = deferred + override fun reset() = Unit +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt new file mode 100644 index 00000000..9d74a9be --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -0,0 +1,73 @@ +package space.kscience.dataforge.distributed.serialization + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.NamedData +import space.kscience.dataforge.data.component1 +import space.kscience.dataforge.data.component2 +import space.kscience.dataforge.names.Name +import kotlin.reflect.KType + +/** + * [DataSet] representation that is trivially serializable. + */ +@Serializable +internal data class DataSetPrototype(val data: Map) { + fun toDataSet(): SerializableDataSet = + SerializableDataSetImpl(this) + + fun toJson(): String = Json.encodeToString(serializer(), this) + + companion object { + fun of(dataSet: DataSet): DataSetPrototype = runBlocking { + val serializer = serializer(dataSet.dataType) + val map = mutableListOf>() + dataSet.flowData().map { (name, data) -> + name.toString() to DataPrototype.of(data, serializer) + }.toList(map) + DataSetPrototype(map.associate { it }) + } + + fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) + } +} + +/** + * Trivial [SerializableDataSet] implementation. + */ +private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : SerializableDataSet { + + private lateinit var type: KType + private lateinit var data: Map> + + override fun finishDecoding(type: KType) { + this.type = type + this.data = prototype.data + .mapKeys { (name, _) -> Name.of(name) } + .mapValues { (_, dataPrototype) -> dataPrototype.toData(type) } + } + + override val dataType: KType + get() = type + + override fun flowData(): Flow> = + data.map { (name, data) -> SimpleNamedData(name, data) }.asFlow() + + override suspend fun getData(name: Name): Data? = data[name] + + /** + * Trivial named data implementation. + */ + private class SimpleNamedData( + override val name: Name, + override val data: Data, + ) : NamedData, Data by data +} diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt similarity index 87% rename from dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt rename to dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt index 378333dd..bb4bdce9 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.workspace +package space.kscience.dataforge.distributed import io.lambdarpc.utils.Endpoint import kotlinx.coroutines.runBlocking @@ -19,9 +19,11 @@ import space.kscience.dataforge.data.static import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.distributed.ClientWorkspacePlugin -import space.kscience.dataforge.workspace.distributed.ServiceWorkspace +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.WorkspacePlugin +import space.kscience.dataforge.workspace.task import kotlin.reflect.KClass +import kotlin.reflect.KType import kotlin.reflect.typeOf import kotlin.test.assertEquals @@ -46,11 +48,15 @@ private class MyPlugin : WorkspacePlugin() { } } -private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin( - MyPlugin.tag, - endpoint, - "task".asName() to typeOf() -) +private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(endpoint) { + override val tag: PluginTag + get() = MyPlugin.tag + + override val tasks: Map + get() = mapOf( + "task".asName() to typeOf() + ) +} @TestInstance(TestInstance.Lifecycle.PER_CLASS) class ServiceWorkspaceTest { diff --git a/dataforge-workspace/build.gradle.kts b/dataforge-workspace/build.gradle.kts index 5d83ba4b..350a0b01 100644 --- a/dataforge-workspace/build.gradle.kts +++ b/dataforge-workspace/build.gradle.kts @@ -12,30 +12,9 @@ kotlin { api(project(":dataforge-io")) } } - jvmMain { - dependencies { - // TODO include fat jar of lambdarpc - api(files("lambdarpc-core-0.0.1.jar")) - runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") - api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") - api("io.grpc:grpc-protobuf:1.44.0") - api("com.google.protobuf:protobuf-java-util:3.19.4") - api("com.google.protobuf:protobuf-kotlin:3.19.4") - api("io.grpc:grpc-kotlin-stub:1.2.1") - api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2") - api("org.slf4j:slf4j-simple:1.7.36") - api("io.github.microutils:kotlin-logging-jvm:2.1.21") - } - } - } -} - -kscience { - useSerialization { - json() } } readme { maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL -} \ No newline at end of file +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt deleted file mode 100644 index 302037a7..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt +++ /dev/null @@ -1,31 +0,0 @@ -package space.kscience.dataforge.workspace.distributed - -import io.lambdarpc.utils.Endpoint -import space.kscience.dataforge.context.AbstractPlugin -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task -import kotlin.reflect.KType - -/** - * Plugin that purpose is to communicate with remote plugins. - * @param tag Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. - * @param endpoint Endpoint of the remote plugin. - * @param tasks Enumeration of names of remote tasks and their result types. - */ -public abstract class ClientWorkspacePlugin( - override val tag: PluginTag, - endpoint: Endpoint, - vararg tasks: Pair, -) : AbstractPlugin() { - - private val tasks: Map> = tasks.associate { (name, type) -> - name to RemoteTask(endpoint, type) - } - - override fun content(target: String): Map = - when (target) { - Task.TYPE -> tasks - else -> emptyMap() - } -} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt deleted file mode 100644 index 7d48e930..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt +++ /dev/null @@ -1,237 +0,0 @@ -package space.kscience.dataforge.workspace.distributed - -import io.ktor.utils.io.core.* -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.dsl.LibService -import io.lambdarpc.dsl.def -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import io.lambdarpc.utils.Address -import io.lambdarpc.utils.Port -import io.lambdarpc.utils.toSid -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.runBlocking -import kotlinx.serialization.KSerializer -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.serializer -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.gather -import space.kscience.dataforge.data.Data -import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.Goal -import space.kscience.dataforge.data.NamedData -import space.kscience.dataforge.data.await -import space.kscience.dataforge.data.component1 -import space.kscience.dataforge.data.component2 -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.MetaSerializer -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.Task -import space.kscience.dataforge.workspace.TaskResult -import space.kscience.dataforge.workspace.Workspace -import space.kscience.dataforge.workspace.wrapResult -import java.nio.charset.Charset -import kotlin.reflect.KType - -/** - * Workspace that exposes its tasks for remote clients. - */ -public class ServiceWorkspace( - address: String = "localhost", - port: Int? = null, - override val context: Context = Global.buildContext("workspace".asName()), - data: DataSet<*> = runBlocking { DataTree {} }, - override val targets: Map = mapOf(), -) : Workspace, Closeable { - - override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) - - override val tasks: Map> - get() = context.gather(Task.TYPE) - - private val service = LibService(serviceId, address, port) { - execute of { name -> - val res = produce(name, Meta.EMPTY) - LazyDecodableDataSetAdapter(res) - } - } - - /** - * Address this workspace is available on. - */ - public val address: Address = Address(address) - - /** - * Port this workspace is available on. - */ - public val port: Port - get() = service.port - - /** - * Start [ServiceWorkspace] as a service. - */ - public fun start(): Unit = service.start() - - /** - * Await termination of the service. - */ - public fun awaitTermination(): Unit = service.awaitTermination() - - /** - * Shutdown service. - */ - public fun shutdown(): Unit = service.shutdown() - - override fun close(): Unit = service.shutdown() - - public companion object { - internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() - internal val execute by serviceId.def(NameCoder, DataSetCoder) - } -} - -private object NameCoder : Coder { - override fun decode(entity: Entity, context: CodingContext): Name { - require(entity.hasData()) { "Entity should contain data" } - val string = entity.data.toString(Charset.defaultCharset()) - return Name.parse(string) - } - - override fun encode(value: Name, context: CodingContext): Entity = - Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) -} - -@Serializable -private data class DataPrototype( - val meta: String, - val data: String, -) { - companion object { - suspend fun of(data: Data, serializer: KSerializer): DataPrototype { - val meta = Json.encodeToString(MetaSerializer, data.meta) - val string = Json.encodeToString(serializer, data.await()) - return DataPrototype(meta, string) - } - } -} - -/** - * Data class that represents serializable [DataSet]. - */ -@Serializable -private data class DataSetPrototype( - val data: Map, -) - -/** - * [DataSetPrototype] builder. - */ -private fun DataSet.toPrototype(): DataSetPrototype = runBlocking { - val serializer = serializer(dataType) - val map = mutableListOf>() - flowData().map { (name, data) -> - name.toString() to DataPrototype.of(data, serializer) - }.toList(map) - DataSetPrototype(map.associate { it }) -} - -/** - * Trivial [Data] implementation. - */ -private class SimpleData( - override val type: KType, - override val meta: Meta, - val data: Any, -) : Data { - override val dependencies: Collection> - get() = emptyList() - - override val deferred: Deferred - get() = CompletableDeferred(data) - - override fun async(coroutineScope: CoroutineScope): Deferred = deferred - override fun reset() = Unit -} - -/** - * Trivial named data implementation. - */ -private class SimpleNamedData( - override val name: Name, - override val data: Data, -) : NamedData, Data by data - -/** - * Represents [DataSet] that should be initialized before usage. - */ -internal interface LazyDecodableDataSet : DataSet { - fun finishDecoding(type: KType) -} - -private class LazyDecodableDataSetAdapter(val dataSet: DataSet) : - LazyDecodableDataSet, DataSet by dataSet { - override fun finishDecoding(type: KType) = Unit -} - -/** - * Trivial [LazyDecodableDataSet] implementation. - */ -private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet { - - lateinit var type: KType - lateinit var data: Map> - - override fun finishDecoding(type: KType) { - this.type = type - val serializer = serializer(type) - this.data = prototype.data - .mapKeys { (name, _) -> Name.of(name) } - .mapValues { (_, pair) -> - val (meta, data) = pair - Pair( - Json.decodeFromString(MetaSerializer, meta), - Json.decodeFromString(serializer, data)!! - ) - } - } - - override val dataType: KType - get() = type - - override fun flowData(): Flow> = - data.map { (name, pair) -> - val (meta, data) = pair - val wrapped = SimpleData(dataType, meta, data) - SimpleNamedData(name, wrapped) - }.asFlow() - - override suspend fun getData(name: Name): Data? = data[name]?.let { (meta, data) -> - SimpleData(dataType, meta, data) - } -} - -private object DataSetCoder : Coder> { - override fun decode(entity: Entity, context: CodingContext): LazyDecodableDataSet { - val string = entity.data.toString(Charset.defaultCharset()) - val prototype = Json.decodeFromString(serializer(), string) - return SimpleDataSet(prototype) - } - - override fun encode(value: LazyDecodableDataSet, context: CodingContext): Entity { - val prototype = value.toPrototype() - val string = Json.encodeToString(serializer(), prototype) - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 2e5ee74f..ff2adfdf 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -22,5 +22,6 @@ include( ":dataforge-context", ":dataforge-data", ":dataforge-workspace", - ":dataforge-scripting" -) \ No newline at end of file + ":dataforge-scripting", + ":dataforge-distributed", +)