Compare commits
25 Commits
master
...
winter-yuk
Author | SHA1 | Date | |
---|---|---|---|
|
50a9e7d314 | ||
|
ba3354b4a2 | ||
|
7e13c3dec6 | ||
|
99efe3a456 | ||
|
196854429a | ||
|
9c55d26be5 | ||
|
114d310fdc | ||
|
77f8f045e6 | ||
|
230a3f1e22 | ||
|
07a0bd551b | ||
|
f62507e1b9 | ||
|
e9d4683f9b | ||
|
a4044c82a0 | ||
|
be8e971436 | ||
|
9cc30b1f4e | ||
|
7414e60192 | ||
|
8c0bc05a9a | ||
|
acfe9c2f74 | ||
|
32b986fc47 | ||
|
b86c6141cd | ||
|
e13e3ab6bf | ||
|
a7ee2f5922 | ||
|
6912f26291 | ||
|
d951668911 | ||
|
f5b2b4c9e4 |
4
.github/workflows/publish.yml
vendored
4
.github/workflows/publish.yml
vendored
@ -9,6 +9,8 @@ jobs:
|
|||||||
publish:
|
publish:
|
||||||
environment:
|
environment:
|
||||||
name: publish
|
name: publish
|
||||||
|
env:
|
||||||
|
publishing.github: false
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
os: [ macOS-latest, windows-latest ]
|
os: [ macOS-latest, windows-latest ]
|
||||||
@ -48,6 +50,6 @@ jobs:
|
|||||||
- name: Publish Mac Artifacts
|
- name: Publish Mac Artifacts
|
||||||
if: matrix.os == 'macOS-latest'
|
if: matrix.os == 'macOS-latest'
|
||||||
run: >
|
run: >
|
||||||
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true -Ppublishing.platform=macosX64
|
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true
|
||||||
-Ppublishing.space.user=${{ secrets.SPACE_APP_ID }}
|
-Ppublishing.space.user=${{ secrets.SPACE_APP_ID }}
|
||||||
-Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }}
|
-Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }}
|
||||||
|
@ -1,8 +1,19 @@
|
|||||||
package space.kscience.dataforge.data
|
package space.kscience.dataforge.data
|
||||||
|
|
||||||
|
import kotlinx.coroutines.coroutineScope
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import space.kscience.dataforge.names.*
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.NameToken
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.names.cutFirst
|
||||||
|
import space.kscience.dataforge.names.cutLast
|
||||||
|
import space.kscience.dataforge.names.firstOrNull
|
||||||
|
import space.kscience.dataforge.names.isEmpty
|
||||||
|
import space.kscience.dataforge.names.lastOrNull
|
||||||
|
import space.kscience.dataforge.names.length
|
||||||
|
import space.kscience.dataforge.names.plus
|
||||||
|
import kotlin.collections.set
|
||||||
import kotlin.reflect.KType
|
import kotlin.reflect.KType
|
||||||
import kotlin.reflect.typeOf
|
import kotlin.reflect.typeOf
|
||||||
|
|
||||||
@ -73,7 +84,7 @@ public inline fun <T : Any> DataTree(
|
|||||||
|
|
||||||
@Suppress("FunctionName")
|
@Suppress("FunctionName")
|
||||||
public inline fun <reified T : Any> DataTree(
|
public inline fun <reified T : Any> DataTree(
|
||||||
noinline block: DataSetBuilder<T>.() -> Unit,
|
noinline block: DataSetBuilder<T>.() -> Unit = {},
|
||||||
): DataTree<T> = DataTree(typeOf<T>(), block)
|
): DataTree<T> = DataTree(typeOf<T>(), block)
|
||||||
|
|
||||||
@OptIn(DFExperimental::class)
|
@OptIn(DFExperimental::class)
|
||||||
|
42
dataforge-distributed/build.gradle.kts
Normal file
42
dataforge-distributed/build.gradle.kts
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
plugins {
|
||||||
|
id("ru.mipt.npm.gradle.mpp")
|
||||||
|
}
|
||||||
|
|
||||||
|
kotlin {
|
||||||
|
sourceSets {
|
||||||
|
commonMain {
|
||||||
|
dependencies {
|
||||||
|
api(project(":dataforge-context"))
|
||||||
|
api(project(":dataforge-data"))
|
||||||
|
api(project(":dataforge-io"))
|
||||||
|
api(project(":dataforge-workspace"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jvmMain {
|
||||||
|
dependencies {
|
||||||
|
// TODO include fat jar of lambdarpc
|
||||||
|
val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs"
|
||||||
|
api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar"))
|
||||||
|
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
|
||||||
|
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
|
||||||
|
api("io.grpc:grpc-protobuf:1.44.0")
|
||||||
|
api("com.google.protobuf:protobuf-java-util:3.19.4")
|
||||||
|
api("com.google.protobuf:protobuf-kotlin:3.19.4")
|
||||||
|
api("io.grpc:grpc-kotlin-stub:1.2.1")
|
||||||
|
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
|
||||||
|
api("org.slf4j:slf4j-simple:1.7.36")
|
||||||
|
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kscience {
|
||||||
|
useSerialization {
|
||||||
|
json()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
readme {
|
||||||
|
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import io.lambdarpc.dsl.LibService
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.Global
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import space.kscience.dataforge.data.DataTree
|
||||||
|
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute
|
||||||
|
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId
|
||||||
|
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.put
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.names.plus
|
||||||
|
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||||
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Workspace that exposes its tasks for remote clients.
|
||||||
|
* @param port Port to start service on. Will be random if null.
|
||||||
|
*/
|
||||||
|
public class NodeWorkspace(
|
||||||
|
port: Int? = null,
|
||||||
|
context: Context = Global.buildContext("workspace".asName()),
|
||||||
|
private val dataSerializer: KSerializer<Any>? = null,
|
||||||
|
data: DataSet<*> = DataTree<Any>(),
|
||||||
|
targets: Map<String, Meta> = mapOf(),
|
||||||
|
) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
|
||||||
|
private val _port: Int? = port
|
||||||
|
|
||||||
|
private val service = LibService(serviceId, port) {
|
||||||
|
execute of { name, meta, executionContext ->
|
||||||
|
if (name == Name.EMPTY) {
|
||||||
|
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
|
||||||
|
DataSetPrototype.of(data, dataSerializer)
|
||||||
|
} else {
|
||||||
|
val proxyContext = context.buildContext(context.name + "proxy") {
|
||||||
|
properties {
|
||||||
|
put(executionContext)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val proxy = RemoteTaskWorkspace(context = proxyContext, data = data)
|
||||||
|
val task = tasks[name] ?: error("Task with name $name not found in the workspace")
|
||||||
|
require(task is SerializableResultTask)
|
||||||
|
// Local function to capture generic parameter
|
||||||
|
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
|
||||||
|
val result = task.execute(proxy, name, meta)
|
||||||
|
return DataSetPrototype.of(result, task.resultSerializer)
|
||||||
|
}
|
||||||
|
execute(task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Port this workspace is available on.
|
||||||
|
*/
|
||||||
|
public override val port: Int
|
||||||
|
get() = _port ?: service.port.p
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start [NodeWorkspace] as a service.
|
||||||
|
*/
|
||||||
|
public override fun start(): Unit = service.start()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Await termination of the service.
|
||||||
|
*/
|
||||||
|
public override fun awaitTermination(): Unit = service.awaitTermination()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown service.
|
||||||
|
*/
|
||||||
|
public override fun shutdown(): Unit = service.shutdown()
|
||||||
|
|
||||||
|
override fun close(): Unit = shutdown()
|
||||||
|
}
|
@ -0,0 +1,35 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import io.lambdarpc.context.ServiceDispatcher
|
||||||
|
import io.lambdarpc.utils.Endpoint
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||||
|
import space.kscience.dataforge.workspace.TaskResult
|
||||||
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
|
import space.kscience.dataforge.workspace.wrapResult
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Proxy task that communicates with the corresponding remote task.
|
||||||
|
*/
|
||||||
|
internal class RemoteTask<T : Any>(
|
||||||
|
endpoint: String,
|
||||||
|
override val resultType: KType,
|
||||||
|
override val resultSerializer: KSerializer<T>,
|
||||||
|
override val descriptor: MetaDescriptor? = null,
|
||||||
|
private val executionContext: Meta = Meta.EMPTY,
|
||||||
|
) : SerializableResultTask<T> {
|
||||||
|
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint))
|
||||||
|
|
||||||
|
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> {
|
||||||
|
val result = withContext(dispatcher) {
|
||||||
|
ServiceWorkspace.execute(taskName, taskMeta, executionContext)
|
||||||
|
}
|
||||||
|
val dataSet = result.toDataSet(resultType, resultSerializer)
|
||||||
|
return workspace.wrapResult(dataSet, taskName, taskMeta)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,76 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.Global
|
||||||
|
import space.kscience.dataforge.context.gather
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import space.kscience.dataforge.data.DataTree
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.MutableMeta
|
||||||
|
import space.kscience.dataforge.meta.extract
|
||||||
|
import space.kscience.dataforge.meta.get
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.values.string
|
||||||
|
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||||
|
import space.kscience.dataforge.workspace.Task
|
||||||
|
import space.kscience.dataforge.workspace.TaskResult
|
||||||
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
|
import space.kscience.dataforge.workspace.wrapResult
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Workspace that returns [RemoteTask] if such task should be
|
||||||
|
* executed remotely according to the execution context.
|
||||||
|
*/
|
||||||
|
internal open class RemoteTaskWorkspace(
|
||||||
|
final override val context: Context = Global.buildContext("workspace".asName()),
|
||||||
|
data: DataSet<*> = DataTree<Any>(),
|
||||||
|
override val targets: Map<String, Meta> = mapOf(),
|
||||||
|
) : Workspace {
|
||||||
|
|
||||||
|
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
|
||||||
|
|
||||||
|
private val _tasks: Map<Name, Task<*>> = context.gather(Task.TYPE)
|
||||||
|
|
||||||
|
override val tasks: Map<Name, Task<*>>
|
||||||
|
get() = object : AbstractMap<Name, Task<*>>() {
|
||||||
|
override val entries: Set<Map.Entry<Name, Task<*>>>
|
||||||
|
get() = _tasks.entries
|
||||||
|
|
||||||
|
override fun get(key: Name): Task<*>? {
|
||||||
|
val executionContext = context.properties.extract(EXECUTION_CONTEXT)
|
||||||
|
val endpoint = executionContext
|
||||||
|
?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key)
|
||||||
|
?: return _tasks[key]
|
||||||
|
val string = endpoint.value?.string ?: error("Endpoint is expected to be a string")
|
||||||
|
val local = _tasks[key] ?: error("No task with name $key")
|
||||||
|
require(local is SerializableResultTask) { "Task $key result is not serializable" }
|
||||||
|
return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
val EXECUTION_CONTEXT = "execution".asName()
|
||||||
|
val ENDPOINTS = "endpoints".asName()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) {
|
||||||
|
RemoteTaskWorkspace.EXECUTION_CONTEXT put {
|
||||||
|
RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class EndpointsBuilder {
|
||||||
|
private val endpoints = mutableMapOf<Name, String>()
|
||||||
|
|
||||||
|
public infix fun Name.on(endpoint: String) {
|
||||||
|
endpoints[this] = endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
internal fun build(): Meta = Meta {
|
||||||
|
endpoints.forEach { (name, endpoint) ->
|
||||||
|
name put endpoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import io.lambdarpc.dsl.def
|
||||||
|
import io.lambdarpc.dsl.j
|
||||||
|
import io.lambdarpc.utils.ServiceId
|
||||||
|
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
|
||||||
|
import space.kscience.dataforge.meta.MetaSerializer
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
|
import java.io.Closeable
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [Workspace] that can expose its tasks to other workspaces as a service.
|
||||||
|
*/
|
||||||
|
public interface ServiceWorkspace : Workspace, Closeable {
|
||||||
|
public val port: Int
|
||||||
|
public fun start()
|
||||||
|
public fun awaitTermination()
|
||||||
|
public fun shutdown()
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
public companion object {
|
||||||
|
internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246")
|
||||||
|
internal val execute by serviceId.def(j<Name>(), j(MetaSerializer), j(MetaSerializer), j<DataSetPrototype>())
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.Serializable
|
||||||
|
import kotlinx.serialization.json.Json
|
||||||
|
import space.kscience.dataforge.data.Data
|
||||||
|
import space.kscience.dataforge.data.StaticData
|
||||||
|
import space.kscience.dataforge.data.await
|
||||||
|
import space.kscience.dataforge.meta.MetaSerializer
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [Data] representation that is trivially serializable.
|
||||||
|
*/
|
||||||
|
@Serializable
|
||||||
|
internal data class DataPrototype(
|
||||||
|
val meta: String,
|
||||||
|
val data: String,
|
||||||
|
) {
|
||||||
|
fun <T : Any> toData(type: KType, serializer: KSerializer<T>): Data<T> =
|
||||||
|
StaticData(
|
||||||
|
type = type,
|
||||||
|
value = Json.decodeFromString(serializer, data),
|
||||||
|
meta = Json.decodeFromString(MetaSerializer, meta),
|
||||||
|
)
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
|
||||||
|
val meta = Json.encodeToString(MetaSerializer, data.meta)
|
||||||
|
val string = Json.encodeToString(serializer, data.await())
|
||||||
|
return DataPrototype(meta, string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,57 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.coroutineScope
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.Serializable
|
||||||
|
import space.kscience.dataforge.data.Data
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import space.kscience.dataforge.data.NamedData
|
||||||
|
import space.kscience.dataforge.data.asIterable
|
||||||
|
import space.kscience.dataforge.data.component1
|
||||||
|
import space.kscience.dataforge.data.component2
|
||||||
|
import space.kscience.dataforge.data.named
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [DataSet] representation that is trivially serializable.
|
||||||
|
*/
|
||||||
|
@Serializable
|
||||||
|
internal data class DataSetPrototype(val meta: Meta, val data: Map<String, DataPrototype>) {
|
||||||
|
fun <T : Any> toDataSet(type: KType, serializer: KSerializer<T>): DataSet<T> {
|
||||||
|
val data = data
|
||||||
|
.mapKeys { (name, _) -> Name.of(name) }
|
||||||
|
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) }
|
||||||
|
return SimpleDataSet(type, data, meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
suspend fun <T : Any> of(dataSet: DataSet<T>, serializer: KSerializer<T>): DataSetPrototype = coroutineScope {
|
||||||
|
val prototypes = dataSet.asIterable().map { (name, data) ->
|
||||||
|
name.toString() to async { DataPrototype.of(data, serializer) }
|
||||||
|
}
|
||||||
|
val map = prototypes.associate { (name, deferred) -> name to deferred.await() }
|
||||||
|
DataSetPrototype(dataSet.meta, map)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial [DataSet] implementation.
|
||||||
|
*/
|
||||||
|
private class SimpleDataSet<T : Any>(
|
||||||
|
override val dataType: KType,
|
||||||
|
private val data: Map<Name, Data<T>>,
|
||||||
|
override val meta: Meta,
|
||||||
|
) : DataSet<T> {
|
||||||
|
|
||||||
|
override fun iterator(): Iterator<NamedData<T>> =
|
||||||
|
data
|
||||||
|
.asSequence()
|
||||||
|
.map { (name, data) -> data.named(name) }
|
||||||
|
.iterator()
|
||||||
|
|
||||||
|
override fun get(name: Name): Data<T>? = data[name]
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import kotlinx.serialization.serializer
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.PluginFactory
|
||||||
|
import space.kscience.dataforge.context.PluginTag
|
||||||
|
import space.kscience.dataforge.context.info
|
||||||
|
import space.kscience.dataforge.context.logger
|
||||||
|
import space.kscience.dataforge.data.data
|
||||||
|
import space.kscience.dataforge.data.getByType
|
||||||
|
import space.kscience.dataforge.data.map
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.workspace.WorkspacePlugin
|
||||||
|
import space.kscience.dataforge.workspace.fromTask
|
||||||
|
import space.kscience.dataforge.workspace.task
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
internal class MyPlugin1 : WorkspacePlugin() {
|
||||||
|
override val tag: PluginTag
|
||||||
|
get() = Factory.tag
|
||||||
|
|
||||||
|
val task by task<Int>(serializer()) {
|
||||||
|
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
|
||||||
|
val myInt = workspace.data.getByType<Int>("int")!!
|
||||||
|
data("result", myInt.data.map { it + 1 })
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object Factory : PluginFactory<MyPlugin1> {
|
||||||
|
override val tag: PluginTag
|
||||||
|
get() = PluginTag("Plg1")
|
||||||
|
|
||||||
|
override val type: KClass<out MyPlugin1>
|
||||||
|
get() = MyPlugin1::class
|
||||||
|
|
||||||
|
override fun build(context: Context, meta: Meta): MyPlugin1 = MyPlugin1()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class MyPlugin2 : WorkspacePlugin() {
|
||||||
|
override val tag: PluginTag
|
||||||
|
get() = Factory.tag
|
||||||
|
|
||||||
|
val task by task<Int>(serializer()) {
|
||||||
|
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
|
||||||
|
val dataSet = fromTask<Int>(Name.of(MyPlugin1.tag.name, "task"))
|
||||||
|
val data = dataSet["result".asName()]!!
|
||||||
|
data("result", data.map { it + 1 })
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object Factory : PluginFactory<MyPlugin2> {
|
||||||
|
override val tag: PluginTag
|
||||||
|
get() = PluginTag("Plg2")
|
||||||
|
|
||||||
|
override val type: KClass<out MyPlugin2>
|
||||||
|
get() = MyPlugin2::class
|
||||||
|
|
||||||
|
override fun build(context: Context, meta: Meta): MyPlugin2 = MyPlugin2()
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,90 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import org.junit.jupiter.api.AfterAll
|
||||||
|
import org.junit.jupiter.api.BeforeAll
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
import org.junit.jupiter.api.TestInstance
|
||||||
|
import space.kscience.dataforge.context.Global
|
||||||
|
import space.kscience.dataforge.data.DataTree
|
||||||
|
import space.kscience.dataforge.data.await
|
||||||
|
import space.kscience.dataforge.data.get
|
||||||
|
import space.kscience.dataforge.data.static
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
|
internal class RemoteCallTest {
|
||||||
|
|
||||||
|
private lateinit var worker1: NodeWorkspace
|
||||||
|
private lateinit var worker2: NodeWorkspace
|
||||||
|
private lateinit var workspace: Workspace
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
fun before() = runBlocking {
|
||||||
|
worker1 = NodeWorkspace(
|
||||||
|
context = Global.buildContext("worker1".asName()) {
|
||||||
|
plugin(MyPlugin1)
|
||||||
|
},
|
||||||
|
data = DataTree<Any> {
|
||||||
|
static("int", 42)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
worker1.start()
|
||||||
|
|
||||||
|
worker2 = NodeWorkspace(
|
||||||
|
context = Global.buildContext("worker2".asName()) {
|
||||||
|
plugin(MyPlugin1)
|
||||||
|
plugin(MyPlugin2)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
worker2.start()
|
||||||
|
|
||||||
|
workspace = NodeWorkspace(
|
||||||
|
context = Global.buildContext {
|
||||||
|
plugin(MyPlugin1)
|
||||||
|
plugin(MyPlugin2)
|
||||||
|
properties {
|
||||||
|
endpoints {
|
||||||
|
Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}"
|
||||||
|
Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
fun after() {
|
||||||
|
worker1.shutdown()
|
||||||
|
worker2.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `local execution`() = runBlocking {
|
||||||
|
assertEquals(42, worker1.data["int".asName()]!!.await())
|
||||||
|
val res = worker1
|
||||||
|
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||||
|
.await()
|
||||||
|
assertEquals(43, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `remote execution`() = runBlocking {
|
||||||
|
val remoteRes = workspace
|
||||||
|
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||||
|
.await()
|
||||||
|
assertEquals(43, remoteRes)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `transitive execution`() = runBlocking {
|
||||||
|
val remoteRes = workspace
|
||||||
|
.produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||||
|
.await()
|
||||||
|
assertEquals(44, remoteRes)
|
||||||
|
}
|
||||||
|
}
|
@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable
|
|||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import space.kscience.dataforge.misc.Type
|
import space.kscience.dataforge.misc.Type
|
||||||
import space.kscience.dataforge.misc.unsafeCast
|
import space.kscience.dataforge.misc.unsafeCast
|
||||||
import space.kscience.dataforge.names.*
|
import space.kscience.dataforge.names.Name
|
||||||
import space.kscience.dataforge.values.*
|
import space.kscience.dataforge.names.NameToken
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.names.cutFirst
|
||||||
|
import space.kscience.dataforge.names.cutLast
|
||||||
|
import space.kscience.dataforge.names.firstOrNull
|
||||||
|
import space.kscience.dataforge.names.isEmpty
|
||||||
|
import space.kscience.dataforge.names.lastOrNull
|
||||||
|
import space.kscience.dataforge.names.length
|
||||||
|
import space.kscience.dataforge.names.parseAsName
|
||||||
|
import space.kscience.dataforge.names.plus
|
||||||
|
import space.kscience.dataforge.values.EnumValue
|
||||||
|
import space.kscience.dataforge.values.Value
|
||||||
|
import space.kscience.dataforge.values.ValueProvider
|
||||||
|
import space.kscience.dataforge.values.boolean
|
||||||
|
import space.kscience.dataforge.values.numberOrNull
|
||||||
|
import space.kscience.dataforge.values.string
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name)
|
|||||||
*/
|
*/
|
||||||
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
|
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
|
||||||
|
|
||||||
|
public fun Meta.extract(name: Name): Meta? {
|
||||||
|
val value = get(name) ?: return null
|
||||||
|
return Meta { name put value }
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all items matching given name. The index of the last element, if present is used as a [Regex],
|
* Get all items matching given name. The index of the last element, if present is used as a [Regex],
|
||||||
* against which indexes of elements are matched.
|
* against which indexes of elements are matched.
|
||||||
|
@ -2,11 +2,25 @@ package space.kscience.dataforge.meta
|
|||||||
|
|
||||||
import kotlinx.serialization.Serializable
|
import kotlinx.serialization.Serializable
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import space.kscience.dataforge.names.*
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.NameToken
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.names.cutFirst
|
||||||
|
import space.kscience.dataforge.names.cutLast
|
||||||
|
import space.kscience.dataforge.names.first
|
||||||
|
import space.kscience.dataforge.names.firstOrNull
|
||||||
|
import space.kscience.dataforge.names.isEmpty
|
||||||
|
import space.kscience.dataforge.names.lastOrNull
|
||||||
|
import space.kscience.dataforge.names.length
|
||||||
|
import space.kscience.dataforge.names.plus
|
||||||
|
import space.kscience.dataforge.names.withIndex
|
||||||
import space.kscience.dataforge.values.EnumValue
|
import space.kscience.dataforge.values.EnumValue
|
||||||
import space.kscience.dataforge.values.MutableValueProvider
|
import space.kscience.dataforge.values.MutableValueProvider
|
||||||
import space.kscience.dataforge.values.Value
|
import space.kscience.dataforge.values.Value
|
||||||
import space.kscience.dataforge.values.asValue
|
import space.kscience.dataforge.values.asValue
|
||||||
|
import kotlin.collections.component1
|
||||||
|
import kotlin.collections.component2
|
||||||
|
import kotlin.collections.set
|
||||||
import kotlin.js.JsName
|
import kotlin.js.JsName
|
||||||
import kotlin.jvm.Synchronized
|
import kotlin.jvm.Synchronized
|
||||||
|
|
||||||
@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider {
|
|||||||
*/
|
*/
|
||||||
public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta)
|
public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta)
|
||||||
|
|
||||||
|
public fun MutableMeta.put(other: Meta) {
|
||||||
|
other.items.forEach { (name, meta) ->
|
||||||
|
name.asName() put meta
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set or replace value at given [name]
|
* Set or replace value at given [name]
|
||||||
*/
|
*/
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.serializer
|
||||||
import space.kscience.dataforge.data.DataSetBuilder
|
import space.kscience.dataforge.data.DataSetBuilder
|
||||||
import space.kscience.dataforge.data.DataTree
|
import space.kscience.dataforge.data.DataTree
|
||||||
import space.kscience.dataforge.data.GoalExecutionRestriction
|
import space.kscience.dataforge.data.GoalExecutionRestriction
|
||||||
@ -9,6 +11,7 @@ import space.kscience.dataforge.meta.MetaRepr
|
|||||||
import space.kscience.dataforge.meta.Specification
|
import space.kscience.dataforge.meta.Specification
|
||||||
import space.kscience.dataforge.meta.descriptors.Described
|
import space.kscience.dataforge.meta.descriptors.Described
|
||||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||||
|
import space.kscience.dataforge.misc.DFInternal
|
||||||
import space.kscience.dataforge.misc.Type
|
import space.kscience.dataforge.misc.Type
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import space.kscience.dataforge.workspace.Task.Companion.TYPE
|
import space.kscience.dataforge.workspace.Task.Companion.TYPE
|
||||||
@ -37,6 +40,12 @@ public interface Task<out T : Any> : Described {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Type(TYPE)
|
||||||
|
public interface SerializableResultTask<T : Any> : Task<T> {
|
||||||
|
public val resultType: KType
|
||||||
|
public val resultSerializer: KSerializer<T>
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A [Task] with [Specification] for wrapping and unwrapping task configuration
|
* A [Task] with [Specification] for wrapping and unwrapping task configuration
|
||||||
*/
|
*/
|
||||||
@ -93,12 +102,30 @@ public fun <T : Any> Task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [Task] that has [resultSerializer] to be able to cache or send its results
|
||||||
|
*/
|
||||||
|
@DFInternal
|
||||||
|
public class SerializableResultTaskImpl<T : Any>(
|
||||||
|
override val resultType: KType,
|
||||||
|
override val resultSerializer: KSerializer<T>,
|
||||||
|
descriptor: MetaDescriptor? = null,
|
||||||
|
builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||||
|
) : SerializableResultTask<T>, Task<T> by Task(resultType, descriptor, builder)
|
||||||
|
|
||||||
@Suppress("FunctionName")
|
@Suppress("FunctionName")
|
||||||
public inline fun <reified T : Any> Task(
|
public inline fun <reified T : Any> Task(
|
||||||
descriptor: MetaDescriptor? = null,
|
descriptor: MetaDescriptor? = null,
|
||||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||||
): Task<T> = Task(typeOf<T>(), descriptor, builder)
|
): Task<T> = Task(typeOf<T>(), descriptor, builder)
|
||||||
|
|
||||||
|
@OptIn(DFInternal::class)
|
||||||
|
@Suppress("FunctionName")
|
||||||
|
public inline fun <reified T : Any> SerializableResultTask(
|
||||||
|
resultSerializer: KSerializer<T> = serializer(),
|
||||||
|
descriptor: MetaDescriptor? = null,
|
||||||
|
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||||
|
): Task<T> = SerializableResultTaskImpl(typeOf<T>(), resultSerializer, descriptor, builder)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
|
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.ContextBuilder
|
import space.kscience.dataforge.context.ContextBuilder
|
||||||
@ -41,9 +42,15 @@ public interface TaskContainer {
|
|||||||
@Deprecated("use buildTask instead", ReplaceWith("buildTask(name, descriptorBuilder, builder)"))
|
@Deprecated("use buildTask instead", ReplaceWith("buildTask(name, descriptorBuilder, builder)"))
|
||||||
public inline fun <reified T : Any> TaskContainer.registerTask(
|
public inline fun <reified T : Any> TaskContainer.registerTask(
|
||||||
name: String,
|
name: String,
|
||||||
descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
resultSerializer: KSerializer<T>? = null,
|
||||||
|
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||||
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
|
) {
|
||||||
|
val descriptor = MetaDescriptor(descriptorBuilder)
|
||||||
|
val task = if (resultSerializer == null) Task(descriptor, builder) else
|
||||||
|
SerializableResultTask(resultSerializer, descriptor, builder)
|
||||||
|
registerTask(Name.parse(name), task)
|
||||||
|
}
|
||||||
|
|
||||||
public inline fun <reified T : Any> TaskContainer.buildTask(
|
public inline fun <reified T : Any> TaskContainer.buildTask(
|
||||||
name: String,
|
name: String,
|
||||||
@ -59,10 +66,12 @@ public inline fun <reified T : Any> TaskContainer.buildTask(
|
|||||||
|
|
||||||
public inline fun <reified T : Any> TaskContainer.task(
|
public inline fun <reified T : Any> TaskContainer.task(
|
||||||
descriptor: MetaDescriptor,
|
descriptor: MetaDescriptor,
|
||||||
|
resultSerializer: KSerializer<T>? = null,
|
||||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||||
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
|
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
|
||||||
val taskName = Name.parse(property.name)
|
val taskName = Name.parse(property.name)
|
||||||
val task = Task(descriptor, builder)
|
val task = if (resultSerializer == null) Task(descriptor, builder) else
|
||||||
|
SerializableResultTask(resultSerializer, descriptor, builder)
|
||||||
registerTask(taskName, task)
|
registerTask(taskName, task)
|
||||||
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
|
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
|
||||||
}
|
}
|
||||||
@ -78,10 +87,11 @@ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task(
|
|||||||
}
|
}
|
||||||
|
|
||||||
public inline fun <reified T : Any> TaskContainer.task(
|
public inline fun <reified T : Any> TaskContainer.task(
|
||||||
|
resultSerializer: KSerializer<T>? = null,
|
||||||
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||||
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
|
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
|
||||||
task(MetaDescriptor(descriptorBuilder), builder)
|
task(MetaDescriptor(descriptorBuilder), resultSerializer, builder)
|
||||||
|
|
||||||
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
|
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
|
||||||
private var context: Context? = null
|
private var context: Context? = null
|
||||||
|
@ -46,5 +46,6 @@ include(
|
|||||||
":dataforge-context",
|
":dataforge-context",
|
||||||
":dataforge-data",
|
":dataforge-data",
|
||||||
":dataforge-workspace",
|
":dataforge-workspace",
|
||||||
":dataforge-scripting"
|
":dataforge-scripting",
|
||||||
|
":dataforge-distributed",
|
||||||
)
|
)
|
Loading…
Reference in New Issue
Block a user