From ba3354b4a2b884c62d89a24ea20f142e36d92cd1 Mon Sep 17 00:00:00 2001 From: Andrey Stoyan Date: Wed, 1 Jun 2022 23:41:21 +0300 Subject: [PATCH] Fix transitive execution --- .../dataforge/distributed/NodeWorkspace.kt | 3 ++- .../distributed/RemoteTaskWorkspace.kt | 15 ++++++----- .../space/kscience/dataforge/meta/Meta.kt | 26 ++++++++++++++++--- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt index c9238c09..76b9358f 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/NodeWorkspace.kt @@ -15,6 +15,7 @@ import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.plus import space.kscience.dataforge.workspace.SerializableResultTask +import space.kscience.dataforge.workspace.Workspace /** * Workspace that exposes its tasks for remote clients. @@ -26,7 +27,7 @@ public class NodeWorkspace( private val dataSerializer: KSerializer? = null, data: DataSet<*> = DataTree(), targets: Map = mapOf(), -) : RemoteTaskWorkspace(context, data, targets), ServiceWorkspace { +) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace { private val _port: Int? = port private val service = LibService(serviceId, port) { diff --git a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt index 9baf7312..286a09d9 100644 --- a/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt +++ b/dataforge-distributed/src/jvmMain/kotlin/space/kscience/dataforge/distributed/RemoteTaskWorkspace.kt @@ -7,6 +7,7 @@ import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta +import space.kscience.dataforge.meta.extract import space.kscience.dataforge.meta.get import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName @@ -21,7 +22,7 @@ import space.kscience.dataforge.workspace.wrapResult * Workspace that returns [RemoteTask] if such task should be * executed remotely according to the execution context. */ -public open class RemoteTaskWorkspace( +internal open class RemoteTaskWorkspace( final override val context: Context = Global.buildContext("workspace".asName()), data: DataSet<*> = DataTree(), override val targets: Map = mapOf(), @@ -37,8 +38,10 @@ public open class RemoteTaskWorkspace( get() = _tasks.entries override fun get(key: Name): Task<*>? { - val executionContext = context.properties[EXECUTION_CONTEXT] - val endpoint = executionContext?.get(ENDPOINTS)?.toMeta()?.get(key) ?: return _tasks[key] + val executionContext = context.properties.extract(EXECUTION_CONTEXT) + val endpoint = executionContext + ?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key) + ?: return _tasks[key] val string = endpoint.value?.string ?: error("Endpoint is expected to be a string") val local = _tasks[key] ?: error("No task with name $key") require(local is SerializableResultTask) { "Task $key result is not serializable" } @@ -46,9 +49,9 @@ public open class RemoteTaskWorkspace( } } - public companion object { - internal val EXECUTION_CONTEXT = "execution".asName() - internal val ENDPOINTS = "endpoints".asName() + companion object { + val EXECUTION_CONTEXT = "execution".asName() + val ENDPOINTS = "endpoints".asName() } } diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt index 01a5ebf2..9ee5c6fa 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt @@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import space.kscience.dataforge.misc.Type import space.kscience.dataforge.misc.unsafeCast -import space.kscience.dataforge.names.* -import space.kscience.dataforge.values.* +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.NameToken +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.cutFirst +import space.kscience.dataforge.names.cutLast +import space.kscience.dataforge.names.firstOrNull +import space.kscience.dataforge.names.isEmpty +import space.kscience.dataforge.names.lastOrNull +import space.kscience.dataforge.names.length +import space.kscience.dataforge.names.parseAsName +import space.kscience.dataforge.names.plus +import space.kscience.dataforge.values.EnumValue +import space.kscience.dataforge.values.Value +import space.kscience.dataforge.values.ValueProvider +import space.kscience.dataforge.values.boolean +import space.kscience.dataforge.values.numberOrNull +import space.kscience.dataforge.values.string /** @@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name) */ public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key)) +public fun Meta.extract(name: Name): Meta? { + val value = get(name) ?: return null + return Meta { name put value } +} + /** * Get all items matching given name. The index of the last element, if present is used as a [Regex], * against which indexes of elements are matched. @@ -135,7 +155,7 @@ public fun Meta.getIndexed(name: Name): Map { } } -public fun Meta.getIndexed(name: String): Map = getIndexed(name.parseAsName()) +public fun Meta.getIndexed(name: String): Map = getIndexed(name.parseAsName()) /** * A meta node that ensures that all of its descendants has at least the same type.