diff --git a/dataforge-distributed/build.gradle.kts b/dataforge-distributed/build.gradle.kts index e5629e81..04d31e07 100644 --- a/dataforge-distributed/build.gradle.kts +++ b/dataforge-distributed/build.gradle.kts @@ -15,7 +15,8 @@ kotlin { jvmMain { dependencies { // TODO include fat jar of lambdarpc - api(files("lambdarpc-core-0.0.1.jar")) + val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs" + api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar")) runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") api("io.grpc:grpc-protobuf:1.44.0") diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt deleted file mode 100644 index c9dec94e..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ClientWorkspacePlugin.kt +++ /dev/null @@ -1,37 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import space.kscience.dataforge.context.AbstractPlugin -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task -import kotlin.reflect.KType - -/** - * Plugin that purpose is to communicate with remote plugins. - * @param endpoint Endpoint of the remote plugin. - */ -public abstract class ClientWorkspacePlugin(endpoint: Endpoint) : AbstractPlugin() { - - /** - * Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. - */ - abstract override val tag: PluginTag - - /** - * Enumeration of names of remote tasks and their result types. - */ - public abstract val tasks: Map - - private val _tasks: Map> by lazy { - tasks.mapValues { (_, type) -> - RemoteTask(endpoint, type) - } - } - - override fun content(target: String): Map = - when (target) { - Task.TYPE -> _tasks - else -> emptyMap() - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt new file mode 100644 index 00000000..0f37753b --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt @@ -0,0 +1,36 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.utils.Endpoint +import space.kscience.dataforge.context.AbstractPlugin +import space.kscience.dataforge.context.Plugin +import space.kscience.dataforge.context.PluginFactory +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.SerializableResultTask +import space.kscience.dataforge.workspace.Task + +/** + * Plugin that purpose is to communicate with remote plugins. + * @param plugin A remote plugin to be used. + * @param endpoint Endpoint of the remote plugin. + */ +public class RemotePlugin

(private val plugin: P, private val endpoint: String) : AbstractPlugin() { + + public constructor(factory: PluginFactory

, endpoint: String) : this(factory(), endpoint) + + override val tag: PluginTag + get() = plugin.tag + + private val tasks = plugin.content(Task.TYPE) + .filterValues { it is SerializableResultTask<*> } + .mapValues { (_, task) -> + require(task is SerializableResultTask<*>) + RemoteTask(Endpoint(endpoint), task.resultType, task.resultSerializer) + } + + override fun content(target: String): Map = + when (target) { + Task.TYPE -> tasks + else -> emptyMap() + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt index e0d3cb3b..61b5b104 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt @@ -1,13 +1,13 @@ package space.kscience.dataforge.distributed -import io.lambdarpc.dsl.ServiceDispatcher +import io.lambdarpc.context.ServiceDispatcher import io.lambdarpc.utils.Endpoint import kotlinx.coroutines.withContext -import space.kscience.dataforge.data.DataSet +import kotlinx.serialization.KSerializer import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.SerializableResultTask import space.kscience.dataforge.workspace.TaskResult import space.kscience.dataforge.workspace.Workspace import space.kscience.dataforge.workspace.wrapResult @@ -17,20 +17,20 @@ import kotlin.reflect.KType * Proxy task that communicates with the corresponding remote task. */ internal class RemoteTask( - endpoint: Endpoint, + internal val endpoint: Endpoint, override val resultType: KType, + override val resultSerializer: KSerializer, override val descriptor: MetaDescriptor? = null, -) : Task { + private val taskRegistry: TaskRegistry? = null, +) : SerializableResultTask { private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint) - @Suppress("UNCHECKED_CAST") - override suspend fun execute( - workspace: Workspace, - taskName: Name, - taskMeta: Meta, - ): TaskResult = withContext(dispatcher) { - val dataset = ServiceWorkspace.execute(taskName) - dataset.finishDecoding(resultType) - workspace.wrapResult(dataset as DataSet, taskName, taskMeta) + override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult { + val registry = taskRegistry ?: TaskRegistry(workspace.tasks) + val result = withContext(dispatcher) { + ServiceWorkspace.execute(taskName, taskMeta, registry) + } + val dataSet = result.toDataSet(resultType, resultSerializer) + return workspace.wrapResult(dataSet, taskName, taskMeta) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt index b6849754..3eecfd7c 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -3,21 +3,23 @@ package space.kscience.dataforge.distributed import io.ktor.utils.io.core.* import io.lambdarpc.dsl.LibService import io.lambdarpc.dsl.def -import io.lambdarpc.utils.Address -import io.lambdarpc.utils.Port -import io.lambdarpc.utils.toSid +import io.lambdarpc.dsl.j +import io.lambdarpc.utils.ServiceId import kotlinx.coroutines.runBlocking +import kotlinx.serialization.KSerializer import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.gather import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.distributed.serialization.DataSetCoder +import space.kscience.dataforge.distributed.serialization.DataSetPrototype +import space.kscience.dataforge.distributed.serialization.MetaCoder import space.kscience.dataforge.distributed.serialization.NameCoder -import space.kscience.dataforge.distributed.serialization.SerializableDataSetAdapter +import space.kscience.dataforge.distributed.serialization.TaskRegistryCoder import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.SerializableResultTask import space.kscience.dataforge.workspace.Task import space.kscience.dataforge.workspace.TaskResult import space.kscience.dataforge.workspace.Workspace @@ -25,37 +27,73 @@ import space.kscience.dataforge.workspace.wrapResult /** * Workspace that exposes its tasks for remote clients. + * @param port Port to start service on. Will be random if null. */ public class ServiceWorkspace( - address: String = "localhost", port: Int? = null, override val context: Context = Global.buildContext("workspace".asName()), + private val dataSerializer: KSerializer? = null, data: DataSet<*> = runBlocking { DataTree {} }, override val targets: Map = mapOf(), ) : Workspace, Closeable { + private val _port: Int? = port override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) override val tasks: Map> get() = context.gather(Task.TYPE) - private val service = LibService(serviceId, address, port) { - execute of { name -> - val res = produce(name, Meta.EMPTY) - SerializableDataSetAdapter(res) + private val service = LibService(serviceId, port) { + execute of { name, meta, taskRegistry -> + if (name == Name.EMPTY) { + requireNotNull(dataSerializer) { "Data serializer is not provided on $port" } + DataSetPrototype.of(data, dataSerializer) + } else { + val task = tasks[name] ?: error("Task $name does not exist locally") + require(task is SerializableResultTask) { "Result of $name cannot be serialized" } + val workspace = ProxyWorkspace(taskRegistry) + + // Local function to capture generic parameter + suspend fun execute(task: SerializableResultTask): DataSetPrototype { + val result = task.execute(workspace, name, meta) + return DataSetPrototype.of(result, task.resultSerializer) + } + execute(task) + } } } /** - * Address this workspace is available on. + * Proxies task calls to right endpoints according to the [TaskRegistry]. */ - public val address: Address = Address(address) + private inner class ProxyWorkspace(private val taskRegistry: TaskRegistry) : Workspace by this { + override val tasks: Map> + get() = object : AbstractMap>() { + override val entries: Set>> + get() = this@ServiceWorkspace.tasks.entries + + override fun get(key: Name): Task<*>? = remoteTask(key) ?: this@ServiceWorkspace.tasks[key] + } + + /** + * Call default implementation to use [tasks] virtually instead of it in [ServiceWorkspace]. + */ + override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = + super.produce(taskName, taskMeta) + + private fun remoteTask(name: Name): RemoteTask<*>? { + val endpoint = taskRegistry.tasks[name] ?: return null + val local = this@ServiceWorkspace.tasks[name] ?: error("No task with name $name locally on $port") + require(local is SerializableResultTask) { "Task $name result is not serializable" } + return RemoteTask(endpoint, local.resultType, local.resultSerializer, local.descriptor, taskRegistry) + } + } /** * Port this workspace is available on. */ - public val port: Port - get() = service.port + public val port: Int + get() = _port ?: service.port.p /** * Start [ServiceWorkspace] as a service. @@ -75,7 +113,7 @@ public class ServiceWorkspace( override fun close(): Unit = service.shutdown() public companion object { - internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() - internal val execute by serviceId.def(NameCoder, DataSetCoder) + internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246") + internal val execute by serviceId.def(NameCoder, MetaCoder, TaskRegistryCoder, j()) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt new file mode 100644 index 00000000..aea1b29e --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt @@ -0,0 +1,18 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.utils.Endpoint +import kotlinx.serialization.Serializable +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task + +@Serializable +internal class TaskRegistry(val tasks: Map) + +internal fun TaskRegistry(tasks: Map>): TaskRegistry { + val remotes = tasks.filterValues { it is RemoteTask<*> } + val endpoints = remotes.mapValues { (_, task) -> + require(task is RemoteTask) + task.endpoint + } + return TaskRegistry(endpoints) +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt deleted file mode 100644 index 02392235..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/DataSetCoder.kt +++ /dev/null @@ -1,23 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import io.lambdarpc.coding.Coder -import io.lambdarpc.coding.CodingContext -import io.lambdarpc.transport.grpc.Entity -import io.lambdarpc.transport.serialization.Entity -import io.lambdarpc.transport.serialization.RawData -import kotlinx.coroutines.runBlocking -import java.nio.charset.Charset - -internal object DataSetCoder : Coder> { - override fun decode(entity: Entity, context: CodingContext): SerializableDataSet { - val string = entity.data.toString(Charset.defaultCharset()) - val prototype = DataSetPrototype.fromJson(string) - return prototype.toDataSet() - } - - override fun encode(value: SerializableDataSet, context: CodingContext): Entity { - val prototype = runBlocking { DataSetPrototype.of(value) } // TODO update LambdaRPC and remove blocking - val string = prototype.toJson() - return Entity(RawData.copyFrom(string, Charset.defaultCharset())) - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt new file mode 100644 index 00000000..d5425271 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/MetaCoder.kt @@ -0,0 +1,23 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import kotlinx.serialization.json.Json +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MetaSerializer +import java.nio.charset.Charset + +internal object MetaCoder : Coder { + override suspend fun decode(entity: Entity, context: CodingContext): Meta { + val string = entity.data.toString(Charset.defaultCharset()) + return Json.decodeFromString(MetaSerializer, string) + } + + override suspend fun encode(value: Meta, context: CodingContext): Entity { + val string = Json.encodeToString(MetaSerializer, value) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt index beb22124..87d51901 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/NameCoder.kt @@ -5,16 +5,18 @@ import io.lambdarpc.coding.CodingContext import io.lambdarpc.transport.grpc.Entity import io.lambdarpc.transport.serialization.Entity import io.lambdarpc.transport.serialization.RawData +import kotlinx.serialization.json.Json import space.kscience.dataforge.names.Name import java.nio.charset.Charset internal object NameCoder : Coder { - override fun decode(entity: Entity, context: CodingContext): Name { - require(entity.hasData()) { "Entity should contain data" } + override suspend fun decode(entity: Entity, context: CodingContext): Name { val string = entity.data.toString(Charset.defaultCharset()) - return Name.parse(string) + return Json.decodeFromString(Name.serializer(), string) } - override fun encode(value: Name, context: CodingContext): Entity = - Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) + override suspend fun encode(value: Name, context: CodingContext): Entity { + val string = Json.encodeToString(Name.serializer(), value) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt deleted file mode 100644 index 2265b524..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/SerializableDataSet.kt +++ /dev/null @@ -1,16 +0,0 @@ -package space.kscience.dataforge.distributed.serialization - -import space.kscience.dataforge.data.DataSet -import kotlin.reflect.KType - -/** - * Represents [DataSet] that should be initialized before usage. - */ -internal interface SerializableDataSet : DataSet { - fun finishDecoding(type: KType) -} - -internal class SerializableDataSetAdapter(dataSet: DataSet) : - SerializableDataSet, DataSet by dataSet { - override fun finishDecoding(type: KType) = Unit -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt new file mode 100644 index 00000000..68c7e87f --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/TaskRegistryCoder.kt @@ -0,0 +1,22 @@ +package space.kscience.dataforge.distributed.serialization + +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import kotlinx.serialization.json.Json +import space.kscience.dataforge.distributed.TaskRegistry +import java.nio.charset.Charset + +internal object TaskRegistryCoder : Coder { + override suspend fun decode(entity: Entity, context: CodingContext): TaskRegistry { + val string = entity.data.toString(Charset.defaultCharset()) + return Json.decodeFromString(TaskRegistry.serializer(), string) + } + + override suspend fun encode(value: TaskRegistry, context: CodingContext): Entity { + val string = Json.encodeToString(TaskRegistry.serializer(), value) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt index b3e400cc..0ca9511c 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSerialization.kt @@ -21,11 +21,11 @@ internal data class DataPrototype( val meta: String, val data: String, ) { - fun toData(type: KType): Data = + fun toData(type: KType, serializer: KSerializer): Data = SimpleData( type = type, meta = Json.decodeFromString(MetaSerializer, meta), - data = Json.decodeFromString(kotlinx.serialization.serializer(type), data)!! + data = Json.decodeFromString(serializer, data) ) companion object { diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt index f30aa0b2..e9941810 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/serialization/dataSetSerialization.kt @@ -7,9 +7,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList +import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.serializer import space.kscience.dataforge.data.Data import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.NamedData @@ -23,51 +22,40 @@ import kotlin.reflect.KType */ @Serializable internal data class DataSetPrototype(val data: Map) { - fun toDataSet(): SerializableDataSet = - SerializableDataSetImpl(this) - - fun toJson(): String = Json.encodeToString(serializer(), this) + fun toDataSet(type: KType, serializer: KSerializer): DataSet { + val data = data + .mapKeys { (name, _) -> Name.of(name) } + .mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) } + return SerializableDataSetImpl(type, data) + } companion object { - suspend fun of(dataSet: DataSet): DataSetPrototype = coroutineScope { - val serializer = serializer(dataSet.dataType) + suspend fun of(dataSet: DataSet, serializer: KSerializer): DataSetPrototype = coroutineScope { val flow = mutableListOf>>() dataSet.flowData().map { (name, data) -> name.toString() to async { DataPrototype.of(data, serializer) } }.toList(flow) DataSetPrototype(flow.associate { (name, deferred) -> name to deferred.await() }) } - - fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string) } } /** - * Trivial [SerializableDataSet] implementation. + * Trivial [DataSet] implementation. */ -private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : SerializableDataSet { +private class SerializableDataSetImpl( + override val dataType: KType, + private val data: Map>, +) : DataSet { - private lateinit var type: KType - private lateinit var data: Map> - - override fun finishDecoding(type: KType) { - this.type = type - this.data = prototype.data - .mapKeys { (name, _) -> Name.of(name) } - .mapValues { (_, dataPrototype) -> dataPrototype.toData(type) } - } - - override val dataType: KType - get() = type - - override fun flowData(): Flow> = + override fun flowData(): Flow> = data.map { (name, data) -> SimpleNamedData(name, data) }.asFlow() - override suspend fun getData(name: Name): Data? = data[name] + override suspend fun getData(name: Name): Data? = data[name] /** * Trivial named data implementation. */ - private class SimpleNamedData(override val name: Name, override val data: Data) : - NamedData, Data by data + private class SimpleNamedData(override val name: Name, override val data: Data) : + NamedData, Data by data } diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt new file mode 100644 index 00000000..7b1bd637 --- /dev/null +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt @@ -0,0 +1,61 @@ +package space.kscience.dataforge.distributed + +import kotlinx.serialization.serializer +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.PluginFactory +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.context.info +import space.kscience.dataforge.context.logger +import space.kscience.dataforge.data.map +import space.kscience.dataforge.data.select +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.WorkspacePlugin +import space.kscience.dataforge.workspace.fromTask +import space.kscience.dataforge.workspace.task +import kotlin.reflect.KClass + +internal class MyPlugin1 : WorkspacePlugin() { + override val tag: PluginTag + get() = Factory.tag + + val task by task(serializer()) { + workspace.logger.info { "In ${tag.name}.task" } + val myInt = workspace.data.select() + val res = myInt.getData("int".asName())!! + emit("result".asName(), res.map { it + 1 }) + } + + companion object Factory : PluginFactory { + override fun invoke(meta: Meta, context: Context): MyPlugin1 = MyPlugin1() + + override val tag: PluginTag + get() = PluginTag("Plg1") + + override val type: KClass + get() = MyPlugin1::class + } +} + +internal class MyPlugin2 : WorkspacePlugin() { + override val tag: PluginTag + get() = Factory.tag + + val task by task(serializer()) { + workspace.logger.info { "In ${tag.name}.task" } + val dataSet = fromTask(Name.of(MyPlugin1.tag.name, "task")) + val data = dataSet.getData("result".asName())!! + emit("result".asName(), data.map { it + 1 }) + } + + companion object Factory : PluginFactory { + override fun invoke(meta: Meta, context: Context): MyPlugin2 = MyPlugin2() + + override val tag: PluginTag + get() = PluginTag("Plg2") + + override val type: KClass + get() = MyPlugin2::class + } +} diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt new file mode 100644 index 00000000..3220adf2 --- /dev/null +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt @@ -0,0 +1,89 @@ +package space.kscience.dataforge.distributed + +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.await +import space.kscience.dataforge.data.getData +import space.kscience.dataforge.data.static +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.Workspace +import kotlin.test.assertEquals + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +internal class RemoteCallTest { + + private lateinit var worker1: ServiceWorkspace + private lateinit var worker2: ServiceWorkspace + private lateinit var workspace: Workspace + + @BeforeAll + fun before() { + worker1 = ServiceWorkspace( + context = Global.buildContext("worker1".asName()) { + plugin(MyPlugin1) + }, + data = runBlocking { + DataTree { + static("int", 42) + } + }, + ) + worker1.start() + + worker2 = ServiceWorkspace( + context = Global.buildContext("worker2".asName()) { + plugin(MyPlugin1) + plugin(MyPlugin2) + }, + ) + worker2.start() + + workspace = Workspace { + context { + plugin(RemotePlugin(MyPlugin1, "localhost:${worker1.port}")) + plugin(RemotePlugin(MyPlugin2, "localhost:${worker2.port}")) + } + } + } + + @AfterAll + fun after() { + worker1.shutdown() + worker2.shutdown() + } + + @Test + fun `local execution`() = runBlocking { + assertEquals(42, worker1.data.getData("int")!!.await()) + val res = worker1 + .produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(43, res) + } + + @Test + fun `remote execution`() = runBlocking { + val remoteRes = workspace + .produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(43, remoteRes) + } + + @Test + fun `transitive execution`() = runBlocking { + val remoteRes = workspace + .produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(44, remoteRes) + } +} diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt deleted file mode 100644 index bb4bdce9..00000000 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/ServiceWorkspaceTest.kt +++ /dev/null @@ -1,112 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import kotlinx.coroutines.runBlocking -import org.junit.jupiter.api.AfterAll -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.TestInstance -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.PluginFactory -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.await -import space.kscience.dataforge.data.getData -import space.kscience.dataforge.data.map -import space.kscience.dataforge.data.select -import space.kscience.dataforge.data.static -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.Workspace -import space.kscience.dataforge.workspace.WorkspacePlugin -import space.kscience.dataforge.workspace.task -import kotlin.reflect.KClass -import kotlin.reflect.KType -import kotlin.reflect.typeOf -import kotlin.test.assertEquals - -private class MyPlugin : WorkspacePlugin() { - override val tag: PluginTag - get() = Factory.tag - - val task by task { - val myInt = workspace.data.select() - val res = myInt.getData("int".asName())!! - emit("result".asName(), res.map { it + 1 }) - } - - companion object Factory : PluginFactory { - override fun invoke(meta: Meta, context: Context): MyPlugin = MyPlugin() - - override val tag: PluginTag - get() = PluginTag("Plg") - - override val type: KClass - get() = MyPlugin::class - } -} - -private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(endpoint) { - override val tag: PluginTag - get() = MyPlugin.tag - - override val tasks: Map - get() = mapOf( - "task".asName() to typeOf() - ) -} - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -class ServiceWorkspaceTest { - - private lateinit var worker1: ServiceWorkspace - private lateinit var workspace: Workspace - - @BeforeAll - fun before() { - worker1 = ServiceWorkspace( - context = Global.buildContext("worker1".asName()) { - plugin(MyPlugin) - }, - data = runBlocking { - DataTree { - static("int", 0) - } - }, - ) - worker1.start() - - workspace = Workspace { - context { - val endpoint = Endpoint(worker1.address, worker1.port) - plugin(RemoteMyPlugin(endpoint)) - } - } - } - - @AfterAll - fun after() { - worker1.shutdown() - } - - @Test - fun localExecution() = runBlocking { - assertEquals(0, worker1.data.getData("int")!!.await()) - val res = worker1 - .produce(Name.of("Plg", "task"), Meta.EMPTY) - .getData("result".asName())!! - .await() - assertEquals(1, res) - } - - @Test - fun remoteExecution() = runBlocking { - val remoteRes = workspace - .produce(Name.of("Plg", "task"), Meta.EMPTY) - .getData("result".asName())!! - .await() - assertEquals(1, remoteRes) - } -} diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index 042945c5..6260245b 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -1,6 +1,8 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.withContext +import kotlinx.serialization.KSerializer +import kotlinx.serialization.serializer import space.kscience.dataforge.data.DataSetBuilder import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.GoalExecutionRestriction @@ -17,11 +19,6 @@ import kotlin.reflect.typeOf @Type(TYPE) public interface Task : Described { - /** - * Type of the task result data. - */ - public val resultType: KType - /** * Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model * and a handler for actual result @@ -37,6 +34,12 @@ public interface Task : Described { } } +@Type(TYPE) +public interface SerializableResultTask : Task { + public val resultType: KType + public val resultSerializer: KSerializer +} + public class TaskResultBuilder( public val workspace: Workspace, public val taskName: Name, @@ -60,9 +63,6 @@ public fun Task( builder: suspend TaskResultBuilder.() -> Unit, ): Task = object : Task { - override val resultType: KType - get() = resultType - override val descriptor: MetaDescriptor? = descriptor override suspend fun execute( @@ -78,9 +78,28 @@ public fun Task( } } +/** + * [Task] that has [resultSerializer] to be able to cache or send its results + */ +@DFInternal +public class SerializableResultTaskImpl( + override val resultType: KType, + override val resultSerializer: KSerializer, + descriptor: MetaDescriptor? = null, + builder: suspend TaskResultBuilder.() -> Unit, +) : SerializableResultTask, Task by Task(resultType, descriptor, builder) + @OptIn(DFInternal::class) @Suppress("FunctionName") public inline fun Task( descriptor: MetaDescriptor? = null, noinline builder: suspend TaskResultBuilder.() -> Unit, -): Task = Task(typeOf(), descriptor, builder) \ No newline at end of file +): Task = Task(typeOf(), descriptor, builder) + +@OptIn(DFInternal::class) +@Suppress("FunctionName") +public inline fun SerializableResultTask( + resultSerializer: KSerializer = serializer(), + descriptor: MetaDescriptor? = null, + noinline builder: suspend TaskResultBuilder.() -> Unit, +): Task = SerializableResultTaskImpl(typeOf(), resultSerializer, descriptor, builder) diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index ec078020..6413030b 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -1,5 +1,6 @@ package space.kscience.dataforge.workspace +import kotlinx.serialization.KSerializer import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.Global @@ -37,25 +38,34 @@ public interface TaskContainer { public inline fun TaskContainer.registerTask( name: String, + resultSerializer: KSerializer? = null, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline builder: suspend TaskResultBuilder.() -> Unit, -): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder)) +) { + val descriptor = MetaDescriptor(descriptorBuilder) + val task = if (resultSerializer == null) Task(descriptor, builder) else + SerializableResultTask(resultSerializer, descriptor, builder) + registerTask(Name.parse(name), task) +} public inline fun TaskContainer.task( descriptor: MetaDescriptor, + resultSerializer: KSerializer? = null, noinline builder: suspend TaskResultBuilder.() -> Unit, ): PropertyDelegateProvider>> = PropertyDelegateProvider { _, property -> val taskName = Name.parse(property.name) - val task = Task(descriptor, builder) + val task = if (resultSerializer == null) Task(descriptor, builder) else + SerializableResultTask(resultSerializer, descriptor, builder) registerTask(taskName, task) ReadOnlyProperty { _, _ -> TaskReference(taskName, task) } } public inline fun TaskContainer.task( + resultSerializer: KSerializer? = null, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline builder: suspend TaskResultBuilder.() -> Unit, ): PropertyDelegateProvider>> = - task(MetaDescriptor(descriptorBuilder), builder) + task(MetaDescriptor(descriptorBuilder), resultSerializer, builder) public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer { private var context: Context? = null