Call remote tasks of service workspace #75

Closed
winter-yuki wants to merge 25 commits from winter-yuki/distributed into 0.6
3 changed files with 34 additions and 10 deletions
Showing only changes of commit ba3354b4a2 - Show all commits

View File

@ -15,6 +15,7 @@ import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Workspace
/**
* Workspace that exposes its tasks for remote clients.
@ -26,7 +27,7 @@ public class NodeWorkspace(
private val dataSerializer: KSerializer<Any>? = null,
data: DataSet<*> = DataTree<Any>(),
targets: Map<String, Meta> = mapOf(),
) : RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
altavir commented 2022-06-22 18:50:33 +03:00 (Migrated from github.com)
Review

Why do we need such complex construct. Isn't it possible to add a simple wrapper by adding a connection and serializator resolver. Something like this for server:.

/**
 * Expose existing workspace to external connections. Use Job to handle the lifecycle.
 * Serializers are resolved dynamically when appropriate task is called. If serializer is not found,
 * a "soft" error is produced.
 * /
fun Workspace.serve(serializatorResolver: (KType)->KSerializer<*>): Job

And for client:

fun Workspace.Companion.connect(ip: String, port: Int): Workspace

We can add a close method to a generic Workspace to handle the lifecycle.

Why do we need such complex construct. Isn't it possible to add a simple wrapper by adding a connection and serializator resolver. Something like this for server:. ```kotlin /** * Expose existing workspace to external connections. Use Job to handle the lifecycle. * Serializers are resolved dynamically when appropriate task is called. If serializer is not found, * a "soft" error is produced. * / fun Workspace.serve(serializatorResolver: (KType)->KSerializer<*>): Job ``` And for client: ```kotlin fun Workspace.Companion.connect(ip: String, port: Int): Workspace ``` We can add a `close` method to a generic Workspace to handle the lifecycle.
private val _port: Int? = port
private val service = LibService(serviceId, port) {

View File

@ -7,6 +7,7 @@ import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.extract
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
@ -21,7 +22,7 @@ import space.kscience.dataforge.workspace.wrapResult
* Workspace that returns [RemoteTask] if such task should be
* executed remotely according to the execution context.
*/
public open class RemoteTaskWorkspace(
internal open class RemoteTaskWorkspace(
final override val context: Context = Global.buildContext("workspace".asName()),
data: DataSet<*> = DataTree<Any>(),
override val targets: Map<String, Meta> = mapOf(),
@ -37,8 +38,10 @@ public open class RemoteTaskWorkspace(
get() = _tasks.entries
override fun get(key: Name): Task<*>? {
val executionContext = context.properties[EXECUTION_CONTEXT]
val endpoint = executionContext?.get(ENDPOINTS)?.toMeta()?.get(key) ?: return _tasks[key]
val executionContext = context.properties.extract(EXECUTION_CONTEXT)
val endpoint = executionContext
?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key)
?: return _tasks[key]
val string = endpoint.value?.string ?: error("Endpoint is expected to be a string")
val local = _tasks[key] ?: error("No task with name $key")
require(local is SerializableResultTask) { "Task $key result is not serializable" }
@ -46,9 +49,9 @@ public open class RemoteTaskWorkspace(
}
}
public companion object {
internal val EXECUTION_CONTEXT = "execution".asName()
internal val ENDPOINTS = "endpoints".asName()
companion object {
val EXECUTION_CONTEXT = "execution".asName()
val ENDPOINTS = "endpoints".asName()
}
}

View File

@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.misc.unsafeCast
import space.kscience.dataforge.names.*
import space.kscience.dataforge.values.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.cutFirst
import space.kscience.dataforge.names.cutLast
import space.kscience.dataforge.names.firstOrNull
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.lastOrNull
import space.kscience.dataforge.names.length
import space.kscience.dataforge.names.parseAsName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.values.EnumValue
import space.kscience.dataforge.values.Value
import space.kscience.dataforge.values.ValueProvider
import space.kscience.dataforge.values.boolean
import space.kscience.dataforge.values.numberOrNull
import space.kscience.dataforge.values.string
/**
@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name)
*/
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
public fun Meta.extract(name: Name): Meta? {
val value = get(name) ?: return null
return Meta { name put value }
}
/**
* Get all items matching given name. The index of the last element, if present is used as a [Regex],
* against which indexes of elements are matched.
@ -135,7 +155,7 @@ public fun Meta.getIndexed(name: Name): Map<String?, Meta> {
}
}
public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName())
public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName())
/**
* A meta node that ensures that all of its descendants has at least the same type.