Endpoint info via meta

This commit is contained in:
Andrey Stoyan 2022-06-01 23:31:51 +03:00
parent 99efe3a456
commit 7e13c3dec6
9 changed files with 205 additions and 171 deletions

View File

@ -0,0 +1,78 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.dsl.LibService
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.put
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.SerializableResultTask
/**
* Workspace that exposes its tasks for remote clients.
* @param port Port to start service on. Will be random if null.
*/
public class NodeWorkspace(
port: Int? = null,
context: Context = Global.buildContext("workspace".asName()),
private val dataSerializer: KSerializer<Any>? = null,
data: DataSet<*> = DataTree<Any>(),
targets: Map<String, Meta> = mapOf(),
) : RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
private val _port: Int? = port
private val service = LibService(serviceId, port) {
execute of { name, meta, executionContext ->
if (name == Name.EMPTY) {
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
DataSetPrototype.of(data, dataSerializer)
} else {
val proxyContext = context.buildContext(context.name + "proxy") {
properties {
put(executionContext)
}
}
val proxy = RemoteTaskWorkspace(context = proxyContext, data = data)
val task = tasks[name] ?: error("Task with name $name not found in the workspace")
require(task is SerializableResultTask)
// Local function to capture generic parameter
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
val result = task.execute(proxy, name, meta)
return DataSetPrototype.of(result, task.resultSerializer)
}
execute(task)
}
}
}
/**
* Port this workspace is available on.
*/
public override val port: Int
get() = _port ?: service.port.p
/**
* Start [NodeWorkspace] as a service.
*/
public override fun start(): Unit = service.start()
/**
* Await termination of the service.
*/
public override fun awaitTermination(): Unit = service.awaitTermination()
/**
* Shutdown service.
*/
public override fun shutdown(): Unit = service.shutdown()
override fun close(): Unit = shutdown()
}

View File

@ -1,39 +0,0 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.utils.Endpoint
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.Plugin
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Task
/**
* Plugin that purpose is to communicate with remote plugins.
* @param plugin A remote plugin to be used.
* @param endpoint Endpoint of the remote plugin.
*/
public class RemotePlugin<P : Plugin>(private val plugin: P, private val endpoint: String) : AbstractPlugin() {
// TODO
public constructor(factory: PluginFactory<P>, endpoint: String) : this(factory.build(Global, Meta.EMPTY), endpoint)
override val tag: PluginTag
get() = plugin.tag
private val tasks = plugin.content(Task.TYPE)
.filterValues { it is SerializableResultTask<*> }
.mapValues { (_, task) ->
require(task is SerializableResultTask<*>)
RemoteTask(Endpoint(endpoint), task.resultType, task.resultSerializer)
}
override fun content(target: String): Map<Name, Any> =
when (target) {
Task.TYPE -> tasks
else -> emptyMap()
}
}

View File

@ -17,18 +17,17 @@ import kotlin.reflect.KType
* Proxy task that communicates with the corresponding remote task.
*/
internal class RemoteTask<T : Any>(
internal val endpoint: Endpoint,
endpoint: String,
override val resultType: KType,
override val resultSerializer: KSerializer<T>,
override val descriptor: MetaDescriptor? = null,
private val taskRegistry: TaskRegistry? = null,
private val executionContext: Meta = Meta.EMPTY,
) : SerializableResultTask<T> {
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint)
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint))
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> {
val registry = taskRegistry ?: TaskRegistry(workspace.tasks)
val result = withContext(dispatcher) {
ServiceWorkspace.execute(taskName, taskMeta, registry)
ServiceWorkspace.execute(taskName, taskMeta, executionContext)
}
val dataSet = result.toDataSet(resultType, resultSerializer)
return workspace.wrapResult(dataSet, taskName, taskMeta)

View File

@ -0,0 +1,73 @@
package space.kscience.dataforge.distributed
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.values.string
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
/**
* Workspace that returns [RemoteTask] if such task should be
* executed remotely according to the execution context.
*/
public open class RemoteTaskWorkspace(
final override val context: Context = Global.buildContext("workspace".asName()),
data: DataSet<*> = DataTree<Any>(),
override val targets: Map<String, Meta> = mapOf(),
) : Workspace {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
private val _tasks: Map<Name, Task<*>> = context.gather(Task.TYPE)
override val tasks: Map<Name, Task<*>>
get() = object : AbstractMap<Name, Task<*>>() {
override val entries: Set<Map.Entry<Name, Task<*>>>
get() = _tasks.entries
override fun get(key: Name): Task<*>? {
val executionContext = context.properties[EXECUTION_CONTEXT]
val endpoint = executionContext?.get(ENDPOINTS)?.toMeta()?.get(key) ?: return _tasks[key]
val string = endpoint.value?.string ?: error("Endpoint is expected to be a string")
val local = _tasks[key] ?: error("No task with name $key")
require(local is SerializableResultTask) { "Task $key result is not serializable" }
return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext)
}
}
public companion object {
internal val EXECUTION_CONTEXT = "execution".asName()
internal val ENDPOINTS = "endpoints".asName()
}
}
public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) {
RemoteTaskWorkspace.EXECUTION_CONTEXT put {
RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build()
}
}
public class EndpointsBuilder {
private val endpoints = mutableMapOf<Name, String>()
public infix fun Name.on(endpoint: String) {
endpoints[this] = endpoint
}
internal fun build(): Meta = Meta {
endpoints.forEach { (name, endpoint) ->
name put endpoint
}
}
}

View File

@ -1,116 +1,29 @@
package space.kscience.dataforge.distributed
import io.ktor.utils.io.core.*
import io.lambdarpc.dsl.LibService
import io.lambdarpc.dsl.def
import io.lambdarpc.dsl.j
import io.lambdarpc.utils.ServiceId
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaSerializer
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
import java.io.Closeable
/**
* Workspace that exposes its tasks for remote clients.
* @param port Port to start service on. Will be random if null.
* [Workspace] that can expose its tasks to other workspaces as a service.
*/
public class ServiceWorkspace(
port: Int? = null,
override val context: Context = Global.buildContext("workspace".asName()),
private val dataSerializer: KSerializer<Any>? = null,
data: DataSet<*> = DataTree<Any>(),
override val targets: Map<String, Meta> = mapOf(),
) : Workspace, Closeable {
private val _port: Int? = port
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>>
get() = context.gather(Task.TYPE)
private val service = LibService(serviceId, port) {
execute of { name, meta, taskRegistry ->
if (name == Name.EMPTY) {
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
DataSetPrototype.of(data, dataSerializer)
} else {
val task = tasks[name] ?: error("Task $name does not exist locally")
require(task is SerializableResultTask) { "Result of $name cannot be serialized" }
val workspace = ProxyWorkspace(taskRegistry)
// Local function to capture generic parameter
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
val result = task.execute(workspace, name, meta)
return DataSetPrototype.of(result, task.resultSerializer)
}
execute(task)
}
}
}
/**
* Proxies task calls to right endpoints according to the [TaskRegistry].
*/
private inner class ProxyWorkspace(private val taskRegistry: TaskRegistry) : Workspace by this {
override val tasks: Map<Name, Task<*>>
get() = object : AbstractMap<Name, Task<*>>() {
override val entries: Set<Map.Entry<Name, Task<*>>>
get() = this@ServiceWorkspace.tasks.entries
override fun get(key: Name): Task<*>? = remoteTask(key) ?: this@ServiceWorkspace.tasks[key]
}
/**
* Call default implementation to use [tasks] virtually instead of it in [ServiceWorkspace].
*/
override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> =
super.produce(taskName, taskMeta)
private fun remoteTask(name: Name): RemoteTask<*>? {
val endpoint = taskRegistry.tasks[name] ?: return null
val local = this@ServiceWorkspace.tasks[name] ?: error("No task with name $name locally on $port")
require(local is SerializableResultTask) { "Task $name result is not serializable" }
return RemoteTask(endpoint, local.resultType, local.resultSerializer, local.descriptor, taskRegistry)
}
}
/**
* Port this workspace is available on.
*/
public interface ServiceWorkspace : Workspace, Closeable {
public val port: Int
get() = _port ?: service.port.p
public fun start()
public fun awaitTermination()
public fun shutdown()
/**
* Start [ServiceWorkspace] as a service.
*/
public fun start(): Unit = service.start()
/**
* Await termination of the service.
*/
public fun awaitTermination(): Unit = service.awaitTermination()
/**
* Shutdown service.
*/
public fun shutdown(): Unit = service.shutdown()
override fun close(): Unit = service.shutdown()
override fun close() {
shutdown()
}
public companion object {
internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246")
internal val execute by serviceId.def(j<Name>(), j(MetaSerializer), j<TaskRegistry>(), j<DataSetPrototype>())
internal val execute by serviceId.def(j<Name>(), j(MetaSerializer), j(MetaSerializer), j<DataSetPrototype>())
}
}

View File

@ -1,18 +0,0 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.utils.Endpoint
import kotlinx.serialization.Serializable
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task
@Serializable
internal class TaskRegistry(val tasks: Map<Name, Endpoint>)
internal fun TaskRegistry(tasks: Map<Name, Task<*>>): TaskRegistry {
val remotes = tasks.filterValues { it is RemoteTask<*> }
val endpoints = remotes.mapValues { (_, task) ->
require(task is RemoteTask)
task.endpoint
}
return TaskRegistry(endpoints)
}

View File

@ -22,7 +22,7 @@ internal class MyPlugin1 : WorkspacePlugin() {
get() = Factory.tag
val task by task<Int>(serializer()) {
workspace.logger.info { "In ${tag.name}.task" }
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
val myInt = workspace.data.getByType<Int>("int")!!
data("result", myInt.data.map { it + 1 })
}
@ -43,7 +43,7 @@ internal class MyPlugin2 : WorkspacePlugin() {
get() = Factory.tag
val task by task<Int>(serializer()) {
workspace.logger.info { "In ${tag.name}.task" }
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
val dataSet = fromTask<Int>(Name.of(MyPlugin1.tag.name, "task"))
val data = dataSet["result".asName()]!!
data("result", data.map { it + 1 })

View File

@ -19,13 +19,13 @@ import kotlin.test.assertEquals
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class RemoteCallTest {
private lateinit var worker1: ServiceWorkspace
private lateinit var worker2: ServiceWorkspace
private lateinit var worker1: NodeWorkspace
private lateinit var worker2: NodeWorkspace
private lateinit var workspace: Workspace
@BeforeAll
fun before() = runBlocking {
worker1 = ServiceWorkspace(
worker1 = NodeWorkspace(
context = Global.buildContext("worker1".asName()) {
plugin(MyPlugin1)
},
@ -35,7 +35,7 @@ internal class RemoteCallTest {
)
worker1.start()
worker2 = ServiceWorkspace(
worker2 = NodeWorkspace(
context = Global.buildContext("worker2".asName()) {
plugin(MyPlugin1)
plugin(MyPlugin2)
@ -43,12 +43,18 @@ internal class RemoteCallTest {
)
worker2.start()
workspace = Workspace {
context {
plugin(RemotePlugin(MyPlugin1, "localhost:${worker1.port}"))
plugin(RemotePlugin(MyPlugin2, "localhost:${worker2.port}"))
workspace = NodeWorkspace(
context = Global.buildContext {
plugin(MyPlugin1)
plugin(MyPlugin2)
properties {
endpoints {
Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}"
Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}"
}
}
}
}
)
}
@AfterAll

View File

@ -2,11 +2,25 @@ package space.kscience.dataforge.meta
import kotlinx.serialization.Serializable
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.cutFirst
import space.kscience.dataforge.names.cutLast
import space.kscience.dataforge.names.first
import space.kscience.dataforge.names.firstOrNull
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.lastOrNull
import space.kscience.dataforge.names.length
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.names.withIndex
import space.kscience.dataforge.values.EnumValue
import space.kscience.dataforge.values.MutableValueProvider
import space.kscience.dataforge.values.Value
import space.kscience.dataforge.values.asValue
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.js.JsName
import kotlin.jvm.Synchronized
@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider {
*/
public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta)
public fun MutableMeta.put(other: Meta) {
other.items.forEach { (name, meta) ->
name.asName() put meta
}
}
public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta)
/**
* Set or replace value at given [name]
*/