Compare commits

...

25 Commits

Author SHA1 Message Date
Andrey Stoyan
50a9e7d314 Refactor serialization 2022-06-04 18:20:39 +03:00
Andrey Stoyan
ba3354b4a2 Fix transitive execution 2022-06-02 13:24:13 +03:00
Andrey Stoyan
7e13c3dec6 Endpoint info via meta 2022-06-01 23:31:51 +03:00
Andrey Stoyan
99efe3a456 Merge remote-tracking branch 'upstream/0.6' into distributed 2022-06-01 09:38:56 +03:00
Andrey Stoyan
196854429a Non-suspend data tree builder 2022-05-31 22:50:09 +03:00
Andrey Stoyan
9c55d26be5 Merge branch 'dev' into distributed 2022-05-25 14:08:56 +03:00
Andrey Stoyan
114d310fdc Replace custom json coders with default 2022-05-25 13:51:26 +03:00
Andrey Stoyan
77f8f045e6 Refactor remote execution model 2022-05-21 23:10:54 +03:00
Andrey Stoyan
230a3f1e22 Make dataset serialization non-blocking 2022-05-10 10:49:38 +03:00
Andrey Stoyan
07a0bd551b Await data in parallel 2022-05-10 10:31:26 +03:00
Andrey Stoyan
f62507e1b9 Create separatere gradle project 2022-05-09 21:32:40 +03:00
Andrey Stoyan
e9d4683f9b Send meta too 2022-05-09 20:22:17 +03:00
Andrey Stoyan
a4044c82a0 Call remote task of service workspace 2022-04-30 19:34:27 +03:00
Alexander Nozik
be8e971436
all platforms for macos publication 2021-12-12 11:13:35 +03:00
Alexander Nozik
9cc30b1f4e
disable sonatype publishing 2021-12-12 10:48:52 +03:00
Alexander Nozik
7414e60192
Update publish.yml
force disable github publishing
2021-11-30 14:42:00 +03:00
Alexander Nozik
8c0bc05a9a
Merge pull request #74 from mipt-npm/dev
0.5.2
2021-11-30 13:12:20 +03:00
Alexander Nozik
acfe9c2f74
Merge pull request #70 from mipt-npm/dev
0.5.0
2021-08-13 21:42:32 +03:00
Alexander Nozik
32b986fc47
Merge pull request #64 from mipt-npm/dev
0.4.0
2021-04-27 11:23:43 +03:00
Alexander Nozik
b86c6141cd
Merge pull request #62 from mipt-npm/dev
Publishing settings
2021-03-07 15:43:34 +03:00
Alexander Nozik
e13e3ab6bf
Merge pull request #61 from mipt-npm/dev
0.4.0-dev-2
2021-03-07 15:06:32 +03:00
Alexander Nozik
a7ee2f5922
Merge pull request #60 from mipt-npm/dev
0.3.0
2021-02-07 21:18:04 +03:00
Alexander Nozik
6912f26291
Merge pull request #58 from mipt-npm/dev
0.2.0 release
2020-11-28 10:29:09 +03:00
Alexander Nozik
d951668911
Merge pull request #52 from mipt-npm/dev
0.1.8
2020-07-05 15:32:13 +03:00
Alexander Nozik
f5b2b4c9e4
Merge pull request #51 from mipt-npm/dev
0.1.8-dev-4
2020-06-06 21:28:42 +03:00
18 changed files with 615 additions and 19 deletions

View File

@ -9,6 +9,8 @@ jobs:
publish: publish:
environment: environment:
name: publish name: publish
env:
publishing.github: false
strategy: strategy:
matrix: matrix:
os: [ macOS-latest, windows-latest ] os: [ macOS-latest, windows-latest ]
@ -48,6 +50,6 @@ jobs:
- name: Publish Mac Artifacts - name: Publish Mac Artifacts
if: matrix.os == 'macOS-latest' if: matrix.os == 'macOS-latest'
run: > run: >
./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true -Ppublishing.platform=macosX64 ./gradlew release --no-daemon --build-cache -Ppublishing.enabled=true
-Ppublishing.space.user=${{ secrets.SPACE_APP_ID }} -Ppublishing.space.user=${{ secrets.SPACE_APP_ID }}
-Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }} -Ppublishing.space.token=${{ secrets.SPACE_APP_SECRET }}

View File

@ -32,4 +32,4 @@ public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is Named
NamedDataImpl(name, this.data) NamedDataImpl(name, this.data)
} else { } else {
NamedDataImpl(name, this) NamedDataImpl(name, this)
} }

View File

@ -1,8 +1,19 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.coroutineScope
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.cutFirst
import space.kscience.dataforge.names.cutLast
import space.kscience.dataforge.names.firstOrNull
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.lastOrNull
import space.kscience.dataforge.names.length
import space.kscience.dataforge.names.plus
import kotlin.collections.set
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
@ -73,10 +84,10 @@ public inline fun <T : Any> DataTree(
@Suppress("FunctionName") @Suppress("FunctionName")
public inline fun <reified T : Any> DataTree( public inline fun <reified T : Any> DataTree(
noinline block: DataSetBuilder<T>.() -> Unit, noinline block: DataSetBuilder<T>.() -> Unit = {},
): DataTree<T> = DataTree(typeOf<T>(), block) ): DataTree<T> = DataTree(typeOf<T>(), block)
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
public fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) { public fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) {
populateFrom(this@seal) populateFrom(this@seal)
} }

View File

@ -0,0 +1,42 @@
plugins {
id("ru.mipt.npm.gradle.mpp")
}
kotlin {
sourceSets {
commonMain {
dependencies {
api(project(":dataforge-context"))
api(project(":dataforge-data"))
api(project(":dataforge-io"))
api(project(":dataforge-workspace"))
}
}
jvmMain {
dependencies {
// TODO include fat jar of lambdarpc
val path = "../../../lambdarpc/LambdaRPC.kt/lambdarpc-core/build/libs"
api(files("$path/lambdarpc-core-0.0.1-SNAPSHOT.jar"))
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
api("io.grpc:grpc-protobuf:1.44.0")
api("com.google.protobuf:protobuf-java-util:3.19.4")
api("com.google.protobuf:protobuf-kotlin:3.19.4")
api("io.grpc:grpc-kotlin-stub:1.2.1")
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
api("org.slf4j:slf4j-simple:1.7.36")
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
}
}
}
}
kscience {
useSerialization {
json()
}
}
readme {
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
}

View File

@ -0,0 +1,79 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.dsl.LibService
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.execute
import space.kscience.dataforge.distributed.ServiceWorkspace.Companion.serviceId
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.put
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Workspace
/**
* Workspace that exposes its tasks for remote clients.
* @param port Port to start service on. Will be random if null.
*/
public class NodeWorkspace(
port: Int? = null,
context: Context = Global.buildContext("workspace".asName()),
private val dataSerializer: KSerializer<Any>? = null,
data: DataSet<*> = DataTree<Any>(),
targets: Map<String, Meta> = mapOf(),
) : Workspace by RemoteTaskWorkspace(context, data, targets), ServiceWorkspace {
private val _port: Int? = port
private val service = LibService(serviceId, port) {
execute of { name, meta, executionContext ->
if (name == Name.EMPTY) {
requireNotNull(dataSerializer) { "Data serializer is not provided on $port" }
DataSetPrototype.of(data, dataSerializer)
} else {
val proxyContext = context.buildContext(context.name + "proxy") {
properties {
put(executionContext)
}
}
val proxy = RemoteTaskWorkspace(context = proxyContext, data = data)
val task = tasks[name] ?: error("Task with name $name not found in the workspace")
require(task is SerializableResultTask)
// Local function to capture generic parameter
suspend fun <T : Any> execute(task: SerializableResultTask<T>): DataSetPrototype {
val result = task.execute(proxy, name, meta)
return DataSetPrototype.of(result, task.resultSerializer)
}
execute(task)
}
}
}
/**
* Port this workspace is available on.
*/
public override val port: Int
get() = _port ?: service.port.p
/**
* Start [NodeWorkspace] as a service.
*/
public override fun start(): Unit = service.start()
/**
* Await termination of the service.
*/
public override fun awaitTermination(): Unit = service.awaitTermination()
/**
* Shutdown service.
*/
public override fun shutdown(): Unit = service.shutdown()
override fun close(): Unit = shutdown()
}

View File

@ -0,0 +1,35 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.context.ServiceDispatcher
import io.lambdarpc.utils.Endpoint
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
import kotlin.reflect.KType
/**
* Proxy task that communicates with the corresponding remote task.
*/
internal class RemoteTask<T : Any>(
endpoint: String,
override val resultType: KType,
override val resultSerializer: KSerializer<T>,
override val descriptor: MetaDescriptor? = null,
private val executionContext: Meta = Meta.EMPTY,
) : SerializableResultTask<T> {
private val dispatcher = ServiceDispatcher(ServiceWorkspace.serviceId to Endpoint(endpoint))
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> {
val result = withContext(dispatcher) {
ServiceWorkspace.execute(taskName, taskMeta, executionContext)
}
val dataSet = result.toDataSet(resultType, resultSerializer)
return workspace.wrapResult(dataSet, taskName, taskMeta)
}
}

View File

@ -0,0 +1,76 @@
package space.kscience.dataforge.distributed
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.extract
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.values.string
import space.kscience.dataforge.workspace.SerializableResultTask
import space.kscience.dataforge.workspace.Task
import space.kscience.dataforge.workspace.TaskResult
import space.kscience.dataforge.workspace.Workspace
import space.kscience.dataforge.workspace.wrapResult
/**
* Workspace that returns [RemoteTask] if such task should be
* executed remotely according to the execution context.
*/
internal open class RemoteTaskWorkspace(
final override val context: Context = Global.buildContext("workspace".asName()),
data: DataSet<*> = DataTree<Any>(),
override val targets: Map<String, Meta> = mapOf(),
) : Workspace {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
private val _tasks: Map<Name, Task<*>> = context.gather(Task.TYPE)
override val tasks: Map<Name, Task<*>>
get() = object : AbstractMap<Name, Task<*>>() {
override val entries: Set<Map.Entry<Name, Task<*>>>
get() = _tasks.entries
override fun get(key: Name): Task<*>? {
val executionContext = context.properties.extract(EXECUTION_CONTEXT)
val endpoint = executionContext
?.get(EXECUTION_CONTEXT)?.get(ENDPOINTS)?.get(key)
?: return _tasks[key]
val string = endpoint.value?.string ?: error("Endpoint is expected to be a string")
val local = _tasks[key] ?: error("No task with name $key")
require(local is SerializableResultTask) { "Task $key result is not serializable" }
return RemoteTask(string, local.resultType, local.resultSerializer, local.descriptor, executionContext)
}
}
companion object {
val EXECUTION_CONTEXT = "execution".asName()
val ENDPOINTS = "endpoints".asName()
}
}
public fun MutableMeta.endpoints(block: EndpointsBuilder.() -> Unit) {
RemoteTaskWorkspace.EXECUTION_CONTEXT put {
RemoteTaskWorkspace.ENDPOINTS put EndpointsBuilder().apply(block).build()
}
}
public class EndpointsBuilder {
private val endpoints = mutableMapOf<Name, String>()
public infix fun Name.on(endpoint: String) {
endpoints[this] = endpoint
}
internal fun build(): Meta = Meta {
endpoints.forEach { (name, endpoint) ->
name put endpoint
}
}
}

View File

@ -0,0 +1,29 @@
package space.kscience.dataforge.distributed
import io.lambdarpc.dsl.def
import io.lambdarpc.dsl.j
import io.lambdarpc.utils.ServiceId
import space.kscience.dataforge.distributed.serialization.DataSetPrototype
import space.kscience.dataforge.meta.MetaSerializer
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Workspace
import java.io.Closeable
/**
* [Workspace] that can expose its tasks to other workspaces as a service.
*/
public interface ServiceWorkspace : Workspace, Closeable {
public val port: Int
public fun start()
public fun awaitTermination()
public fun shutdown()
override fun close() {
shutdown()
}
public companion object {
internal val serviceId = ServiceId("d41b95b1-828b-4444-8ff0-6f9c92a79246")
internal val execute by serviceId.def(j<Name>(), j(MetaSerializer), j(MetaSerializer), j<DataSetPrototype>())
}
}

View File

@ -0,0 +1,34 @@
package space.kscience.dataforge.distributed.serialization
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.StaticData
import space.kscience.dataforge.data.await
import space.kscience.dataforge.meta.MetaSerializer
import kotlin.reflect.KType
/**
* [Data] representation that is trivially serializable.
*/
@Serializable
internal data class DataPrototype(
val meta: String,
val data: String,
) {
fun <T : Any> toData(type: KType, serializer: KSerializer<T>): Data<T> =
StaticData(
type = type,
value = Json.decodeFromString(serializer, data),
meta = Json.decodeFromString(MetaSerializer, meta),
)
companion object {
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
val meta = Json.encodeToString(MetaSerializer, data.meta)
val string = Json.encodeToString(serializer, data.await())
return DataPrototype(meta, string)
}
}
}

View File

@ -0,0 +1,57 @@
package space.kscience.dataforge.distributed.serialization
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.NamedData
import space.kscience.dataforge.data.asIterable
import space.kscience.dataforge.data.component1
import space.kscience.dataforge.data.component2
import space.kscience.dataforge.data.named
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
/**
* [DataSet] representation that is trivially serializable.
*/
@Serializable
internal data class DataSetPrototype(val meta: Meta, val data: Map<String, DataPrototype>) {
fun <T : Any> toDataSet(type: KType, serializer: KSerializer<T>): DataSet<T> {
val data = data
.mapKeys { (name, _) -> Name.of(name) }
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type, serializer) }
return SimpleDataSet(type, data, meta)
}
companion object {
suspend fun <T : Any> of(dataSet: DataSet<T>, serializer: KSerializer<T>): DataSetPrototype = coroutineScope {
val prototypes = dataSet.asIterable().map { (name, data) ->
name.toString() to async { DataPrototype.of(data, serializer) }
}
val map = prototypes.associate { (name, deferred) -> name to deferred.await() }
DataSetPrototype(dataSet.meta, map)
}
}
}
/**
* Trivial [DataSet] implementation.
*/
private class SimpleDataSet<T : Any>(
override val dataType: KType,
private val data: Map<Name, Data<T>>,
override val meta: Meta,
) : DataSet<T> {
override fun iterator(): Iterator<NamedData<T>> =
data
.asSequence()
.map { (name, data) -> data.named(name) }
.iterator()
override fun get(name: Name): Data<T>? = data[name]
}

View File

@ -0,0 +1,61 @@
package space.kscience.dataforge.distributed
import kotlinx.serialization.serializer
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.context.info
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.data.data
import space.kscience.dataforge.data.getByType
import space.kscience.dataforge.data.map
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.WorkspacePlugin
import space.kscience.dataforge.workspace.fromTask
import space.kscience.dataforge.workspace.task
import kotlin.reflect.KClass
internal class MyPlugin1 : WorkspacePlugin() {
override val tag: PluginTag
get() = Factory.tag
val task by task<Int>(serializer()) {
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
val myInt = workspace.data.getByType<Int>("int")!!
data("result", myInt.data.map { it + 1 })
}
companion object Factory : PluginFactory<MyPlugin1> {
override val tag: PluginTag
get() = PluginTag("Plg1")
override val type: KClass<out MyPlugin1>
get() = MyPlugin1::class
override fun build(context: Context, meta: Meta): MyPlugin1 = MyPlugin1()
}
}
internal class MyPlugin2 : WorkspacePlugin() {
override val tag: PluginTag
get() = Factory.tag
val task by task<Int>(serializer()) {
workspace.logger.info { "In ${tag.name}.task on ${workspace.context.name}" }
val dataSet = fromTask<Int>(Name.of(MyPlugin1.tag.name, "task"))
val data = dataSet["result".asName()]!!
data("result", data.map { it + 1 })
}
companion object Factory : PluginFactory<MyPlugin2> {
override val tag: PluginTag
get() = PluginTag("Plg2")
override val type: KClass<out MyPlugin2>
get() = MyPlugin2::class
override fun build(context: Context, meta: Meta): MyPlugin2 = MyPlugin2()
}
}

View File

@ -0,0 +1,90 @@
package space.kscience.dataforge.distributed
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.get
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.workspace.Workspace
import kotlin.test.assertEquals
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class RemoteCallTest {
private lateinit var worker1: NodeWorkspace
private lateinit var worker2: NodeWorkspace
private lateinit var workspace: Workspace
@BeforeAll
fun before() = runBlocking {
worker1 = NodeWorkspace(
context = Global.buildContext("worker1".asName()) {
plugin(MyPlugin1)
},
data = DataTree<Any> {
static("int", 42)
},
)
worker1.start()
worker2 = NodeWorkspace(
context = Global.buildContext("worker2".asName()) {
plugin(MyPlugin1)
plugin(MyPlugin2)
},
)
worker2.start()
workspace = NodeWorkspace(
context = Global.buildContext {
plugin(MyPlugin1)
plugin(MyPlugin2)
properties {
endpoints {
Name.of(MyPlugin1.tag.name, "task") on "localhost:${worker1.port}"
Name.of(MyPlugin2.tag.name, "task") on "localhost:${worker2.port}"
}
}
}
)
}
@AfterAll
fun after() {
worker1.shutdown()
worker2.shutdown()
}
@Test
fun `local execution`() = runBlocking {
assertEquals(42, worker1.data["int".asName()]!!.await())
val res = worker1
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
.await()
assertEquals(43, res)
}
@Test
fun `remote execution`() = runBlocking {
val remoteRes = workspace
.produce(Name.of(MyPlugin1.tag.name, "task"), Meta.EMPTY)["result"]!!
.await()
assertEquals(43, remoteRes)
}
@Test
fun `transitive execution`() = runBlocking {
val remoteRes = workspace
.produce(Name.of(MyPlugin2.tag.name, "task"), Meta.EMPTY)["result"]!!
.await()
assertEquals(44, remoteRes)
}
}

View File

@ -4,8 +4,23 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import space.kscience.dataforge.misc.Type import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.misc.unsafeCast import space.kscience.dataforge.misc.unsafeCast
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.Name
import space.kscience.dataforge.values.* import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.cutFirst
import space.kscience.dataforge.names.cutLast
import space.kscience.dataforge.names.firstOrNull
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.lastOrNull
import space.kscience.dataforge.names.length
import space.kscience.dataforge.names.parseAsName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.values.EnumValue
import space.kscience.dataforge.values.Value
import space.kscience.dataforge.values.ValueProvider
import space.kscience.dataforge.values.boolean
import space.kscience.dataforge.values.numberOrNull
import space.kscience.dataforge.values.string
/** /**
@ -111,6 +126,11 @@ public operator fun Meta.get(name: Name): Meta? = this.getMeta(name)
*/ */
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key)) public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
public fun Meta.extract(name: Name): Meta? {
val value = get(name) ?: return null
return Meta { name put value }
}
/** /**
* Get all items matching given name. The index of the last element, if present is used as a [Regex], * Get all items matching given name. The index of the last element, if present is used as a [Regex],
* against which indexes of elements are matched. * against which indexes of elements are matched.
@ -135,7 +155,7 @@ public fun Meta.getIndexed(name: Name): Map<String?, Meta> {
} }
} }
public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName()) public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName())
/** /**
* A meta node that ensures that all of its descendants has at least the same type. * A meta node that ensures that all of its descendants has at least the same type.

View File

@ -2,11 +2,25 @@ package space.kscience.dataforge.meta
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.cutFirst
import space.kscience.dataforge.names.cutLast
import space.kscience.dataforge.names.first
import space.kscience.dataforge.names.firstOrNull
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.lastOrNull
import space.kscience.dataforge.names.length
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.names.withIndex
import space.kscience.dataforge.values.EnumValue import space.kscience.dataforge.values.EnumValue
import space.kscience.dataforge.values.MutableValueProvider import space.kscience.dataforge.values.MutableValueProvider
import space.kscience.dataforge.values.Value import space.kscience.dataforge.values.Value
import space.kscience.dataforge.values.asValue import space.kscience.dataforge.values.asValue
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.js.JsName import kotlin.js.JsName
import kotlin.jvm.Synchronized import kotlin.jvm.Synchronized
@ -146,6 +160,14 @@ public interface MutableMeta : Meta, MutableMetaProvider {
*/ */
public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta) public operator fun MutableMeta.set(name: Name, meta: Meta): Unit = setMeta(name, meta)
public fun MutableMeta.put(other: Meta) {
other.items.forEach { (name, meta) ->
name.asName() put meta
}
}
public operator fun MutableMeta.plusAssign(meta: Meta): Unit = put(meta)
/** /**
* Set or replace value at given [name] * Set or replace value at given [name]
*/ */

View File

@ -5,7 +5,7 @@ plugins {
kotlin { kotlin {
sourceSets { sourceSets {
commonMain{ commonMain {
dependencies { dependencies {
api(project(":dataforge-context")) api(project(":dataforge-context"))
api(project(":dataforge-data")) api(project(":dataforge-data"))
@ -15,6 +15,6 @@ kotlin {
} }
} }
readme{ readme {
maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL maturity = ru.mipt.npm.gradle.Maturity.EXPERIMENTAL
} }

View File

@ -1,6 +1,8 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.serializer
import space.kscience.dataforge.data.DataSetBuilder import space.kscience.dataforge.data.DataSetBuilder
import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.GoalExecutionRestriction import space.kscience.dataforge.data.GoalExecutionRestriction
@ -9,6 +11,7 @@ import space.kscience.dataforge.meta.MetaRepr
import space.kscience.dataforge.meta.Specification import space.kscience.dataforge.meta.Specification
import space.kscience.dataforge.meta.descriptors.Described import space.kscience.dataforge.meta.descriptors.Described
import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.Type import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task.Companion.TYPE import space.kscience.dataforge.workspace.Task.Companion.TYPE
@ -37,6 +40,12 @@ public interface Task<out T : Any> : Described {
} }
} }
@Type(TYPE)
public interface SerializableResultTask<T : Any> : Task<T> {
public val resultType: KType
public val resultSerializer: KSerializer<T>
}
/** /**
* A [Task] with [Specification] for wrapping and unwrapping task configuration * A [Task] with [Specification] for wrapping and unwrapping task configuration
*/ */
@ -93,12 +102,30 @@ public fun <T : Any> Task(
} }
} }
/**
* [Task] that has [resultSerializer] to be able to cache or send its results
*/
@DFInternal
public class SerializableResultTaskImpl<T : Any>(
override val resultType: KType,
override val resultSerializer: KSerializer<T>,
descriptor: MetaDescriptor? = null,
builder: suspend TaskResultBuilder<T>.() -> Unit,
) : SerializableResultTask<T>, Task<T> by Task(resultType, descriptor, builder)
@Suppress("FunctionName") @Suppress("FunctionName")
public inline fun <reified T : Any> Task( public inline fun <reified T : Any> Task(
descriptor: MetaDescriptor? = null, descriptor: MetaDescriptor? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit, noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = Task(typeOf<T>(), descriptor, builder) ): Task<T> = Task(typeOf<T>(), descriptor, builder)
@OptIn(DFInternal::class)
@Suppress("FunctionName")
public inline fun <reified T : Any> SerializableResultTask(
resultSerializer: KSerializer<T> = serializer(),
descriptor: MetaDescriptor? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = SerializableResultTaskImpl(typeOf<T>(), resultSerializer, descriptor, builder)
/** /**
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used. * Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
@ -134,4 +161,4 @@ public fun <T : Any, C : MetaRepr> Task(
public inline fun <reified T : Any, C : MetaRepr> Task( public inline fun <reified T : Any, C : MetaRepr> Task(
specification: Specification<C>, specification: Specification<C>,
noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit, noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit,
): Task<T> = Task(typeOf<T>(), specification, builder) ): Task<T> = Task(typeOf<T>(), specification, builder)

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.serialization.KSerializer
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.ContextBuilder
@ -41,9 +42,15 @@ public interface TaskContainer {
@Deprecated("use buildTask instead", ReplaceWith("buildTask(name, descriptorBuilder, builder)")) @Deprecated("use buildTask instead", ReplaceWith("buildTask(name, descriptorBuilder, builder)"))
public inline fun <reified T : Any> TaskContainer.registerTask( public inline fun <reified T : Any> TaskContainer.registerTask(
name: String, name: String,
descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, resultSerializer: KSerializer<T>? = null,
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit, noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder)) ) {
val descriptor = MetaDescriptor(descriptorBuilder)
val task = if (resultSerializer == null) Task(descriptor, builder) else
SerializableResultTask(resultSerializer, descriptor, builder)
registerTask(Name.parse(name), task)
}
public inline fun <reified T : Any> TaskContainer.buildTask( public inline fun <reified T : Any> TaskContainer.buildTask(
name: String, name: String,
@ -59,10 +66,12 @@ public inline fun <reified T : Any> TaskContainer.buildTask(
public inline fun <reified T : Any> TaskContainer.task( public inline fun <reified T : Any> TaskContainer.task(
descriptor: MetaDescriptor, descriptor: MetaDescriptor,
resultSerializer: KSerializer<T>? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit, noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property -> ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
val taskName = Name.parse(property.name) val taskName = Name.parse(property.name)
val task = Task(descriptor, builder) val task = if (resultSerializer == null) Task(descriptor, builder) else
SerializableResultTask(resultSerializer, descriptor, builder)
registerTask(taskName, task) registerTask(taskName, task)
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) } ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
} }
@ -78,10 +87,11 @@ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task(
} }
public inline fun <reified T : Any> TaskContainer.task( public inline fun <reified T : Any> TaskContainer.task(
resultSerializer: KSerializer<T>? = null,
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit, noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
task(MetaDescriptor(descriptorBuilder), builder) task(MetaDescriptor(descriptorBuilder), resultSerializer, builder)
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer { public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
private var context: Context? = null private var context: Context? = null

View File

@ -46,5 +46,6 @@ include(
":dataforge-context", ":dataforge-context",
":dataforge-data", ":dataforge-data",
":dataforge-workspace", ":dataforge-workspace",
":dataforge-scripting" ":dataforge-scripting",
) ":dataforge-distributed",
)