diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt index 59ae10a8..8082cfc7 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt @@ -29,4 +29,7 @@ public fun Data.named(name: Name): NamedData = if (this is Named NamedDataImpl(name, this.data) } else { NamedDataImpl(name, this) -} \ No newline at end of file +} + +public operator fun NamedData.component1(): Name = name +public operator fun NamedData.component2(): Data = data diff --git a/dataforge-workspace/build.gradle.kts b/dataforge-workspace/build.gradle.kts index c6e62b36..5d83ba4b 100644 --- a/dataforge-workspace/build.gradle.kts +++ b/dataforge-workspace/build.gradle.kts @@ -5,16 +5,37 @@ plugins { kotlin { sourceSets { - commonMain{ + commonMain { dependencies { api(project(":dataforge-context")) api(project(":dataforge-data")) api(project(":dataforge-io")) } } + jvmMain { + dependencies { + // TODO include fat jar of lambdarpc + api(files("lambdarpc-core-0.0.1.jar")) + runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") + api("io.grpc:grpc-protobuf:1.44.0") + api("com.google.protobuf:protobuf-java-util:3.19.4") + api("com.google.protobuf:protobuf-kotlin:3.19.4") + api("io.grpc:grpc-kotlin-stub:1.2.1") + api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2") + api("org.slf4j:slf4j-simple:1.7.36") + api("io.github.microutils:kotlin-logging-jvm:2.1.21") + } + } } } -readme{ +kscience { + useSerialization { + json() + } +} + +readme { maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index dcf63db2..042945c5 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -17,6 +17,11 @@ import kotlin.reflect.typeOf @Type(TYPE) public interface Task : Described { + /** + * Type of the task result data. + */ + public val resultType: KType + /** * Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model * and a handler for actual result @@ -55,6 +60,9 @@ public fun Task( builder: suspend TaskResultBuilder.() -> Unit, ): Task = object : Task { + override val resultType: KType + get() = resultType + override val descriptor: MetaDescriptor? = descriptor override suspend fun execute( diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt new file mode 100644 index 00000000..302037a7 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ClientWorkspacePlugin.kt @@ -0,0 +1,31 @@ +package space.kscience.dataforge.workspace.distributed + +import io.lambdarpc.utils.Endpoint +import space.kscience.dataforge.context.AbstractPlugin +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task +import kotlin.reflect.KType + +/** + * Plugin that purpose is to communicate with remote plugins. + * @param tag Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin. + * @param endpoint Endpoint of the remote plugin. + * @param tasks Enumeration of names of remote tasks and their result types. + */ +public abstract class ClientWorkspacePlugin( + override val tag: PluginTag, + endpoint: Endpoint, + vararg tasks: Pair, +) : AbstractPlugin() { + + private val tasks: Map> = tasks.associate { (name, type) -> + name to RemoteTask(endpoint, type) + } + + override fun content(target: String): Map = + when (target) { + Task.TYPE -> tasks + else -> emptyMap() + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt new file mode 100644 index 00000000..a1def94d --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/RemoteTask.kt @@ -0,0 +1,36 @@ +package space.kscience.dataforge.workspace.distributed + +import io.lambdarpc.dsl.ServiceDispatcher +import io.lambdarpc.utils.Endpoint +import kotlinx.coroutines.withContext +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.descriptors.MetaDescriptor +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult +import kotlin.reflect.KType + +/** + * Proxy task that communicates with the corresponding remote task. + */ +internal class RemoteTask( + endpoint: Endpoint, + override val resultType: KType, + override val descriptor: MetaDescriptor? = null, +) : Task { + private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to endpoint) + + @Suppress("UNCHECKED_CAST") + override suspend fun execute( + workspace: Workspace, + taskName: Name, + taskMeta: Meta, + ): TaskResult = withContext(dispatcher) { + val dataset = ServiceWorkspace.execute(taskName) as LazyDecodableDataSet + dataset.finishDecoding(resultType) + workspace.wrapResult(dataset as DataSet, taskName, taskMeta) + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt new file mode 100644 index 00000000..3aafd596 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/distributed/ServiceWorkspace.kt @@ -0,0 +1,202 @@ +package space.kscience.dataforge.workspace.distributed + +import io.ktor.utils.io.core.* +import io.lambdarpc.coding.Coder +import io.lambdarpc.coding.CodingContext +import io.lambdarpc.dsl.LibService +import io.lambdarpc.dsl.def +import io.lambdarpc.transport.grpc.Entity +import io.lambdarpc.transport.serialization.Entity +import io.lambdarpc.transport.serialization.RawData +import io.lambdarpc.utils.Address +import io.lambdarpc.utils.Port +import io.lambdarpc.utils.toSid +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.gather +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.Goal +import space.kscience.dataforge.data.NamedData +import space.kscience.dataforge.data.await +import space.kscience.dataforge.data.component1 +import space.kscience.dataforge.data.component2 +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.Task +import space.kscience.dataforge.workspace.TaskResult +import space.kscience.dataforge.workspace.Workspace +import space.kscience.dataforge.workspace.wrapResult +import java.nio.charset.Charset +import kotlin.reflect.KType + +/** + * Workspace that exposes its tasks for remote clients. + */ +public class ServiceWorkspace( + address: String = "localhost", + port: Int? = null, + override val context: Context = Global.buildContext("workspace".asName()), + data: DataSet<*> = runBlocking { DataTree {} }, + override val targets: Map = mapOf(), +) : Workspace, Closeable { + + override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) + + override val tasks: Map> + get() = context.gather(Task.TYPE) + + private val service = LibService(serviceId, address, port) { + execute of { name -> println("service = $name"); produce(name, Meta.EMPTY) } // TODO + } + + /** + * Address this workspace is available on. + */ + public val address: Address = Address(address) + + /** + * Port this workspace is available on. + */ + public val port: Port + get() = service.port + + /** + * Start [ServiceWorkspace] as a service. + */ + public fun start(): Unit = service.start() + + /** + * Await termination of the service. + */ + public fun awaitTermination(): Unit = service.awaitTermination() + + /** + * Shutdown service. + */ + public fun shutdown(): Unit = service.shutdown() + + override fun close(): Unit = service.shutdown() + + public companion object { + internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid() + internal val execute by serviceId.def(NameCoder, DataSetCoder) + } +} + +private object NameCoder : Coder { + override fun decode(entity: Entity, context: CodingContext): Name { + require(entity.hasData()) { "Entity should contain data" } + val string = entity.data.toString(Charset.defaultCharset()) + return Name.parse(string) + } + + override fun encode(value: Name, context: CodingContext): Entity = + Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset())) +} + +/** + * Data class that represents serializable [DataSet]. + */ +@Serializable +private data class DataSetPrototype( + val data: Map, +) + +/** + * [DataSetPrototype] builder. + */ +private fun DataSet.toPrototype(): DataSetPrototype = runBlocking { + val serializer = serializer(dataType) + val map = mutableListOf>() + flowData().map { (name, data) -> + name.toString() to Json.encodeToString(serializer, data.await()) + }.toList(map) + DataSetPrototype(map.associate { it }) +} + +/** + * Trivial [Data] implementation. + */ +private class SimpleData(override val type: KType, val data: Any) : Data { + override val meta: Meta + get() = Meta.EMPTY + override val dependencies: Collection> + get() = emptyList() + override val deferred: Deferred + get() = CompletableDeferred(data) + + override fun async(coroutineScope: CoroutineScope): Deferred = deferred + override fun reset() = Unit +} + +/** + * Trivial named data implementation. + */ +private class SimpleNamedData( + override val name: Name, + override val data: Data, +) : NamedData, Data by data + +/** + * Represents [DataSet] that should be initialized before usage. + */ +internal interface LazyDecodableDataSet : DataSet { + fun finishDecoding(type: KType) +} + +/** + * Trivial [LazyDecodableDataSet] implementation. + */ +private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet { + + lateinit var type: KType + lateinit var data: Map + + override fun finishDecoding(type: KType) { + this.type = type + this.data = prototype.data.map { (name, data) -> + Name.parse(name) to Json.decodeFromString(serializer(type), data) + }.associate { (name, data) -> name to data!! } + } + + override val dataType: KType + get() = type + + override fun flowData(): Flow> = + data.map { (name, data) -> + val wrapped = SimpleData(dataType, data) + SimpleNamedData(name, wrapped) + }.asFlow() + + override suspend fun getData(name: Name): Data? = data[name]?.let { data -> + SimpleData(dataType, data) + } +} + +private object DataSetCoder : Coder> { + override fun decode(entity: Entity, context: CodingContext): DataSet { + val string = entity.data.toString(Charset.defaultCharset()) + val prototype = Json.decodeFromString(serializer(), string) + return SimpleDataSet(prototype) + } + + override fun encode(value: DataSet<*>, context: CodingContext): Entity { + val prototype = value.toPrototype() + val string = Json.encodeToString(serializer(), prototype) + return Entity(RawData.copyFrom(string, Charset.defaultCharset())) + } +} diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt new file mode 100644 index 00000000..378333dd --- /dev/null +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/ServiceWorkspaceTest.kt @@ -0,0 +1,106 @@ +package space.kscience.dataforge.workspace + +import io.lambdarpc.utils.Endpoint +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.PluginFactory +import space.kscience.dataforge.context.PluginTag +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.await +import space.kscience.dataforge.data.getData +import space.kscience.dataforge.data.map +import space.kscience.dataforge.data.select +import space.kscience.dataforge.data.static +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.workspace.distributed.ClientWorkspacePlugin +import space.kscience.dataforge.workspace.distributed.ServiceWorkspace +import kotlin.reflect.KClass +import kotlin.reflect.typeOf +import kotlin.test.assertEquals + +private class MyPlugin : WorkspacePlugin() { + override val tag: PluginTag + get() = Factory.tag + + val task by task { + val myInt = workspace.data.select() + val res = myInt.getData("int".asName())!! + emit("result".asName(), res.map { it + 1 }) + } + + companion object Factory : PluginFactory { + override fun invoke(meta: Meta, context: Context): MyPlugin = MyPlugin() + + override val tag: PluginTag + get() = PluginTag("Plg") + + override val type: KClass + get() = MyPlugin::class + } +} + +private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin( + MyPlugin.tag, + endpoint, + "task".asName() to typeOf() +) + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ServiceWorkspaceTest { + + private lateinit var worker1: ServiceWorkspace + private lateinit var workspace: Workspace + + @BeforeAll + fun before() { + worker1 = ServiceWorkspace( + context = Global.buildContext("worker1".asName()) { + plugin(MyPlugin) + }, + data = runBlocking { + DataTree { + static("int", 0) + } + }, + ) + worker1.start() + + workspace = Workspace { + context { + val endpoint = Endpoint(worker1.address, worker1.port) + plugin(RemoteMyPlugin(endpoint)) + } + } + } + + @AfterAll + fun after() { + worker1.shutdown() + } + + @Test + fun localExecution() = runBlocking { + assertEquals(0, worker1.data.getData("int")!!.await()) + val res = worker1 + .produce(Name.of("Plg", "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(1, res) + } + + @Test + fun remoteExecution() = runBlocking { + val remoteRes = workspace + .produce(Name.of("Plg", "task"), Meta.EMPTY) + .getData("result".asName())!! + .await() + assertEquals(1, remoteRes) + } +}