diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt index e3545782..c0214e7b 100644 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/Responder.kt @@ -1,8 +1,21 @@ package hep.dataforge.io +import hep.dataforge.meta.DFExperimental + +/** + * An object that could respond to external messages asynchronously + */ interface Responder { /** * Send a request and wait for response for this specific request */ suspend fun respond(request: Envelope): Envelope +} + +/** + * A fire-and-forget consumer of messages + */ +@DFExperimental +interface Consumer { + fun consume(message: Envelope): Unit } \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionServer.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionServer.kt deleted file mode 100644 index 2eca605c..00000000 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/FunctionServer.kt +++ /dev/null @@ -1,75 +0,0 @@ -package hep.dataforge.io.functions - -import hep.dataforge.context.ContextAware -import hep.dataforge.io.IOFormat -import hep.dataforge.io.IOPlugin -import hep.dataforge.meta.Meta -import hep.dataforge.meta.get -import hep.dataforge.names.asName -import hep.dataforge.names.plus -import kotlin.reflect.KClass - - -/** - * A server that could produce asynchronous function values - */ -interface FunctionServer : ContextAware { - /** - * Call a function with given name and descriptor - */ - suspend fun call(meta: Meta, arg: T, inputType: KClass, outputType: KClass): R - - suspend fun callMany( - meta: Meta, - arg: List, - inputType: KClass, - outputType: KClass - ): List = List(arg.size) { - call(meta, arg[it], inputType, outputType) - } - - /** - * Get a generic suspended function with given name and descriptor - */ - fun function( - meta: Meta, - inputType: KClass, - outputType: KClass - ): (suspend (T) -> R) = { call(meta, it, inputType, outputType) } - - companion object { - const val FUNCTION_NAME_KEY = "function" - val FORMAT_KEY = "format".asName() - val INPUT_FORMAT_KEY = FORMAT_KEY + "input" - val OUTPUT_FORMAT_KEY = FORMAT_KEY + "output" - } -} - -suspend inline fun FunctionServer.call(meta: Meta, arg: T) = - call(meta, arg, T::class, R::class) - -suspend inline fun FunctionServer.callMany(meta: Meta, arg: List) = - callMany(meta, arg, T::class, R::class) - -inline fun FunctionServer.function(meta: Meta) = - function(meta, T::class, R::class) - -fun IOPlugin.getInputFormat(meta: Meta, type: KClass): IOFormat { - return meta[FunctionServer.INPUT_FORMAT_KEY]?.let { - resolveIOFormat(it, type) - } ?: error("Input format not resolved") -} - -fun IOPlugin.getOutputFormat(meta: Meta, type: KClass): IOFormat { - return meta[FunctionServer.OUTPUT_FORMAT_KEY]?.let { - resolveIOFormat(it, type) - } ?: error("Input format not resolved") -} - -inline fun IOPlugin.getInputFormat(meta: Meta): IOFormat = - getInputFormat(meta, T::class) - -inline fun IOPlugin.getOutputFormat(meta: Meta): IOFormat = - getOutputFormat(meta, R::class) - - diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionClient.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionClient.kt deleted file mode 100644 index 7c294891..00000000 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionClient.kt +++ /dev/null @@ -1,98 +0,0 @@ -package hep.dataforge.io.functions - -import hep.dataforge.context.Context -import hep.dataforge.context.ContextAware -import hep.dataforge.io.* -import hep.dataforge.meta.Meta -import hep.dataforge.meta.get -import hep.dataforge.meta.int -import kotlin.reflect.KClass - -class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware { - - private fun IOPlugin.encodeOne( - meta: Meta, - value: T, - valueType: KClass = value::class - ): Envelope = Envelope.invoke { - meta(meta) - type = REQUEST_TYPE - data { - val inputFormat: IOFormat = getInputFormat(meta, valueType) - inputFormat.run { - writeObject(value) - } - } - } - - private fun IOPlugin.encodeMany( - meta: Meta, - values: List, - valueType: KClass - ): Envelope = Envelope.invoke { - meta(meta) - type = REQUEST_TYPE - meta { - SIZE_KEY put values.size - } - data { - val inputFormat: IOFormat = getInputFormat(meta, valueType) - inputFormat.run { - values.forEach { - writeObject(it) - } - } - } - } - - private fun IOPlugin.decode(envelope: Envelope, valueType: KClass): List { - require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" } - val size = envelope.meta[SIZE_KEY].int ?: 1 - - return if (size == 0) { - emptyList() - } else { - val outputFormat: IOFormat = getOutputFormat(envelope.meta, valueType) - envelope.data?.read { - List(size) { - outputFormat.run { - readObject() - } - } - } ?: error("Message does not contain data") - } - } - - private val plugin by lazy { - context.plugins.load(IOPlugin) - } - - override suspend fun call( - meta: Meta, - arg: T, - inputType: KClass, - outputType: KClass - ): R = plugin.run { - val request = encodeOne(meta, arg) - val response = responder.respond(request) - return decode(response, outputType).first() - } - - override suspend fun callMany( - meta: Meta, - arg: List, - inputType: KClass, - outputType: KClass - ): List = plugin.run { - val request = encodeMany(meta, arg, inputType) - val response = responder.respond(request) - return decode(response, outputType) - } - - companion object { - const val REQUEST_TYPE = "function.request" - const val RESPONSE_TYPE = "function.response" - - const val SIZE_KEY = "size" - } -} \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionServer.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionServer.kt deleted file mode 100644 index 8252b1d3..00000000 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/functions/RemoteFunctionServer.kt +++ /dev/null @@ -1,58 +0,0 @@ -package hep.dataforge.io.functions - -import hep.dataforge.context.Context -import hep.dataforge.context.ContextAware -import hep.dataforge.io.Envelope -import hep.dataforge.io.IOPlugin -import hep.dataforge.io.Responder -import hep.dataforge.io.type -import hep.dataforge.meta.get -import hep.dataforge.meta.int - -class RemoteFunctionServer( - override val context: Context, - val functionServer: FunctionServer -) : ContextAware, Responder { - - private val plugin by lazy { - context.plugins.load(IOPlugin) - } - - - override suspend fun respond(request: Envelope): Envelope { - require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" } - - val inputFormat = plugin.getInputFormat(request.meta) - val outputFormat = plugin.getOutputFormat(request.meta) - - val size = request.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1 - - val input = request.data?.read { - inputFormat.run { - List(size) { - readObject() - } - } - } ?: error("Input is empty") - - val output = functionServer.callMany( - request.meta, - input - ) - - return Envelope.invoke { - meta { - meta(request.meta) - } - type = RemoteFunctionClient.RESPONSE_TYPE - data { - outputFormat.run { - output.forEach { - writeObject(it) - } - } - } - - } - } -} \ No newline at end of file diff --git a/dataforge-io/src/jvmMain/kotlin/hep/dataforge/io/functionsJVM.kt b/dataforge-io/src/jvmMain/kotlin/hep/dataforge/io/functionsJVM.kt deleted file mode 100644 index ccd57c55..00000000 --- a/dataforge-io/src/jvmMain/kotlin/hep/dataforge/io/functionsJVM.kt +++ /dev/null @@ -1,28 +0,0 @@ -package hep.dataforge.io - -import hep.dataforge.io.functions.FunctionServer -import hep.dataforge.io.functions.function -import hep.dataforge.meta.Meta -import hep.dataforge.names.Name -import kotlin.reflect.KClass -import kotlin.reflect.full.isSuperclassOf - - -fun IOPlugin.resolveIOFormatName(type: KClass<*>): Name { - return ioFormatFactories.find { it.type.isSuperclassOf(type) }?.name - ?: error("Can't resolve IOFormat for type $type") -} - -inline fun IOPlugin.generateFunctionMeta(functionName: String): Meta = Meta { - FunctionServer.FUNCTION_NAME_KEY put functionName - FunctionServer.INPUT_FORMAT_KEY put resolveIOFormatName(T::class).toString() - FunctionServer.OUTPUT_FORMAT_KEY put resolveIOFormatName(R::class).toString() -} - -inline fun FunctionServer.function( - functionName: String -): (suspend (T) -> R) { - val plugin = context.plugins.get() ?: error("IO plugin not loaded") - val meta = plugin.generateFunctionMeta(functionName) - return function(meta) -} \ No newline at end of file