Call remote tasks of service workspace #75
4
.github/workflows/publish.yml
vendored
@ -9,6 +9,8 @@ jobs:
|
||||
publish:
|
||||
environment:
|
||||
name: publish
|
||||
env:
|
||||
publishing.github: false
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ macOS-latest, windows-latest ]
|
||||
@ -48,6 +50,6 @@ jobs:
|
||||
- name: Publish Mac Artifacts
|
||||
if: matrix.os == 'macOS-latest'
|
||||
run: >
|
||||
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true -Ppublishing.platform=macosX64
|
||||
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true
|
||||
-Ppublishing.space.user=${{ secrets.SPACE_APP_ID }}
|
||||
-Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }}
|
||||
|
@ -32,4 +32,4 @@ public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is Named
|
||||
NamedDataImpl(name, this.data)
|
||||
} else {
|
||||
NamedDataImpl(name, this)
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,19 @@
|
||||
package space.kscience.dataforge.data
|
||||
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.cutFirst
|
||||
import space.kscience.dataforge.names.cutLast
|
||||
import space.kscience.dataforge.names.firstOrNull
|
||||
import space.kscience.dataforge.names.isEmpty
|
||||
import space.kscience.dataforge.names.lastOrNull
|
||||
import space.kscience.dataforge.names.length
|
||||
import space.kscience.dataforge.names.plus
|
||||
import kotlin.collections.set
|
||||
import kotlin.reflect.KType
|
||||
import kotlin.reflect.typeOf
|
||||
|
||||
@ -73,10 +84,10 @@ public inline fun <T : Any> DataTree(
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> DataTree(
|
||||
noinline block: DataSetBuilder<T>.() -> Unit,
|
||||
noinline block: DataSetBuilder<T>.() -> Unit = {},
|
||||
): DataTree<T> = DataTree(typeOf<T>(), block)
|
||||
|
||||
@OptIn(DFExperimental::class)
|
||||
public fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) {
|
||||
populateFrom(this@seal)
|
||||
}
|
||||
}
|
||||
|
42
dataforge-distributed/build.gradle.kts
Normal file
@ -0,0 +1,42 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.mpp")
|
||||
}
|
||||
|
||||
kotlin {
|
||||
sourceSets {
|
||||
commonMain {
|
||||
dependencies {
|
||||
api(project(":dataforge-context"))
|
||||
api(project(":dataforge-data"))
|
||||
api(project(":dataforge-io"))
|
||||
api(project(":dataforge-workspace"))
|
||||
}
|
||||
}
|
||||
jvmMain {
|
||||
dependencies {
|
||||
// TODO include fat jar of lambdarpc
|
||||
val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs"
|
||||
api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar"))
|
||||
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
|
||||
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
|
||||
api("io.grpc:grpc-protobuf:1.44.0")
|
||||
api("com.google.protobuf:protobuf-java-util:3.19.4")
|
||||
api("com.google.protobuf:protobuf-kotlin:3.19.4")
|
||||
api("io.grpc:grpc-kotlin-stub:1.2.1")
|
||||
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
|
||||
api("org.slf4j:slf4j-simple:1.7.36")
|
||||
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
kscience {
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
}
|
||||
|
||||
readme {
|
||||
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import io.lambdarpc.dsl.LibService
|
||||
import kotlinx.serialization.KSerializer
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.data.DataSet
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute
|
||||
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId
|
||||
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.put
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.plus
|
||||
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
|
||||
/**
|
||||
* Workspace that exposes its tasks for remote clients.
|
||||
* @param port Port to start service on. Will be random if null.
|
||||
*/
|
||||
public class NodeWorkspace(
|
||||
port: Int? = null,
|
||||
context: Context = Global.buildContext("workspace".asName()),
|
||||
private val dataSerializer: KSerializer<Any>? = null,
|
||||
data: DataSet<*> = DataTree<Any>(),
|
||||
targets: Map<String, Meta> = mapOf(),
|
||||
) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
|
||||
|
||||
private val _port: Int? = port
|
||||
|
||||
private val service = LibService(serviceId, port) {
|
||||
execute of { name, meta, executionContext ->
|
||||
if (name == Name.EMPTY) {
|
||||
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
|
||||
DataSetPrototype.of(data, dataSerializer)
|
||||
} else {
|
||||
val proxyContext = context.buildContext(context.name + "proxy") {
|
||||
properties {
|
||||
put(executionContext)
|
||||
}
|
||||
}
|
||||
val proxy = RemoteTaskWorkspace(context = proxyContext, data = data)
|
||||
val task = tasks[name] ?: error("Task with name $name not found in the workspace")
|
||||
require(task is SerializableResultTask)
|
||||
// Local function to capture generic parameter
|
||||
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
|
||||
val result = task.execute(proxy, name, meta)
|
||||
return DataSetPrototype.of(result, task.resultSerializer)
|
||||
}
|
||||
execute(task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Port this workspace is available on.
|
||||
*/
|
||||
public override val port: Int
|
||||
get() = _port ?: service.port.p
|
||||
|
||||
/**
|
||||
* Start [NodeWorkspace] as a service.
|
||||
*/
|
||||
public override fun start(): Unit = service.start()
|
||||
|
||||
/**
|
||||
* Await termination of the service.
|
||||
*/
|
||||
public override fun awaitTermination(): Unit = service.awaitTermination()
|
||||
|
||||
/**
|
||||
* Shutdown service.
|
||||
*/
|
||||
public override fun shutdown(): Unit = service.shutdown()
|
||||
|
||||
override fun close(): Unit = shutdown()
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import io.lambdarpc.context.ServiceDispatcher
|
||||
import io.lambdarpc.utils.Endpoint
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.serialization.KSerializer
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||
import space.kscience.dataforge.workspace.TaskResult
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
import space.kscience.dataforge.workspace.wrapResult
|
||||
import kotlin.reflect.KType
|
||||
|
||||
/**
|
||||
* Proxy task that communicates with the corresponding remote task.
|
||||
*/
|
||||
internal class RemoteTask<T : Any>(
|
||||
endpoint: String,
|
||||
override val resultType: KType,
|
||||
override val resultSerializer: KSerializer<T>,
|
||||
override val descriptor: MetaDescriptor? = null,
|
||||
private val executionContext: Meta = Meta.EMPTY,
|
||||
) : SerializableResultTask<T> {
|
||||
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint))
|
||||
|
||||
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> {
|
||||
val result = withContext(dispatcher) {
|
||||
ServiceWorkspace.execute(taskName, taskMeta, executionContext)
|
||||
}
|
||||
val dataSet = result.toDataSet(resultType, resultSerializer)
|
||||
return workspace.wrapResult(dataSet, taskName, taskMeta)
|
||||
}
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.context.gather
|
||||
import space.kscience.dataforge.data.DataSet
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.MutableMeta
|
||||
import space.kscience.dataforge.meta.extract
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.values.string
|
||||
import space.kscience.dataforge.workspace.SerializableResultTask
|
||||
import space.kscience.dataforge.workspace.Task
|
||||
import space.kscience.dataforge.workspace.TaskResult
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
import space.kscience.dataforge.workspace.wrapResult
|
||||
|
||||
/**
|
||||
* Workspace that returns [RemoteTask] if such task should be
|
||||
* executed remotely according to the execution context.
|
||||
*/
|
||||
internal open class RemoteTaskWorkspace(
|
||||
final override val context: Context = Global.buildContext("workspace".asName()),
|
||||
data: DataSet<*> = DataTree<Any>(),
|
||||
override val targets: Map<String, Meta> = mapOf(),
|
||||
) : Workspace {
|
||||
|
||||
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
|
||||
|
||||
private val _tasks: Map<Name, Task<*>> = context.gather(Task.TYPE)
|
||||
|
||||
override val tasks: Map<Name, Task<*>>
|
||||
get() = object : AbstractMap<Name, Task<*>>() {
|
||||
override val entries: Set<Map.Entry<Name, Task<*>>>
|
||||
get() = _tasks.entries
|
||||
|
||||
override fun get(key: Name): Task<*>? {
|
||||
val executionContext = context.properties.extract(EXECUTION_CONTEXT)
|
||||
val endpoint = executionContext
|
||||
?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key)
|
||||
?: return _tasks[key]
|
||||
val string = endpoint.value?.string ?: error("Endpoint is expected to be a string")
|
||||
val local = _tasks[key] ?: error("No task with name $key")
|
||||
require(local is SerializableResultTask) { "Task $key result is not serializable" }
|
||||
return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
val EXECUTION_CONTEXT = "execution".asName()
|
||||
val ENDPOINTS = "endpoints".asName()
|
||||
}
|
||||
}
|
||||
|
||||
public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) {
|
||||
RemoteTaskWorkspace.EXECUTION_CONTEXT put {
|
||||
RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build()
|
||||
}
|
||||
}
|
||||
|
||||
public class EndpointsBuilder {
|
||||
private val endpoints = mutableMapOf<Name, String>()
|
||||
|
||||
public infix fun Name.on(endpoint: String) {
|
||||
endpoints[this] = endpoint
|
||||
}
|
||||
|
||||
internal fun build(): Meta = Meta {
|
||||
endpoints.forEach { (name, endpoint) ->
|
||||
name put endpoint
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
package space.kscience.dataforge.distributed
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import io.lambdarpc.dsl.def
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import io.lambdarpc.dsl.j
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import io.lambdarpc.utils.ServiceId
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import space.kscience.dataforge.meta.MetaSerializer
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import space.kscience.dataforge.names.Name
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
import java.io.Closeable
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
/**
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
* [Workspace] that can expose its tasks to other workspaces as a service.
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
*/
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
public interface ServiceWorkspace : Workspace, Closeable {
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
I do not see, why we need it in the API on the server side the connection should be closeable, not the workspace. Maybe you should rename it to connection and remove inheritance from a workspace? I do not see, why we need it in the API on the server side the connection should be closeable, not the workspace. Maybe you should rename it to connection and remove inheritance from a workspace?
|
||||
public val port: Int
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
public fun start()
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
public fun awaitTermination()
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
public fun shutdown()
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
override fun close() {
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
shutdown()
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
}
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
public companion object {
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246")
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
internal val execute by serviceId.def(j<Name>(), j(MetaSerializer), j(MetaSerializer), j<DataSetPrototype>())
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
}
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
||||
}
|
||||
DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now. DataTree builder will be non-suspending in the next version, but it should be possible to create it without suspending now.
Need to think about better naming Need to think about better naming
Why not suspended? Why not suspended?
I thought about I thought about `WorkspaceNode` or `WorkerWorkspace`. There is also `DistributedWorkspace` but it is not truly distributed itself.
Also `ServiceWorkpace` is good enought to my opinion. "Service" here means that this workspace should be run on some endpoint to be available.
Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so. Because it is blocking in the google gRPC implementation. It can be made suspend and block separate thread but I do not see any reasons to do so.
|
@ -0,0 +1,34 @@
|
||||
There is existing class for that There is existing class for that
There is existing class for that There is existing class for that
|
||||
package space.kscience.dataforge.distributed.serialization
|
||||
There is existing class for that There is existing class for that
|
||||
|
||||
There is existing class for that There is existing class for that
|
||||
import kotlinx.serialization.KSerializer
|
||||
There is existing class for that There is existing class for that
|
||||
import kotlinx.serialization.Serializable
|
||||
There is existing class for that There is existing class for that
|
||||
import kotlinx.serialization.json.Json
|
||||
There is existing class for that There is existing class for that
|
||||
import space.kscience.dataforge.data.Data
|
||||
There is existing class for that There is existing class for that
|
||||
import space.kscience.dataforge.data.StaticData
|
||||
There is existing class for that There is existing class for that
|
||||
import space.kscience.dataforge.data.await
|
||||
There is existing class for that There is existing class for that
|
||||
import space.kscience.dataforge.meta.MetaSerializer
|
||||
There is existing class for that There is existing class for that
|
||||
import kotlin.reflect.KType
|
||||
There is existing class for that There is existing class for that
|
||||
|
||||
There is existing class for that There is existing class for that
|
||||
/**
|
||||
There is existing class for that There is existing class for that
|
||||
* [Data] representation that is trivially serializable.
|
||||
There is existing class for that There is existing class for that
|
||||
*/
|
||||
There is existing class for that There is existing class for that
|
||||
@Serializable
|
||||
There is existing class for that There is existing class for that
|
||||
internal data class DataPrototype(
|
||||
This is unusable. I believe that it should be something like this:
This is unusable. I believe that it should be something like this:
```kotlin
class SerializeableData<T>(val data: Data<T>, val serializer: KSerializer<T>): Data<T> by data
```
There is existing class for that There is existing class for that
Is it used somewhere? Is it used somewhere?
|
||||
val meta: String,
|
||||
There is existing class for that There is existing class for that
|
||||
val data: String,
|
||||
There is existing class for that There is existing class for that
|
||||
) {
|
||||
There is existing class for that There is existing class for that
|
||||
fun <T : Any> toData(type: KType, serializer: KSerializer<T>): Data<T> =
|
||||
There is existing class for that There is existing class for that
|
||||
StaticData(
|
||||
There is existing class for that There is existing class for that
|
||||
type = type,
|
||||
There is existing class for that There is existing class for that
|
||||
value = Json.decodeFromString(serializer, data),
|
||||
There is existing class for that There is existing class for that
|
||||
meta = Json.decodeFromString(MetaSerializer, meta),
|
||||
There is existing class for that There is existing class for that
|
||||
)
|
||||
There is existing class for that There is existing class for that
|
||||
|
||||
There is existing class for that There is existing class for that
|
||||
companion object {
|
||||
There is existing class for that There is existing class for that
|
||||
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
|
||||
There is existing class for that There is existing class for that
|
||||
val meta = Json.encodeToString(MetaSerializer, data.meta)
|
||||
There is existing class for that There is existing class for that
|
||||
val string = Json.encodeToString(serializer, data.await())
|
||||
There is existing class for that There is existing class for that
|
||||
return DataPrototype(meta, string)
|
||||
There is existing class for that There is existing class for that
|
||||
}
|
||||
There is existing class for that There is existing class for that
|
||||
}
|
||||
There is existing class for that There is existing class for that
|
||||
}
|
||||
There is existing class for that There is existing class for that
|
@ -0,0 +1,57 @@
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
package space.kscience.dataforge.distributed.serialization
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import kotlinx.coroutines.async
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import kotlinx.serialization.KSerializer
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import kotlinx.serialization.Serializable
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.Data
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.DataSet
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.NamedData
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.asIterable
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.component1
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.component2
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.data.named
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import space.kscience.dataforge.names.Name
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
import kotlin.reflect.KType
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
/**
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
* [DataSet] representation that is trivially serializable.
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
*/
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
@Serializable
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
internal data class DataSetPrototype(val meta: Meta, val data: Map<String, DataPrototype>) {
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
Do we need that? Do we need that?
|
||||
fun <T : Any> toDataSet(type: KType, serializer: KSerializer<T>): DataSet<T> {
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
val data = data
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
.mapKeys { (name, _) -> Name.of(name) }
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) }
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
return SimpleDataSet(type, data, meta)
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
}
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
companion object {
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
suspend fun <T : Any> of(dataSet: DataSet<T>, serializer: KSerializer<T>): DataSetPrototype = coroutineScope {
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
val prototypes = dataSet.asIterable().map { (name, data) ->
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
name.toString() to async { DataPrototype.of(data, serializer) }
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
}
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
val map = prototypes.associate { (name, deferred) -> name to deferred.await() }
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
DataSetPrototype(dataSet.meta, map)
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
}
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
}
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
}
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
/**
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
* Trivial [DataSet] implementation.
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
*/
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
private class SimpleDataSet<T : Any>(
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
override val dataType: KType,
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
private val data: Map<Name, Data<T>>,
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
override val meta: Meta,
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
) : DataSet<T> {
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
override fun iterator(): Iterator<NamedData<T>> =
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
data
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
.asSequence()
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
.map { (name, data) -> data.named(name) }
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
.iterator()
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
override fun get(name: Name): Data<T>? = data[name]
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
||||
}
|
||||
Again, you do not need a separate structure. All you need is a generic DataSet with serializer. Again, you do not need a separate structure. All you need is a generic DataSet with serializer.
When When `ServiceWorkspace.execute` returns result to the client, it do not know yet about serializer for the `T`. Then `RemoteTask` uses serializer to deserialize DataSet from prototype.
|
@ -0,0 +1,61 @@
|
||||
package space.kscience.dataforge.distributed
|
||||
|
||||
import kotlinx.serialization.serializer
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.PluginFactory
|
||||
import space.kscience.dataforge.context.PluginTag
|
||||
import space.kscience.dataforge.context.info
|
||||
import space.kscience.dataforge.context.logger
|
||||
import space.kscience.dataforge.data.data
|
||||
import space.kscience.dataforge.data.getByType
|
||||
import space.kscience.dataforge.data.map
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.workspace.WorkspacePlugin
|
||||
import space.kscience.dataforge.workspace.fromTask
|
||||
import space.kscience.dataforge.workspace.task
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
internal class MyPlugin1 : WorkspacePlugin() {
|
||||
override val tag: PluginTag
|
||||
get() = Factory.tag
|
||||
|
||||
val task by task<Int>(serializer()) {
|
||||
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
|
||||
val myInt = workspace.data.getByType<Int>("int")!!
|
||||
data("result", myInt.data.map { it + 1 })
|
||||
}
|
||||
|
||||
companion object Factory : PluginFactory<MyPlugin1> {
|
||||
override val tag: PluginTag
|
||||
get() = PluginTag("Plg1")
|
||||
|
||||
override val type: KClass<out MyPlugin1>
|
||||
get() = MyPlugin1::class
|
||||
|
||||
override fun build(context: Context, meta: Meta): MyPlugin1 = MyPlugin1()
|
||||
}
|
||||
}
|
||||
|
||||
internal class MyPlugin2 : WorkspacePlugin() {
|
||||
override val tag: PluginTag
|
||||
get() = Factory.tag
|
||||
|
||||
val task by task<Int>(serializer()) {
|
||||
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
|
||||
val dataSet = fromTask<Int>(Name.of(MyPlugin1.tag.name, "task"))
|
||||
val data = dataSet["result".asName()]!!
|
||||
data("result", data.map { it + 1 })
|
||||
}
|
||||
|
||||
companion object Factory : PluginFactory<MyPlugin2> {
|
||||
override val tag: PluginTag
|
||||
get() = PluginTag("Plg2")
|
||||
|
||||
override val type: KClass<out MyPlugin2>
|
||||
get() = MyPlugin2::class
|
||||
|
||||
override fun build(context: Context, meta: Meta): MyPlugin2 = MyPlugin2()
|
||||
}
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
switch to runTest switch to runTest
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
package space.kscience.dataforge.distributed
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import kotlinx.coroutines.runBlocking
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import org.junit.jupiter.api.Test
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import org.junit.jupiter.api.TestInstance
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.context.Global
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.data.await
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.data.get
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.data.static
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.names.Name
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.names.asName
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import space.kscience.dataforge.workspace.Workspace
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
import kotlin.test.assertEquals
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
internal class RemoteCallTest {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
private lateinit var worker1: NodeWorkspace
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
private lateinit var worker2: NodeWorkspace
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
private lateinit var workspace: Workspace
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
@BeforeAll
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
fun before() = runBlocking {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
worker1 = NodeWorkspace(
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
context = Global.buildContext("worker1".asName()) {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
This is outdated, use This is outdated, use `Context()` factory function instead.
|
||||
plugin(MyPlugin1)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
},
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
data = DataTree<Any> {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
static("int", 42)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
},
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
worker1.start()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
worker2 = NodeWorkspace(
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
context = Global.buildContext("worker2".asName()) {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
plugin(MyPlugin1)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
plugin(MyPlugin2)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
},
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
worker2.start()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
workspace = NodeWorkspace(
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
context = Global.buildContext {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
plugin(MyPlugin1)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
plugin(MyPlugin2)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
properties {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
endpoints {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}"
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}"
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
@AfterAll
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
fun after() {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
worker1.shutdown()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
worker2.shutdown()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
@Test
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
fun `local execution`() = runBlocking {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
assertEquals(42, worker1.data["int".asName()]!!.await())
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
val res = worker1
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
.await()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
assertEquals(43, res)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
@Test
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
fun `remote execution`() = runBlocking {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
val remoteRes = workspace
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
.await()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
assertEquals(43, remoteRes)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
@Test
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
fun `transitive execution`() = runBlocking {
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
val remoteRes = workspace
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
.produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY)["result"]!!
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
.await()
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
assertEquals(44, remoteRes)
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
||||
}
|
||||
switch to runTest switch to runTest
This function is not available in dataforge, shall I add some dependency to the project? This function is not available in dataforge, shall I add some dependency to the project?
|
@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.Json
|
||||
import space.kscience.dataforge.misc.Type
|
||||
import space.kscience.dataforge.misc.unsafeCast
|
||||
import space.kscience.dataforge.names.*
|
||||
import space.kscience.dataforge.values.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.cutFirst
|
||||
import space.kscience.dataforge.names.cutLast
|
||||
import space.kscience.dataforge.names.firstOrNull
|
||||
import space.kscience.dataforge.names.isEmpty
|
||||
import space.kscience.dataforge.names.lastOrNull
|
||||
import space.kscience.dataforge.names.length
|
||||
import space.kscience.dataforge.names.parseAsName
|
||||
import space.kscience.dataforge.names.plus
|
||||
import space.kscience.dataforge.values.EnumValue
|
||||
import space.kscience.dataforge.values.Value
|
||||
import space.kscience.dataforge.values.ValueProvider
|
||||
import space.kscience.dataforge.values.boolean
|
||||
import space.kscience.dataforge.values.numberOrNull
|
||||
import space.kscience.dataforge.values.string
|
||||
|
||||
|
||||
/**
|
||||
@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name)
|
||||
*/
|
||||
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
|
||||
|
||||
public fun Meta.extract(name: Name): Meta? {
|
||||
val value = get(name) ?: return null
|
||||
return Meta { name put value }
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all items matching given name. The index of the last element, if present is used as a [Regex],
|
||||
* against which indexes of elements are matched.
|
||||
@ -135,7 +155,7 @@ public fun Meta.getIndexed(name: Name): Map<String?, Meta> {
|
||||
}
|
||||
}
|
||||
|
||||
public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName())
|
||||
public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName())
|
||||
|
||||
/**
|
||||
* A meta node that ensures that all of its descendants has at least the same type.
|
||||
|
@ -2,11 +2,25 @@ package space.kscience.dataforge.meta
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.asName
|
||||
import space.kscience.dataforge.names.cutFirst
|
||||
import space.kscience.dataforge.names.cutLast
|
||||
import space.kscience.dataforge.names.first
|
||||
import space.kscience.dataforge.names.firstOrNull
|
||||
import space.kscience.dataforge.names.isEmpty
|
||||
import space.kscience.dataforge.names.lastOrNull
|
||||
import space.kscience.dataforge.names.length
|
||||
import space.kscience.dataforge.names.plus
|
||||
import space.kscience.dataforge.names.withIndex
|
||||
import space.kscience.dataforge.values.EnumValue
|
||||
import space.kscience.dataforge.values.MutableValueProvider
|
||||
import space.kscience.dataforge.values.Value
|
||||
import space.kscience.dataforge.values.asValue
|
||||
import kotlin.collections.component1
|
||||
import kotlin.collections.component2
|
||||
import kotlin.collections.set
|
||||
import kotlin.js.JsName
|
||||
import kotlin.jvm.Synchronized
|
||||
|
||||
@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider {
|
||||
*/
|
||||
public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta)
|
||||
|
||||
public fun MutableMeta.put(other: Meta) {
|
||||
other.items.forEach { (name, meta) ->
|
||||
name.asName() put meta
|
||||
}
|
||||
}
|
||||
|
||||
public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta)
|
||||
|
||||
/**
|
||||
* Set or replace value at given [name]
|
||||
*/
|
||||
|
@ -5,7 +5,7 @@ plugins {
|
||||
|
||||
kotlin {
|
||||
sourceSets {
|
||||
commonMain{
|
||||
commonMain {
|
||||
dependencies {
|
||||
api(project(":dataforge-context"))
|
||||
api(project(":dataforge-data"))
|
||||
@ -15,6 +15,6 @@ kotlin {
|
||||
}
|
||||
}
|
||||
|
||||
readme{
|
||||
readme {
|
||||
maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.serializer
|
||||
import space.kscience.dataforge.data.DataSetBuilder
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.data.GoalExecutionRestriction
|
||||
@ -9,6 +11,7 @@ import space.kscience.dataforge.meta.MetaRepr
|
||||
import space.kscience.dataforge.meta.Specification
|
||||
import space.kscience.dataforge.meta.descriptors.Described
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
import space.kscience.dataforge.misc.DFInternal
|
||||
import space.kscience.dataforge.misc.Type
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.workspace.Task.Companion.TYPE
|
||||
@ -37,6 +40,12 @@ public interface Task<out T : Any> : Described {
|
||||
}
|
||||
}
|
||||
|
||||
@Type(TYPE)
|
||||
public interface SerializableResultTask<T : Any> : Task<T> {
|
||||
public val resultType: KType
|
||||
public val resultSerializer: KSerializer<T>
|
||||
}
|
||||
|
||||
/**
|
||||
* A [Task] with [Specification] for wrapping and unwrapping task configuration
|
||||
*/
|
||||
@ -93,12 +102,30 @@ public fun <T : Any> Task(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* [Task] that has [resultSerializer] to be able to cache or send its results
|
||||
*/
|
||||
@DFInternal
|
||||
public class SerializableResultTaskImpl<T : Any>(
|
||||
override val resultType: KType,
|
||||
override val resultSerializer: KSerializer<T>,
|
||||
descriptor: MetaDescriptor? = null,
|
||||
builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
) : SerializableResultTask<T>, Task<T> by Task(resultType, descriptor, builder)
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> Task(
|
||||
descriptor: MetaDescriptor? = null,
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): Task<T> = Task(typeOf<T>(), descriptor, builder)
|
||||
|
||||
@OptIn(DFInternal::class)
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> SerializableResultTask(
|
||||
resultSerializer: KSerializer<T> = serializer(),
|
||||
descriptor: MetaDescriptor? = null,
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): Task<T> = SerializableResultTaskImpl(typeOf<T>(), resultSerializer, descriptor, builder)
|
||||
|
||||
/**
|
||||
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
|
||||
@ -134,4 +161,4 @@ public fun <T : Any, C : MetaRepr> Task(
|
||||
public inline fun <reified T : Any, C : MetaRepr> Task(
|
||||
specification: Specification<C>,
|
||||
noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit,
|
||||
): Task<T> = Task(typeOf<T>(), specification, builder)
|
||||
): Task<T> = Task(typeOf<T>(), specification, builder)
|
||||
|
@ -1,5 +1,6 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.ContextBuilder
|
||||
@ -41,9 +42,15 @@ public interface TaskContainer {
|
||||
@Deprecated("use buildTask instead", ReplaceWith("buildTask(name, descriptorBuilder, builder)"))
|
||||
public inline fun <reified T : Any> TaskContainer.registerTask(
|
||||
name: String,
|
||||
descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||
resultSerializer: KSerializer<T>? = null,
|
||||
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
|
||||
) {
|
||||
val descriptor = MetaDescriptor(descriptorBuilder)
|
||||
val task = if (resultSerializer == null) Task(descriptor, builder) else
|
||||
SerializableResultTask(resultSerializer, descriptor, builder)
|
||||
registerTask(Name.parse(name), task)
|
||||
}
|
||||
|
||||
public inline fun <reified T : Any> TaskContainer.buildTask(
|
||||
name: String,
|
||||
@ -59,10 +66,12 @@ public inline fun <reified T : Any> TaskContainer.buildTask(
|
||||
|
||||
public inline fun <reified T : Any> TaskContainer.task(
|
||||
descriptor: MetaDescriptor,
|
||||
resultSerializer: KSerializer<T>? = null,
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
|
||||
val taskName = Name.parse(property.name)
|
||||
val task = Task(descriptor, builder)
|
||||
val task = if (resultSerializer == null) Task(descriptor, builder) else
|
||||
SerializableResultTask(resultSerializer, descriptor, builder)
|
||||
registerTask(taskName, task)
|
||||
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
|
||||
}
|
||||
@ -78,10 +87,11 @@ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task(
|
||||
}
|
||||
|
||||
public inline fun <reified T : Any> TaskContainer.task(
|
||||
resultSerializer: KSerializer<T>? = null,
|
||||
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
|
||||
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
|
||||
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
|
||||
task(MetaDescriptor(descriptorBuilder), builder)
|
||||
task(MetaDescriptor(descriptorBuilder), resultSerializer, builder)
|
||||
|
||||
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
|
||||
private var context: Context? = null
|
||||
|
@ -46,5 +46,6 @@ include(
|
||||
":dataforge-context",
|
||||
":dataforge-data",
|
||||
":dataforge-workspace",
|
||||
":dataforge-scripting"
|
||||
)
|
||||
":dataforge-scripting",
|
||||
":dataforge-distributed",
|
||||
)
|
||||
|
Why do we need such complex construct. Isn't it possible to add a simple wrapper by adding a connection and serializator resolver. Something like this for server:.
And for client:
We can add a
close
method to a generic Workspace to handle the lifecycle.