Create separatere gradle project
This commit is contained in:
parent
e9d4683f9b
commit
f62507e1b9
41
dataforge-distributed/build.gradle.kts
Normal file
41
dataforge-distributed/build.gradle.kts
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
plugins {
|
||||||
|
id("ru.mipt.npm.gradle.mpp")
|
||||||
|
}
|
||||||
|
|
||||||
|
kotlin {
|
||||||
|
sourceSets {
|
||||||
|
commonMain {
|
||||||
|
dependencies {
|
||||||
|
api(project(":dataforge-context"))
|
||||||
|
api(project(":dataforge-data"))
|
||||||
|
api(project(":dataforge-io"))
|
||||||
|
api(project(":dataforge-workspace"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jvmMain {
|
||||||
|
dependencies {
|
||||||
|
// TODO include fat jar of lambdarpc
|
||||||
|
api(files("lambdarpc-core-0.0.1.jar"))
|
||||||
|
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
|
||||||
|
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
|
||||||
|
api("io.grpc:grpc-protobuf:1.44.0")
|
||||||
|
api("com.google.protobuf:protobuf-java-util:3.19.4")
|
||||||
|
api("com.google.protobuf:protobuf-kotlin:3.19.4")
|
||||||
|
api("io.grpc:grpc-kotlin-stub:1.2.1")
|
||||||
|
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
|
||||||
|
api("org.slf4j:slf4j-simple:1.7.36")
|
||||||
|
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kscience {
|
||||||
|
useSerialization {
|
||||||
|
json()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
readme {
|
||||||
|
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import io.lambdarpc.utils.Endpoint
|
||||||
|
import space.kscience.dataforge.context.AbstractPlugin
|
||||||
|
import space.kscience.dataforge.context.PluginTag
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.workspace.Task
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Plugin that purpose is to communicate with remote plugins.
|
||||||
|
* @param endpoint Endpoint of the remote plugin.
|
||||||
|
*/
|
||||||
|
public abstract class ClientWorkspacePlugin(endpoint: Endpoint) : AbstractPlugin() {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin.
|
||||||
|
*/
|
||||||
|
abstract override val tag: PluginTag
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enumeration of names of remote tasks and their result types.
|
||||||
|
*/
|
||||||
|
public abstract val tasks: Map<Name, KType>
|
||||||
|
|
||||||
|
private val _tasks: Map<Name, Task<*>> by lazy {
|
||||||
|
tasks.mapValues { (_, type) ->
|
||||||
|
RemoteTask<Any>(endpoint, type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun content(target: String): Map<Name, Any> =
|
||||||
|
when (target) {
|
||||||
|
Task.TYPE -> _tasks
|
||||||
|
else -> emptyMap()
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package space.kscience.dataforge.workspace.distributed
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
import io.lambdarpc.dsl.ServiceDispatcher
|
import io.lambdarpc.dsl.ServiceDispatcher
|
||||||
import io.lambdarpc.utils.Endpoint
|
import io.lambdarpc.utils.Endpoint
|
@ -0,0 +1,81 @@
|
|||||||
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
|
import io.ktor.utils.io.core.*
|
||||||
|
import io.lambdarpc.dsl.LibService
|
||||||
|
import io.lambdarpc.dsl.def
|
||||||
|
import io.lambdarpc.utils.Address
|
||||||
|
import io.lambdarpc.utils.Port
|
||||||
|
import io.lambdarpc.utils.toSid
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.Global
|
||||||
|
import space.kscience.dataforge.context.gather
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import space.kscience.dataforge.data.DataTree
|
||||||
|
import space.kscience.dataforge.distributed.serialization.DataSetCoder
|
||||||
|
import space.kscience.dataforge.distributed.serialization.NameCoder
|
||||||
|
import space.kscience.dataforge.distributed.serialization.SerializableDataSetAdapter
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.asName
|
||||||
|
import space.kscience.dataforge.workspace.Task
|
||||||
|
import space.kscience.dataforge.workspace.TaskResult
|
||||||
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
|
import space.kscience.dataforge.workspace.wrapResult
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Workspace that exposes its tasks for remote clients.
|
||||||
|
*/
|
||||||
|
public class ServiceWorkspace(
|
||||||
|
address: String = "localhost",
|
||||||
|
port: Int? = null,
|
||||||
|
override val context: Context = Global.buildContext("workspace".asName()),
|
||||||
|
data: DataSet<*> = runBlocking { DataTree<Any> {} },
|
||||||
|
override val targets: Map<String, Meta> = mapOf(),
|
||||||
|
) : Workspace, Closeable {
|
||||||
|
|
||||||
|
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
|
||||||
|
|
||||||
|
override val tasks: Map<Name, Task<*>>
|
||||||
|
get() = context.gather(Task.TYPE)
|
||||||
|
|
||||||
|
private val service = LibService(serviceId, address, port) {
|
||||||
|
execute of { name ->
|
||||||
|
val res = produce(name, Meta.EMPTY)
|
||||||
|
SerializableDataSetAdapter(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Address this workspace is available on.
|
||||||
|
*/
|
||||||
|
public val address: Address = Address(address)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Port this workspace is available on.
|
||||||
|
*/
|
||||||
|
public val port: Port
|
||||||
|
get() = service.port
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start [ServiceWorkspace] as a service.
|
||||||
|
*/
|
||||||
|
public fun start(): Unit = service.start()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Await termination of the service.
|
||||||
|
*/
|
||||||
|
public fun awaitTermination(): Unit = service.awaitTermination()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown service.
|
||||||
|
*/
|
||||||
|
public fun shutdown(): Unit = service.shutdown()
|
||||||
|
|
||||||
|
override fun close(): Unit = service.shutdown()
|
||||||
|
|
||||||
|
public companion object {
|
||||||
|
internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid()
|
||||||
|
internal val execute by serviceId.def(NameCoder, DataSetCoder)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import io.lambdarpc.coding.Coder
|
||||||
|
import io.lambdarpc.coding.CodingContext
|
||||||
|
import io.lambdarpc.transport.grpc.Entity
|
||||||
|
import io.lambdarpc.transport.serialization.Entity
|
||||||
|
import io.lambdarpc.transport.serialization.RawData
|
||||||
|
import java.nio.charset.Charset
|
||||||
|
|
||||||
|
internal object DataSetCoder : Coder<SerializableDataSet<Any>> {
|
||||||
|
override fun decode(entity: Entity, context: CodingContext): SerializableDataSet<Any> {
|
||||||
|
val string = entity.data.toString(Charset.defaultCharset())
|
||||||
|
val prototype = DataSetPrototype.fromJson(string)
|
||||||
|
return prototype.toDataSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun encode(value: SerializableDataSet<Any>, context: CodingContext): Entity {
|
||||||
|
val prototype = DataSetPrototype.of(value)
|
||||||
|
val string = prototype.toJson()
|
||||||
|
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import io.lambdarpc.coding.Coder
|
||||||
|
import io.lambdarpc.coding.CodingContext
|
||||||
|
import io.lambdarpc.transport.grpc.Entity
|
||||||
|
import io.lambdarpc.transport.serialization.Entity
|
||||||
|
import io.lambdarpc.transport.serialization.RawData
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import java.nio.charset.Charset
|
||||||
|
|
||||||
|
internal object NameCoder : Coder<Name> {
|
||||||
|
override fun decode(entity: Entity, context: CodingContext): Name {
|
||||||
|
require(entity.hasData()) { "Entity should contain data" }
|
||||||
|
val string = entity.data.toString(Charset.defaultCharset())
|
||||||
|
return Name.parse(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun encode(value: Name, context: CodingContext): Entity =
|
||||||
|
Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset()))
|
||||||
|
}
|
@ -0,0 +1,16 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents [DataSet] that should be initialized before usage.
|
||||||
|
*/
|
||||||
|
internal interface SerializableDataSet<T : Any> : DataSet<T> {
|
||||||
|
fun finishDecoding(type: KType)
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class SerializableDataSetAdapter<T : Any>(dataSet: DataSet<T>) :
|
||||||
|
SerializableDataSet<T>, DataSet<T> by dataSet {
|
||||||
|
override fun finishDecoding(type: KType) = Unit
|
||||||
|
}
|
@ -0,0 +1,56 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.Deferred
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.Serializable
|
||||||
|
import kotlinx.serialization.json.Json
|
||||||
|
import space.kscience.dataforge.data.Data
|
||||||
|
import space.kscience.dataforge.data.Goal
|
||||||
|
import space.kscience.dataforge.data.await
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.MetaSerializer
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [Data] representation that is trivially serializable.
|
||||||
|
*/
|
||||||
|
@Serializable
|
||||||
|
internal data class DataPrototype(
|
||||||
|
val meta: String,
|
||||||
|
val data: String,
|
||||||
|
) {
|
||||||
|
fun toData(type: KType): Data<Any> =
|
||||||
|
SimpleData(
|
||||||
|
type = type,
|
||||||
|
meta = Json.decodeFromString(MetaSerializer, meta),
|
||||||
|
data = Json.decodeFromString(kotlinx.serialization.serializer(type), data)!!
|
||||||
|
)
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
|
||||||
|
val meta = Json.encodeToString(MetaSerializer, data.meta)
|
||||||
|
val string = Json.encodeToString(serializer, data.await())
|
||||||
|
return DataPrototype(meta, string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial [Data] implementation.
|
||||||
|
*/
|
||||||
|
private class SimpleData<T : Any>(
|
||||||
|
override val type: KType,
|
||||||
|
override val meta: Meta,
|
||||||
|
val data: T,
|
||||||
|
) : Data<T> {
|
||||||
|
override val dependencies: Collection<Goal<*>>
|
||||||
|
get() = emptyList()
|
||||||
|
|
||||||
|
override val deferred: Deferred<T>
|
||||||
|
get() = CompletableDeferred(data)
|
||||||
|
|
||||||
|
override fun async(coroutineScope: CoroutineScope): Deferred<T> = deferred
|
||||||
|
override fun reset() = Unit
|
||||||
|
}
|
@ -0,0 +1,73 @@
|
|||||||
|
package space.kscience.dataforge.distributed.serialization
|
||||||
|
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.asFlow
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
|
import kotlinx.coroutines.flow.toList
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import kotlinx.serialization.Serializable
|
||||||
|
import kotlinx.serialization.json.Json
|
||||||
|
import kotlinx.serialization.serializer
|
||||||
|
import space.kscience.dataforge.data.Data
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import space.kscience.dataforge.data.NamedData
|
||||||
|
import space.kscience.dataforge.data.component1
|
||||||
|
import space.kscience.dataforge.data.component2
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [DataSet] representation that is trivially serializable.
|
||||||
|
*/
|
||||||
|
@Serializable
|
||||||
|
internal data class DataSetPrototype(val data: Map<String, DataPrototype>) {
|
||||||
|
fun toDataSet(): SerializableDataSet<Any> =
|
||||||
|
SerializableDataSetImpl(this)
|
||||||
|
|
||||||
|
fun toJson(): String = Json.encodeToString(serializer(), this)
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
fun <T : Any> of(dataSet: DataSet<T>): DataSetPrototype = runBlocking {
|
||||||
|
val serializer = serializer(dataSet.dataType)
|
||||||
|
val map = mutableListOf<Pair<String, DataPrototype>>()
|
||||||
|
dataSet.flowData().map { (name, data) ->
|
||||||
|
name.toString() to DataPrototype.of(data, serializer)
|
||||||
|
}.toList(map)
|
||||||
|
DataSetPrototype(map.associate { it })
|
||||||
|
}
|
||||||
|
|
||||||
|
fun fromJson(string: String): DataSetPrototype = Json.decodeFromString(serializer(), string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial [SerializableDataSet] implementation.
|
||||||
|
*/
|
||||||
|
private class SerializableDataSetImpl(private val prototype: DataSetPrototype) : SerializableDataSet<Any> {
|
||||||
|
|
||||||
|
private lateinit var type: KType
|
||||||
|
private lateinit var data: Map<Name, Data<Any>>
|
||||||
|
|
||||||
|
override fun finishDecoding(type: KType) {
|
||||||
|
this.type = type
|
||||||
|
this.data = prototype.data
|
||||||
|
.mapKeys { (name, _) -> Name.of(name) }
|
||||||
|
.mapValues { (_, dataPrototype) -> dataPrototype.toData(type) }
|
||||||
|
}
|
||||||
|
|
||||||
|
override val dataType: KType
|
||||||
|
get() = type
|
||||||
|
|
||||||
|
override fun flowData(): Flow<NamedData<Any>> =
|
||||||
|
data.map { (name, data) -> SimpleNamedData(name, data) }.asFlow()
|
||||||
|
|
||||||
|
override suspend fun getData(name: Name): Data<Any>? = data[name]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial named data implementation.
|
||||||
|
*/
|
||||||
|
private class SimpleNamedData(
|
||||||
|
override val name: Name,
|
||||||
|
override val data: Data<Any>,
|
||||||
|
) : NamedData<Any>, Data<Any> by data
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.distributed
|
||||||
|
|
||||||
import io.lambdarpc.utils.Endpoint
|
import io.lambdarpc.utils.Endpoint
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
@ -19,9 +19,11 @@ import space.kscience.dataforge.data.static
|
|||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import space.kscience.dataforge.names.asName
|
import space.kscience.dataforge.names.asName
|
||||||
import space.kscience.dataforge.workspace.distributed.ClientWorkspacePlugin
|
import space.kscience.dataforge.workspace.Workspace
|
||||||
import space.kscience.dataforge.workspace.distributed.ServiceWorkspace
|
import space.kscience.dataforge.workspace.WorkspacePlugin
|
||||||
|
import space.kscience.dataforge.workspace.task
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
import kotlin.reflect.KType
|
||||||
import kotlin.reflect.typeOf
|
import kotlin.reflect.typeOf
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
@ -46,11 +48,15 @@ private class MyPlugin : WorkspacePlugin() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(
|
private class RemoteMyPlugin(endpoint: Endpoint) : ClientWorkspacePlugin(endpoint) {
|
||||||
MyPlugin.tag,
|
override val tag: PluginTag
|
||||||
endpoint,
|
get() = MyPlugin.tag
|
||||||
|
|
||||||
|
override val tasks: Map<Name, KType>
|
||||||
|
get() = mapOf(
|
||||||
"task".asName() to typeOf<Int>()
|
"task".asName() to typeOf<Int>()
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
class ServiceWorkspaceTest {
|
class ServiceWorkspaceTest {
|
@ -12,27 +12,6 @@ kotlin {
|
|||||||
api(project(":dataforge-io"))
|
api(project(":dataforge-io"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
jvmMain {
|
|
||||||
dependencies {
|
|
||||||
// TODO include fat jar of lambdarpc
|
|
||||||
api(files("lambdarpc-core-0.0.1.jar"))
|
|
||||||
runtimeOnly("io.grpc:grpc-netty-shaded:1.44.0")
|
|
||||||
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
|
|
||||||
api("io.grpc:grpc-protobuf:1.44.0")
|
|
||||||
api("com.google.protobuf:protobuf-java-util:3.19.4")
|
|
||||||
api("com.google.protobuf:protobuf-kotlin:3.19.4")
|
|
||||||
api("io.grpc:grpc-kotlin-stub:1.2.1")
|
|
||||||
api("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.2")
|
|
||||||
api("org.slf4j:slf4j-simple:1.7.36")
|
|
||||||
api("io.github.microutils:kotlin-logging-jvm:2.1.21")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
kscience {
|
|
||||||
useSerialization {
|
|
||||||
json()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,31 +0,0 @@
|
|||||||
package space.kscience.dataforge.workspace.distributed
|
|
||||||
|
|
||||||
import io.lambdarpc.utils.Endpoint
|
|
||||||
import space.kscience.dataforge.context.AbstractPlugin
|
|
||||||
import space.kscience.dataforge.context.PluginTag
|
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
import space.kscience.dataforge.workspace.Task
|
|
||||||
import kotlin.reflect.KType
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Plugin that purpose is to communicate with remote plugins.
|
|
||||||
* @param tag Tag og the [ClientWorkspacePlugin] should be equal to the tag of the corresponding remote plugin.
|
|
||||||
* @param endpoint Endpoint of the remote plugin.
|
|
||||||
* @param tasks Enumeration of names of remote tasks and their result types.
|
|
||||||
*/
|
|
||||||
public abstract class ClientWorkspacePlugin(
|
|
||||||
override val tag: PluginTag,
|
|
||||||
endpoint: Endpoint,
|
|
||||||
vararg tasks: Pair<Name, KType>,
|
|
||||||
) : AbstractPlugin() {
|
|
||||||
|
|
||||||
private val tasks: Map<Name, Task<*>> = tasks.associate { (name, type) ->
|
|
||||||
name to RemoteTask<Any>(endpoint, type)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun content(target: String): Map<Name, Any> =
|
|
||||||
when (target) {
|
|
||||||
Task.TYPE -> tasks
|
|
||||||
else -> emptyMap()
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,237 +0,0 @@
|
|||||||
package space.kscience.dataforge.workspace.distributed
|
|
||||||
|
|
||||||
import io.ktor.utils.io.core.*
|
|
||||||
import io.lambdarpc.coding.Coder
|
|
||||||
import io.lambdarpc.coding.CodingContext
|
|
||||||
import io.lambdarpc.dsl.LibService
|
|
||||||
import io.lambdarpc.dsl.def
|
|
||||||
import io.lambdarpc.transport.grpc.Entity
|
|
||||||
import io.lambdarpc.transport.serialization.Entity
|
|
||||||
import io.lambdarpc.transport.serialization.RawData
|
|
||||||
import io.lambdarpc.utils.Address
|
|
||||||
import io.lambdarpc.utils.Port
|
|
||||||
import io.lambdarpc.utils.toSid
|
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.Deferred
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.asFlow
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.coroutines.flow.toList
|
|
||||||
import kotlinx.coroutines.runBlocking
|
|
||||||
import kotlinx.serialization.KSerializer
|
|
||||||
import kotlinx.serialization.Serializable
|
|
||||||
import kotlinx.serialization.json.Json
|
|
||||||
import kotlinx.serialization.serializer
|
|
||||||
import space.kscience.dataforge.context.Context
|
|
||||||
import space.kscience.dataforge.context.Global
|
|
||||||
import space.kscience.dataforge.context.gather
|
|
||||||
import space.kscience.dataforge.data.Data
|
|
||||||
import space.kscience.dataforge.data.DataSet
|
|
||||||
import space.kscience.dataforge.data.DataTree
|
|
||||||
import space.kscience.dataforge.data.Goal
|
|
||||||
import space.kscience.dataforge.data.NamedData
|
|
||||||
import space.kscience.dataforge.data.await
|
|
||||||
import space.kscience.dataforge.data.component1
|
|
||||||
import space.kscience.dataforge.data.component2
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.MetaSerializer
|
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
import space.kscience.dataforge.names.asName
|
|
||||||
import space.kscience.dataforge.workspace.Task
|
|
||||||
import space.kscience.dataforge.workspace.TaskResult
|
|
||||||
import space.kscience.dataforge.workspace.Workspace
|
|
||||||
import space.kscience.dataforge.workspace.wrapResult
|
|
||||||
import java.nio.charset.Charset
|
|
||||||
import kotlin.reflect.KType
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Workspace that exposes its tasks for remote clients.
|
|
||||||
*/
|
|
||||||
public class ServiceWorkspace(
|
|
||||||
address: String = "localhost",
|
|
||||||
port: Int? = null,
|
|
||||||
override val context: Context = Global.buildContext("workspace".asName()),
|
|
||||||
data: DataSet<*> = runBlocking { DataTree<Any> {} },
|
|
||||||
override val targets: Map<String, Meta> = mapOf(),
|
|
||||||
) : Workspace, Closeable {
|
|
||||||
|
|
||||||
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
|
|
||||||
|
|
||||||
override val tasks: Map<Name, Task<*>>
|
|
||||||
get() = context.gather(Task.TYPE)
|
|
||||||
|
|
||||||
private val service = LibService(serviceId, address, port) {
|
|
||||||
execute of { name ->
|
|
||||||
val res = produce(name, Meta.EMPTY)
|
|
||||||
LazyDecodableDataSetAdapter(res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Address this workspace is available on.
|
|
||||||
*/
|
|
||||||
public val address: Address = Address(address)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Port this workspace is available on.
|
|
||||||
*/
|
|
||||||
public val port: Port
|
|
||||||
get() = service.port
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start [ServiceWorkspace] as a service.
|
|
||||||
*/
|
|
||||||
public fun start(): Unit = service.start()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Await termination of the service.
|
|
||||||
*/
|
|
||||||
public fun awaitTermination(): Unit = service.awaitTermination()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Shutdown service.
|
|
||||||
*/
|
|
||||||
public fun shutdown(): Unit = service.shutdown()
|
|
||||||
|
|
||||||
override fun close(): Unit = service.shutdown()
|
|
||||||
|
|
||||||
public companion object {
|
|
||||||
internal val serviceId = "d41b95b1-828b-4444-8ff0-6f9c92a79246".toSid()
|
|
||||||
internal val execute by serviceId.def(NameCoder, DataSetCoder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private object NameCoder : Coder<Name> {
|
|
||||||
override fun decode(entity: Entity, context: CodingContext): Name {
|
|
||||||
require(entity.hasData()) { "Entity should contain data" }
|
|
||||||
val string = entity.data.toString(Charset.defaultCharset())
|
|
||||||
return Name.parse(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun encode(value: Name, context: CodingContext): Entity =
|
|
||||||
Entity(RawData.copyFrom(value.toString(), Charset.defaultCharset()))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Serializable
|
|
||||||
private data class DataPrototype(
|
|
||||||
val meta: String,
|
|
||||||
val data: String,
|
|
||||||
) {
|
|
||||||
companion object {
|
|
||||||
suspend fun <T : Any> of(data: Data<T>, serializer: KSerializer<in T>): DataPrototype {
|
|
||||||
val meta = Json.encodeToString(MetaSerializer, data.meta)
|
|
||||||
val string = Json.encodeToString(serializer, data.await())
|
|
||||||
return DataPrototype(meta, string)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Data class that represents serializable [DataSet].
|
|
||||||
*/
|
|
||||||
@Serializable
|
|
||||||
private data class DataSetPrototype(
|
|
||||||
val data: Map<String, DataPrototype>,
|
|
||||||
)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* [DataSetPrototype] builder.
|
|
||||||
*/
|
|
||||||
private fun <T : Any> DataSet<T>.toPrototype(): DataSetPrototype = runBlocking {
|
|
||||||
val serializer = serializer(dataType)
|
|
||||||
val map = mutableListOf<Pair<String, DataPrototype>>()
|
|
||||||
flowData().map { (name, data) ->
|
|
||||||
name.toString() to DataPrototype.of(data, serializer)
|
|
||||||
}.toList(map)
|
|
||||||
DataSetPrototype(map.associate { it })
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Trivial [Data] implementation.
|
|
||||||
*/
|
|
||||||
private class SimpleData(
|
|
||||||
override val type: KType,
|
|
||||||
override val meta: Meta,
|
|
||||||
val data: Any,
|
|
||||||
) : Data<Any> {
|
|
||||||
override val dependencies: Collection<Goal<*>>
|
|
||||||
get() = emptyList()
|
|
||||||
|
|
||||||
override val deferred: Deferred<Any>
|
|
||||||
get() = CompletableDeferred(data)
|
|
||||||
|
|
||||||
override fun async(coroutineScope: CoroutineScope): Deferred<Any> = deferred
|
|
||||||
override fun reset() = Unit
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Trivial named data implementation.
|
|
||||||
*/
|
|
||||||
private class SimpleNamedData(
|
|
||||||
override val name: Name,
|
|
||||||
override val data: Data<Any>,
|
|
||||||
) : NamedData<Any>, Data<Any> by data
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents [DataSet] that should be initialized before usage.
|
|
||||||
*/
|
|
||||||
internal interface LazyDecodableDataSet<T : Any> : DataSet<T> {
|
|
||||||
fun finishDecoding(type: KType)
|
|
||||||
}
|
|
||||||
|
|
||||||
private class LazyDecodableDataSetAdapter<T : Any>(val dataSet: DataSet<T>) :
|
|
||||||
LazyDecodableDataSet<T>, DataSet<T> by dataSet {
|
|
||||||
override fun finishDecoding(type: KType) = Unit
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Trivial [LazyDecodableDataSet] implementation.
|
|
||||||
*/
|
|
||||||
private class SimpleDataSet(private val prototype: DataSetPrototype) : LazyDecodableDataSet<Any> {
|
|
||||||
|
|
||||||
lateinit var type: KType
|
|
||||||
lateinit var data: Map<Name, Pair<Meta, Any>>
|
|
||||||
|
|
||||||
override fun finishDecoding(type: KType) {
|
|
||||||
this.type = type
|
|
||||||
val serializer = serializer(type)
|
|
||||||
this.data = prototype.data
|
|
||||||
.mapKeys { (name, _) -> Name.of(name) }
|
|
||||||
.mapValues { (_, pair) ->
|
|
||||||
val (meta, data) = pair
|
|
||||||
Pair(
|
|
||||||
Json.decodeFromString(MetaSerializer, meta),
|
|
||||||
Json.decodeFromString(serializer, data)!!
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override val dataType: KType
|
|
||||||
get() = type
|
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<Any>> =
|
|
||||||
data.map { (name, pair) ->
|
|
||||||
val (meta, data) = pair
|
|
||||||
val wrapped = SimpleData(dataType, meta, data)
|
|
||||||
SimpleNamedData(name, wrapped)
|
|
||||||
}.asFlow()
|
|
||||||
|
|
||||||
override suspend fun getData(name: Name): Data<Any>? = data[name]?.let { (meta, data) ->
|
|
||||||
SimpleData(dataType, meta, data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private object DataSetCoder : Coder<LazyDecodableDataSet<Any>> {
|
|
||||||
override fun decode(entity: Entity, context: CodingContext): LazyDecodableDataSet<Any> {
|
|
||||||
val string = entity.data.toString(Charset.defaultCharset())
|
|
||||||
val prototype = Json.decodeFromString(serializer<DataSetPrototype>(), string)
|
|
||||||
return SimpleDataSet(prototype)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun encode(value: LazyDecodableDataSet<Any>, context: CodingContext): Entity {
|
|
||||||
val prototype = value.toPrototype()
|
|
||||||
val string = Json.encodeToString(serializer(), prototype)
|
|
||||||
return Entity(RawData.copyFrom(string, Charset.defaultCharset()))
|
|
||||||
}
|
|
||||||
}
|
|
@ -22,5 +22,6 @@ include(
|
|||||||
":dataforge-context",
|
":dataforge-context",
|
||||||
":dataforge-data",
|
":dataforge-data",
|
||||||
":dataforge-workspace",
|
":dataforge-workspace",
|
||||||
":dataforge-scripting"
|
":dataforge-scripting",
|
||||||
|
":dataforge-distributed",
|
||||||
)
|
)
|
Loading…
Reference in New Issue
Block a user