Compare commits
25 Commits
master
...
winter-yuk
Author | SHA1 | Date | |
---|---|---|---|
|
50a9e7d314 | ||
|
ba3354b4a2 | ||
|
7e13c3dec6 | ||
|
99efe3a456 | ||
|
196854429a | ||
|
9c55d26be5 | ||
|
114d310fdc | ||
|
77f8f045e6 | ||
|
230a3f1e22 | ||
|
07a0bd551b | ||
|
f62507e1b9 | ||
|
e9d4683f9b | ||
|
a4044c82a0 | ||
|
be8e971436 | ||
|
9cc30b1f4e | ||
|
7414e60192 | ||
|
8c0bc05a9a | ||
|
acfe9c2f74 | ||
|
32b986fc47 | ||
|
b86c6141cd | ||
|
e13e3ab6bf | ||
|
a7ee2f5922 | ||
|
6912f26291 | ||
|
d951668911 | ||
|
f5b2b4c9e4 |
4
.github/workflows/publish.yml
vendored
4
.github/workflows/publish.yml
vendored
@ -9,6 +9,8 @@ jobs:
|
||||
publish:
|
||||
environment:
|
||||
name: publish
|
||||
env:
|
||||
publishing.github: false
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ macOS-latest, windows-latest ]
|
||||
@ -48,6 +50,6 @@ jobs:
|
||||
- name: Publish Mac Artifacts
|
||||
if: matrix.os == 'macOS-latest'
|
||||
run: >
|
||||
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true -Ppublishing.platform=macosX64
|
||||
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true
|
||||
-Ppublishing.space.user=${{ secrets.SPACE_APP_ID }}
|
||||
-Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }}
|
||||
|
@ -1,8 +1,19 @@
|
||||
package space.kscience.dataforge.data
|
||||
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.cutFirst
|
||||
import space.kscience.dataforge.names.cutLast
|
||||
import space.kscience.dataforge.names.firstOrNull
|
||||
import space.kscience.dataforge.names.isEmpty
|
||||
import space.kscience.dataforge.names.lastOrNull
|
||||
import space.kscience.dataforge.names.length
|
||||
import space.kscience.dataforge.names.plus
|
||||
import kotlin.collections.set
|
||||
import kotlin.reflect.KType
|
||||
import kotlin.reflect.typeOf
|
||||
|
||||
@ -73,7 +84,7 @@ public inline fun <T : Any> DataTree(
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> DataTree(
|
||||
noinline block: DataSetBuilder<T>.() -> Unit,
|
||||
noinline block: DataSetBuilder<T>.() -> Unit = {},
|
||||
): DataTree<T> = DataTree(typeOf<T>(), block)
|
||||
|
||||
@OptIn(DFExperimental::class)
|
||||
|
42
dataforge-distributed/build.gradle.kts
Normal file
42
dataforge-distributed/build.gradle.kts
Normal file
@ -0,0 +1,42 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.mpp")
|
||||
}
|
||||
|
||||
kotlin {
|
||||
sourceSets {
|
||||
commonMain {
|
||||
dependencies {
|
||||
api(project(":dataforge-context"))
|
||||
api(project(":dataforge-data"))
|
||||
api(project(":dataforge-io"))
|
||||
api(project(":dataforge-workspace"))
|
||||
}
|
||||
}
|
||||
jvmMain {
|
||||
dependencies {
|
||||
// TODO include fat jar of lambdarpc
|
||||
val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs"
|
||||
api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar"))
|
||||
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
|
||||
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
|
||||
api("io.grpc:grpc-protobuf:1.44.0")
|
||||
api("com.google.protobuf:protobuf-java-util:3.19.4")
|
||||
api("com.google.protobuf:protobuf-kotlin:3.19.4")
|
||||
api("io.grpc:grpc-kotlin-stub:1.2.1")
|
||||
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
|
||||
api("org.slf4j:slf4j-simple:1.7.36")
|
||||
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
kscience {
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
}
|
||||
|
||||
readme {
|
||||
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import io.lambdarpc.dsl.LibService
|
||||
import kotlinx.serialization.KSerializer
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.data.DataSet
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute
|
||||
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId
|
||||
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.put
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.plus
|
||||
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
|
||||
/**
|
||||
* Workspace that exposes its tasks for remote clients.
|
||||
* @param port Port to start service on. Will be random if null.
|
||||
*/
|
||||
public class NodeWorkspace(
|
||||
port: Int? = null,
|
||||
context: Context = Global.buildContext("workspace".asName()),
|
||||
private val dataSerializer: KSerializer<Any>? = null,
|
||||
data: DataSet<*> = DataTree<Any>(),
|
||||
targets: Map<String, Meta> = mapOf(),
|
||||
) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
|
||||
private val _port: Int? = port
|
||||
|
||||
private val service = LibService(serviceId, port) {
|
||||
execute of { name, meta, executionContext ->
|
||||
if (name == Name.EMPTY) {
|
||||
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
|
||||
DataSetPrototype.of(data, dataSerializer)
|
||||
} else {
|
||||
val proxyContext = context.buildContext(context.name + "proxy") {
|
||||
properties {
|
||||
put(executionContext)
|
||||
}
|
||||
}
|
||||
val proxy = RemoteTaskWorkspace(context = proxyContext, data = data)
|
||||
val task = tasks[name] ?: error("Task with name $name not found in the workspace")
|
||||
require(task is SerializableResultTask)
|
||||
// Local function to capture generic parameter
|
||||
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
|
||||
val result = task.execute(proxy, name, meta)
|
||||
return DataSetPrototype.of(result, task.resultSerializer)
|
||||
}
|
||||
execute(task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Port this workspace is available on.
|
||||
*/
|
||||
public override val port: Int
|
||||
get() = _port ?: service.port.p
|
||||
|
||||
/**
|
||||
* Start [NodeWorkspace] as a service.
|
||||
*/
|
||||
public override fun start(): Unit = service.start()
|
||||
|
||||
/**
|
||||
* Await termination of the service.
|
||||
*/
|
||||
public override fun awaitTermination(): Unit = service.awaitTermination()
|
||||
|
||||
/**
|
||||
* Shutdown service.
|
||||
*/
|
||||
public override fun shutdown(): Unit = service.shutdown()
|
||||
|
||||
override fun close(): Unit = shutdown()
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import io.lambdarpc.context.ServiceDispatcher
|
||||
import io.lambdarpc.utils.Endpoint
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.serialization.KSerializer
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||
import space.kscience.dataforge.workspace.TaskResult
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
import space.kscience.dataforge.workspace.wrapResult
|
||||
import kotlin.reflect.KType
|
||||
|
||||
/**
|
||||
* Proxy task that communicates with the corresponding remote task.
|
||||
*/
|
||||
internal class RemoteTask<T : Any>(
|
||||
endpoint: String,
|
||||
override val resultType: KType,
|
||||
override val resultSerializer: KSerializer<T>,
|
||||
override val descriptor: MetaDescriptor? = null,
|
||||
private val executionContext: Meta = Meta.EMPTY,
|
||||
) : SerializableResultTask<T> {
|
||||
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint))
|
||||
|
||||
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> {
|
||||
val result = withContext(dispatcher) {
|
||||
ServiceWorkspace.execute(taskName, taskMeta, executionContext)
|
||||
}
|
||||
val dataSet = result.toDataSet(resultType, resultSerializer)
|
||||
return workspace.wrapResult(dataSet, taskName, taskMeta)
|
||||
}
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.context.gather
|
||||
import space.kscience.dataforge.data.DataSet
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.MutableMeta
|
||||
import space.kscience.dataforge.meta.extract
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.values.string
|
||||
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||
import space.kscience.dataforge.workspace.Task
|
||||
import space.kscience.dataforge.workspace.TaskResult
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
import space.kscience.dataforge.workspace.wrapResult
|
||||
|
||||
/**
|
||||
* Workspace that returns [RemoteTask] if such task should be
|
||||
* executed remotely according to the execution context.
|
||||
*/
|
||||
internal open class RemoteTaskWorkspace(
|
||||
final override val context: Context = Global.buildContext("workspace".asName()),
|
||||
data: DataSet<*> = DataTree<Any>(),
|
||||
override val targets: Map<String, Meta> = mapOf(),
|
||||
) : Workspace {
|
||||
|
||||
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
|
||||
|
||||
private val _tasks: Map<Name, Task<*>> = context.gather(Task.TYPE)
|
||||
|
||||
override val tasks: Map<Name, Task<*>>
|
||||
get() = object : AbstractMap<Name, Task<*>>() {
|
||||
override val entries: Set<Map.Entry<Name, Task<*>>>
|
||||
get() = _tasks.entries
|
||||
|
||||
override fun get(key: Name): Task<*>? {
|
||||
val executionContext = context.properties.extract(EXECUTION_CONTEXT)
|
||||
val endpoint = executionContext
|
||||
?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key)
|
||||
?: return _tasks[key]
|
||||
val string = endpoint.value?.string ?: error("Endpoint is expected to be a string")
|
||||
val local = _tasks[key] ?: error("No task with name $key")
|
||||
require(local is SerializableResultTask) { "Task $key result is not serializable" }
|
||||
return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
val EXECUTION_CONTEXT = "execution".asName()
|
||||
val ENDPOINTS = "endpoints".asName()
|
||||
}
|
||||
}
|
||||
|
||||
public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) {
|
||||
RemoteTaskWorkspace.EXECUTION_CONTEXT put {
|
||||
RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build()
|
||||
}
|
||||
}
|
||||
|
||||
public class EndpointsBuilder {
|
||||
private val endpoints = mutableMapOf<Name, String>()
|
||||
|
||||
public infix fun Name.on(endpoint: String) {
|
||||
endpoints[this] = endpoint
|
||||
}
|
||||
|
||||
internal fun build(): Meta = Meta {
|
||||
endpoints.forEach { (name, endpoint) ->
|
||||
name put endpoint
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import io.lambdarpc.dsl.def
|
||||
import io.lambdarpc.dsl.j
|
||||
import io.lambdarpc.utils.ServiceId
|
||||
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
|
||||
import space.kscience.dataforge.meta.MetaSerializer
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
import java.io.Closeable
|
||||
|
||||
/**
|
||||
* [Workspace] that can expose its tasks to other workspaces as a service.
|
||||
*/
|
||||
public interface ServiceWorkspace : Workspace, Closeable {
|
||||
public val port: Int
|
||||
public fun start()
|
||||
public fun awaitTermination()
|
||||
public fun shutdown()
|
||||
|
||||
override fun close() {
|
||||
shutdown()
|
||||
}
|
||||
|
||||
public companion object {
|
||||
internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246")
|
||||
internal val execute by serviceId.def(j<Name>(), j(MetaSerializer), j(MetaSerializer), j<DataSetPrototype>())
|
||||
}
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package space.kscience.dataforge.distributed.serialization
|
||||
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.Json
|
||||
import space.kscience.dataforge.data.Data
|
||||
import space.kscience.dataforge.data.StaticData
|
||||
import space.kscience.dataforge.data.await
|
||||
import space.kscience.dataforge.meta.MetaSerializer
|
||||
import kotlin.reflect.KType
|
||||
|
||||
/**
|
||||
* [Data] representation that is trivially serializable.
|
||||
*/
|
||||
@Serializable
|
||||
internal data class DataPrototype(
|
||||
val meta: String,
|
||||
val data: String,
|
||||
) {
|
||||
fun <T : Any> toData(type: KType, serializer: KSerializer<T>): Data<T> =
|
||||
StaticData(
|
||||
type = type,
|
||||
value = Json.decodeFromString(serializer, data),
|
||||
meta = Json.decodeFromString(MetaSerializer, meta),
|
||||
)
|
||||
|
||||
companion object {
|
||||
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
|
||||
val meta = Json.encodeToString(MetaSerializer, data.meta)
|
||||
val string = Json.encodeToString(serializer, data.await())
|
||||
return DataPrototype(meta, string)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package space.kscience.dataforge.distributed.serialization
|
||||
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.dataforge.data.Data
|
||||
import space.kscience.dataforge.data.DataSet
|
||||
import space.kscience.dataforge.data.NamedData
|
||||
import space.kscience.dataforge.data.asIterable
|
||||
import space.kscience.dataforge.data.component1
|
||||
import space.kscience.dataforge.data.component2
|
||||
import space.kscience.dataforge.data.named
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import kotlin.reflect.KType
|
||||
|
||||
/**
|
||||
* [DataSet] representation that is trivially serializable.
|
||||
*/
|
||||
@Serializable
|
||||
internal data class DataSetPrototype(val meta: Meta, val data: Map<String, DataPrototype>) {
|
||||
fun <T : Any> toDataSet(type: KType, serializer: KSerializer<T>): DataSet<T> {
|
||||
val data = data
|
||||
.mapKeys { (name, _) -> Name.of(name) }
|
||||
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) }
|
||||
return SimpleDataSet(type, data, meta)
|
||||
}
|
||||
|
||||
companion object {
|
||||
suspend fun <T : Any> of(dataSet: DataSet<T>, serializer: KSerializer<T>): DataSetPrototype = coroutineScope {
|
||||
val prototypes = dataSet.asIterable().map { (name, data) ->
|
||||
name.toString() to async { DataPrototype.of(data, serializer) }
|
||||
}
|
||||
val map = prototypes.associate { (name, deferred) -> name to deferred.await() }
|
||||
DataSetPrototype(dataSet.meta, map)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Trivial [DataSet] implementation.
|
||||
*/
|
||||
private class SimpleDataSet<T : Any>(
|
||||
override val dataType: KType,
|
||||
private val data: Map<Name, Data<T>>,
|
||||
override val meta: Meta,
|
||||
) : DataSet<T> {
|
||||
|
||||
override fun iterator(): Iterator<NamedData<T>> =
|
||||
data
|
||||
.asSequence()
|
||||
.map { (name, data) -> data.named(name) }
|
||||
.iterator()
|
||||
|
||||
override fun get(name: Name): Data<T>? = data[name]
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import kotlinx.serialization.serializer
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.PluginFactory
|
||||
import space.kscience.dataforge.context.PluginTag
|
||||
import space.kscience.dataforge.context.info
|
||||
import space.kscience.dataforge.context.logger
|
||||
import space.kscience.dataforge.data.data
|
||||
import space.kscience.dataforge.data.getByType
|
||||
import space.kscience.dataforge.data.map
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.workspace.WorkspacePlugin
|
||||
import space.kscience.dataforge.workspace.fromTask
|
||||
import space.kscience.dataforge.workspace.task
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
internal class MyPlugin1 : WorkspacePlugin() {
|
||||
override val tag: PluginTag
|
||||
get() = Factory.tag
|
||||
|
||||
val task by task<Int>(serializer()) {
|
||||
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
|
||||
val myInt = workspace.data.getByType<Int>("int")!!
|
||||
data("result", myInt.data.map { it + 1 })
|
||||
}
|
||||
|
||||
companion object Factory : PluginFactory<MyPlugin1> {
|
||||
override val tag: PluginTag
|
||||
get() = PluginTag("Plg1")
|
||||
|
||||
override val type: KClass<out MyPlugin1>
|
||||
get() = MyPlugin1::class
|
||||
|
||||
override fun build(context: Context, meta: Meta): MyPlugin1 = MyPlugin1()
|
||||
}
|
||||
}
|
||||
|
||||
internal class MyPlugin2 : WorkspacePlugin() {
|
||||
override val tag: PluginTag
|
||||
get() = Factory.tag
|
||||
|
||||
val task by task<Int>(serializer()) {
|
||||
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
|
||||
val dataSet = fromTask<Int>(Name.of(MyPlugin1.tag.name, "task"))
|
||||
val data = dataSet["result".asName()]!!
|
||||
data("result", data.map { it + 1 })
|
||||
}
|
||||
|
||||
companion object Factory : PluginFactory<MyPlugin2> {
|
||||
override val tag: PluginTag
|
||||
get() = PluginTag("Plg2")
|
||||
|
||||
override val type: KClass<out MyPlugin2>
|
||||
get() = MyPlugin2::class
|
||||
|
||||
override fun build(context: Context, meta: Meta): MyPlugin2 = MyPlugin2()
|
||||
}
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.TestInstance
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.data.await
|
||||
import space.kscience.dataforge.data.get
|
||||
import space.kscience.dataforge.data.static
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
internal class RemoteCallTest {
|
||||
|
||||
private lateinit var worker1: NodeWorkspace
|
||||
private lateinit var worker2: NodeWorkspace
|
||||
private lateinit var workspace: Workspace
|
||||
|
||||
@BeforeAll
|
||||
fun before() = runBlocking {
|
||||
worker1 = NodeWorkspace(
|
||||
context = Global.buildContext("worker1".asName()) {
|
||||
plugin(MyPlugin1)
|
||||
},
|
||||
data = DataTree<Any> {
|
||||
static("int", 42)
|
||||
},
|
||||
)
|
||||
worker1.start()
|
||||
|
||||
worker2 = NodeWorkspace(
|
||||
context = Global.buildContext("worker2".asName()) {
|
||||
plugin(MyPlugin1)
|
||||
plugin(MyPlugin2)
|
||||
},
|
||||
)
|
||||
worker2.start()
|
||||
|
||||
workspace = NodeWorkspace(
|
||||
context = Global.buildContext {
|
||||
plugin(MyPlugin1)
|
||||
plugin(MyPlugin2)
|
||||
properties {
|
||||
endpoints {
|
||||
Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}"
|
||||
Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}"
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
fun after() {
|
||||
worker1.shutdown()
|
||||
worker2.shutdown()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `local execution`() = runBlocking {
|
||||
assertEquals(42, worker1.data["int".asName()]!!.await())
|
||||
val res = worker1
|
||||
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||
.await()
|
||||
assertEquals(43, res)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remote execution`() = runBlocking {
|
||||
val remoteRes = workspace
|
||||
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||
.await()
|
||||
assertEquals(43, remoteRes)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transitive execution`() = runBlocking {
|
||||
val remoteRes = workspace
|
||||
.produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||
.await()
|
||||
assertEquals(44, remoteRes)
|
||||
}
|
||||
}
|
@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.Json
|
||||
import space.kscience.dataforge.misc.Type
|
||||
import space.kscience.dataforge.misc.unsafeCast
|
||||
import space.kscience.dataforge.names.*
|
||||
import space.kscience.dataforge.values.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.cutFirst
|
||||
import space.kscience.dataforge.names.cutLast
|
||||
import space.kscience.dataforge.names.firstOrNull
|
||||
import space.kscience.dataforge.names.isEmpty
|
||||
import space.kscience.dataforge.names.lastOrNull
|
||||
import space.kscience.dataforge.names.length
|
||||
import space.kscience.dataforge.names.parseAsName
|
||||
import space.kscience.dataforge.names.plus
|
||||
import space.kscience.dataforge.values.EnumValue
|
||||
import space.kscience.dataforge.values.Value
|
||||
import space.kscience.dataforge.values.ValueProvider
|
||||
import space.kscience.dataforge.values.boolean
|
||||
import space.kscience.dataforge.values.numberOrNull
|
||||
import space.kscience.dataforge.values.string
|
||||
|
||||
|
||||
/**
|
||||
@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name)
|
||||
*/
|
||||
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
|
||||
|
||||
public fun Meta.extract(name: Name): Meta? {
|
||||
val value = get(name) ?: return null
|
||||
return Meta { name put value }
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all items matching given name. The index of the last element, if present is used as a [Regex],
|
||||
* against which indexes of elements are matched.
|
||||
|
@ -2,11 +2,25 @@ package space.kscience.dataforge.meta
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.cutFirst
|
||||
import space.kscience.dataforge.names.cutLast
|
||||
import space.kscience.dataforge.names.first
|
||||
import space.kscience.dataforge.names.firstOrNull
|
||||
import space.kscience.dataforge.names.isEmpty
|
||||
import space.kscience.dataforge.names.lastOrNull
|
||||
import space.kscience.dataforge.names.length
|
||||
import space.kscience.dataforge.names.plus
|
||||
import space.kscience.dataforge.names.withIndex
|
||||
import space.kscience.dataforge.values.EnumValue
|
||||
import space.kscience.dataforge.values.MutableValueProvider
|
||||
import space.kscience.dataforge.values.Value
|
||||
import space.kscience.dataforge.values.asValue
|
||||
import kotlin.collections.component1
|
||||
import kotlin.collections.component2
|
||||
import kotlin.collections.set
|
||||
import kotlin.js.JsName
|
||||
import kotlin.jvm.Synchronized
|
||||
|
||||
@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider {
|
||||
*/
|
||||
public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta)
|
||||
|
||||
public fun MutableMeta.put(other: Meta) {
|
||||
other.items.forEach { (name, meta) ->
|
||||
name.asName() put meta
|
||||
}
|
||||
}
|
||||
|
||||
public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta)
|
||||
|
||||
/**
|
||||
* Set or replace value at given [name]
|
||||
*/
|
||||
|
@ -1,6 +1,8 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.serializer
|
||||
import space.kscience.dataforge.data.DataSetBuilder
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.data.GoalExecutionRestriction
|
||||
@ -9,6 +11,7 @@ import space.kscience.dataforge.meta.MetaRepr
|
||||
import space.kscience.dataforge.meta.Specification
|
||||
import space.kscience.dataforge.meta.descriptors.Described
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
import space.kscience.dataforge.misc.DFInternal
|
||||
import space.kscience.dataforge.misc.Type
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.workspace.Task.Companion.TYPE
|
||||
@ -37,6 +40,12 @@ public interface Task<out T : Any> : Described {
|
||||
}
|
||||
}
|
||||
|
||||
@Type(TYPE)
|
||||
public interface SerializableResultTask<T : Any> : Task<T> {
|
||||
public val resultType: KType
|
||||
public val resultSerializer: KSerializer<T>
|
||||
}
|
||||
|
||||
/**
|
||||
* A [Task] with [Specification] for wrapping and unwrapping task configuration
|
||||
*/
|
||||
@ -93,12 +102,30 @@ public fun <T : Any> Task(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* [Task] that has [resultSerializer] to be able to cache or send its results
|
||||
*/
|
||||
@DFInternal
|
||||
public class SerializableResultTaskImpl<T : Any>(
|
||||
override val resultType: KType,
|
||||
override val resultSerializer: KSerializer<T>,
|
||||
descriptor: MetaDescriptor? = null,
|
||||
builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
) : SerializableResultTask<T>, Task<T> by Task(resultType, descriptor, builder)
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> Task(
|
||||
descriptor: MetaDescriptor? = null,
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): Task<T> = Task(typeOf<T>(), descriptor, builder)
|
||||
|
||||
@OptIn(DFInternal::class)
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> SerializableResultTask(
|
||||
resultSerializer: KSerializer<T> = serializer(),
|
||||
descriptor: MetaDescriptor? = null,
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): Task<T> = SerializableResultTaskImpl(typeOf<T>(), resultSerializer, descriptor, builder)
|
||||
|
||||
/**
|
||||
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
|
||||
|
@ -1,5 +1,6 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.ContextBuilder
|
||||
@ -41,9 +42,15 @@ public interface TaskContainer {
|
||||
@Deprecated("use buildTask instead", ReplaceWith("buildTask(name, descriptorBuilder, builder)"))
|
||||
public inline fun <reified T : Any> TaskContainer.registerTask(
|
||||
name: String,
|
||||
descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||
resultSerializer: KSerializer<T>? = null,
|
||||
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
|
||||
) {
|
||||
val descriptor = MetaDescriptor(descriptorBuilder)
|
||||
val task = if (resultSerializer == null) Task(descriptor, builder) else
|
||||
SerializableResultTask(resultSerializer, descriptor, builder)
|
||||
registerTask(Name.parse(name), task)
|
||||
}
|
||||
|
||||
public inline fun <reified T : Any> TaskContainer.buildTask(
|
||||
name: String,
|
||||
@ -59,10 +66,12 @@ public inline fun <reified T : Any> TaskContainer.buildTask(
|
||||
|
||||
public inline fun <reified T : Any> TaskContainer.task(
|
||||
descriptor: MetaDescriptor,
|
||||
resultSerializer: KSerializer<T>? = null,
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
|
||||
val taskName = Name.parse(property.name)
|
||||
val task = Task(descriptor, builder)
|
||||
val task = if (resultSerializer == null) Task(descriptor, builder) else
|
||||
SerializableResultTask(resultSerializer, descriptor, builder)
|
||||
registerTask(taskName, task)
|
||||
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
|
||||
}
|
||||
@ -78,10 +87,11 @@ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task(
|
||||
}
|
||||
|
||||
public inline fun <reified T : Any> TaskContainer.task(
|
||||
resultSerializer: KSerializer<T>? = null,
|
||||
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
|
||||
task(MetaDescriptor(descriptorBuilder), builder)
|
||||
task(MetaDescriptor(descriptorBuilder), resultSerializer, builder)
|
||||
|
||||
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
|
||||
private var context: Context? = null
|
||||
|
@ -46,5 +46,6 @@ include(
|
||||
":dataforge-context",
|
||||
":dataforge-data",
|
||||
":dataforge-workspace",
|
||||
":dataforge-scripting"
|
||||
":dataforge-scripting",
|
||||
":dataforge-distributed",
|
||||
)
|
Loading…
Reference in New Issue
Block a user