Refactor remote execution model

This commit is contained in:
Andrey Stoyan 2022-05-21 22:01:34 +03:00
parent 230a3f1e22
commit 77f8f045e6
18 changed files with 386 additions and 267 deletions

View File

@ -15,7 +15,8 @@ kotlin {
jvmMain {
dependencies {
// TODO include fat jar of lambdarpc
api(files("lambdarpc-core-0.0.1.jar"))
val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs"
api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar"))
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
api("io.grpc:grpc-protobuf:1.44.0")

View File

@ -1,37 +0,0 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.utils.Endpoint
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task
import kotlin.reflect.KType
/**
* Plugin that purpose is to communicate with remote plugins.
* @param endpoint Endpoint of the remote plugin.
*/
public abstract class ClientWorkspacePlugin(endpoint: Endpoint) : AbstractPlugin() {
/**
* Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin.
*/
abstract override val tag: PluginTag
/**
* Enumeration of names of remote tasks and their result types.
*/
public abstract val tasks: Map<Name, KType>
private val _tasks: Map<Name, Task<*>> by lazy {
tasks.mapValues { (_, type) ->
RemoteTask<Any>(endpoint, type)
}
}
override fun content(target: String): Map<Name, Any> =
when (target) {
Task.TYPE -> _tasks
else -> emptyMap()
}
}

View File

@ -0,0 +1,36 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.utils.Endpoint
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Plugin
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Task
/**
* Plugin that purpose is to communicate with remote plugins.
* @param plugin A remote plugin to be used.
* @param endpoint Endpoint of the remote plugin.
*/
public class RemotePlugin<P : Plugin>(private val plugin: P, private val endpoint: String) : AbstractPlugin() {
public constructor(factory: PluginFactory<P>, endpoint: String) : this(factory(), endpoint)
override val tag: PluginTag
get() = plugin.tag
private val tasks = plugin.content(Task.TYPE)
.filterValues { it is SerializableResultTask<*> }
.mapValues { (_, task) ->
require(task is SerializableResultTask<*>)
RemoteTask(Endpoint(endpoint), task.resultType, task.resultSerializer)
}
override fun content(target: String): Map<Name, Any> =
when (target) {
Task.TYPE -> tasks
else -> emptyMap()
}
}

View File

@ -1,13 +1,13 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.dsl.ServiceDispatcher
import io.lambdarpc.context.ServiceDispatcher
import io.lambdarpc.utils.Endpoint
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataSet
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
@ -17,20 +17,20 @@ import kotlin.reflect.KType
* Proxy task that communicates with the corresponding remote task.
*/
internal class RemoteTask<T : Any>(
endpoint: Endpoint,
internal val endpoint: Endpoint,
override val resultType: KType,
override val resultSerializer: KSerializer<T>,
override val descriptor: MetaDescriptor? = null,
) : Task<T> {
private val taskRegistry: TaskRegistry? = null,
) : SerializableResultTask<T> {
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint)
@Suppress("UNCHECKED_CAST")
override suspend fun execute(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): TaskResult<T> = withContext(dispatcher) {
val dataset = ServiceWorkspace.execute(taskName)
dataset.finishDecoding(resultType)
workspace.wrapResult(dataset as DataSet<T>, taskName, taskMeta)
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> {
val registry = taskRegistry ?: TaskRegistry(workspace.tasks)
val result = withContext(dispatcher) {
ServiceWorkspace.execute(taskName, taskMeta, registry)
}
val dataSet = result.toDataSet(resultType, resultSerializer)
return workspace.wrapResult(dataSet, taskName, taskMeta)
}
}

View File

@ -3,21 +3,23 @@ package space.kscience.dataforge.distributed
import io.ktor.utils.io.core.*
import io.lambdarpc.dsl.LibService
import io.lambdarpc.dsl.def
import io.lambdarpc.utils.Address
import io.lambdarpc.utils.Port
import io.lambdarpc.utils.toSid
import io.lambdarpc.dsl.j
import io.lambdarpc.utils.ServiceId
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.distributed.serialization.DataSetCoder
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
import space.kscience.dataforge.distributed.serialization.MetaCoder
import space.kscience.dataforge.distributed.serialization.NameCoder
import space.kscience.dataforge.distributed.serialization.SerializableDataSetAdapter
import space.kscience.dataforge.distributed.serialization.TaskRegistryCoder
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
@ -25,37 +27,73 @@ import space.kscience.dataforge.workspace.wrapResult
/**
* Workspace that exposes its tasks for remote clients.
* @param port Port to start service on. Will be random if null.
*/
public class ServiceWorkspace(
address: String = "localhost",
port: Int? = null,
override val context: Context = Global.buildContext("workspace".asName()),
private val dataSerializer: KSerializer<Any>? = null,
data: DataSet<*> = runBlocking { DataTree<Any> {} },
override val targets: Map<String, Meta> = mapOf(),
) : Workspace, Closeable {
private val _port: Int? = port
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>>
get() = context.gather(Task.TYPE)
private val service = LibService(serviceId, address, port) {
execute of { name ->
val res = produce(name, Meta.EMPTY)
SerializableDataSetAdapter(res)
private val service = LibService(serviceId, port) {
execute of { name, meta, taskRegistry ->
if (name == Name.EMPTY) {
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
DataSetPrototype.of(data, dataSerializer)
} else {
val task = tasks[name] ?: error("Task $name does not exist locally")
require(task is SerializableResultTask) { "Result of $name cannot be serialized" }
val workspace = ProxyWorkspace(taskRegistry)
// Local function to capture generic parameter
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
val result = task.execute(workspace, name, meta)
return DataSetPrototype.of(result, task.resultSerializer)
}
execute(task)
}
}
}
/**
* Address this workspace is available on.
* Proxies task calls to right endpoints according to the [TaskRegistry].
*/
public val address: Address = Address(address)
private inner class ProxyWorkspace(private val taskRegistry: TaskRegistry) : Workspace by this {
override val tasks: Map<Name, Task<*>>
get() = object : AbstractMap<Name, Task<*>>() {
override val entries: Set<Map.Entry<Name, Task<*>>>
get() = this@ServiceWorkspace.tasks.entries
override fun get(key: Name): Task<*>? = remoteTask(key) ?: this@ServiceWorkspace.tasks[key]
}
/**
* Call default implementation to use [tasks] virtually instead of it in [ServiceWorkspace].
*/
override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> =
super.produce(taskName, taskMeta)
private fun remoteTask(name: Name): RemoteTask<*>? {
val endpoint = taskRegistry.tasks[name] ?: return null
val local = this@ServiceWorkspace.tasks[name] ?: error("No task with name $name locally on $port")
require(local is SerializableResultTask) { "Task $name result is not serializable" }
return RemoteTask(endpoint, local.resultType, local.resultSerializer, local.descriptor, taskRegistry)
}
}
/**
* Port this workspace is available on.
*/
public val port: Port
get() = service.port
public val port: Int
get() = _port ?: service.port.p
/**
* Start [ServiceWorkspace] as a service.
@ -75,7 +113,7 @@ public class ServiceWorkspace(
override fun close(): Unit = service.shutdown()
public companion object {
internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid()
internal val execute by serviceId.def(NameCoder, DataSetCoder)
internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246")
internal val execute by serviceId.def(NameCoder, MetaCoder, TaskRegistryCoder, j<DataSetPrototype>())
}
}

View File

@ -0,0 +1,18 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.utils.Endpoint
import kotlinx.serialization.Serializable
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task
@Serializable
internal class TaskRegistry(val tasks: Map<Name, Endpoint>)
internal fun TaskRegistry(tasks: Map<Name, Task<*>>): TaskRegistry {
val remotes = tasks.filterValues { it is RemoteTask<*> }
val endpoints = remotes.mapValues { (_, task) ->
require(task is RemoteTask)
task.endpoint
}
return TaskRegistry(endpoints)
}

View File

@ -1,23 +0,0 @@
package space.kscience.dataforge.distributed.serialization
import io.lambdarpc.coding.Coder
import io.lambdarpc.coding.CodingContext
import io.lambdarpc.transport.grpc.Entity
import io.lambdarpc.transport.serialization.Entity
import io.lambdarpc.transport.serialization.RawData
import kotlinx.coroutines.runBlocking
import java.nio.charset.Charset
internal object DataSetCoder : Coder<SerializableDataSet<Any>> {
override fun decode(entity: Entity, context: CodingContext): SerializableDataSet<Any> {
val string = entity.data.toString(Charset.defaultCharset())
val prototype = DataSetPrototype.fromJson(string)
return prototype.toDataSet()
}
override fun encode(value: SerializableDataSet<Any>, context: CodingContext): Entity {
val prototype = runBlocking { DataSetPrototype.of(value) } // TODO update LambdaRPC and remove blocking
val string = prototype.toJson()
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
}
}

View File

@ -0,0 +1,23 @@
package space.kscience.dataforge.distributed.serialization
import io.lambdarpc.coding.Coder
import io.lambdarpc.coding.CodingContext
import io.lambdarpc.transport.grpc.Entity
import io.lambdarpc.transport.serialization.Entity
import io.lambdarpc.transport.serialization.RawData
import kotlinx.serialization.json.Json
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaSerializer
import java.nio.charset.Charset
internal object MetaCoder : Coder<Meta> {
override suspend fun decode(entity: Entity, context: CodingContext): Meta {
val string = entity.data.toString(Charset.defaultCharset())
return Json.decodeFromString(MetaSerializer, string)
}
override suspend fun encode(value: Meta, context: CodingContext): Entity {
val string = Json.encodeToString(MetaSerializer, value)
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
}
}

View File

@ -5,16 +5,18 @@ import io.lambdarpc.coding.CodingContext
import io.lambdarpc.transport.grpc.Entity
import io.lambdarpc.transport.serialization.Entity
import io.lambdarpc.transport.serialization.RawData
import kotlinx.serialization.json.Json
import space.kscience.dataforge.names.Name
import java.nio.charset.Charset
internal object NameCoder : Coder<Name> {
override fun decode(entity: Entity, context: CodingContext): Name {
require(entity.hasData()) { "Entity should contain data" }
override suspend fun decode(entity: Entity, context: CodingContext): Name {
val string = entity.data.toString(Charset.defaultCharset())
return Name.parse(string)
return Json.decodeFromString(Name.serializer(), string)
}
override fun encode(value: Name, context: CodingContext): Entity =
Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset()))
override suspend fun encode(value: Name, context: CodingContext): Entity {
val string = Json.encodeToString(Name.serializer(), value)
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
}
}

View File

@ -1,16 +0,0 @@
package space.kscience.dataforge.distributed.serialization
import space.kscience.dataforge.data.DataSet
import kotlin.reflect.KType
/**
* Represents [DataSet] that should be initialized before usage.
*/
internal interface SerializableDataSet<T : Any> : DataSet<T> {
fun finishDecoding(type: KType)
}
internal class SerializableDataSetAdapter<T : Any>(dataSet: DataSet<T>) :
SerializableDataSet<T>, DataSet<T> by dataSet {
override fun finishDecoding(type: KType) = Unit
}

View File

@ -0,0 +1,22 @@
package space.kscience.dataforge.distributed.serialization
import io.lambdarpc.coding.Coder
import io.lambdarpc.coding.CodingContext
import io.lambdarpc.transport.grpc.Entity
import io.lambdarpc.transport.serialization.Entity
import io.lambdarpc.transport.serialization.RawData
import kotlinx.serialization.json.Json
import space.kscience.dataforge.distributed.TaskRegistry
import java.nio.charset.Charset
internal object TaskRegistryCoder : Coder<TaskRegistry> {
override suspend fun decode(entity: Entity, context: CodingContext): TaskRegistry {
val string = entity.data.toString(Charset.defaultCharset())
return Json.decodeFromString(TaskRegistry.serializer(), string)
}
override suspend fun encode(value: TaskRegistry, context: CodingContext): Entity {
val string = Json.encodeToString(TaskRegistry.serializer(), value)
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
}
}

View File

@ -21,11 +21,11 @@ internal data class DataPrototype(
val meta: String,
val data: String,
) {
fun toData(type: KType): Data<Any> =
fun <T : Any> toData(type: KType, serializer: KSerializer<T>): Data<T> =
SimpleData(
type = type,
meta = Json.decodeFromString(MetaSerializer, meta),
data = Json.decodeFromString(kotlinx.serialization.serializer(type), data)!!
data = Json.decodeFromString(serializer, data)
)
companion object {

View File

@ -7,9 +7,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.NamedData
@ -23,51 +22,40 @@ import kotlin.reflect.KType
*/
@Serializable
internal data class DataSetPrototype(val data: Map<String, DataPrototype>) {
fun toDataSet(): SerializableDataSet<Any> =
SerializableDataSetImpl(this)
fun toJson(): String = Json.encodeToString(serializer(), this)
fun <T : Any> toDataSet(type: KType, serializer: KSerializer<T>): DataSet<T> {
val data = data
.mapKeys { (name, _) -> Name.of(name) }
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) }
return SerializableDataSetImpl(type, data)
}
companion object {
suspend fun <T : Any> of(dataSet: DataSet<T>): DataSetPrototype = coroutineScope {
val serializer = serializer(dataSet.dataType)
suspend fun <T : Any> of(dataSet: DataSet<T>, serializer: KSerializer<T>): DataSetPrototype = coroutineScope {
val flow = mutableListOf<Pair<String, Deferred<DataPrototype>>>()
dataSet.flowData().map { (name, data) ->
name.toString() to async { DataPrototype.of(data, serializer) }
}.toList(flow)
DataSetPrototype(flow.associate { (name, deferred) -> name to deferred.await() })
}
fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string)
}
}
/**
* Trivial [SerializableDataSet] implementation.
* Trivial [DataSet] implementation.
*/
private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : SerializableDataSet<Any> {
private class SerializableDataSetImpl<T : Any>(
override val dataType: KType,
private val data: Map<Name, Data<T>>,
) : DataSet<T> {
private lateinit var type: KType
private lateinit var data: Map<Name, Data<Any>>
override fun finishDecoding(type: KType) {
this.type = type
this.data = prototype.data
.mapKeys { (name, _) -> Name.of(name) }
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type) }
}
override val dataType: KType
get() = type
override fun flowData(): Flow<NamedData<Any>> =
override fun flowData(): Flow<NamedData<T>> =
data.map { (name, data) -> SimpleNamedData(name, data) }.asFlow()
override suspend fun getData(name: Name): Data<Any>? = data[name]
override suspend fun getData(name: Name): Data<T>? = data[name]
/**
* Trivial named data implementation.
*/
private class SimpleNamedData(override val name: Name, override val data: Data<Any>) :
NamedData<Any>, Data<Any> by data
private class SimpleNamedData<T : Any>(override val name: Name, override val data: Data<T>) :
NamedData<T>, Data<T> by data
}

View File

@ -0,0 +1,61 @@
package space.kscience.dataforge.distributed
import kotlinx.serialization.serializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.context.info
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.data.map
import space.kscience.dataforge.data.select
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.WorkspacePlugin
import space.kscience.dataforge.workspace.fromTask
import space.kscience.dataforge.workspace.task
import kotlin.reflect.KClass
internal class MyPlugin1 : WorkspacePlugin() {
override val tag: PluginTag
get() = Factory.tag
val task by task<Int>(serializer()) {
workspace.logger.info { "In ${tag.name}.task" }
val myInt = workspace.data.select<Int>()
val res = myInt.getData("int".asName())!!
emit("result".asName(), res.map { it + 1 })
}
companion object Factory : PluginFactory<MyPlugin1> {
override fun invoke(meta: Meta, context: Context): MyPlugin1 = MyPlugin1()
override val tag: PluginTag
get() = PluginTag("Plg1")
override val type: KClass<out MyPlugin1>
get() = MyPlugin1::class
}
}
internal class MyPlugin2 : WorkspacePlugin() {
override val tag: PluginTag
get() = Factory.tag
val task by task<Int>(serializer()) {
workspace.logger.info { "In ${tag.name}.task" }
val dataSet = fromTask<Int>(Name.of(MyPlugin1.tag.name, "task"))
val data = dataSet.getData("result".asName())!!
emit("result".asName(), data.map { it + 1 })
}
companion object Factory : PluginFactory<MyPlugin2> {
override fun invoke(meta: Meta, context: Context): MyPlugin2 = MyPlugin2()
override val tag: PluginTag
get() = PluginTag("Plg2")
override val type: KClass<out MyPlugin2>
get() = MyPlugin2::class
}
}

View File

@ -0,0 +1,89 @@
package space.kscience.dataforge.distributed
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.getData
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.Workspace
import kotlin.test.assertEquals
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class RemoteCallTest {
private lateinit var worker1: ServiceWorkspace
private lateinit var worker2: ServiceWorkspace
private lateinit var workspace: Workspace
@BeforeAll
fun before() {
worker1 = ServiceWorkspace(
context = Global.buildContext("worker1".asName()) {
plugin(MyPlugin1)
},
data = runBlocking {
DataTree<Any> {
static("int", 42)
}
},
)
worker1.start()
worker2 = ServiceWorkspace(
context = Global.buildContext("worker2".asName()) {
plugin(MyPlugin1)
plugin(MyPlugin2)
},
)
worker2.start()
workspace = Workspace {
context {
plugin(RemotePlugin(MyPlugin1, "localhost:${worker1.port}"))
plugin(RemotePlugin(MyPlugin2, "localhost:${worker2.port}"))
}
}
}
@AfterAll
fun after() {
worker1.shutdown()
worker2.shutdown()
}
@Test
fun `local execution`() = runBlocking {
assertEquals(42, worker1.data.getData("int")!!.await())
val res = worker1
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(43, res)
}
@Test
fun `remote execution`() = runBlocking {
val remoteRes = workspace
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(43, remoteRes)
}
@Test
fun `transitive execution`() = runBlocking {
val remoteRes = workspace
.produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(44, remoteRes)
}
}

View File

@ -1,112 +0,0 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.utils.Endpoint
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.getData
import space.kscience.dataforge.data.map
import space.kscience.dataforge.data.select
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.WorkspacePlugin
import space.kscience.dataforge.workspace.task
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.assertEquals
private class MyPlugin : WorkspacePlugin() {
override val tag: PluginTag
get() = Factory.tag
val task by task<Int> {
val myInt = workspace.data.select<Int>()
val res = myInt.getData("int".asName())!!
emit("result".asName(), res.map { it + 1 })
}
companion object Factory : PluginFactory<MyPlugin> {
override fun invoke(meta: Meta, context: Context): MyPlugin = MyPlugin()
override val tag: PluginTag
get() = PluginTag("Plg")
override val type: KClass<out MyPlugin>
get() = MyPlugin::class
}
}
private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(endpoint) {
override val tag: PluginTag
get() = MyPlugin.tag
override val tasks: Map<Name, KType>
get() = mapOf(
"task".asName() to typeOf<Int>()
)
}
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ServiceWorkspaceTest {
private lateinit var worker1: ServiceWorkspace
private lateinit var workspace: Workspace
@BeforeAll
fun before() {
worker1 = ServiceWorkspace(
context = Global.buildContext("worker1".asName()) {
plugin(MyPlugin)
},
data = runBlocking {
DataTree<Any> {
static("int", 0)
}
},
)
worker1.start()
workspace = Workspace {
context {
val endpoint = Endpoint(worker1.address, worker1.port)
plugin(RemoteMyPlugin(endpoint))
}
}
}
@AfterAll
fun after() {
worker1.shutdown()
}
@Test
fun localExecution() = runBlocking {
assertEquals(0, worker1.data.getData("int")!!.await())
val res = worker1
.produce(Name.of("Plg", "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(1, res)
}
@Test
fun remoteExecution() = runBlocking {
val remoteRes = workspace
.produce(Name.of("Plg", "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(1, remoteRes)
}
}

View File

@ -1,6 +1,8 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.serializer
import space.kscience.dataforge.data.DataSetBuilder
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.GoalExecutionRestriction
@ -17,11 +19,6 @@ import kotlin.reflect.typeOf
@Type(TYPE)
public interface Task<out T : Any> : Described {
/**
* Type of the task result data.
*/
public val resultType: KType
/**
* Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result
@ -37,6 +34,12 @@ public interface Task<out T : Any> : Described {
}
}
@Type(TYPE)
public interface SerializableResultTask<T : Any> : Task<T> {
public val resultType: KType
public val resultSerializer: KSerializer<T>
}
public class TaskResultBuilder<T : Any>(
public val workspace: Workspace,
public val taskName: Name,
@ -60,9 +63,6 @@ public fun <T : Any> Task(
builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = object : Task<T> {
override val resultType: KType
get() = resultType
override val descriptor: MetaDescriptor? = descriptor
override suspend fun execute(
@ -78,9 +78,28 @@ public fun <T : Any> Task(
}
}
/**
* [Task] that has [resultSerializer] to be able to cache or send its results
*/
@DFInternal
public class SerializableResultTaskImpl<T : Any>(
override val resultType: KType,
override val resultSerializer: KSerializer<T>,
descriptor: MetaDescriptor? = null,
builder: suspend TaskResultBuilder<T>.() -> Unit,
) : SerializableResultTask<T>, Task<T> by Task(resultType, descriptor, builder)
@OptIn(DFInternal::class)
@Suppress("FunctionName")
public inline fun <reified T : Any> Task(
descriptor: MetaDescriptor? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = Task(typeOf<T>(), descriptor, builder)
@OptIn(DFInternal::class)
@Suppress("FunctionName")
public inline fun <reified T : Any> SerializableResultTask(
resultSerializer: KSerializer<T> = serializer(),
descriptor: MetaDescriptor? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = SerializableResultTaskImpl(typeOf<T>(), resultSerializer, descriptor, builder)

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.context.Global
@ -37,25 +38,34 @@ public interface TaskContainer {
public inline fun <reified T : Any> TaskContainer.registerTask(
name: String,
resultSerializer: KSerializer<T>? = null,
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
) {
val descriptor = MetaDescriptor(descriptorBuilder)
val task = if (resultSerializer == null) Task(descriptor, builder) else
SerializableResultTask(resultSerializer, descriptor, builder)
registerTask(Name.parse(name), task)
}
public inline fun <reified T : Any> TaskContainer.task(
descriptor: MetaDescriptor,
resultSerializer: KSerializer<T>? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
val taskName = Name.parse(property.name)
val task = Task(descriptor, builder)
val task = if (resultSerializer == null) Task(descriptor, builder) else
SerializableResultTask(resultSerializer, descriptor, builder)
registerTask(taskName, task)
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
}
public inline fun <reified T : Any> TaskContainer.task(
resultSerializer: KSerializer<T>? = null,
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
task(MetaDescriptor(descriptorBuilder), builder)
task(MetaDescriptor(descriptorBuilder), resultSerializer, builder)
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
private var context: Context? = null