Call remote task of service workspace

This commit is contained in:
Andrey Stoyan 2022-04-30 19:34:27 +03:00
parent be8e971436
commit a4044c82a0
7 changed files with 410 additions and 3 deletions

View File

@ -30,3 +30,6 @@ public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is Named
} else {
NamedDataImpl(name, this)
}
public operator fun <T : Any> NamedData<T>.component1(): Name = name
public operator fun <T : Any> NamedData<T>.component2(): Data<T> = data

View File

@ -12,6 +12,27 @@ kotlin {
api(project(":dataforge-io"))
}
}
jvmMain {
dependencies {
// TODO include fat jar of lambdarpc
api(files("lambdarpc-core-0.0.1.jar"))
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
api("io.grpc:grpc-protobuf:1.44.0")
api("com.google.protobuf:protobuf-java-util:3.19.4")
api("com.google.protobuf:protobuf-kotlin:3.19.4")
api("io.grpc:grpc-kotlin-stub:1.2.1")
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
api("org.slf4j:slf4j-simple:1.7.36")
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
}
}
}
}
kscience {
useSerialization {
json()
}
}

View File

@ -17,6 +17,11 @@ import kotlin.reflect.typeOf
@Type(TYPE)
public interface Task<out T : Any> : Described {
/**
* Type of the task result data.
*/
public val resultType: KType
/**
* Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result
@ -55,6 +60,9 @@ public fun <T : Any> Task(
builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = object : Task<T> {
override val resultType: KType
get() = resultType
override val descriptor: MetaDescriptor? = descriptor
override suspend fun execute(

View File

@ -0,0 +1,31 @@
package space.kscience.dataforge.workspace.distributed
import io.lambdarpc.utils.Endpoint
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task
import kotlin.reflect.KType
/**
* Plugin that purpose is to communicate with remote plugins.
* @param tag Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin.
* @param endpoint Endpoint of the remote plugin.
* @param tasks Enumeration of names of remote tasks and their result types.
*/
public abstract class ClientWorkspacePlugin(
override val tag: PluginTag,
endpoint: Endpoint,
vararg tasks: Pair<Name, KType>,
) : AbstractPlugin() {
private val tasks: Map<Name, Task<*>> = tasks.associate { (name, type) ->
name to RemoteTask<Any>(endpoint, type)
}
override fun content(target: String): Map<Name, Any> =
when (target) {
Task.TYPE -> tasks
else -> emptyMap()
}
}

View File

@ -0,0 +1,36 @@
package space.kscience.dataforge.workspace.distributed
import io.lambdarpc.dsl.ServiceDispatcher
import io.lambdarpc.utils.Endpoint
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
import kotlin.reflect.KType
/**
* Proxy task that communicates with the corresponding remote task.
*/
internal class RemoteTask<T : Any>(
endpoint: Endpoint,
override val resultType: KType,
override val descriptor: MetaDescriptor? = null,
) : Task<T> {
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint)
@Suppress("UNCHECKED_CAST")
override suspend fun execute(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): TaskResult<T> = withContext(dispatcher) {
val dataset = ServiceWorkspace.execute(taskName) as LazyDecodableDataSet
dataset.finishDecoding(resultType)
workspace.wrapResult(dataset as DataSet<T>, taskName, taskMeta)
}
}

View File

@ -0,0 +1,202 @@
package space.kscience.dataforge.workspace.distributed
import io.ktor.utils.io.core.*
import io.lambdarpc.coding.Coder
import io.lambdarpc.coding.CodingContext
import io.lambdarpc.dsl.LibService
import io.lambdarpc.dsl.def
import io.lambdarpc.transport.grpc.Entity
import io.lambdarpc.transport.serialization.Entity
import io.lambdarpc.transport.serialization.RawData
import io.lambdarpc.utils.Address
import io.lambdarpc.utils.Port
import io.lambdarpc.utils.toSid
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.Goal
import space.kscience.dataforge.data.NamedData
import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.component1
import space.kscience.dataforge.data.component2
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
import java.nio.charset.Charset
import kotlin.reflect.KType
/**
* Workspace that exposes its tasks for remote clients.
*/
public class ServiceWorkspace(
address: String = "localhost",
port: Int? = null,
override val context: Context = Global.buildContext("workspace".asName()),
data: DataSet<*> = runBlocking { DataTree<Any> {} },
override val targets: Map<String, Meta> = mapOf(),
) : Workspace, Closeable {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>>
get() = context.gather(Task.TYPE)
private val service = LibService(serviceId, address, port) {
execute of { name -> println("service = $name"); produce(name, Meta.EMPTY) } // TODO
}
/**
* Address this workspace is available on.
*/
public val address: Address = Address(address)
/**
* Port this workspace is available on.
*/
public val port: Port
get() = service.port
/**
* Start [ServiceWorkspace] as a service.
*/
public fun start(): Unit = service.start()
/**
* Await termination of the service.
*/
public fun awaitTermination(): Unit = service.awaitTermination()
/**
* Shutdown service.
*/
public fun shutdown(): Unit = service.shutdown()
override fun close(): Unit = service.shutdown()
public companion object {
internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid()
internal val execute by serviceId.def(NameCoder, DataSetCoder)
}
}
private object NameCoder : Coder<Name> {
override fun decode(entity: Entity, context: CodingContext): Name {
require(entity.hasData()) { "Entity should contain data" }
val string = entity.data.toString(Charset.defaultCharset())
return Name.parse(string)
}
override fun encode(value: Name, context: CodingContext): Entity =
Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset()))
}
/**
* Data class that represents serializable [DataSet].
*/
@Serializable
private data class DataSetPrototype(
val data: Map<String, String>,
)
/**
* [DataSetPrototype] builder.
*/
private fun <T : Any> DataSet<T>.toPrototype(): DataSetPrototype = runBlocking {
val serializer = serializer(dataType)
val map = mutableListOf<Pair<String, String>>()
flowData().map { (name, data) ->
name.toString() to Json.encodeToString(serializer, data.await())
}.toList(map)
DataSetPrototype(map.associate { it })
}
/**
* Trivial [Data] implementation.
*/
private class SimpleData(override val type: KType, val data: Any) : Data<Any> {
override val meta: Meta
get() = Meta.EMPTY
override val dependencies: Collection<Goal<*>>
get() = emptyList()
override val deferred: Deferred<Any>
get() = CompletableDeferred(data)
override fun async(coroutineScope: CoroutineScope): Deferred<Any> = deferred
override fun reset() = Unit
}
/**
* Trivial named data implementation.
*/
private class SimpleNamedData(
override val name: Name,
override val data: Data<Any>,
) : NamedData<Any>, Data<Any> by data
/**
* Represents [DataSet] that should be initialized before usage.
*/
internal interface LazyDecodableDataSet<T : Any> : DataSet<T> {
fun finishDecoding(type: KType)
}
/**
* Trivial [LazyDecodableDataSet] implementation.
*/
private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet<Any> {
lateinit var type: KType
lateinit var data: Map<Name, Any>
override fun finishDecoding(type: KType) {
this.type = type
this.data = prototype.data.map { (name, data) ->
Name.parse(name) to Json.decodeFromString(serializer(type), data)
}.associate { (name, data) -> name to data!! }
}
override val dataType: KType
get() = type
override fun flowData(): Flow<NamedData<Any>> =
data.map { (name, data) ->
val wrapped = SimpleData(dataType, data)
SimpleNamedData(name, wrapped)
}.asFlow()
override suspend fun getData(name: Name): Data<Any>? = data[name]?.let { data ->
SimpleData(dataType, data)
}
}
private object DataSetCoder : Coder<DataSet<Any>> {
override fun decode(entity: Entity, context: CodingContext): DataSet<Any> {
val string = entity.data.toString(Charset.defaultCharset())
val prototype = Json.decodeFromString(serializer<DataSetPrototype>(), string)
return SimpleDataSet(prototype)
}
override fun encode(value: DataSet<*>, context: CodingContext): Entity {
val prototype = value.toPrototype()
val string = Json.encodeToString(serializer(), prototype)
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
}
}

View File

@ -0,0 +1,106 @@
package space.kscience.dataforge.workspace
import io.lambdarpc.utils.Endpoint
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.getData
import space.kscience.dataforge.data.map
import space.kscience.dataforge.data.select
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.distributed.ClientWorkspacePlugin
import space.kscience.dataforge.workspace.distributed.ServiceWorkspace
import kotlin.reflect.KClass
import kotlin.reflect.typeOf
import kotlin.test.assertEquals
private class MyPlugin : WorkspacePlugin() {
override val tag: PluginTag
get() = Factory.tag
val task by task<Int> {
val myInt = workspace.data.select<Int>()
val res = myInt.getData("int".asName())!!
emit("result".asName(), res.map { it + 1 })
}
companion object Factory : PluginFactory<MyPlugin> {
override fun invoke(meta: Meta, context: Context): MyPlugin = MyPlugin()
override val tag: PluginTag
get() = PluginTag("Plg")
override val type: KClass<out MyPlugin>
get() = MyPlugin::class
}
}
private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(
MyPlugin.tag,
endpoint,
"task".asName() to typeOf<Int>()
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ServiceWorkspaceTest {
private lateinit var worker1: ServiceWorkspace
private lateinit var workspace: Workspace
@BeforeAll
fun before() {
worker1 = ServiceWorkspace(
context = Global.buildContext("worker1".asName()) {
plugin(MyPlugin)
},
data = runBlocking {
DataTree<Any> {
static("int", 0)
}
},
)
worker1.start()
workspace = Workspace {
context {
val endpoint = Endpoint(worker1.address, worker1.port)
plugin(RemoteMyPlugin(endpoint))
}
}
}
@AfterAll
fun after() {
worker1.shutdown()
}
@Test
fun localExecution() = runBlocking {
assertEquals(0, worker1.data.getData("int")!!.await())
val res = worker1
.produce(Name.of("Plg", "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(1, res)
}
@Test
fun remoteExecution() = runBlocking {
val remoteRes = workspace
.produce(Name.of("Plg", "task"), Meta.EMPTY)
.getData("result".asName())!!
.await()
assertEquals(1, remoteRes)
}
}