Remove function clients and servers
This commit is contained in:
parent
b72f73f75b
commit
34110f6be1
@ -1,8 +1,21 @@
|
|||||||
package hep.dataforge.io
|
package hep.dataforge.io
|
||||||
|
|
||||||
|
import hep.dataforge.meta.DFExperimental
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An object that could respond to external messages asynchronously
|
||||||
|
*/
|
||||||
interface Responder {
|
interface Responder {
|
||||||
/**
|
/**
|
||||||
* Send a request and wait for response for this specific request
|
* Send a request and wait for response for this specific request
|
||||||
*/
|
*/
|
||||||
suspend fun respond(request: Envelope): Envelope
|
suspend fun respond(request: Envelope): Envelope
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A fire-and-forget consumer of messages
|
||||||
|
*/
|
||||||
|
@DFExperimental
|
||||||
|
interface Consumer {
|
||||||
|
fun consume(message: Envelope): Unit
|
||||||
|
}
|
@ -1,75 +0,0 @@
|
|||||||
package hep.dataforge.io.functions
|
|
||||||
|
|
||||||
import hep.dataforge.context.ContextAware
|
|
||||||
import hep.dataforge.io.IOFormat
|
|
||||||
import hep.dataforge.io.IOPlugin
|
|
||||||
import hep.dataforge.meta.Meta
|
|
||||||
import hep.dataforge.meta.get
|
|
||||||
import hep.dataforge.names.asName
|
|
||||||
import hep.dataforge.names.plus
|
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A server that could produce asynchronous function values
|
|
||||||
*/
|
|
||||||
interface FunctionServer : ContextAware {
|
|
||||||
/**
|
|
||||||
* Call a function with given name and descriptor
|
|
||||||
*/
|
|
||||||
suspend fun <T : Any, R : Any> call(meta: Meta, arg: T, inputType: KClass<out T>, outputType: KClass<out R>): R
|
|
||||||
|
|
||||||
suspend fun <T : Any, R : Any> callMany(
|
|
||||||
meta: Meta,
|
|
||||||
arg: List<T>,
|
|
||||||
inputType: KClass<out T>,
|
|
||||||
outputType: KClass<out R>
|
|
||||||
): List<R> = List(arg.size) {
|
|
||||||
call<T, R>(meta, arg[it], inputType, outputType)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a generic suspended function with given name and descriptor
|
|
||||||
*/
|
|
||||||
fun <T : Any, R : Any> function(
|
|
||||||
meta: Meta,
|
|
||||||
inputType: KClass<out T>,
|
|
||||||
outputType: KClass<out R>
|
|
||||||
): (suspend (T) -> R) = { call(meta, it, inputType, outputType) }
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
const val FUNCTION_NAME_KEY = "function"
|
|
||||||
val FORMAT_KEY = "format".asName()
|
|
||||||
val INPUT_FORMAT_KEY = FORMAT_KEY + "input"
|
|
||||||
val OUTPUT_FORMAT_KEY = FORMAT_KEY + "output"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
suspend inline fun <reified T : Any, reified R : Any> FunctionServer.call(meta: Meta, arg: T) =
|
|
||||||
call(meta, arg, T::class, R::class)
|
|
||||||
|
|
||||||
suspend inline fun <reified T : Any, reified R : Any> FunctionServer.callMany(meta: Meta, arg: List<T>) =
|
|
||||||
callMany(meta, arg, T::class, R::class)
|
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> FunctionServer.function(meta: Meta) =
|
|
||||||
function(meta, T::class, R::class)
|
|
||||||
|
|
||||||
fun <T : Any> IOPlugin.getInputFormat(meta: Meta, type: KClass<out T>): IOFormat<T> {
|
|
||||||
return meta[FunctionServer.INPUT_FORMAT_KEY]?.let {
|
|
||||||
resolveIOFormat<T>(it, type)
|
|
||||||
} ?: error("Input format not resolved")
|
|
||||||
}
|
|
||||||
|
|
||||||
fun <R : Any> IOPlugin.getOutputFormat(meta: Meta, type: KClass<out R>): IOFormat<R> {
|
|
||||||
return meta[FunctionServer.OUTPUT_FORMAT_KEY]?.let {
|
|
||||||
resolveIOFormat<R>(it, type)
|
|
||||||
} ?: error("Input format not resolved")
|
|
||||||
}
|
|
||||||
|
|
||||||
inline fun <reified T : Any> IOPlugin.getInputFormat(meta: Meta): IOFormat<T> =
|
|
||||||
getInputFormat(meta, T::class)
|
|
||||||
|
|
||||||
inline fun <reified R : Any> IOPlugin.getOutputFormat(meta: Meta): IOFormat<R> =
|
|
||||||
getOutputFormat(meta, R::class)
|
|
||||||
|
|
||||||
|
|
@ -1,98 +0,0 @@
|
|||||||
package hep.dataforge.io.functions
|
|
||||||
|
|
||||||
import hep.dataforge.context.Context
|
|
||||||
import hep.dataforge.context.ContextAware
|
|
||||||
import hep.dataforge.io.*
|
|
||||||
import hep.dataforge.meta.Meta
|
|
||||||
import hep.dataforge.meta.get
|
|
||||||
import hep.dataforge.meta.int
|
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware {
|
|
||||||
|
|
||||||
private fun <T : Any> IOPlugin.encodeOne(
|
|
||||||
meta: Meta,
|
|
||||||
value: T,
|
|
||||||
valueType: KClass<out T> = value::class
|
|
||||||
): Envelope = Envelope.invoke {
|
|
||||||
meta(meta)
|
|
||||||
type = REQUEST_TYPE
|
|
||||||
data {
|
|
||||||
val inputFormat: IOFormat<T> = getInputFormat(meta, valueType)
|
|
||||||
inputFormat.run {
|
|
||||||
writeObject(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun <T : Any> IOPlugin.encodeMany(
|
|
||||||
meta: Meta,
|
|
||||||
values: List<T>,
|
|
||||||
valueType: KClass<out T>
|
|
||||||
): Envelope = Envelope.invoke {
|
|
||||||
meta(meta)
|
|
||||||
type = REQUEST_TYPE
|
|
||||||
meta {
|
|
||||||
SIZE_KEY put values.size
|
|
||||||
}
|
|
||||||
data {
|
|
||||||
val inputFormat: IOFormat<T> = getInputFormat(meta, valueType)
|
|
||||||
inputFormat.run {
|
|
||||||
values.forEach {
|
|
||||||
writeObject(it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun <R : Any> IOPlugin.decode(envelope: Envelope, valueType: KClass<out R>): List<R> {
|
|
||||||
require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" }
|
|
||||||
val size = envelope.meta[SIZE_KEY].int ?: 1
|
|
||||||
|
|
||||||
return if (size == 0) {
|
|
||||||
emptyList()
|
|
||||||
} else {
|
|
||||||
val outputFormat: IOFormat<R> = getOutputFormat(envelope.meta, valueType)
|
|
||||||
envelope.data?.read {
|
|
||||||
List<R>(size) {
|
|
||||||
outputFormat.run {
|
|
||||||
readObject()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} ?: error("Message does not contain data")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private val plugin by lazy {
|
|
||||||
context.plugins.load(IOPlugin)
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun <T : Any, R : Any> call(
|
|
||||||
meta: Meta,
|
|
||||||
arg: T,
|
|
||||||
inputType: KClass<out T>,
|
|
||||||
outputType: KClass<out R>
|
|
||||||
): R = plugin.run {
|
|
||||||
val request = encodeOne(meta, arg)
|
|
||||||
val response = responder.respond(request)
|
|
||||||
return decode<R>(response, outputType).first()
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun <T : Any, R : Any> callMany(
|
|
||||||
meta: Meta,
|
|
||||||
arg: List<T>,
|
|
||||||
inputType: KClass<out T>,
|
|
||||||
outputType: KClass<out R>
|
|
||||||
): List<R> = plugin.run {
|
|
||||||
val request = encodeMany(meta, arg, inputType)
|
|
||||||
val response = responder.respond(request)
|
|
||||||
return decode<R>(response, outputType)
|
|
||||||
}
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
const val REQUEST_TYPE = "function.request"
|
|
||||||
const val RESPONSE_TYPE = "function.response"
|
|
||||||
|
|
||||||
const val SIZE_KEY = "size"
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,58 +0,0 @@
|
|||||||
package hep.dataforge.io.functions
|
|
||||||
|
|
||||||
import hep.dataforge.context.Context
|
|
||||||
import hep.dataforge.context.ContextAware
|
|
||||||
import hep.dataforge.io.Envelope
|
|
||||||
import hep.dataforge.io.IOPlugin
|
|
||||||
import hep.dataforge.io.Responder
|
|
||||||
import hep.dataforge.io.type
|
|
||||||
import hep.dataforge.meta.get
|
|
||||||
import hep.dataforge.meta.int
|
|
||||||
|
|
||||||
class RemoteFunctionServer(
|
|
||||||
override val context: Context,
|
|
||||||
val functionServer: FunctionServer
|
|
||||||
) : ContextAware, Responder {
|
|
||||||
|
|
||||||
private val plugin by lazy {
|
|
||||||
context.plugins.load(IOPlugin)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
override suspend fun respond(request: Envelope): Envelope {
|
|
||||||
require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" }
|
|
||||||
|
|
||||||
val inputFormat = plugin.getInputFormat<Any>(request.meta)
|
|
||||||
val outputFormat = plugin.getOutputFormat<Any>(request.meta)
|
|
||||||
|
|
||||||
val size = request.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1
|
|
||||||
|
|
||||||
val input = request.data?.read {
|
|
||||||
inputFormat.run {
|
|
||||||
List(size) {
|
|
||||||
readObject()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} ?: error("Input is empty")
|
|
||||||
|
|
||||||
val output = functionServer.callMany<Any, Any>(
|
|
||||||
request.meta,
|
|
||||||
input
|
|
||||||
)
|
|
||||||
|
|
||||||
return Envelope.invoke {
|
|
||||||
meta {
|
|
||||||
meta(request.meta)
|
|
||||||
}
|
|
||||||
type = RemoteFunctionClient.RESPONSE_TYPE
|
|
||||||
data {
|
|
||||||
outputFormat.run {
|
|
||||||
output.forEach {
|
|
||||||
writeObject(it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,28 +0,0 @@
|
|||||||
package hep.dataforge.io
|
|
||||||
|
|
||||||
import hep.dataforge.io.functions.FunctionServer
|
|
||||||
import hep.dataforge.io.functions.function
|
|
||||||
import hep.dataforge.meta.Meta
|
|
||||||
import hep.dataforge.names.Name
|
|
||||||
import kotlin.reflect.KClass
|
|
||||||
import kotlin.reflect.full.isSuperclassOf
|
|
||||||
|
|
||||||
|
|
||||||
fun IOPlugin.resolveIOFormatName(type: KClass<*>): Name {
|
|
||||||
return ioFormatFactories.find { it.type.isSuperclassOf(type) }?.name
|
|
||||||
?: error("Can't resolve IOFormat for type $type")
|
|
||||||
}
|
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> IOPlugin.generateFunctionMeta(functionName: String): Meta = Meta {
|
|
||||||
FunctionServer.FUNCTION_NAME_KEY put functionName
|
|
||||||
FunctionServer.INPUT_FORMAT_KEY put resolveIOFormatName(T::class).toString()
|
|
||||||
FunctionServer.OUTPUT_FORMAT_KEY put resolveIOFormatName(R::class).toString()
|
|
||||||
}
|
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> FunctionServer.function(
|
|
||||||
functionName: String
|
|
||||||
): (suspend (T) -> R) {
|
|
||||||
val plugin = context.plugins.get<IOPlugin>() ?: error("IO plugin not loaded")
|
|
||||||
val meta = plugin.generateFunctionMeta<T, R>(functionName)
|
|
||||||
return function(meta)
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user