From 7414e6019224ec70559f8d625bbd0c4c5289c0dd Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 30 Nov 2021 14:42:00 +0300 Subject: [PATCH 01/14] Update publish.yml force disable github publishing --- .github/workflows/publish.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c5075cb0..45ed2fe8 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -9,6 +9,8 @@ jobs: publish: environment: name: publish + env: + publishing.github: false strategy: matrix: os: [ macOS-latest, windows-latest ] -- 2.34.1 From 9cc30b1f4ef4acbb5574a3ba43c21c98636c7dcf Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 12 Dec 2021 10:48:52 +0300 Subject: [PATCH 02/14] disable sonatype publishing --- gradle.properties | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gradle.properties b/gradle.properties index 5c9b6463..aa1ef695 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,3 +7,5 @@ kotlin.parallel.tasks.in.project=true #kotlin.mpp.enableGranularSourceSetsMetadata=true #kotlin.native.enableDependencyPropagation=false kotlin.mpp.stability.nowarn=true + +publishing.sonatype=false -- 2.34.1 From be8e97143607cb6d02932c51b3012543e4b7628f Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 12 Dec 2021 11:13:35 +0300 Subject: [PATCH 03/14] all platforms for macos publication --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 45ed2fe8..e607e4b7 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -50,6 +50,6 @@ jobs: - name: Publish Mac Artifacts if: matrix.os == 'macOS-latest' run: > - ./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true -Ppublishing.platform=macosX64 + ./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true -Ppublishing.space.user=${{ secrets.SPACE_APP_ID }} -Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }} -- 2.34.1 From a4044c82a0afccd764718441e0187146c28b9c1e Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Sat, 30 Apr 2022 19:34:27 +0300 Subject: [PATCH 04/14] Call remote task of service workspace --- .../kscience/dataforge/data/NamedData.kt | 5 +- dataforge-workspace/build.gradle.kts | 25 ++- .../kscience/dataforge/workspace/Task.kt | 8 + .../distributed/ClientWorkspacePlugin.kt | 31 +++ .../workspace/distributed/RemoteTask.kt | 36 ++++ .../workspace/distributed/ServiceWorkspace.kt | 202 ++++++++++++++++++ .../workspace/ServiceWorkspaceTest.kt | 106 +++++++++ 7 files changed, 410 insertions(+), 3 deletions(-) create mode 100644 dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt create mode 100644 dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt create mode 100644 dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt create mode 100644 dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt index 59ae10a8..8082cfc7 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt @@ -29,4 +29,7 @@ public fun Data.named(name: Name): NamedData = if (this is Named NamedDataImpl(name, this.data) } else { NamedDataImpl(name, this) -} \ No newline at end of file +} + +public operator fun NamedData.component1(): Name = name +public operator fun NamedData.component2(): Data = data diff --git a/dataforge-workspace/build.gradle.kts b/dataforge-workspace/build.gradle.kts index c6e62b36..5d83ba4b 100644 --- a/dataforge-workspace/build.gradle.kts +++ b/dataforge-workspace/build.gradle.kts @@ -5,16 +5,37 @@ plugins { kotlin { sourceSets { - commonMain{ + commonMain { dependencies { api(project(":dataforge-context")) api(project(":dataforge-data")) api(project(":dataforge-io")) } } + jvmMain { + dependencies { + // TODO include fat jar of lambdarpc + api(files("lambdarpc-core-0.0.1.jar")) + runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") + api("io.grpc:grpc-protobuf:1.44.0") + api("com.google.protobuf:protobuf-java-util:3.19.4") + api("com.google.protobuf:protobuf-kotlin:3.19.4") + api("io.grpc:grpc-kotlin-stub:1.2.1") + api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2") + api("org.slf4j:slf4j-simple:1.7.36") + api("io.github.microutils:kotlin-logging-jvm:2.1.21") + } + } } } -readme{ +kscience { + useSerialization { + json() + } +} + +readme { maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index dcf63db2..042945c5 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -17,6 +17,11 @@ import kotlin.reflect.typeOf @Type(TYPE) public interface Task : Described { + /** + * Type of the task result data. + */ + public val resultType: KType + /** * Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model * and a handler for actual result @@ -55,6 +60,9 @@ public fun Task( builder: suspend TaskResultBuilder.() -> Unit, ): Task = object : Task { + override val resultType: KType + get() = resultType + override val descriptor: MetaDescriptor? = descriptor override suspend fun execute( diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt new file mode 100644 index 00000000..302037a7 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt @@ -0,0 +1,31 @@ +package space.kscience.dataforge.workspace.distributed + +import io.lambdarpc.utils.Endpoint +import space.kscience.dataforge.context.AbstractPlugin +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task +import kotlin.reflect.KType + +/** + * Plugin that purpose is to communicate with remote plugins. + * @param tag Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. + * @param endpoint Endpoint of the remote plugin. + * @param tasks Enumeration of names of remote tasks and their result types. + */ +public abstract class ClientWorkspacePlugin( + override val tag: PluginTag, + endpoint: Endpoint, + vararg tasks: Pair, +) : AbstractPlugin() { + + private val tasks: Map> = tasks.associate { (name, type) -> + name to RemoteTask(endpoint, type) + } + + override fun content(target: String): Map = + when (target) { + Task.TYPE -> tasks + else -> emptyMap() + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt new file mode 100644 index 00000000..a1def94d --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt @@ -0,0 +1,36 @@ +package space.kscience.dataforge.workspace.distributed + +import io.lambdarpc.dsl.ServiceDispatcher +import io.lambdarpc.utils.Endpoint +import kotlinx.coroutines.withContext +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.descriptors.MetaDescriptor +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult +import kotlin.reflect.KType + +/** + * Proxy task that communicates with the corresponding remote task. + */ +internal class RemoteTask( + endpoint: Endpoint, + override val resultType: KType, + override val descriptor: MetaDescriptor? = null, +) : Task { + private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint) + + @Suppress("UNCHECKED_CAST") + override suspend fun execute( + workspace: Workspace, + taskName: Name, + taskMeta: Meta, + ): TaskResult = withContext(dispatcher) { + val dataset = ServiceWorkspace.execute(taskName) as LazyDecodableDataSet + dataset.finishDecoding(resultType) + workspace.wrapResult(dataset as DataSet, taskName, taskMeta) + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt new file mode 100644 index 00000000..3aafd596 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt @@ -0,0 +1,202 @@ +package space.kscience.dataforge.workspace.distributed + +import io.ktor.utils.io.core.* +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.dsl.LibService +import io.lambdarpc.dsl.def +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import io.lambdarpc.utils.Address +import io.lambdarpc.utils.Port +import io.lambdarpc.utils.toSid +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.gather +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.Goal +import space.kscience.dataforge.data.NamedData +import space.kscience.dataforge.data.await +import space.kscience.dataforge.data.component1 +import space.kscience.dataforge.data.component2 +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult +import java.nio.charset.Charset +import kotlin.reflect.KType + +/** + * Workspace that exposes its tasks for remote clients. + */ +public class ServiceWorkspace( + address: String = "localhost", + port: Int? = null, + override val context: Context = Global.buildContext("workspace".asName()), + data: DataSet<*> = runBlocking { DataTree {} }, + override val targets: Map = mapOf(), +) : Workspace, Closeable { + + override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) + + override val tasks: Map> + get() = context.gather(Task.TYPE) + + private val service = LibService(serviceId, address, port) { + execute of { name -> println("service = $name"); produce(name, Meta.EMPTY) } // TODO + } + + /** + * Address this workspace is available on. + */ + public val address: Address = Address(address) + + /** + * Port this workspace is available on. + */ + public val port: Port + get() = service.port + + /** + * Start [ServiceWorkspace] as a service. + */ + public fun start(): Unit = service.start() + + /** + * Await termination of the service. + */ + public fun awaitTermination(): Unit = service.awaitTermination() + + /** + * Shutdown service. + */ + public fun shutdown(): Unit = service.shutdown() + + override fun close(): Unit = service.shutdown() + + public companion object { + internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() + internal val execute by serviceId.def(NameCoder, DataSetCoder) + } +} + +private object NameCoder : Coder { + override fun decode(entity: Entity, context: CodingContext): Name { + require(entity.hasData()) { "Entity should contain data" } + val string = entity.data.toString(Charset.defaultCharset()) + return Name.parse(string) + } + + override fun encode(value: Name, context: CodingContext): Entity = + Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) +} + +/** + * Data class that represents serializable [DataSet]. + */ +@Serializable +private data class DataSetPrototype( + val data: Map, +) + +/** + * [DataSetPrototype] builder. + */ +private fun DataSet.toPrototype(): DataSetPrototype = runBlocking { + val serializer = serializer(dataType) + val map = mutableListOf>() + flowData().map { (name, data) -> + name.toString() to Json.encodeToString(serializer, data.await()) + }.toList(map) + DataSetPrototype(map.associate { it }) +} + +/** + * Trivial [Data] implementation. + */ +private class SimpleData(override val type: KType, val data: Any) : Data { + override val meta: Meta + get() = Meta.EMPTY + override val dependencies: Collection> + get() = emptyList() + override val deferred: Deferred + get() = CompletableDeferred(data) + + override fun async(coroutineScope: CoroutineScope): Deferred = deferred + override fun reset() = Unit +} + +/** + * Trivial named data implementation. + */ +private class SimpleNamedData( + override val name: Name, + override val data: Data, +) : NamedData, Data by data + +/** + * Represents [DataSet] that should be initialized before usage. + */ +internal interface LazyDecodableDataSet : DataSet { + fun finishDecoding(type: KType) +} + +/** + * Trivial [LazyDecodableDataSet] implementation. + */ +private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet { + + lateinit var type: KType + lateinit var data: Map + + override fun finishDecoding(type: KType) { + this.type = type + this.data = prototype.data.map { (name, data) -> + Name.parse(name) to Json.decodeFromString(serializer(type), data) + }.associate { (name, data) -> name to data!! } + } + + override val dataType: KType + get() = type + + override fun flowData(): Flow> = + data.map { (name, data) -> + val wrapped = SimpleData(dataType, data) + SimpleNamedData(name, wrapped) + }.asFlow() + + override suspend fun getData(name: Name): Data? = data[name]?.let { data -> + SimpleData(dataType, data) + } +} + +private object DataSetCoder : Coder> { + override fun decode(entity: Entity, context: CodingContext): DataSet { + val string = entity.data.toString(Charset.defaultCharset()) + val prototype = Json.decodeFromString(serializer(), string) + return SimpleDataSet(prototype) + } + + override fun encode(value: DataSet<*>, context: CodingContext): Entity { + val prototype = value.toPrototype() + val string = Json.encodeToString(serializer(), prototype) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt new file mode 100644 index 00000000..378333dd --- /dev/null +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt @@ -0,0 +1,106 @@ +package space.kscience.dataforge.workspace + +import io.lambdarpc.utils.Endpoint +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.PluginFactory +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.await +import space.kscience.dataforge.data.getData +import space.kscience.dataforge.data.map +import space.kscience.dataforge.data.select +import space.kscience.dataforge.data.static +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.distributed.ClientWorkspacePlugin +import space.kscience.dataforge.workspace.distributed.ServiceWorkspace +import kotlin.reflect.KClass +import kotlin.reflect.typeOf +import kotlin.test.assertEquals + +private class MyPlugin : WorkspacePlugin() { + override val tag: PluginTag + get() = Factory.tag + + val task by task { + val myInt = workspace.data.select() + val res = myInt.getData("int".asName())!! + emit("result".asName(), res.map { it + 1 }) + } + + companion object Factory : PluginFactory { + override fun invoke(meta: Meta, context: Context): MyPlugin = MyPlugin() + + override val tag: PluginTag + get() = PluginTag("Plg") + + override val type: KClass + get() = MyPlugin::class + } +} + +private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin( + MyPlugin.tag, + endpoint, + "task".asName() to typeOf() +) + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ServiceWorkspaceTest { + + private lateinit var worker1: ServiceWorkspace + private lateinit var workspace: Workspace + + @BeforeAll + fun before() { + worker1 = ServiceWorkspace( + context = Global.buildContext("worker1".asName()) { + plugin(MyPlugin) + }, + data = runBlocking { + DataTree { + static("int", 0) + } + }, + ) + worker1.start() + + workspace = Workspace { + context { + val endpoint = Endpoint(worker1.address, worker1.port) + plugin(RemoteMyPlugin(endpoint)) + } + } + } + + @AfterAll + fun after() { + worker1.shutdown() + } + + @Test + fun localExecution() = runBlocking { + assertEquals(0, worker1.data.getData("int")!!.await()) + val res = worker1 + .produce(Name.of("Plg", "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(1, res) + } + + @Test + fun remoteExecution() = runBlocking { + val remoteRes = workspace + .produce(Name.of("Plg", "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(1, remoteRes) + } +} -- 2.34.1 From e9d4683f9bb1cbe70b8bedb41b63adbefb993dee Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Mon, 9 May 2022 20:22:17 +0300 Subject: [PATCH 05/14] Send meta too --- .../workspace/distributed/RemoteTask.kt | 2 +- .../workspace/distributed/ServiceWorkspace.kt | 71 ++++++++++++++----- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt index a1def94d..9fc205b0 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt @@ -29,7 +29,7 @@ internal class RemoteTask( taskName: Name, taskMeta: Meta, ): TaskResult = withContext(dispatcher) { - val dataset = ServiceWorkspace.execute(taskName) as LazyDecodableDataSet + val dataset = ServiceWorkspace.execute(taskName) dataset.finishDecoding(resultType) workspace.wrapResult(dataset as DataSet, taskName, taskMeta) } diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt index 3aafd596..7d48e930 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking +import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import kotlinx.serialization.serializer @@ -34,6 +35,7 @@ import space.kscience.dataforge.data.await import space.kscience.dataforge.data.component1 import space.kscience.dataforge.data.component2 import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MetaSerializer import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName import space.kscience.dataforge.workspace.Task @@ -60,7 +62,10 @@ public class ServiceWorkspace( get() = context.gather(Task.TYPE) private val service = LibService(serviceId, address, port) { - execute of { name -> println("service = $name"); produce(name, Meta.EMPTY) } // TODO + execute of { name -> + val res = produce(name, Meta.EMPTY) + LazyDecodableDataSetAdapter(res) + } } /** @@ -108,12 +113,26 @@ private object NameCoder : Coder { Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) } +@Serializable +private data class DataPrototype( + val meta: String, + val data: String, +) { + companion object { + suspend fun of(data: Data, serializer: KSerializer): DataPrototype { + val meta = Json.encodeToString(MetaSerializer, data.meta) + val string = Json.encodeToString(serializer, data.await()) + return DataPrototype(meta, string) + } + } +} + /** * Data class that represents serializable [DataSet]. */ @Serializable private data class DataSetPrototype( - val data: Map, + val data: Map, ) /** @@ -121,9 +140,9 @@ private data class DataSetPrototype( */ private fun DataSet.toPrototype(): DataSetPrototype = runBlocking { val serializer = serializer(dataType) - val map = mutableListOf>() + val map = mutableListOf>() flowData().map { (name, data) -> - name.toString() to Json.encodeToString(serializer, data.await()) + name.toString() to DataPrototype.of(data, serializer) }.toList(map) DataSetPrototype(map.associate { it }) } @@ -131,11 +150,14 @@ private fun DataSet.toPrototype(): DataSetPrototype = runBlocking { /** * Trivial [Data] implementation. */ -private class SimpleData(override val type: KType, val data: Any) : Data { - override val meta: Meta - get() = Meta.EMPTY +private class SimpleData( + override val type: KType, + override val meta: Meta, + val data: Any, +) : Data { override val dependencies: Collection> get() = emptyList() + override val deferred: Deferred get() = CompletableDeferred(data) @@ -158,43 +180,56 @@ internal interface LazyDecodableDataSet : DataSet { fun finishDecoding(type: KType) } +private class LazyDecodableDataSetAdapter(val dataSet: DataSet) : + LazyDecodableDataSet, DataSet by dataSet { + override fun finishDecoding(type: KType) = Unit +} + /** * Trivial [LazyDecodableDataSet] implementation. */ private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet { lateinit var type: KType - lateinit var data: Map + lateinit var data: Map> override fun finishDecoding(type: KType) { this.type = type - this.data = prototype.data.map { (name, data) -> - Name.parse(name) to Json.decodeFromString(serializer(type), data) - }.associate { (name, data) -> name to data!! } + val serializer = serializer(type) + this.data = prototype.data + .mapKeys { (name, _) -> Name.of(name) } + .mapValues { (_, pair) -> + val (meta, data) = pair + Pair( + Json.decodeFromString(MetaSerializer, meta), + Json.decodeFromString(serializer, data)!! + ) + } } override val dataType: KType get() = type override fun flowData(): Flow> = - data.map { (name, data) -> - val wrapped = SimpleData(dataType, data) + data.map { (name, pair) -> + val (meta, data) = pair + val wrapped = SimpleData(dataType, meta, data) SimpleNamedData(name, wrapped) }.asFlow() - override suspend fun getData(name: Name): Data? = data[name]?.let { data -> - SimpleData(dataType, data) + override suspend fun getData(name: Name): Data? = data[name]?.let { (meta, data) -> + SimpleData(dataType, meta, data) } } -private object DataSetCoder : Coder> { - override fun decode(entity: Entity, context: CodingContext): DataSet { +private object DataSetCoder : Coder> { + override fun decode(entity: Entity, context: CodingContext): LazyDecodableDataSet { val string = entity.data.toString(Charset.defaultCharset()) val prototype = Json.decodeFromString(serializer(), string) return SimpleDataSet(prototype) } - override fun encode(value: DataSet<*>, context: CodingContext): Entity { + override fun encode(value: LazyDecodableDataSet, context: CodingContext): Entity { val prototype = value.toPrototype() val string = Json.encodeToString(serializer(), prototype) return Entity(RawData.copyFrom(string, Charset.defaultCharset())) -- 2.34.1 From f62507e1b92e48eaf983b3a10ac3d85c8852a458 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Mon, 9 May 2022 21:29:39 +0300 Subject: [PATCH 06/14] Create separatere gradle project --- dataforge-distributed/build.gradle.kts | 41 +++ .../distributed/ClientWorkspacePlugin.kt | 37 +++ .../dataforge}/distributed/RemoteTask.kt | 2 +- .../dataforge/distributed/ServiceWorkspace.kt | 81 ++++++ .../distributed/serialization/DataSetCoder.kt | 22 ++ .../distributed/serialization/NameCoder.kt | 20 ++ .../serialization/SerializableDataSet.kt | 16 ++ .../serialization/dataSerialization.kt | 56 +++++ .../serialization/dataSetSerialization.kt | 73 ++++++ .../distributed}/ServiceWorkspaceTest.kt | 22 +- dataforge-workspace/build.gradle.kts | 23 +- .../distributed/ClientWorkspacePlugin.kt | 31 --- .../workspace/distributed/ServiceWorkspace.kt | 237 ------------------ settings.gradle.kts | 5 +- 14 files changed, 365 insertions(+), 301 deletions(-) create mode 100644 dataforge-distributed/build.gradle.kts create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt rename {dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace => dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge}/distributed/RemoteTask.kt (95%) create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt rename {dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace => dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed}/ServiceWorkspaceTest.kt (87%) delete mode 100644 dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt delete mode 100644 dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt diff --git a/dataforge-distributed/build.gradle.kts b/dataforge-distributed/build.gradle.kts new file mode 100644 index 00000000..e5629e81 --- /dev/null +++ b/dataforge-distributed/build.gradle.kts @@ -0,0 +1,41 @@ +plugins { + id("ru.mipt.npm.gradle.mpp") +} + +kotlin { + sourceSets { + commonMain { + dependencies { + api(project(":dataforge-context")) + api(project(":dataforge-data")) + api(project(":dataforge-io")) + api(project(":dataforge-workspace")) + } + } + jvmMain { + dependencies { + // TODO include fat jar of lambdarpc + api(files("lambdarpc-core-0.0.1.jar")) + runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") + api("io.grpc:grpc-protobuf:1.44.0") + api("com.google.protobuf:protobuf-java-util:3.19.4") + api("com.google.protobuf:protobuf-kotlin:3.19.4") + api("io.grpc:grpc-kotlin-stub:1.2.1") + api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2") + api("org.slf4j:slf4j-simple:1.7.36") + api("io.github.microutils:kotlin-logging-jvm:2.1.21") + } + } + } +} + +kscience { + useSerialization { + json() + } +} + +readme { + maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt new file mode 100644 index 00000000..c9dec94e --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt @@ -0,0 +1,37 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.utils.Endpoint +import space.kscience.dataforge.context.AbstractPlugin +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task +import kotlin.reflect.KType + +/** + * Plugin that purpose is to communicate with remote plugins. + * @param endpoint Endpoint of the remote plugin. + */ +public abstract class ClientWorkspacePlugin(endpoint: Endpoint) : AbstractPlugin() { + + /** + * Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. + */ + abstract override val tag: PluginTag + + /** + * Enumeration of names of remote tasks and their result types. + */ + public abstract val tasks: Map + + private val _tasks: Map> by lazy { + tasks.mapValues { (_, type) -> + RemoteTask(endpoint, type) + } + } + + override fun content(target: String): Map = + when (target) { + Task.TYPE -> _tasks + else -> emptyMap() + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt similarity index 95% rename from dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt rename to dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt index 9fc205b0..e0d3cb3b 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.workspace.distributed +package space.kscience.dataforge.distributed import io.lambdarpc.dsl.ServiceDispatcher import io.lambdarpc.utils.Endpoint diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt new file mode 100644 index 00000000..b6849754 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -0,0 +1,81 @@ +package space.kscience.dataforge.distributed + +import io.ktor.utils.io.core.* +import io.lambdarpc.dsl.LibService +import io.lambdarpc.dsl.def +import io.lambdarpc.utils.Address +import io.lambdarpc.utils.Port +import io.lambdarpc.utils.toSid +import kotlinx.coroutines.runBlocking +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.gather +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.distributed.serialization.DataSetCoder +import space.kscience.dataforge.distributed.serialization.NameCoder +import space.kscience.dataforge.distributed.serialization.SerializableDataSetAdapter +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult + +/** + * Workspace that exposes its tasks for remote clients. + */ +public class ServiceWorkspace( + address: String = "localhost", + port: Int? = null, + override val context: Context = Global.buildContext("workspace".asName()), + data: DataSet<*> = runBlocking { DataTree {} }, + override val targets: Map = mapOf(), +) : Workspace, Closeable { + + override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) + + override val tasks: Map> + get() = context.gather(Task.TYPE) + + private val service = LibService(serviceId, address, port) { + execute of { name -> + val res = produce(name, Meta.EMPTY) + SerializableDataSetAdapter(res) + } + } + + /** + * Address this workspace is available on. + */ + public val address: Address = Address(address) + + /** + * Port this workspace is available on. + */ + public val port: Port + get() = service.port + + /** + * Start [ServiceWorkspace] as a service. + */ + public fun start(): Unit = service.start() + + /** + * Await termination of the service. + */ + public fun awaitTermination(): Unit = service.awaitTermination() + + /** + * Shutdown service. + */ + public fun shutdown(): Unit = service.shutdown() + + override fun close(): Unit = service.shutdown() + + public companion object { + internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() + internal val execute by serviceId.def(NameCoder, DataSetCoder) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt new file mode 100644 index 00000000..1bb218b1 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt @@ -0,0 +1,22 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import java.nio.charset.Charset + +internal object DataSetCoder : Coder> { + override fun decode(entity: Entity, context: CodingContext): SerializableDataSet { + val string = entity.data.toString(Charset.defaultCharset()) + val prototype = DataSetPrototype.fromJson(string) + return prototype.toDataSet() + } + + override fun encode(value: SerializableDataSet, context: CodingContext): Entity { + val prototype = DataSetPrototype.of(value) + val string = prototype.toJson() + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt new file mode 100644 index 00000000..beb22124 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt @@ -0,0 +1,20 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import space.kscience.dataforge.names.Name +import java.nio.charset.Charset + +internal object NameCoder : Coder { + override fun decode(entity: Entity, context: CodingContext): Name { + require(entity.hasData()) { "Entity should contain data" } + val string = entity.data.toString(Charset.defaultCharset()) + return Name.parse(string) + } + + override fun encode(value: Name, context: CodingContext): Entity = + Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt new file mode 100644 index 00000000..2265b524 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt @@ -0,0 +1,16 @@ +package space.kscience.dataforge.distributed.serialization + +import space.kscience.dataforge.data.DataSet +import kotlin.reflect.KType + +/** + * Represents [DataSet] that should be initialized before usage. + */ +internal interface SerializableDataSet : DataSet { + fun finishDecoding(type: KType) +} + +internal class SerializableDataSetAdapter(dataSet: DataSet) : + SerializableDataSet, DataSet by dataSet { + override fun finishDecoding(type: KType) = Unit +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt new file mode 100644 index 00000000..b3e400cc --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt @@ -0,0 +1,56 @@ +package space.kscience.dataforge.distributed.serialization + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.Goal +import space.kscience.dataforge.data.await +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MetaSerializer +import kotlin.reflect.KType + +/** + * [Data] representation that is trivially serializable. + */ +@Serializable +internal data class DataPrototype( + val meta: String, + val data: String, +) { + fun toData(type: KType): Data = + SimpleData( + type = type, + meta = Json.decodeFromString(MetaSerializer, meta), + data = Json.decodeFromString(kotlinx.serialization.serializer(type), data)!! + ) + + companion object { + suspend fun of(data: Data, serializer: KSerializer): DataPrototype { + val meta = Json.encodeToString(MetaSerializer, data.meta) + val string = Json.encodeToString(serializer, data.await()) + return DataPrototype(meta, string) + } + } +} + +/** + * Trivial [Data] implementation. + */ +private class SimpleData( + override val type: KType, + override val meta: Meta, + val data: T, +) : Data { + override val dependencies: Collection> + get() = emptyList() + + override val deferred: Deferred + get() = CompletableDeferred(data) + + override fun async(coroutineScope: CoroutineScope): Deferred = deferred + override fun reset() = Unit +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt new file mode 100644 index 00000000..9d74a9be --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -0,0 +1,73 @@ +package space.kscience.dataforge.distributed.serialization + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.NamedData +import space.kscience.dataforge.data.component1 +import space.kscience.dataforge.data.component2 +import space.kscience.dataforge.names.Name +import kotlin.reflect.KType + +/** + * [DataSet] representation that is trivially serializable. + */ +@Serializable +internal data class DataSetPrototype(val data: Map) { + fun toDataSet(): SerializableDataSet = + SerializableDataSetImpl(this) + + fun toJson(): String = Json.encodeToString(serializer(), this) + + companion object { + fun of(dataSet: DataSet): DataSetPrototype = runBlocking { + val serializer = serializer(dataSet.dataType) + val map = mutableListOf>() + dataSet.flowData().map { (name, data) -> + name.toString() to DataPrototype.of(data, serializer) + }.toList(map) + DataSetPrototype(map.associate { it }) + } + + fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) + } +} + +/** + * Trivial [SerializableDataSet] implementation. + */ +private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : SerializableDataSet { + + private lateinit var type: KType + private lateinit var data: Map> + + override fun finishDecoding(type: KType) { + this.type = type + this.data = prototype.data + .mapKeys { (name, _) -> Name.of(name) } + .mapValues { (_, dataPrototype) -> dataPrototype.toData(type) } + } + + override val dataType: KType + get() = type + + override fun flowData(): Flow> = + data.map { (name, data) -> SimpleNamedData(name, data) }.asFlow() + + override suspend fun getData(name: Name): Data? = data[name] + + /** + * Trivial named data implementation. + */ + private class SimpleNamedData( + override val name: Name, + override val data: Data, + ) : NamedData, Data by data +} diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt similarity index 87% rename from dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt rename to dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt index 378333dd..bb4bdce9 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.workspace +package space.kscience.dataforge.distributed import io.lambdarpc.utils.Endpoint import kotlinx.coroutines.runBlocking @@ -19,9 +19,11 @@ import space.kscience.dataforge.data.static import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.distributed.ClientWorkspacePlugin -import space.kscience.dataforge.workspace.distributed.ServiceWorkspace +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.WorkspacePlugin +import space.kscience.dataforge.workspace.task import kotlin.reflect.KClass +import kotlin.reflect.KType import kotlin.reflect.typeOf import kotlin.test.assertEquals @@ -46,11 +48,15 @@ private class MyPlugin : WorkspacePlugin() { } } -private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin( - MyPlugin.tag, - endpoint, - "task".asName() to typeOf() -) +private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(endpoint) { + override val tag: PluginTag + get() = MyPlugin.tag + + override val tasks: Map + get() = mapOf( + "task".asName() to typeOf() + ) +} @TestInstance(TestInstance.Lifecycle.PER_CLASS) class ServiceWorkspaceTest { diff --git a/dataforge-workspace/build.gradle.kts b/dataforge-workspace/build.gradle.kts index 5d83ba4b..350a0b01 100644 --- a/dataforge-workspace/build.gradle.kts +++ b/dataforge-workspace/build.gradle.kts @@ -12,30 +12,9 @@ kotlin { api(project(":dataforge-io")) } } - jvmMain { - dependencies { - // TODO include fat jar of lambdarpc - api(files("lambdarpc-core-0.0.1.jar")) - runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") - api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") - api("io.grpc:grpc-protobuf:1.44.0") - api("com.google.protobuf:protobuf-java-util:3.19.4") - api("com.google.protobuf:protobuf-kotlin:3.19.4") - api("io.grpc:grpc-kotlin-stub:1.2.1") - api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2") - api("org.slf4j:slf4j-simple:1.7.36") - api("io.github.microutils:kotlin-logging-jvm:2.1.21") - } - } - } -} - -kscience { - useSerialization { - json() } } readme { maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL -} \ No newline at end of file +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt deleted file mode 100644 index 302037a7..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt +++ /dev/null @@ -1,31 +0,0 @@ -package space.kscience.dataforge.workspace.distributed - -import io.lambdarpc.utils.Endpoint -import space.kscience.dataforge.context.AbstractPlugin -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task -import kotlin.reflect.KType - -/** - * Plugin that purpose is to communicate with remote plugins. - * @param tag Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. - * @param endpoint Endpoint of the remote plugin. - * @param tasks Enumeration of names of remote tasks and their result types. - */ -public abstract class ClientWorkspacePlugin( - override val tag: PluginTag, - endpoint: Endpoint, - vararg tasks: Pair, -) : AbstractPlugin() { - - private val tasks: Map> = tasks.associate { (name, type) -> - name to RemoteTask(endpoint, type) - } - - override fun content(target: String): Map = - when (target) { - Task.TYPE -> tasks - else -> emptyMap() - } -} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt deleted file mode 100644 index 7d48e930..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt +++ /dev/null @@ -1,237 +0,0 @@ -package space.kscience.dataforge.workspace.distributed - -import io.ktor.utils.io.core.* -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.dsl.LibService -import io.lambdarpc.dsl.def -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import io.lambdarpc.utils.Address -import io.lambdarpc.utils.Port -import io.lambdarpc.utils.toSid -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.runBlocking -import kotlinx.serialization.KSerializer -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.serializer -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.gather -import space.kscience.dataforge.data.Data -import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.Goal -import space.kscience.dataforge.data.NamedData -import space.kscience.dataforge.data.await -import space.kscience.dataforge.data.component1 -import space.kscience.dataforge.data.component2 -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.MetaSerializer -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.Task -import space.kscience.dataforge.workspace.TaskResult -import space.kscience.dataforge.workspace.Workspace -import space.kscience.dataforge.workspace.wrapResult -import java.nio.charset.Charset -import kotlin.reflect.KType - -/** - * Workspace that exposes its tasks for remote clients. - */ -public class ServiceWorkspace( - address: String = "localhost", - port: Int? = null, - override val context: Context = Global.buildContext("workspace".asName()), - data: DataSet<*> = runBlocking { DataTree {} }, - override val targets: Map = mapOf(), -) : Workspace, Closeable { - - override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) - - override val tasks: Map> - get() = context.gather(Task.TYPE) - - private val service = LibService(serviceId, address, port) { - execute of { name -> - val res = produce(name, Meta.EMPTY) - LazyDecodableDataSetAdapter(res) - } - } - - /** - * Address this workspace is available on. - */ - public val address: Address = Address(address) - - /** - * Port this workspace is available on. - */ - public val port: Port - get() = service.port - - /** - * Start [ServiceWorkspace] as a service. - */ - public fun start(): Unit = service.start() - - /** - * Await termination of the service. - */ - public fun awaitTermination(): Unit = service.awaitTermination() - - /** - * Shutdown service. - */ - public fun shutdown(): Unit = service.shutdown() - - override fun close(): Unit = service.shutdown() - - public companion object { - internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() - internal val execute by serviceId.def(NameCoder, DataSetCoder) - } -} - -private object NameCoder : Coder { - override fun decode(entity: Entity, context: CodingContext): Name { - require(entity.hasData()) { "Entity should contain data" } - val string = entity.data.toString(Charset.defaultCharset()) - return Name.parse(string) - } - - override fun encode(value: Name, context: CodingContext): Entity = - Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) -} - -@Serializable -private data class DataPrototype( - val meta: String, - val data: String, -) { - companion object { - suspend fun of(data: Data, serializer: KSerializer): DataPrototype { - val meta = Json.encodeToString(MetaSerializer, data.meta) - val string = Json.encodeToString(serializer, data.await()) - return DataPrototype(meta, string) - } - } -} - -/** - * Data class that represents serializable [DataSet]. - */ -@Serializable -private data class DataSetPrototype( - val data: Map, -) - -/** - * [DataSetPrototype] builder. - */ -private fun DataSet.toPrototype(): DataSetPrototype = runBlocking { - val serializer = serializer(dataType) - val map = mutableListOf>() - flowData().map { (name, data) -> - name.toString() to DataPrototype.of(data, serializer) - }.toList(map) - DataSetPrototype(map.associate { it }) -} - -/** - * Trivial [Data] implementation. - */ -private class SimpleData( - override val type: KType, - override val meta: Meta, - val data: Any, -) : Data { - override val dependencies: Collection> - get() = emptyList() - - override val deferred: Deferred - get() = CompletableDeferred(data) - - override fun async(coroutineScope: CoroutineScope): Deferred = deferred - override fun reset() = Unit -} - -/** - * Trivial named data implementation. - */ -private class SimpleNamedData( - override val name: Name, - override val data: Data, -) : NamedData, Data by data - -/** - * Represents [DataSet] that should be initialized before usage. - */ -internal interface LazyDecodableDataSet : DataSet { - fun finishDecoding(type: KType) -} - -private class LazyDecodableDataSetAdapter(val dataSet: DataSet) : - LazyDecodableDataSet, DataSet by dataSet { - override fun finishDecoding(type: KType) = Unit -} - -/** - * Trivial [LazyDecodableDataSet] implementation. - */ -private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet { - - lateinit var type: KType - lateinit var data: Map> - - override fun finishDecoding(type: KType) { - this.type = type - val serializer = serializer(type) - this.data = prototype.data - .mapKeys { (name, _) -> Name.of(name) } - .mapValues { (_, pair) -> - val (meta, data) = pair - Pair( - Json.decodeFromString(MetaSerializer, meta), - Json.decodeFromString(serializer, data)!! - ) - } - } - - override val dataType: KType - get() = type - - override fun flowData(): Flow> = - data.map { (name, pair) -> - val (meta, data) = pair - val wrapped = SimpleData(dataType, meta, data) - SimpleNamedData(name, wrapped) - }.asFlow() - - override suspend fun getData(name: Name): Data? = data[name]?.let { (meta, data) -> - SimpleData(dataType, meta, data) - } -} - -private object DataSetCoder : Coder> { - override fun decode(entity: Entity, context: CodingContext): LazyDecodableDataSet { - val string = entity.data.toString(Charset.defaultCharset()) - val prototype = Json.decodeFromString(serializer(), string) - return SimpleDataSet(prototype) - } - - override fun encode(value: LazyDecodableDataSet, context: CodingContext): Entity { - val prototype = value.toPrototype() - val string = Json.encodeToString(serializer(), prototype) - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 2e5ee74f..ff2adfdf 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -22,5 +22,6 @@ include( ":dataforge-context", ":dataforge-data", ":dataforge-workspace", - ":dataforge-scripting" -) \ No newline at end of file + ":dataforge-scripting", + ":dataforge-distributed", +) -- 2.34.1 From 07a0bd551bad2bd5dc7a619d184db7c9fe99ddac Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Tue, 10 May 2022 10:31:26 +0300 Subject: [PATCH 07/14] Await data in parallel --- .../distributed/serialization/dataSetSerialization.kt | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt index 9d74a9be..dad6fa1b 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -1,5 +1,7 @@ package space.kscience.dataforge.distributed.serialization +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map @@ -29,11 +31,11 @@ internal data class DataSetPrototype(val data: Map) { companion object { fun of(dataSet: DataSet): DataSetPrototype = runBlocking { val serializer = serializer(dataSet.dataType) - val map = mutableListOf>() + val map = mutableListOf>>() dataSet.flowData().map { (name, data) -> - name.toString() to DataPrototype.of(data, serializer) + name.toString() to async { DataPrototype.of(data, serializer) } }.toList(map) - DataSetPrototype(map.associate { it }) + DataSetPrototype(map.associate { (name, deferred) -> name to deferred.await() }) } fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) -- 2.34.1 From 230a3f1e228265ad381aa8d00ab6de23acbdd292 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Tue, 10 May 2022 10:49:38 +0300 Subject: [PATCH 08/14] Make dataset serialization non-blocking --- .../distributed/serialization/DataSetCoder.kt | 3 ++- .../serialization/dataSetSerialization.kt | 16 +++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt index 1bb218b1..02392235 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt @@ -5,6 +5,7 @@ import io.lambdarpc.coding.CodingContext import io.lambdarpc.transport.grpc.Entity import io.lambdarpc.transport.serialization.Entity import io.lambdarpc.transport.serialization.RawData +import kotlinx.coroutines.runBlocking import java.nio.charset.Charset internal object DataSetCoder : Coder> { @@ -15,7 +16,7 @@ internal object DataSetCoder : Coder> { } override fun encode(value: SerializableDataSet, context: CodingContext): Entity { - val prototype = DataSetPrototype.of(value) + val prototype = runBlocking { DataSetPrototype.of(value) } // TODO update LambdaRPC and remove blocking val string = prototype.toJson() return Entity(RawData.copyFrom(string, Charset.defaultCharset())) } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt index dad6fa1b..f30aa0b2 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -2,11 +2,11 @@ package space.kscience.dataforge.distributed.serialization import kotlinx.coroutines.Deferred import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.runBlocking import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import kotlinx.serialization.serializer @@ -29,13 +29,13 @@ internal data class DataSetPrototype(val data: Map) { fun toJson(): String = Json.encodeToString(serializer(), this) companion object { - fun of(dataSet: DataSet): DataSetPrototype = runBlocking { + suspend fun of(dataSet: DataSet): DataSetPrototype = coroutineScope { val serializer = serializer(dataSet.dataType) - val map = mutableListOf>>() + val flow = mutableListOf>>() dataSet.flowData().map { (name, data) -> name.toString() to async { DataPrototype.of(data, serializer) } - }.toList(map) - DataSetPrototype(map.associate { (name, deferred) -> name to deferred.await() }) + }.toList(flow) + DataSetPrototype(flow.associate { (name, deferred) -> name to deferred.await() }) } fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) @@ -68,8 +68,6 @@ private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : /** * Trivial named data implementation. */ - private class SimpleNamedData( - override val name: Name, - override val data: Data, - ) : NamedData, Data by data + private class SimpleNamedData(override val name: Name, override val data: Data) : + NamedData, Data by data } -- 2.34.1 From 77f8f045e63bc74b69324b58e7cb962cc1488d2f Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Sat, 21 May 2022 22:01:34 +0300 Subject: [PATCH 09/14] Refactor remote execution model --- dataforge-distributed/build.gradle.kts | 3 +- .../distributed/ClientWorkspacePlugin.kt | 37 ------ .../dataforge/distributed/RemotePlugin.kt | 36 ++++++ .../dataforge/distributed/RemoteTask.kt | 28 ++--- .../dataforge/distributed/ServiceWorkspace.kt | 70 ++++++++--- .../dataforge/distributed/TaskRegistry.kt | 18 +++ .../distributed/serialization/DataSetCoder.kt | 23 ---- .../distributed/serialization/MetaCoder.kt | 23 ++++ .../distributed/serialization/NameCoder.kt | 12 +- .../serialization/SerializableDataSet.kt | 16 --- .../serialization/TaskRegistryCoder.kt | 22 ++++ .../serialization/dataSerialization.kt | 4 +- .../serialization/dataSetSerialization.kt | 46 +++---- .../kscience/dataforge/distributed/Plugins.kt | 61 ++++++++++ .../dataforge/distributed/RemoteCallTest.kt | 89 ++++++++++++++ .../distributed/ServiceWorkspaceTest.kt | 112 ------------------ .../kscience/dataforge/workspace/Task.kt | 37 ++++-- .../dataforge/workspace/WorkspaceBuilder.kt | 16 ++- 18 files changed, 386 insertions(+), 267 deletions(-) delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt create mode 100644 dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt create mode 100644 dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt delete mode 100644 dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt diff --git a/dataforge-distributed/build.gradle.kts b/dataforge-distributed/build.gradle.kts index e5629e81..04d31e07 100644 --- a/dataforge-distributed/build.gradle.kts +++ b/dataforge-distributed/build.gradle.kts @@ -15,7 +15,8 @@ kotlin { jvmMain { dependencies { // TODO include fat jar of lambdarpc - api(files("lambdarpc-core-0.0.1.jar")) + val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs" + api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar")) runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") api("io.grpc:grpc-protobuf:1.44.0") diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt deleted file mode 100644 index c9dec94e..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt +++ /dev/null @@ -1,37 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import space.kscience.dataforge.context.AbstractPlugin -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task -import kotlin.reflect.KType - -/** - * Plugin that purpose is to communicate with remote plugins. - * @param endpoint Endpoint of the remote plugin. - */ -public abstract class ClientWorkspacePlugin(endpoint: Endpoint) : AbstractPlugin() { - - /** - * Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. - */ - abstract override val tag: PluginTag - - /** - * Enumeration of names of remote tasks and their result types. - */ - public abstract val tasks: Map - - private val _tasks: Map> by lazy { - tasks.mapValues { (_, type) -> - RemoteTask(endpoint, type) - } - } - - override fun content(target: String): Map = - when (target) { - Task.TYPE -> _tasks - else -> emptyMap() - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt new file mode 100644 index 00000000..0f37753b --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt @@ -0,0 +1,36 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.utils.Endpoint +import space.kscience.dataforge.context.AbstractPlugin +import space.kscience.dataforge.context.Plugin +import space.kscience.dataforge.context.PluginFactory +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.SerializableResultTask +import space.kscience.dataforge.workspace.Task + +/** + * Plugin that purpose is to communicate with remote plugins. + * @param plugin A remote plugin to be used. + * @param endpoint Endpoint of the remote plugin. + */ +public class RemotePlugin

(private val plugin: P, private val endpoint: String) : AbstractPlugin() { + + public constructor(factory: PluginFactory

, endpoint: String) : this(factory(), endpoint) + + override val tag: PluginTag + get() = plugin.tag + + private val tasks = plugin.content(Task.TYPE) + .filterValues { it is SerializableResultTask<*> } + .mapValues { (_, task) -> + require(task is SerializableResultTask<*>) + RemoteTask(Endpoint(endpoint), task.resultType, task.resultSerializer) + } + + override fun content(target: String): Map = + when (target) { + Task.TYPE -> tasks + else -> emptyMap() + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt index e0d3cb3b..61b5b104 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt @@ -1,13 +1,13 @@ package space.kscience.dataforge.distributed -import io.lambdarpc.dsl.ServiceDispatcher +import io.lambdarpc.context.ServiceDispatcher import io.lambdarpc.utils.Endpoint import kotlinx.coroutines.withContext -import space.kscience.dataforge.data.DataSet +import kotlinx.serialization.KSerializer import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.SerializableResultTask import space.kscience.dataforge.workspace.TaskResult import space.kscience.dataforge.workspace.Workspace import space.kscience.dataforge.workspace.wrapResult @@ -17,20 +17,20 @@ import kotlin.reflect.KType * Proxy task that communicates with the corresponding remote task. */ internal class RemoteTask( - endpoint: Endpoint, + internal val endpoint: Endpoint, override val resultType: KType, + override val resultSerializer: KSerializer, override val descriptor: MetaDescriptor? = null, -) : Task { + private val taskRegistry: TaskRegistry? = null, +) : SerializableResultTask { private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint) - @Suppress("UNCHECKED_CAST") - override suspend fun execute( - workspace: Workspace, - taskName: Name, - taskMeta: Meta, - ): TaskResult = withContext(dispatcher) { - val dataset = ServiceWorkspace.execute(taskName) - dataset.finishDecoding(resultType) - workspace.wrapResult(dataset as DataSet, taskName, taskMeta) + override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult { + val registry = taskRegistry ?: TaskRegistry(workspace.tasks) + val result = withContext(dispatcher) { + ServiceWorkspace.execute(taskName, taskMeta, registry) + } + val dataSet = result.toDataSet(resultType, resultSerializer) + return workspace.wrapResult(dataSet, taskName, taskMeta) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt index b6849754..3eecfd7c 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -3,21 +3,23 @@ package space.kscience.dataforge.distributed import io.ktor.utils.io.core.* import io.lambdarpc.dsl.LibService import io.lambdarpc.dsl.def -import io.lambdarpc.utils.Address -import io.lambdarpc.utils.Port -import io.lambdarpc.utils.toSid +import io.lambdarpc.dsl.j +import io.lambdarpc.utils.ServiceId import kotlinx.coroutines.runBlocking +import kotlinx.serialization.KSerializer import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.gather import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.distributed.serialization.DataSetCoder +import space.kscience.dataforge.distributed.serialization.DataSetPrototype +import space.kscience.dataforge.distributed.serialization.MetaCoder import space.kscience.dataforge.distributed.serialization.NameCoder -import space.kscience.dataforge.distributed.serialization.SerializableDataSetAdapter +import space.kscience.dataforge.distributed.serialization.TaskRegistryCoder import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.SerializableResultTask import space.kscience.dataforge.workspace.Task import space.kscience.dataforge.workspace.TaskResult import space.kscience.dataforge.workspace.Workspace @@ -25,37 +27,73 @@ import space.kscience.dataforge.workspace.wrapResult /** * Workspace that exposes its tasks for remote clients. + * @param port Port to start service on. Will be random if null. */ public class ServiceWorkspace( - address: String = "localhost", port: Int? = null, override val context: Context = Global.buildContext("workspace".asName()), + private val dataSerializer: KSerializer? = null, data: DataSet<*> = runBlocking { DataTree {} }, override val targets: Map = mapOf(), ) : Workspace, Closeable { + private val _port: Int? = port override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) override val tasks: Map> get() = context.gather(Task.TYPE) - private val service = LibService(serviceId, address, port) { - execute of { name -> - val res = produce(name, Meta.EMPTY) - SerializableDataSetAdapter(res) + private val service = LibService(serviceId, port) { + execute of { name, meta, taskRegistry -> + if (name == Name.EMPTY) { + requireNotNull(dataSerializer) { "Data serializer is not provided on $port" } + DataSetPrototype.of(data, dataSerializer) + } else { + val task = tasks[name] ?: error("Task $name does not exist locally") + require(task is SerializableResultTask) { "Result of $name cannot be serialized" } + val workspace = ProxyWorkspace(taskRegistry) + + // Local function to capture generic parameter + suspend fun execute(task: SerializableResultTask): DataSetPrototype { + val result = task.execute(workspace, name, meta) + return DataSetPrototype.of(result, task.resultSerializer) + } + execute(task) + } } } /** - * Address this workspace is available on. + * Proxies task calls to right endpoints according to the [TaskRegistry]. */ - public val address: Address = Address(address) + private inner class ProxyWorkspace(private val taskRegistry: TaskRegistry) : Workspace by this { + override val tasks: Map> + get() = object : AbstractMap>() { + override val entries: Set>> + get() = this@ServiceWorkspace.tasks.entries + + override fun get(key: Name): Task<*>? = remoteTask(key) ?: this@ServiceWorkspace.tasks[key] + } + + /** + * Call default implementation to use [tasks] virtually instead of it in [ServiceWorkspace]. + */ + override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = + super.produce(taskName, taskMeta) + + private fun remoteTask(name: Name): RemoteTask<*>? { + val endpoint = taskRegistry.tasks[name] ?: return null + val local = this@ServiceWorkspace.tasks[name] ?: error("No task with name $name locally on $port") + require(local is SerializableResultTask) { "Task $name result is not serializable" } + return RemoteTask(endpoint, local.resultType, local.resultSerializer, local.descriptor, taskRegistry) + } + } /** * Port this workspace is available on. */ - public val port: Port - get() = service.port + public val port: Int + get() = _port ?: service.port.p /** * Start [ServiceWorkspace] as a service. @@ -75,7 +113,7 @@ public class ServiceWorkspace( override fun close(): Unit = service.shutdown() public companion object { - internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() - internal val execute by serviceId.def(NameCoder, DataSetCoder) + internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246") + internal val execute by serviceId.def(NameCoder, MetaCoder, TaskRegistryCoder, j()) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt new file mode 100644 index 00000000..aea1b29e --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt @@ -0,0 +1,18 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.utils.Endpoint +import kotlinx.serialization.Serializable +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task + +@Serializable +internal class TaskRegistry(val tasks: Map) + +internal fun TaskRegistry(tasks: Map>): TaskRegistry { + val remotes = tasks.filterValues { it is RemoteTask<*> } + val endpoints = remotes.mapValues { (_, task) -> + require(task is RemoteTask) + task.endpoint + } + return TaskRegistry(endpoints) +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt deleted file mode 100644 index 02392235..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt +++ /dev/null @@ -1,23 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import kotlinx.coroutines.runBlocking -import java.nio.charset.Charset - -internal object DataSetCoder : Coder> { - override fun decode(entity: Entity, context: CodingContext): SerializableDataSet { - val string = entity.data.toString(Charset.defaultCharset()) - val prototype = DataSetPrototype.fromJson(string) - return prototype.toDataSet() - } - - override fun encode(value: SerializableDataSet, context: CodingContext): Entity { - val prototype = runBlocking { DataSetPrototype.of(value) } // TODO update LambdaRPC and remove blocking - val string = prototype.toJson() - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt new file mode 100644 index 00000000..d5425271 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt @@ -0,0 +1,23 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import kotlinx.serialization.json.Json +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MetaSerializer +import java.nio.charset.Charset + +internal object MetaCoder : Coder { + override suspend fun decode(entity: Entity, context: CodingContext): Meta { + val string = entity.data.toString(Charset.defaultCharset()) + return Json.decodeFromString(MetaSerializer, string) + } + + override suspend fun encode(value: Meta, context: CodingContext): Entity { + val string = Json.encodeToString(MetaSerializer, value) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt index beb22124..87d51901 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt @@ -5,16 +5,18 @@ import io.lambdarpc.coding.CodingContext import io.lambdarpc.transport.grpc.Entity import io.lambdarpc.transport.serialization.Entity import io.lambdarpc.transport.serialization.RawData +import kotlinx.serialization.json.Json import space.kscience.dataforge.names.Name import java.nio.charset.Charset internal object NameCoder : Coder { - override fun decode(entity: Entity, context: CodingContext): Name { - require(entity.hasData()) { "Entity should contain data" } + override suspend fun decode(entity: Entity, context: CodingContext): Name { val string = entity.data.toString(Charset.defaultCharset()) - return Name.parse(string) + return Json.decodeFromString(Name.serializer(), string) } - override fun encode(value: Name, context: CodingContext): Entity = - Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) + override suspend fun encode(value: Name, context: CodingContext): Entity { + val string = Json.encodeToString(Name.serializer(), value) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt deleted file mode 100644 index 2265b524..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt +++ /dev/null @@ -1,16 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import space.kscience.dataforge.data.DataSet -import kotlin.reflect.KType - -/** - * Represents [DataSet] that should be initialized before usage. - */ -internal interface SerializableDataSet : DataSet { - fun finishDecoding(type: KType) -} - -internal class SerializableDataSetAdapter(dataSet: DataSet) : - SerializableDataSet, DataSet by dataSet { - override fun finishDecoding(type: KType) = Unit -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt new file mode 100644 index 00000000..68c7e87f --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt @@ -0,0 +1,22 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import kotlinx.serialization.json.Json +import space.kscience.dataforge.distributed.TaskRegistry +import java.nio.charset.Charset + +internal object TaskRegistryCoder : Coder { + override suspend fun decode(entity: Entity, context: CodingContext): TaskRegistry { + val string = entity.data.toString(Charset.defaultCharset()) + return Json.decodeFromString(TaskRegistry.serializer(), string) + } + + override suspend fun encode(value: TaskRegistry, context: CodingContext): Entity { + val string = Json.encodeToString(TaskRegistry.serializer(), value) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt index b3e400cc..0ca9511c 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt @@ -21,11 +21,11 @@ internal data class DataPrototype( val meta: String, val data: String, ) { - fun toData(type: KType): Data = + fun toData(type: KType, serializer: KSerializer): Data = SimpleData( type = type, meta = Json.decodeFromString(MetaSerializer, meta), - data = Json.decodeFromString(kotlinx.serialization.serializer(type), data)!! + data = Json.decodeFromString(serializer, data) ) companion object { diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt index f30aa0b2..e9941810 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -7,9 +7,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList +import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.serializer import space.kscience.dataforge.data.Data import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.NamedData @@ -23,51 +22,40 @@ import kotlin.reflect.KType */ @Serializable internal data class DataSetPrototype(val data: Map) { - fun toDataSet(): SerializableDataSet = - SerializableDataSetImpl(this) - - fun toJson(): String = Json.encodeToString(serializer(), this) + fun toDataSet(type: KType, serializer: KSerializer): DataSet { + val data = data + .mapKeys { (name, _) -> Name.of(name) } + .mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) } + return SerializableDataSetImpl(type, data) + } companion object { - suspend fun of(dataSet: DataSet): DataSetPrototype = coroutineScope { - val serializer = serializer(dataSet.dataType) + suspend fun of(dataSet: DataSet, serializer: KSerializer): DataSetPrototype = coroutineScope { val flow = mutableListOf>>() dataSet.flowData().map { (name, data) -> name.toString() to async { DataPrototype.of(data, serializer) } }.toList(flow) DataSetPrototype(flow.associate { (name, deferred) -> name to deferred.await() }) } - - fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) } } /** - * Trivial [SerializableDataSet] implementation. + * Trivial [DataSet] implementation. */ -private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : SerializableDataSet { +private class SerializableDataSetImpl( + override val dataType: KType, + private val data: Map>, +) : DataSet { - private lateinit var type: KType - private lateinit var data: Map> - - override fun finishDecoding(type: KType) { - this.type = type - this.data = prototype.data - .mapKeys { (name, _) -> Name.of(name) } - .mapValues { (_, dataPrototype) -> dataPrototype.toData(type) } - } - - override val dataType: KType - get() = type - - override fun flowData(): Flow> = + override fun flowData(): Flow> = data.map { (name, data) -> SimpleNamedData(name, data) }.asFlow() - override suspend fun getData(name: Name): Data? = data[name] + override suspend fun getData(name: Name): Data? = data[name] /** * Trivial named data implementation. */ - private class SimpleNamedData(override val name: Name, override val data: Data) : - NamedData, Data by data + private class SimpleNamedData(override val name: Name, override val data: Data) : + NamedData, Data by data } diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt new file mode 100644 index 00000000..7b1bd637 --- /dev/null +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt @@ -0,0 +1,61 @@ +package space.kscience.dataforge.distributed + +import kotlinx.serialization.serializer +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.PluginFactory +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.context.info +import space.kscience.dataforge.context.logger +import space.kscience.dataforge.data.map +import space.kscience.dataforge.data.select +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.WorkspacePlugin +import space.kscience.dataforge.workspace.fromTask +import space.kscience.dataforge.workspace.task +import kotlin.reflect.KClass + +internal class MyPlugin1 : WorkspacePlugin() { + override val tag: PluginTag + get() = Factory.tag + + val task by task(serializer()) { + workspace.logger.info { "In ${tag.name}.task" } + val myInt = workspace.data.select() + val res = myInt.getData("int".asName())!! + emit("result".asName(), res.map { it + 1 }) + } + + companion object Factory : PluginFactory { + override fun invoke(meta: Meta, context: Context): MyPlugin1 = MyPlugin1() + + override val tag: PluginTag + get() = PluginTag("Plg1") + + override val type: KClass + get() = MyPlugin1::class + } +} + +internal class MyPlugin2 : WorkspacePlugin() { + override val tag: PluginTag + get() = Factory.tag + + val task by task(serializer()) { + workspace.logger.info { "In ${tag.name}.task" } + val dataSet = fromTask(Name.of(MyPlugin1.tag.name, "task")) + val data = dataSet.getData("result".asName())!! + emit("result".asName(), data.map { it + 1 }) + } + + companion object Factory : PluginFactory { + override fun invoke(meta: Meta, context: Context): MyPlugin2 = MyPlugin2() + + override val tag: PluginTag + get() = PluginTag("Plg2") + + override val type: KClass + get() = MyPlugin2::class + } +} diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt new file mode 100644 index 00000000..3220adf2 --- /dev/null +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt @@ -0,0 +1,89 @@ +package space.kscience.dataforge.distributed + +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.await +import space.kscience.dataforge.data.getData +import space.kscience.dataforge.data.static +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.Workspace +import kotlin.test.assertEquals + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +internal class RemoteCallTest { + + private lateinit var worker1: ServiceWorkspace + private lateinit var worker2: ServiceWorkspace + private lateinit var workspace: Workspace + + @BeforeAll + fun before() { + worker1 = ServiceWorkspace( + context = Global.buildContext("worker1".asName()) { + plugin(MyPlugin1) + }, + data = runBlocking { + DataTree { + static("int", 42) + } + }, + ) + worker1.start() + + worker2 = ServiceWorkspace( + context = Global.buildContext("worker2".asName()) { + plugin(MyPlugin1) + plugin(MyPlugin2) + }, + ) + worker2.start() + + workspace = Workspace { + context { + plugin(RemotePlugin(MyPlugin1, "localhost:${worker1.port}")) + plugin(RemotePlugin(MyPlugin2, "localhost:${worker2.port}")) + } + } + } + + @AfterAll + fun after() { + worker1.shutdown() + worker2.shutdown() + } + + @Test + fun `local execution`() = runBlocking { + assertEquals(42, worker1.data.getData("int")!!.await()) + val res = worker1 + .produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(43, res) + } + + @Test + fun `remote execution`() = runBlocking { + val remoteRes = workspace + .produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(43, remoteRes) + } + + @Test + fun `transitive execution`() = runBlocking { + val remoteRes = workspace + .produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(44, remoteRes) + } +} diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt deleted file mode 100644 index bb4bdce9..00000000 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt +++ /dev/null @@ -1,112 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.AfterAll -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.TestInstance -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.PluginFactory -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.await -import space.kscience.dataforge.data.getData -import space.kscience.dataforge.data.map -import space.kscience.dataforge.data.select -import space.kscience.dataforge.data.static -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.Workspace -import space.kscience.dataforge.workspace.WorkspacePlugin -import space.kscience.dataforge.workspace.task -import kotlin.reflect.KClass -import kotlin.reflect.KType -import kotlin.reflect.typeOf -import kotlin.test.assertEquals - -private class MyPlugin : WorkspacePlugin() { - override val tag: PluginTag - get() = Factory.tag - - val task by task { - val myInt = workspace.data.select() - val res = myInt.getData("int".asName())!! - emit("result".asName(), res.map { it + 1 }) - } - - companion object Factory : PluginFactory { - override fun invoke(meta: Meta, context: Context): MyPlugin = MyPlugin() - - override val tag: PluginTag - get() = PluginTag("Plg") - - override val type: KClass - get() = MyPlugin::class - } -} - -private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(endpoint) { - override val tag: PluginTag - get() = MyPlugin.tag - - override val tasks: Map - get() = mapOf( - "task".asName() to typeOf() - ) -} - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -class ServiceWorkspaceTest { - - private lateinit var worker1: ServiceWorkspace - private lateinit var workspace: Workspace - - @BeforeAll - fun before() { - worker1 = ServiceWorkspace( - context = Global.buildContext("worker1".asName()) { - plugin(MyPlugin) - }, - data = runBlocking { - DataTree { - static("int", 0) - } - }, - ) - worker1.start() - - workspace = Workspace { - context { - val endpoint = Endpoint(worker1.address, worker1.port) - plugin(RemoteMyPlugin(endpoint)) - } - } - } - - @AfterAll - fun after() { - worker1.shutdown() - } - - @Test - fun localExecution() = runBlocking { - assertEquals(0, worker1.data.getData("int")!!.await()) - val res = worker1 - .produce(Name.of("Plg", "task"), Meta.EMPTY) - .getData("result".asName())!! - .await() - assertEquals(1, res) - } - - @Test - fun remoteExecution() = runBlocking { - val remoteRes = workspace - .produce(Name.of("Plg", "task"), Meta.EMPTY) - .getData("result".asName())!! - .await() - assertEquals(1, remoteRes) - } -} diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index 042945c5..6260245b 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -1,6 +1,8 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.withContext +import kotlinx.serialization.KSerializer +import kotlinx.serialization.serializer import space.kscience.dataforge.data.DataSetBuilder import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.GoalExecutionRestriction @@ -17,11 +19,6 @@ import kotlin.reflect.typeOf @Type(TYPE) public interface Task : Described { - /** - * Type of the task result data. - */ - public val resultType: KType - /** * Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model * and a handler for actual result @@ -37,6 +34,12 @@ public interface Task : Described { } } +@Type(TYPE) +public interface SerializableResultTask : Task { + public val resultType: KType + public val resultSerializer: KSerializer +} + public class TaskResultBuilder( public val workspace: Workspace, public val taskName: Name, @@ -60,9 +63,6 @@ public fun Task( builder: suspend TaskResultBuilder.() -> Unit, ): Task = object : Task { - override val resultType: KType - get() = resultType - override val descriptor: MetaDescriptor? = descriptor override suspend fun execute( @@ -78,9 +78,28 @@ public fun Task( } } +/** + * [Task] that has [resultSerializer] to be able to cache or send its results + */ +@DFInternal +public class SerializableResultTaskImpl( + override val resultType: KType, + override val resultSerializer: KSerializer, + descriptor: MetaDescriptor? = null, + builder: suspend TaskResultBuilder.() -> Unit, +) : SerializableResultTask, Task by Task(resultType, descriptor, builder) + @OptIn(DFInternal::class) @Suppress("FunctionName") public inline fun Task( descriptor: MetaDescriptor? = null, noinline builder: suspend TaskResultBuilder.() -> Unit, -): Task = Task(typeOf(), descriptor, builder) \ No newline at end of file +): Task = Task(typeOf(), descriptor, builder) + +@OptIn(DFInternal::class) +@Suppress("FunctionName") +public inline fun SerializableResultTask( + resultSerializer: KSerializer = serializer(), + descriptor: MetaDescriptor? = null, + noinline builder: suspend TaskResultBuilder.() -> Unit, +): Task = SerializableResultTaskImpl(typeOf(), resultSerializer, descriptor, builder) diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index ec078020..6413030b 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -1,5 +1,6 @@ package space.kscience.dataforge.workspace +import kotlinx.serialization.KSerializer import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.Global @@ -37,25 +38,34 @@ public interface TaskContainer { public inline fun TaskContainer.registerTask( name: String, + resultSerializer: KSerializer? = null, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline builder: suspend TaskResultBuilder.() -> Unit, -): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder)) +) { + val descriptor = MetaDescriptor(descriptorBuilder) + val task = if (resultSerializer == null) Task(descriptor, builder) else + SerializableResultTask(resultSerializer, descriptor, builder) + registerTask(Name.parse(name), task) +} public inline fun TaskContainer.task( descriptor: MetaDescriptor, + resultSerializer: KSerializer? = null, noinline builder: suspend TaskResultBuilder.() -> Unit, ): PropertyDelegateProvider>> = PropertyDelegateProvider { _, property -> val taskName = Name.parse(property.name) - val task = Task(descriptor, builder) + val task = if (resultSerializer == null) Task(descriptor, builder) else + SerializableResultTask(resultSerializer, descriptor, builder) registerTask(taskName, task) ReadOnlyProperty { _, _ -> TaskReference(taskName, task) } } public inline fun TaskContainer.task( + resultSerializer: KSerializer? = null, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline builder: suspend TaskResultBuilder.() -> Unit, ): PropertyDelegateProvider>> = - task(MetaDescriptor(descriptorBuilder), builder) + task(MetaDescriptor(descriptorBuilder), resultSerializer, builder) public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer { private var context: Context? = null -- 2.34.1 From 114d310fdcb411d73a768e77934e43fe5c543605 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Wed, 25 May 2022 13:51:26 +0300 Subject: [PATCH 10/14] Replace custom json coders with default --- .../dataforge/distributed/ServiceWorkspace.kt | 10 ++++---- .../distributed/serialization/MetaCoder.kt | 23 ------------------- .../distributed/serialization/NameCoder.kt | 22 ------------------ .../serialization/TaskRegistryCoder.kt | 22 ------------------ 4 files changed, 6 insertions(+), 71 deletions(-) delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt index 3eecfd7c..cf17ce04 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -1,6 +1,7 @@ package space.kscience.dataforge.distributed import io.ktor.utils.io.core.* +import io.lambdarpc.coding.coders.JsonCoder import io.lambdarpc.dsl.LibService import io.lambdarpc.dsl.def import io.lambdarpc.dsl.j @@ -13,10 +14,8 @@ import space.kscience.dataforge.context.gather import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.distributed.serialization.DataSetPrototype -import space.kscience.dataforge.distributed.serialization.MetaCoder -import space.kscience.dataforge.distributed.serialization.NameCoder -import space.kscience.dataforge.distributed.serialization.TaskRegistryCoder import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MetaSerializer import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName import space.kscience.dataforge.workspace.SerializableResultTask @@ -114,6 +113,9 @@ public class ServiceWorkspace( public companion object { internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246") - internal val execute by serviceId.def(NameCoder, MetaCoder, TaskRegistryCoder, j()) + internal val execute by serviceId.def( + JsonCoder(Name.serializer()), JsonCoder(MetaSerializer), j(), + j() + ) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt deleted file mode 100644 index d5425271..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt +++ /dev/null @@ -1,23 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import kotlinx.serialization.json.Json -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.MetaSerializer -import java.nio.charset.Charset - -internal object MetaCoder : Coder { - override suspend fun decode(entity: Entity, context: CodingContext): Meta { - val string = entity.data.toString(Charset.defaultCharset()) - return Json.decodeFromString(MetaSerializer, string) - } - - override suspend fun encode(value: Meta, context: CodingContext): Entity { - val string = Json.encodeToString(MetaSerializer, value) - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt deleted file mode 100644 index 87d51901..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt +++ /dev/null @@ -1,22 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import kotlinx.serialization.json.Json -import space.kscience.dataforge.names.Name -import java.nio.charset.Charset - -internal object NameCoder : Coder { - override suspend fun decode(entity: Entity, context: CodingContext): Name { - val string = entity.data.toString(Charset.defaultCharset()) - return Json.decodeFromString(Name.serializer(), string) - } - - override suspend fun encode(value: Name, context: CodingContext): Entity { - val string = Json.encodeToString(Name.serializer(), value) - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt deleted file mode 100644 index 68c7e87f..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt +++ /dev/null @@ -1,22 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import kotlinx.serialization.json.Json -import space.kscience.dataforge.distributed.TaskRegistry -import java.nio.charset.Charset - -internal object TaskRegistryCoder : Coder { - override suspend fun decode(entity: Entity, context: CodingContext): TaskRegistry { - val string = entity.data.toString(Charset.defaultCharset()) - return Json.decodeFromString(TaskRegistry.serializer(), string) - } - - override suspend fun encode(value: TaskRegistry, context: CodingContext): Entity { - val string = Json.encodeToString(TaskRegistry.serializer(), value) - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} -- 2.34.1 From 196854429ad91bffd65a48851d4c309a8ba6b091 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Tue, 31 May 2022 22:50:09 +0300 Subject: [PATCH 11/14] Non-suspend data tree builder --- .../kscience/dataforge/data/StaticDataTree.kt | 21 ++++++++++++++++--- .../dataforge/distributed/ServiceWorkspace.kt | 9 ++------ .../dataforge/distributed/RemoteCallTest.kt | 8 +++---- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt index e68c16e2..89d1eeac 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt @@ -1,9 +1,18 @@ package space.kscience.dataforge.data import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.collect import space.kscience.dataforge.misc.DFExperimental -import space.kscience.dataforge.names.* +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.NameToken +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.cutFirst +import space.kscience.dataforge.names.cutLast +import space.kscience.dataforge.names.firstOrNull +import space.kscience.dataforge.names.isEmpty +import space.kscience.dataforge.names.lastOrNull +import space.kscience.dataforge.names.length +import space.kscience.dataforge.names.plus +import kotlin.collections.set import kotlin.reflect.KType import kotlin.reflect.typeOf @@ -61,6 +70,12 @@ internal class StaticDataTree( } } +@Suppress("FunctionName") +public fun DataTree(dataType: KType): DataTree = StaticDataTree(dataType) + +@Suppress("FunctionName") +public inline fun DataTree(): DataTree = DataTree(typeOf()) + @Suppress("FunctionName") public suspend fun DataTree( dataType: KType, @@ -73,6 +88,6 @@ public suspend inline fun DataTree( ): DataTree = DataTree(typeOf(), block) @OptIn(DFExperimental::class) -public suspend fun DataSet.seal(): DataTree = DataTree(dataType){ +public suspend fun DataSet.seal(): DataTree = DataTree(dataType) { populate(this@seal) } \ No newline at end of file diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt index cf17ce04..8df7853a 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -1,12 +1,10 @@ package space.kscience.dataforge.distributed import io.ktor.utils.io.core.* -import io.lambdarpc.coding.coders.JsonCoder import io.lambdarpc.dsl.LibService import io.lambdarpc.dsl.def import io.lambdarpc.dsl.j import io.lambdarpc.utils.ServiceId -import kotlinx.coroutines.runBlocking import kotlinx.serialization.KSerializer import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Global @@ -32,7 +30,7 @@ public class ServiceWorkspace( port: Int? = null, override val context: Context = Global.buildContext("workspace".asName()), private val dataSerializer: KSerializer? = null, - data: DataSet<*> = runBlocking { DataTree {} }, + data: DataSet<*> = DataTree(), override val targets: Map = mapOf(), ) : Workspace, Closeable { private val _port: Int? = port @@ -113,9 +111,6 @@ public class ServiceWorkspace( public companion object { internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246") - internal val execute by serviceId.def( - JsonCoder(Name.serializer()), JsonCoder(MetaSerializer), j(), - j() - ) + internal val execute by serviceId.def(j(), j(MetaSerializer), j(), j()) } } diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt index 3220adf2..d123d834 100644 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt @@ -24,15 +24,13 @@ internal class RemoteCallTest { private lateinit var workspace: Workspace @BeforeAll - fun before() { + fun before() = runBlocking { worker1 = ServiceWorkspace( context = Global.buildContext("worker1".asName()) { plugin(MyPlugin1) }, - data = runBlocking { - DataTree { - static("int", 42) - } + data = DataTree { + static("int", 42) }, ) worker1.start() -- 2.34.1 From 7e13c3dec674e7f48650ba5dd0a077f8f9af4989 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Wed, 1 Jun 2022 23:31:51 +0300 Subject: [PATCH 12/14] Endpoint info via meta --- .../dataforge/distributed/NodeWorkspace.kt | 78 +++++++++++++ .../dataforge/distributed/RemotePlugin.kt | 39 ------- .../dataforge/distributed/RemoteTask.kt | 9 +- .../distributed/RemoteTaskWorkspace.kt | 73 ++++++++++++ .../dataforge/distributed/ServiceWorkspace.kt | 107 ++---------------- .../dataforge/distributed/TaskRegistry.kt | 18 --- .../kscience/dataforge/distributed/Plugins.kt | 4 +- .../dataforge/distributed/RemoteCallTest.kt | 24 ++-- .../kscience/dataforge/meta/MutableMeta.kt | 24 +++- 9 files changed, 205 insertions(+), 171 deletions(-) create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt create mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt delete mode 100644 dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt new file mode 100644 index 00000000..c9238c09 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt @@ -0,0 +1,78 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.dsl.LibService +import kotlinx.serialization.KSerializer +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute +import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId +import space.kscience.dataforge.distributed.serialization.DataSetPrototype +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.put +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.plus +import space.kscience.dataforge.workspace.SerializableResultTask + +/** + * Workspace that exposes its tasks for remote clients. + * @param port Port to start service on. Will be random if null. + */ +public class NodeWorkspace( + port: Int? = null, + context: Context = Global.buildContext("workspace".asName()), + private val dataSerializer: KSerializer? = null, + data: DataSet<*> = DataTree(), + targets: Map = mapOf(), +) : RemoteTaskWorkspace(context, data, targets), ServiceWorkspace { + private val _port: Int? = port + + private val service = LibService(serviceId, port) { + execute of { name, meta, executionContext -> + if (name == Name.EMPTY) { + requireNotNull(dataSerializer) { "Data serializer is not provided on $port" } + DataSetPrototype.of(data, dataSerializer) + } else { + val proxyContext = context.buildContext(context.name + "proxy") { + properties { + put(executionContext) + } + } + val proxy = RemoteTaskWorkspace(context = proxyContext, data = data) + val task = tasks[name] ?: error("Task with name $name not found in the workspace") + require(task is SerializableResultTask) + // Local function to capture generic parameter + suspend fun execute(task: SerializableResultTask): DataSetPrototype { + val result = task.execute(proxy, name, meta) + return DataSetPrototype.of(result, task.resultSerializer) + } + execute(task) + } + } + } + + /** + * Port this workspace is available on. + */ + public override val port: Int + get() = _port ?: service.port.p + + /** + * Start [NodeWorkspace] as a service. + */ + public override fun start(): Unit = service.start() + + /** + * Await termination of the service. + */ + public override fun awaitTermination(): Unit = service.awaitTermination() + + /** + * Shutdown service. + */ + public override fun shutdown(): Unit = service.shutdown() + + override fun close(): Unit = shutdown() +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt deleted file mode 100644 index 5eabc8b8..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt +++ /dev/null @@ -1,39 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import space.kscience.dataforge.context.AbstractPlugin -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.Plugin -import space.kscience.dataforge.context.PluginFactory -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.SerializableResultTask -import space.kscience.dataforge.workspace.Task - -/** - * Plugin that purpose is to communicate with remote plugins. - * @param plugin A remote plugin to be used. - * @param endpoint Endpoint of the remote plugin. - */ -public class RemotePlugin

(private val plugin: P, private val endpoint: String) : AbstractPlugin() { - - // TODO - public constructor(factory: PluginFactory

, endpoint: String) : this(factory.build(Global, Meta.EMPTY), endpoint) - - override val tag: PluginTag - get() = plugin.tag - - private val tasks = plugin.content(Task.TYPE) - .filterValues { it is SerializableResultTask<*> } - .mapValues { (_, task) -> - require(task is SerializableResultTask<*>) - RemoteTask(Endpoint(endpoint), task.resultType, task.resultSerializer) - } - - override fun content(target: String): Map = - when (target) { - Task.TYPE -> tasks - else -> emptyMap() - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt index 61b5b104..2fbe5afa 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt @@ -17,18 +17,17 @@ import kotlin.reflect.KType * Proxy task that communicates with the corresponding remote task. */ internal class RemoteTask( - internal val endpoint: Endpoint, + endpoint: String, override val resultType: KType, override val resultSerializer: KSerializer, override val descriptor: MetaDescriptor? = null, - private val taskRegistry: TaskRegistry? = null, + private val executionContext: Meta = Meta.EMPTY, ) : SerializableResultTask { - private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint) + private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint)) override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult { - val registry = taskRegistry ?: TaskRegistry(workspace.tasks) val result = withContext(dispatcher) { - ServiceWorkspace.execute(taskName, taskMeta, registry) + ServiceWorkspace.execute(taskName, taskMeta, executionContext) } val dataSet = result.toDataSet(resultType, resultSerializer) return workspace.wrapResult(dataSet, taskName, taskMeta) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt new file mode 100644 index 00000000..9baf7312 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt @@ -0,0 +1,73 @@ +package space.kscience.dataforge.distributed + +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.gather +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MutableMeta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.values.string +import space.kscience.dataforge.workspace.SerializableResultTask +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult + +/** + * Workspace that returns [RemoteTask] if such task should be + * executed remotely according to the execution context. + */ +public open class RemoteTaskWorkspace( + final override val context: Context = Global.buildContext("workspace".asName()), + data: DataSet<*> = DataTree(), + override val targets: Map = mapOf(), +) : Workspace { + + override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) + + private val _tasks: Map> = context.gather(Task.TYPE) + + override val tasks: Map> + get() = object : AbstractMap>() { + override val entries: Set>> + get() = _tasks.entries + + override fun get(key: Name): Task<*>? { + val executionContext = context.properties[EXECUTION_CONTEXT] + val endpoint = executionContext?.get(ENDPOINTS)?.toMeta()?.get(key) ?: return _tasks[key] + val string = endpoint.value?.string ?: error("Endpoint is expected to be a string") + val local = _tasks[key] ?: error("No task with name $key") + require(local is SerializableResultTask) { "Task $key result is not serializable" } + return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext) + } + } + + public companion object { + internal val EXECUTION_CONTEXT = "execution".asName() + internal val ENDPOINTS = "endpoints".asName() + } +} + +public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) { + RemoteTaskWorkspace.EXECUTION_CONTEXT put { + RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build() + } +} + +public class EndpointsBuilder { + private val endpoints = mutableMapOf() + + public infix fun Name.on(endpoint: String) { + endpoints[this] = endpoint + } + + internal fun build(): Meta = Meta { + endpoints.forEach { (name, endpoint) -> + name put endpoint + } + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt index 8df7853a..aaaebc91 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -1,116 +1,29 @@ package space.kscience.dataforge.distributed -import io.ktor.utils.io.core.* -import io.lambdarpc.dsl.LibService import io.lambdarpc.dsl.def import io.lambdarpc.dsl.j import io.lambdarpc.utils.ServiceId -import kotlinx.serialization.KSerializer -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.gather -import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.distributed.serialization.DataSetPrototype -import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaSerializer import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.SerializableResultTask -import space.kscience.dataforge.workspace.Task -import space.kscience.dataforge.workspace.TaskResult import space.kscience.dataforge.workspace.Workspace -import space.kscience.dataforge.workspace.wrapResult +import java.io.Closeable /** - * Workspace that exposes its tasks for remote clients. - * @param port Port to start service on. Will be random if null. + * [Workspace] that can expose its tasks to other workspaces as a service. */ -public class ServiceWorkspace( - port: Int? = null, - override val context: Context = Global.buildContext("workspace".asName()), - private val dataSerializer: KSerializer? = null, - data: DataSet<*> = DataTree(), - override val targets: Map = mapOf(), -) : Workspace, Closeable { - private val _port: Int? = port - - override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) - - override val tasks: Map> - get() = context.gather(Task.TYPE) - - private val service = LibService(serviceId, port) { - execute of { name, meta, taskRegistry -> - if (name == Name.EMPTY) { - requireNotNull(dataSerializer) { "Data serializer is not provided on $port" } - DataSetPrototype.of(data, dataSerializer) - } else { - val task = tasks[name] ?: error("Task $name does not exist locally") - require(task is SerializableResultTask) { "Result of $name cannot be serialized" } - val workspace = ProxyWorkspace(taskRegistry) - - // Local function to capture generic parameter - suspend fun execute(task: SerializableResultTask): DataSetPrototype { - val result = task.execute(workspace, name, meta) - return DataSetPrototype.of(result, task.resultSerializer) - } - execute(task) - } - } - } - - /** - * Proxies task calls to right endpoints according to the [TaskRegistry]. - */ - private inner class ProxyWorkspace(private val taskRegistry: TaskRegistry) : Workspace by this { - override val tasks: Map> - get() = object : AbstractMap>() { - override val entries: Set>> - get() = this@ServiceWorkspace.tasks.entries - - override fun get(key: Name): Task<*>? = remoteTask(key) ?: this@ServiceWorkspace.tasks[key] - } - - /** - * Call default implementation to use [tasks] virtually instead of it in [ServiceWorkspace]. - */ - override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = - super.produce(taskName, taskMeta) - - private fun remoteTask(name: Name): RemoteTask<*>? { - val endpoint = taskRegistry.tasks[name] ?: return null - val local = this@ServiceWorkspace.tasks[name] ?: error("No task with name $name locally on $port") - require(local is SerializableResultTask) { "Task $name result is not serializable" } - return RemoteTask(endpoint, local.resultType, local.resultSerializer, local.descriptor, taskRegistry) - } - } - - /** - * Port this workspace is available on. - */ +public interface ServiceWorkspace : Workspace, Closeable { public val port: Int - get() = _port ?: service.port.p + public fun start() + public fun awaitTermination() + public fun shutdown() - /** - * Start [ServiceWorkspace] as a service. - */ - public fun start(): Unit = service.start() - - /** - * Await termination of the service. - */ - public fun awaitTermination(): Unit = service.awaitTermination() - - /** - * Shutdown service. - */ - public fun shutdown(): Unit = service.shutdown() - - override fun close(): Unit = service.shutdown() + override fun close() { + shutdown() + } public companion object { internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246") - internal val execute by serviceId.def(j(), j(MetaSerializer), j(), j()) + internal val execute by serviceId.def(j(), j(MetaSerializer), j(MetaSerializer), j()) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt deleted file mode 100644 index aea1b29e..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt +++ /dev/null @@ -1,18 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import kotlinx.serialization.Serializable -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task - -@Serializable -internal class TaskRegistry(val tasks: Map) - -internal fun TaskRegistry(tasks: Map>): TaskRegistry { - val remotes = tasks.filterValues { it is RemoteTask<*> } - val endpoints = remotes.mapValues { (_, task) -> - require(task is RemoteTask) - task.endpoint - } - return TaskRegistry(endpoints) -} diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt index 1202dfd9..a76a0ceb 100644 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt @@ -22,7 +22,7 @@ internal class MyPlugin1 : WorkspacePlugin() { get() = Factory.tag val task by task(serializer()) { - workspace.logger.info { "In ${tag.name}.task" } + workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" } val myInt = workspace.data.getByType("int")!! data("result", myInt.data.map { it + 1 }) } @@ -43,7 +43,7 @@ internal class MyPlugin2 : WorkspacePlugin() { get() = Factory.tag val task by task(serializer()) { - workspace.logger.info { "In ${tag.name}.task" } + workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" } val dataSet = fromTask(Name.of(MyPlugin1.tag.name, "task")) val data = dataSet["result".asName()]!! data("result", data.map { it + 1 }) diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt index b78b4f56..adf2c67f 100644 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt @@ -19,13 +19,13 @@ import kotlin.test.assertEquals @TestInstance(TestInstance.Lifecycle.PER_CLASS) internal class RemoteCallTest { - private lateinit var worker1: ServiceWorkspace - private lateinit var worker2: ServiceWorkspace + private lateinit var worker1: NodeWorkspace + private lateinit var worker2: NodeWorkspace private lateinit var workspace: Workspace @BeforeAll fun before() = runBlocking { - worker1 = ServiceWorkspace( + worker1 = NodeWorkspace( context = Global.buildContext("worker1".asName()) { plugin(MyPlugin1) }, @@ -35,7 +35,7 @@ internal class RemoteCallTest { ) worker1.start() - worker2 = ServiceWorkspace( + worker2 = NodeWorkspace( context = Global.buildContext("worker2".asName()) { plugin(MyPlugin1) plugin(MyPlugin2) @@ -43,12 +43,18 @@ internal class RemoteCallTest { ) worker2.start() - workspace = Workspace { - context { - plugin(RemotePlugin(MyPlugin1, "localhost:${worker1.port}")) - plugin(RemotePlugin(MyPlugin2, "localhost:${worker2.port}")) + workspace = NodeWorkspace( + context = Global.buildContext { + plugin(MyPlugin1) + plugin(MyPlugin2) + properties { + endpoints { + Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}" + Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}" + } + } } - } + ) } @AfterAll diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt index d39a6dbf..9e3a2ab8 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt @@ -2,11 +2,25 @@ package space.kscience.dataforge.meta import kotlinx.serialization.Serializable import space.kscience.dataforge.misc.DFExperimental -import space.kscience.dataforge.names.* +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.NameToken +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.cutFirst +import space.kscience.dataforge.names.cutLast +import space.kscience.dataforge.names.first +import space.kscience.dataforge.names.firstOrNull +import space.kscience.dataforge.names.isEmpty +import space.kscience.dataforge.names.lastOrNull +import space.kscience.dataforge.names.length +import space.kscience.dataforge.names.plus +import space.kscience.dataforge.names.withIndex import space.kscience.dataforge.values.EnumValue import space.kscience.dataforge.values.MutableValueProvider import space.kscience.dataforge.values.Value import space.kscience.dataforge.values.asValue +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.set import kotlin.js.JsName import kotlin.jvm.Synchronized @@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider { */ public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta) +public fun MutableMeta.put(other: Meta) { + other.items.forEach { (name, meta) -> + name.asName() put meta + } +} + +public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta) + /** * Set or replace value at given [name] */ -- 2.34.1 From ba3354b4a2b884c62d89a24ea20f142e36d92cd1 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Wed, 1 Jun 2022 23:41:21 +0300 Subject: [PATCH 13/14] Fix transitive execution --- .../dataforge/distributed/NodeWorkspace.kt | 3 ++- .../distributed/RemoteTaskWorkspace.kt | 15 ++++++----- .../space/kscience/dataforge/meta/Meta.kt | 26 ++++++++++++++++--- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt index c9238c09..76b9358f 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt @@ -15,6 +15,7 @@ import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.plus import space.kscience.dataforge.workspace.SerializableResultTask +import space.kscience.dataforge.workspace.Workspace /** * Workspace that exposes its tasks for remote clients. @@ -26,7 +27,7 @@ public class NodeWorkspace( private val dataSerializer: KSerializer? = null, data: DataSet<*> = DataTree(), targets: Map = mapOf(), -) : RemoteTaskWorkspace(context, data, targets), ServiceWorkspace { +) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace { private val _port: Int? = port private val service = LibService(serviceId, port) { diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt index 9baf7312..286a09d9 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt @@ -7,6 +7,7 @@ import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta +import space.kscience.dataforge.meta.extract import space.kscience.dataforge.meta.get import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName @@ -21,7 +22,7 @@ import space.kscience.dataforge.workspace.wrapResult * Workspace that returns [RemoteTask] if such task should be * executed remotely according to the execution context. */ -public open class RemoteTaskWorkspace( +internal open class RemoteTaskWorkspace( final override val context: Context = Global.buildContext("workspace".asName()), data: DataSet<*> = DataTree(), override val targets: Map = mapOf(), @@ -37,8 +38,10 @@ public open class RemoteTaskWorkspace( get() = _tasks.entries override fun get(key: Name): Task<*>? { - val executionContext = context.properties[EXECUTION_CONTEXT] - val endpoint = executionContext?.get(ENDPOINTS)?.toMeta()?.get(key) ?: return _tasks[key] + val executionContext = context.properties.extract(EXECUTION_CONTEXT) + val endpoint = executionContext + ?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key) + ?: return _tasks[key] val string = endpoint.value?.string ?: error("Endpoint is expected to be a string") val local = _tasks[key] ?: error("No task with name $key") require(local is SerializableResultTask) { "Task $key result is not serializable" } @@ -46,9 +49,9 @@ public open class RemoteTaskWorkspace( } } - public companion object { - internal val EXECUTION_CONTEXT = "execution".asName() - internal val ENDPOINTS = "endpoints".asName() + companion object { + val EXECUTION_CONTEXT = "execution".asName() + val ENDPOINTS = "endpoints".asName() } } diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt index 01a5ebf2..9ee5c6fa 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt @@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import space.kscience.dataforge.misc.Type import space.kscience.dataforge.misc.unsafeCast -import space.kscience.dataforge.names.* -import space.kscience.dataforge.values.* +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.NameToken +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.cutFirst +import space.kscience.dataforge.names.cutLast +import space.kscience.dataforge.names.firstOrNull +import space.kscience.dataforge.names.isEmpty +import space.kscience.dataforge.names.lastOrNull +import space.kscience.dataforge.names.length +import space.kscience.dataforge.names.parseAsName +import space.kscience.dataforge.names.plus +import space.kscience.dataforge.values.EnumValue +import space.kscience.dataforge.values.Value +import space.kscience.dataforge.values.ValueProvider +import space.kscience.dataforge.values.boolean +import space.kscience.dataforge.values.numberOrNull +import space.kscience.dataforge.values.string /** @@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name) */ public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key)) +public fun Meta.extract(name: Name): Meta? { + val value = get(name) ?: return null + return Meta { name put value } +} + /** * Get all items matching given name. The index of the last element, if present is used as a [Regex], * against which indexes of elements are matched. @@ -135,7 +155,7 @@ public fun Meta.getIndexed(name: Name): Map { } } -public fun Meta.getIndexed(name: String): Map = getIndexed(name.parseAsName()) +public fun Meta.getIndexed(name: String): Map = getIndexed(name.parseAsName()) /** * A meta node that ensures that all of its descendants has at least the same type. -- 2.34.1 From 50a9e7d314b7649af72af35684149a1ca06aac02 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Sat, 4 Jun 2022 18:20:39 +0300 Subject: [PATCH 14/14] Refactor serialization --- .../serialization/dataSerialization.kt | 28 ++----------------- .../serialization/dataSetSerialization.kt | 13 +++------ 2 files changed, 7 insertions(+), 34 deletions(-) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt index 0ca9511c..0b8b91c3 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt @@ -1,15 +1,11 @@ package space.kscience.dataforge.distributed.serialization -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import space.kscience.dataforge.data.Data -import space.kscience.dataforge.data.Goal +import space.kscience.dataforge.data.StaticData import space.kscience.dataforge.data.await -import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaSerializer import kotlin.reflect.KType @@ -22,10 +18,10 @@ internal data class DataPrototype( val data: String, ) { fun toData(type: KType, serializer: KSerializer): Data = - SimpleData( + StaticData( type = type, + value = Json.decodeFromString(serializer, data), meta = Json.decodeFromString(MetaSerializer, meta), - data = Json.decodeFromString(serializer, data) ) companion object { @@ -36,21 +32,3 @@ internal data class DataPrototype( } } } - -/** - * Trivial [Data] implementation. - */ -private class SimpleData( - override val type: KType, - override val meta: Meta, - val data: T, -) : Data { - override val dependencies: Collection> - get() = emptyList() - - override val deferred: Deferred - get() = CompletableDeferred(data) - - override fun async(coroutineScope: CoroutineScope): Deferred = deferred - override fun reset() = Unit -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt index d5f8e4d7..60eb84c3 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -10,6 +10,7 @@ import space.kscience.dataforge.data.NamedData import space.kscience.dataforge.data.asIterable import space.kscience.dataforge.data.component1 import space.kscience.dataforge.data.component2 +import space.kscience.dataforge.data.named import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import kotlin.reflect.KType @@ -23,7 +24,7 @@ internal data class DataSetPrototype(val meta: Meta, val data: Map Name.of(name) } .mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) } - return SerializableDataSetImpl(type, data, meta) + return SimpleDataSet(type, data, meta) } companion object { @@ -40,22 +41,16 @@ internal data class DataSetPrototype(val meta: Meta, val data: Map( +private class SimpleDataSet( override val dataType: KType, private val data: Map>, override val meta: Meta, ) : DataSet { - /** - * Trivial named data implementation. - */ - private class SimpleNamedData(override val name: Name, override val data: Data) : - NamedData, Data by data - override fun iterator(): Iterator> = data .asSequence() - .map { (name, data) -> SimpleNamedData(name, data) } + .map { (name, data) -> data.named(name) } .iterator() override fun get(name: Name): Data? = data[name] -- 2.34.1