diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt new file mode 100644 index 00000000..c9238c09 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt @@ -0,0 +1,78 @@ +package space.kscience.dataforge.distributed + +import io.lambdarpc.dsl.LibService +import kotlinx.serialization.KSerializer +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute +import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId +import space.kscience.dataforge.distributed.serialization.DataSetPrototype +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.put +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.plus +import space.kscience.dataforge.workspace.SerializableResultTask + +/** + * Workspace that exposes its tasks for remote clients. + * @param port Port to start service on. Will be random if null. + */ +public class NodeWorkspace( + port: Int? = null, + context: Context = Global.buildContext("workspace".asName()), + private val dataSerializer: KSerializer? = null, + data: DataSet<*> = DataTree(), + targets: Map = mapOf(), +) : RemoteTaskWorkspace(context, data, targets), ServiceWorkspace { + private val _port: Int? = port + + private val service = LibService(serviceId, port) { + execute of { name, meta, executionContext -> + if (name == Name.EMPTY) { + requireNotNull(dataSerializer) { "Data serializer is not provided on $port" } + DataSetPrototype.of(data, dataSerializer) + } else { + val proxyContext = context.buildContext(context.name + "proxy") { + properties { + put(executionContext) + } + } + val proxy = RemoteTaskWorkspace(context = proxyContext, data = data) + val task = tasks[name] ?: error("Task with name $name not found in the workspace") + require(task is SerializableResultTask) + // Local function to capture generic parameter + suspend fun execute(task: SerializableResultTask): DataSetPrototype { + val result = task.execute(proxy, name, meta) + return DataSetPrototype.of(result, task.resultSerializer) + } + execute(task) + } + } + } + + /** + * Port this workspace is available on. + */ + public override val port: Int + get() = _port ?: service.port.p + + /** + * Start [NodeWorkspace] as a service. + */ + public override fun start(): Unit = service.start() + + /** + * Await termination of the service. + */ + public override fun awaitTermination(): Unit = service.awaitTermination() + + /** + * Shutdown service. + */ + public override fun shutdown(): Unit = service.shutdown() + + override fun close(): Unit = shutdown() +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt deleted file mode 100644 index 5eabc8b8..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemotePlugin.kt +++ /dev/null @@ -1,39 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import space.kscience.dataforge.context.AbstractPlugin -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.Plugin -import space.kscience.dataforge.context.PluginFactory -import space.kscience.dataforge.context.PluginTag -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.SerializableResultTask -import space.kscience.dataforge.workspace.Task - -/** - * Plugin that purpose is to communicate with remote plugins. - * @param plugin A remote plugin to be used. - * @param endpoint Endpoint of the remote plugin. - */ -public class RemotePlugin

(private val plugin: P, private val endpoint: String) : AbstractPlugin() { - - // TODO - public constructor(factory: PluginFactory

, endpoint: String) : this(factory.build(Global, Meta.EMPTY), endpoint) - - override val tag: PluginTag - get() = plugin.tag - - private val tasks = plugin.content(Task.TYPE) - .filterValues { it is SerializableResultTask<*> } - .mapValues { (_, task) -> - require(task is SerializableResultTask<*>) - RemoteTask(Endpoint(endpoint), task.resultType, task.resultSerializer) - } - - override fun content(target: String): Map = - when (target) { - Task.TYPE -> tasks - else -> emptyMap() - } -} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt index 61b5b104..2fbe5afa 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTask.kt @@ -17,18 +17,17 @@ import kotlin.reflect.KType * Proxy task that communicates with the corresponding remote task. */ internal class RemoteTask( - internal val endpoint: Endpoint, + endpoint: String, override val resultType: KType, override val resultSerializer: KSerializer, override val descriptor: MetaDescriptor? = null, - private val taskRegistry: TaskRegistry? = null, + private val executionContext: Meta = Meta.EMPTY, ) : SerializableResultTask { - private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint) + private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint)) override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult { - val registry = taskRegistry ?: TaskRegistry(workspace.tasks) val result = withContext(dispatcher) { - ServiceWorkspace.execute(taskName, taskMeta, registry) + ServiceWorkspace.execute(taskName, taskMeta, executionContext) } val dataSet = result.toDataSet(resultType, resultSerializer) return workspace.wrapResult(dataSet, taskName, taskMeta) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt new file mode 100644 index 00000000..9baf7312 --- /dev/null +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt @@ -0,0 +1,73 @@ +package space.kscience.dataforge.distributed + +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.gather +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MutableMeta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.values.string +import space.kscience.dataforge.workspace.SerializableResultTask +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult + +/** + * Workspace that returns [RemoteTask] if such task should be + * executed remotely according to the execution context. + */ +public open class RemoteTaskWorkspace( + final override val context: Context = Global.buildContext("workspace".asName()), + data: DataSet<*> = DataTree(), + override val targets: Map = mapOf(), +) : Workspace { + + override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) + + private val _tasks: Map> = context.gather(Task.TYPE) + + override val tasks: Map> + get() = object : AbstractMap>() { + override val entries: Set>> + get() = _tasks.entries + + override fun get(key: Name): Task<*>? { + val executionContext = context.properties[EXECUTION_CONTEXT] + val endpoint = executionContext?.get(ENDPOINTS)?.toMeta()?.get(key) ?: return _tasks[key] + val string = endpoint.value?.string ?: error("Endpoint is expected to be a string") + val local = _tasks[key] ?: error("No task with name $key") + require(local is SerializableResultTask) { "Task $key result is not serializable" } + return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext) + } + } + + public companion object { + internal val EXECUTION_CONTEXT = "execution".asName() + internal val ENDPOINTS = "endpoints".asName() + } +} + +public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) { + RemoteTaskWorkspace.EXECUTION_CONTEXT put { + RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build() + } +} + +public class EndpointsBuilder { + private val endpoints = mutableMapOf() + + public infix fun Name.on(endpoint: String) { + endpoints[this] = endpoint + } + + internal fun build(): Meta = Meta { + endpoints.forEach { (name, endpoint) -> + name put endpoint + } + } +} diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt index 8df7853a..aaaebc91 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/ServiceWorkspace.kt @@ -1,116 +1,29 @@ package space.kscience.dataforge.distributed -import io.ktor.utils.io.core.* -import io.lambdarpc.dsl.LibService import io.lambdarpc.dsl.def import io.lambdarpc.dsl.j import io.lambdarpc.utils.ServiceId -import kotlinx.serialization.KSerializer -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.gather -import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.distributed.serialization.DataSetPrototype -import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaSerializer import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.workspace.SerializableResultTask -import space.kscience.dataforge.workspace.Task -import space.kscience.dataforge.workspace.TaskResult import space.kscience.dataforge.workspace.Workspace -import space.kscience.dataforge.workspace.wrapResult +import java.io.Closeable /** - * Workspace that exposes its tasks for remote clients. - * @param port Port to start service on. Will be random if null. + * [Workspace] that can expose its tasks to other workspaces as a service. */ -public class ServiceWorkspace( - port: Int? = null, - override val context: Context = Global.buildContext("workspace".asName()), - private val dataSerializer: KSerializer? = null, - data: DataSet<*> = DataTree(), - override val targets: Map = mapOf(), -) : Workspace, Closeable { - private val _port: Int? = port - - override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) - - override val tasks: Map> - get() = context.gather(Task.TYPE) - - private val service = LibService(serviceId, port) { - execute of { name, meta, taskRegistry -> - if (name == Name.EMPTY) { - requireNotNull(dataSerializer) { "Data serializer is not provided on $port" } - DataSetPrototype.of(data, dataSerializer) - } else { - val task = tasks[name] ?: error("Task $name does not exist locally") - require(task is SerializableResultTask) { "Result of $name cannot be serialized" } - val workspace = ProxyWorkspace(taskRegistry) - - // Local function to capture generic parameter - suspend fun execute(task: SerializableResultTask): DataSetPrototype { - val result = task.execute(workspace, name, meta) - return DataSetPrototype.of(result, task.resultSerializer) - } - execute(task) - } - } - } - - /** - * Proxies task calls to right endpoints according to the [TaskRegistry]. - */ - private inner class ProxyWorkspace(private val taskRegistry: TaskRegistry) : Workspace by this { - override val tasks: Map> - get() = object : AbstractMap>() { - override val entries: Set>> - get() = this@ServiceWorkspace.tasks.entries - - override fun get(key: Name): Task<*>? = remoteTask(key) ?: this@ServiceWorkspace.tasks[key] - } - - /** - * Call default implementation to use [tasks] virtually instead of it in [ServiceWorkspace]. - */ - override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = - super.produce(taskName, taskMeta) - - private fun remoteTask(name: Name): RemoteTask<*>? { - val endpoint = taskRegistry.tasks[name] ?: return null - val local = this@ServiceWorkspace.tasks[name] ?: error("No task with name $name locally on $port") - require(local is SerializableResultTask) { "Task $name result is not serializable" } - return RemoteTask(endpoint, local.resultType, local.resultSerializer, local.descriptor, taskRegistry) - } - } - - /** - * Port this workspace is available on. - */ +public interface ServiceWorkspace : Workspace, Closeable { public val port: Int - get() = _port ?: service.port.p + public fun start() + public fun awaitTermination() + public fun shutdown() - /** - * Start [ServiceWorkspace] as a service. - */ - public fun start(): Unit = service.start() - - /** - * Await termination of the service. - */ - public fun awaitTermination(): Unit = service.awaitTermination() - - /** - * Shutdown service. - */ - public fun shutdown(): Unit = service.shutdown() - - override fun close(): Unit = service.shutdown() + override fun close() { + shutdown() + } public companion object { internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246") - internal val execute by serviceId.def(j(), j(MetaSerializer), j(), j()) + internal val execute by serviceId.def(j(), j(MetaSerializer), j(MetaSerializer), j()) } } diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt deleted file mode 100644 index aea1b29e..00000000 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/TaskRegistry.kt +++ /dev/null @@ -1,18 +0,0 @@ -package space.kscience.dataforge.distributed - -import io.lambdarpc.utils.Endpoint -import kotlinx.serialization.Serializable -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.workspace.Task - -@Serializable -internal class TaskRegistry(val tasks: Map) - -internal fun TaskRegistry(tasks: Map>): TaskRegistry { - val remotes = tasks.filterValues { it is RemoteTask<*> } - val endpoints = remotes.mapValues { (_, task) -> - require(task is RemoteTask) - task.endpoint - } - return TaskRegistry(endpoints) -} diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt index 1202dfd9..a76a0ceb 100644 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/Plugins.kt @@ -22,7 +22,7 @@ internal class MyPlugin1 : WorkspacePlugin() { get() = Factory.tag val task by task(serializer()) { - workspace.logger.info { "In ${tag.name}.task" } + workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" } val myInt = workspace.data.getByType("int")!! data("result", myInt.data.map { it + 1 }) } @@ -43,7 +43,7 @@ internal class MyPlugin2 : WorkspacePlugin() { get() = Factory.tag val task by task(serializer()) { - workspace.logger.info { "In ${tag.name}.task" } + workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" } val dataSet = fromTask(Name.of(MyPlugin1.tag.name, "task")) val data = dataSet["result".asName()]!! data("result", data.map { it + 1 }) diff --git a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt index b78b4f56..adf2c67f 100644 --- a/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt +++ b/dataforge-distributed/src/jvmTest/kotlin/space/kscience/dataforge/distributed/RemoteCallTest.kt @@ -19,13 +19,13 @@ import kotlin.test.assertEquals @TestInstance(TestInstance.Lifecycle.PER_CLASS) internal class RemoteCallTest { - private lateinit var worker1: ServiceWorkspace - private lateinit var worker2: ServiceWorkspace + private lateinit var worker1: NodeWorkspace + private lateinit var worker2: NodeWorkspace private lateinit var workspace: Workspace @BeforeAll fun before() = runBlocking { - worker1 = ServiceWorkspace( + worker1 = NodeWorkspace( context = Global.buildContext("worker1".asName()) { plugin(MyPlugin1) }, @@ -35,7 +35,7 @@ internal class RemoteCallTest { ) worker1.start() - worker2 = ServiceWorkspace( + worker2 = NodeWorkspace( context = Global.buildContext("worker2".asName()) { plugin(MyPlugin1) plugin(MyPlugin2) @@ -43,12 +43,18 @@ internal class RemoteCallTest { ) worker2.start() - workspace = Workspace { - context { - plugin(RemotePlugin(MyPlugin1, "localhost:${worker1.port}")) - plugin(RemotePlugin(MyPlugin2, "localhost:${worker2.port}")) + workspace = NodeWorkspace( + context = Global.buildContext { + plugin(MyPlugin1) + plugin(MyPlugin2) + properties { + endpoints { + Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}" + Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}" + } + } } - } + ) } @AfterAll diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt index d39a6dbf..9e3a2ab8 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/MutableMeta.kt @@ -2,11 +2,25 @@ package space.kscience.dataforge.meta import kotlinx.serialization.Serializable import space.kscience.dataforge.misc.DFExperimental -import space.kscience.dataforge.names.* +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.NameToken +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.cutFirst +import space.kscience.dataforge.names.cutLast +import space.kscience.dataforge.names.first +import space.kscience.dataforge.names.firstOrNull +import space.kscience.dataforge.names.isEmpty +import space.kscience.dataforge.names.lastOrNull +import space.kscience.dataforge.names.length +import space.kscience.dataforge.names.plus +import space.kscience.dataforge.names.withIndex import space.kscience.dataforge.values.EnumValue import space.kscience.dataforge.values.MutableValueProvider import space.kscience.dataforge.values.Value import space.kscience.dataforge.values.asValue +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.set import kotlin.js.JsName import kotlin.jvm.Synchronized @@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider { */ public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta) +public fun MutableMeta.put(other: Meta) { + other.items.forEach { (name, meta) -> + name.asName() put meta + } +} + +public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta) + /** * Set or replace value at given [name] */