diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Binary.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Binary.kt index 67c962dd..811986b6 100644 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Binary.kt +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Binary.kt @@ -64,6 +64,16 @@ class ArrayBinary(val array: ByteArray) : RandomAccessBinary { } } +class PacketBinary(val packet: ByteReadPacket): Binary{ + override val size: ULong + get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates. + + override fun read(block: Input.() -> R): R { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + +} + /** * Read given binary as object using given format */ diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Envelope.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Envelope.kt index c2abca21..606a5ffb 100644 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Envelope.kt +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Envelope.kt @@ -1,9 +1,9 @@ package hep.dataforge.io -import hep.dataforge.meta.Laminate -import hep.dataforge.meta.Meta -import hep.dataforge.meta.get -import hep.dataforge.meta.string +import hep.dataforge.meta.* +import kotlinx.io.core.Output +import kotlinx.io.core.buildPacket +import kotlinx.io.core.readBytes interface Envelope { val meta: Meta @@ -20,6 +20,10 @@ interface Envelope { const val ENVELOPE_DESCRIPTION_KEY = "$ENVELOPE_NODE.description" //const val ENVELOPE_TIME_KEY = "@envelope.time" + /** + * Build a static envelope using provided builder + */ + fun build(block: EnvelopeBuilder.() -> Unit) = EnvelopeBuilder().apply(block).build() } } @@ -63,4 +67,29 @@ fun Envelope.withMetaLayers(vararg layers: Meta): Envelope { this is ProxyEnvelope -> ProxyEnvelope(source, *layers, *this.meta.layers.toTypedArray()) else -> ProxyEnvelope(this, *layers) } +} + +class EnvelopeBuilder { + private val metaBuilder = MetaBuilder() + var data: Binary? = null + + fun meta(block: MetaBuilder.() -> Unit) { + metaBuilder.apply(block) + } + + var type by metaBuilder.string(key = Envelope.ENVELOPE_TYPE_KEY) + var dataType by metaBuilder.string(key = Envelope.ENVELOPE_DATA_TYPE_KEY) + var description by metaBuilder.string(key = Envelope.ENVELOPE_DESCRIPTION_KEY) + + /** + * Construct a binary and transform it into byte-array based buffer + */ + fun data(block: Output.() -> Unit) { + val bytes = buildPacket { + block() + } + data = ArrayBinary(bytes.readBytes()) + } + + internal fun build() = SimpleEnvelope(metaBuilder.seal(), data) } \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/FunctionServer.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/FunctionServer.kt deleted file mode 100644 index 2b254716..00000000 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/FunctionServer.kt +++ /dev/null @@ -1,38 +0,0 @@ -package hep.dataforge.io - -import kotlin.reflect.KClass - -/** - * A descriptor for specific type of functions - */ -interface FunctionSpec { - val inputType: KClass - val outputType: KClass -} - -/** - * A server that could produce asynchronous function values - */ -interface FunctionServer { - /** - * Call a function with given name and descriptor - */ - suspend fun > call(name: String, descriptor: D, arg: T): R - - /** - * Resolve a function descriptor for given types - */ - fun resolveType(inputType: KClass, outputType: KClass): FunctionSpec - - /** - * Get a generic suspended function with given name and descriptor - */ - operator fun > get(name: String, descriptor: D): (suspend (T) -> R) = - { call(name, descriptor, it) } -} - -suspend inline fun FunctionServer.call(name: String, arg: T): R = - call(name, resolveType(T::class, R::class), arg) - -inline operator fun FunctionServer.get(name: String): (suspend (T) -> R) = - get(name, resolveType(T::class, R::class)) diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/IOFormat.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/IOFormat.kt index 9cc9a584..faa110f7 100644 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/IOFormat.kt +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/IOFormat.kt @@ -1,14 +1,47 @@ package hep.dataforge.io +import hep.dataforge.io.IOFormat.Companion.TYPE +import hep.dataforge.provider.Type import kotlinx.io.core.* +import kotlinx.serialization.KSerializer +import kotlinx.serialization.cbor.Cbor /** * And interface for serialization facilities */ +@Type(TYPE) interface IOFormat { fun Output.writeThis(obj: T) fun Input.readThis(): T + + companion object { + const val TYPE = "ioFormat" + } } fun IOFormat.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) } -fun IOFormat.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes() \ No newline at end of file +fun IOFormat.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes() + + +object DoubleIOFormat: IOFormat{ + override fun Output.writeThis(obj: Double) { + writeDouble(obj) + } + override fun Input.readThis(): Double = readDouble() +} + +/** + * Experimental + */ +class SerializerIOFormat(val serializer: KSerializer) : IOFormat { + override fun Output.writeThis(obj: T) { + val bytes = Cbor.plain.dump(serializer, obj) + writeFully(bytes) + } + + override fun Input.readThis(): T { + //FIXME reads the whole input + val bytes = readBytes() + return Cbor.plain.load(serializer, bytes) + } +} \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt new file mode 100644 index 00000000..e3545782 --- /dev/null +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt @@ -0,0 +1,8 @@ +package hep.dataforge.io + +interface Responder { + /** + * Send a request and wait for response for this specific request + */ + suspend fun respond(request: Envelope): Envelope +} \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionServer.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionServer.kt new file mode 100644 index 00000000..5fd81711 --- /dev/null +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionServer.kt @@ -0,0 +1,34 @@ +package hep.dataforge.io + +import hep.dataforge.context.ContextAware +import hep.dataforge.io.functions.FunctionSpec + + +/** + * A server that could produce asynchronous function values + */ +interface FunctionServer : ContextAware { + /** + * Call a function with given name and descriptor + */ + suspend fun call(name: String, spec: FunctionSpec, arg: T): R + + suspend fun callMany( + name: String, + spec: FunctionSpec, + arg: List + ): List = List(arg.size) { + call(name, spec, arg[it]) + } + + /** + * Get a generic suspended function with given name and descriptor + */ + fun get( + name: String, + spec: FunctionSpec + ): (suspend (T) -> R) = + { call(name, spec, it) } +} + + diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionSpec.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionSpec.kt new file mode 100644 index 00000000..3ed6f18e --- /dev/null +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionSpec.kt @@ -0,0 +1,9 @@ +package hep.dataforge.io.functions + +import hep.dataforge.io.IOFormat +import hep.dataforge.meta.MetaRepr + +interface FunctionSpec: MetaRepr { + val inputFormat: IOFormat + val outputFormat: IOFormat +} \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionsPlugin.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionsPlugin.kt new file mode 100644 index 00000000..495a4c47 --- /dev/null +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionsPlugin.kt @@ -0,0 +1,58 @@ +package hep.dataforge.io.functions + +import hep.dataforge.context.AbstractPlugin +import hep.dataforge.context.PluginFactory +import hep.dataforge.context.PluginTag +import hep.dataforge.io.DoubleIOFormat +import hep.dataforge.io.IOFormat +import hep.dataforge.io.IOPlugin +import hep.dataforge.meta.Meta +import hep.dataforge.meta.buildMeta +import kotlin.reflect.KClass + +class FunctionsPlugin(meta: Meta) : AbstractPlugin(meta) { + override val tag: PluginTag get() = Companion.tag + + override fun dependsOn(): List> = listOf(IOPlugin) + + private val specs: Collection> = listOf( + DoubleToDoubleFunctionSpec + ) + + fun resolve(meta: Meta): FunctionSpec<*, *>? { + return specs.find { it.toMeta() == meta } + } + +// fun resolve(inputType: KClass, outputType: KClass): FunctionSpec { +// +// } + + companion object : PluginFactory { + + override val tag: PluginTag = PluginTag("io.functions", group = PluginTag.DATAFORGE_GROUP) + override val type: KClass = FunctionsPlugin::class + override fun invoke(meta: Meta): FunctionsPlugin = FunctionsPlugin(meta) + } +} + +object DoubleToDoubleFunctionSpec : FunctionSpec { + override val inputFormat: IOFormat get() = DoubleIOFormat + override val outputFormat: IOFormat get() = DoubleIOFormat + + override fun toMeta(): Meta = buildMeta { + "input" to "Double" + "output" to "Double" + } +} + +//suspend inline fun FunctionServer.call(name: String, arg: T): R { +// val plugin = context.plugins.load(FunctionsPlugin) +// val spec = plugin.resolve(T::class, R::class) +// return call(name, spec, arg) +//} +// +//inline operator fun FunctionServer.get(name: String): (suspend (T) -> R) { +// val plugin = context.plugins.load(FunctionsPlugin) +// val spec = plugin.resolve(T::class, R::class) +// return get(name, spec) +//} \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionClient.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionClient.kt new file mode 100644 index 00000000..b285e80d --- /dev/null +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionClient.kt @@ -0,0 +1,100 @@ +package hep.dataforge.io + +import hep.dataforge.context.Context +import hep.dataforge.context.ContextAware +import hep.dataforge.io.functions.FunctionSpec +import hep.dataforge.io.functions.FunctionsPlugin +import hep.dataforge.meta.get +import hep.dataforge.meta.int +import kotlin.reflect.KClass + +class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware { + + private fun encodeOne(name: String, spec: FunctionSpec, value: T): Envelope = + Envelope.build { + type = REQUEST_TYPE + meta { + FUNCTION_NAME_KEY to name + FUNCTION_SPEC_KEY to spec.toMeta() + } + data { + spec.inputFormat.run { + writeThis(value) + } + } + } + + private fun encodeMany(name: String, spec: FunctionSpec, values: List): Envelope = + Envelope.build { + type = REQUEST_TYPE + meta { + FUNCTION_NAME_KEY to name + FUNCTION_SPEC_KEY to spec.toMeta() + SIZE_KEY to values.size + } + data { + spec.inputFormat.run { + values.forEach { + writeThis(it) + } + } + } + } + + private fun decode(spec: FunctionSpec<*, R>, envelope: Envelope): List { + require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" } + val size = envelope.meta[SIZE_KEY].int ?: 1 + + return if (size == 0) { + emptyList() + } else { + envelope.data?.read { + List(size) { + spec.outputFormat.run { + readThis() + } + } + } ?: error("Message does not contain data") + } + } + + override suspend fun call( + name: String, + spec: FunctionSpec, + arg: T + ): R { + val request = encodeOne(name, spec, arg) + val response = responder.respond(request) + return decode(spec, response).first() + } + + override suspend fun callMany( + name: String, + spec: FunctionSpec, + arg: List + ): List { + val request = encodeMany(name, spec, arg) + val response = responder.respond(request) + return decode(spec, response) + } + + private val plugin by lazy { + context.plugins.load(FunctionsPlugin) + } + + fun resolveSpec( + inputType: KClass, + outputType: KClass + ): FunctionSpec { + return plugin.resolve(inputType, outputType) + } + + companion object { + const val REQUEST_TYPE = "function.request" + const val RESPONSE_TYPE = "function.response" + + const val FUNCTION_NAME_KEY = "function" + const val SIZE_KEY = "size" + const val FUNCTION_SPEC_KEY = "spec" + } +} \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionServer.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionServer.kt new file mode 100644 index 00000000..ba93c0a0 --- /dev/null +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionServer.kt @@ -0,0 +1,58 @@ +package hep.dataforge.io.functions + +import hep.dataforge.context.Context +import hep.dataforge.context.ContextAware +import hep.dataforge.io.* +import hep.dataforge.meta.get +import hep.dataforge.meta.int +import hep.dataforge.meta.node +import hep.dataforge.meta.string + +class RemoteFunctionServer( + override val context: Context, + val functionServer: FunctionServer +) : ContextAware, Responder { + + private val plugin by lazy { + context.plugins.load(FunctionsPlugin) + } + + override suspend fun respond(request: Envelope): Envelope { + require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" } + val functionName = request.meta[RemoteFunctionClient.FUNCTION_NAME_KEY].string ?: "" + + @Suppress("UNCHECKED_CAST") val spec = request.meta[RemoteFunctionClient.FUNCTION_SPEC_KEY].node?.let { + plugin.resolve(it) as FunctionSpec + } ?: error("Function specification not found") + + val size = request + .meta[RemoteFunctionClient.SIZE_KEY].int ?: 1 + + val input = request.data?.read { + spec.inputFormat.run { + List(size) { + readThis() + } + } + } ?: error("Input is empty") + + val output = functionServer.callMany(functionName, spec, input) + + return Envelope.build { + type = RemoteFunctionClient.RESPONSE_TYPE + meta { + RemoteFunctionClient.FUNCTION_NAME_KEY to functionName + RemoteFunctionClient.FUNCTION_SPEC_KEY to spec.toMeta() + RemoteFunctionClient.SIZE_KEY to output.size + } + data { + spec.outputFormat.run { + output.forEach { + writeThis(it) + } + } + } + + } + } +} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt index 24c012ea..cb3e790e 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt @@ -6,10 +6,7 @@ import hep.dataforge.data.filter import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaRepr import hep.dataforge.meta.buildMeta -import hep.dataforge.names.EmptyName -import hep.dataforge.names.Name -import hep.dataforge.names.get -import hep.dataforge.names.isEmpty +import hep.dataforge.names.* /** * A dependency of the task which allows to lazily create a data tree for single dependency @@ -45,12 +42,18 @@ class AllDataDependency(val placement: Name = EmptyName) : Dependency() { "data" to "*" "to" to placement } - } -class TaskModelDependency(val name: String, val meta: Meta, val placement: Name = EmptyName) : Dependency() { +abstract class TaskDependency(val meta: Meta, val placement: Name = EmptyName) : Dependency() { + abstract fun resolveTask(workspace: Workspace): Task<*> + + /** + * A name of the dependency for logging and serialization + */ + abstract val name: Name + override fun apply(workspace: Workspace): DataNode { - val task = workspace.tasks[name] ?: error("Task with name $name is not found in the workspace") + val task = resolveTask(workspace) if (task.isTerminal) TODO("Support terminal task") val result = workspace.run(task, meta) return if (placement.isEmpty()) { @@ -65,4 +68,19 @@ class TaskModelDependency(val name: String, val meta: Meta, val placement: Name "meta" to meta "to" to placement } +} + +class DirectTaskDependency(val task: Task<*>, meta: Meta, placement: Name) : TaskDependency(meta, placement) { + override fun resolveTask(workspace: Workspace): Task<*> = task + + override val name: Name get() = DIRECT_TASK_NAME + task.name + + companion object { + val DIRECT_TASK_NAME = "@direct".asName() + } +} + +class WorkspaceTaskDependency(override val name: Name, meta: Meta, placement: Name) : TaskDependency(meta, placement) { + override fun resolveTask(workspace: Workspace): Task<*> = + workspace.tasks[name] ?: error("Task with name $name is not found in the workspace") } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt index b6c4b2b6..07fb6638 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt @@ -35,9 +35,9 @@ data class TaskModel( "meta" to meta "dependsOn" to { val dataDependencies = dependencies.filterIsInstance() - val taskDependencies = dependencies.filterIsInstance() + val taskDependencies = dependencies.filterIsInstance() setIndexed("data".toName(), dataDependencies.map { it.toMeta() }) - setIndexed("task".toName(), taskDependencies.map { it.toMeta() }) { taskDependencies[it].name } + setIndexed("task".toName(), taskDependencies.map { it.toMeta() }) { taskDependencies[it].name.toString() } //TODO ensure all dependencies are listed } } @@ -76,14 +76,15 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) { var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "") /** - * Add dependency for + * Add dependency for a task defined in a workspace and resolved by */ fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) { - dependencies.add(TaskModelDependency(name, meta, placement)) + dependencies.add(WorkspaceTaskDependency(name.toName(), meta, placement)) } - fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) = - dependsOn(task.name, meta, placement) + fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) { + dependencies.add(DirectTaskDependency(task, meta, placement)) + } fun dependsOn(task: Task<*>, placement: Name = EmptyName, metaBuilder: MetaBuilder.() -> Unit) = dependsOn(task.name, buildMeta(metaBuilder), placement) diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt index 04e1efcc..c35003d3 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt @@ -81,13 +81,13 @@ class TaskBuilder(val name: String) { inline fun customPipe( from: String = "", to: String = "", - crossinline block: PipeBuilder.(Context) -> Unit + crossinline block: PipeBuilder.(TaskEnv) -> Unit ) { action(from, to) { PipeAction( inputType = T::class, outputType = R::class - ) { block(context) } + ) { block(this@action) } } } @@ -118,13 +118,13 @@ class TaskBuilder(val name: String) { inline fun joinByGroup( from: String = "", to: String = "", - crossinline block: JoinGroupBuilder.(Context) -> Unit + crossinline block: JoinGroupBuilder.(TaskEnv) -> Unit ) { action(from, to) { JoinAction( inputType = T::class, outputType = R::class - ) { block(context) } + ) { block(this@action) } } } @@ -157,13 +157,13 @@ class TaskBuilder(val name: String) { inline fun split( from: String = "", to: String = "", - crossinline block: SplitBuilder.(Context) -> Unit + crossinline block: SplitBuilder.(TaskEnv) -> Unit ) { action(from, to) { SplitAction( inputType = T::class, outputType = R::class - ) { block(context) } + ) { block(this@action) } } } diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt index 51471525..b64ee467 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt @@ -60,16 +60,16 @@ class SimpleWorkspaceTest { model { allData() } - joinByGroup { context -> + joinByGroup {env-> group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) { result { data -> - context.logger.info { "Starting even" } + env.context.logger.info { "Starting even" } data.values.average() } } group("odd", filter = { name, _ -> name.toString().toInt() % 2 == 1 }) { result { data -> - context.logger.info { "Starting odd" } + env.context.logger.info { "Starting odd" } data.values.average() } }