Fixed function server
This commit is contained in:
parent
6d52db311d
commit
3ff3a5d09e
@ -3,7 +3,7 @@ plugins {
|
|||||||
id("scientifik.publish") version "0.2.0" apply false
|
id("scientifik.publish") version "0.2.0" apply false
|
||||||
}
|
}
|
||||||
|
|
||||||
val dataforgeVersion by extra("0.1.4-dev-3")
|
val dataforgeVersion by extra("0.1.4-dev-4")
|
||||||
|
|
||||||
val bintrayRepo by extra("dataforge")
|
val bintrayRepo by extra("dataforge")
|
||||||
val githubProject by extra("dataforge-core")
|
val githubProject by extra("dataforge-core")
|
||||||
|
@ -31,9 +31,13 @@ class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
|
|||||||
context.content<IOFormat<*>>(IOFormat.TYPE)
|
context.content<IOFormat<*>>(IOFormat.TYPE)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun resolveIOFormat(item: MetaItem<*>): IOFormat<*>? {
|
fun <T: Any> resolveIOFormat(item: MetaItem<*>, type: KClass<out T>): IOFormat<T>? {
|
||||||
val key = item.string ?: error("Not a string value!")
|
val key = item.string ?: error("Not a string value!")
|
||||||
return ioFormats[key]
|
return ioFormats[key]?.let {
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
if(it.type!= type) error("Format type ${it.type} is not the same as requested type ${type}")
|
||||||
|
else it as IOFormat<T>
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object : PluginFactory<IOPlugin> {
|
companion object : PluginFactory<IOPlugin> {
|
||||||
|
@ -7,6 +7,7 @@ import hep.dataforge.meta.Meta
|
|||||||
import hep.dataforge.meta.get
|
import hep.dataforge.meta.get
|
||||||
import hep.dataforge.names.asName
|
import hep.dataforge.names.asName
|
||||||
import hep.dataforge.names.plus
|
import hep.dataforge.names.plus
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -16,21 +17,25 @@ interface FunctionServer : ContextAware {
|
|||||||
/**
|
/**
|
||||||
* Call a function with given name and descriptor
|
* Call a function with given name and descriptor
|
||||||
*/
|
*/
|
||||||
suspend fun <T : Any, R : Any> call(meta: Meta, arg: T): R
|
suspend fun <T : Any, R : Any> call(meta: Meta, arg: T, inputType: KClass<out T>, outputType: KClass<out R>): R
|
||||||
|
|
||||||
suspend fun <T : Any, R : Any> callMany(
|
suspend fun <T : Any, R : Any> callMany(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
arg: List<T>
|
arg: List<T>,
|
||||||
|
inputType: KClass<out T>,
|
||||||
|
outputType: KClass<out R>
|
||||||
): List<R> = List(arg.size) {
|
): List<R> = List(arg.size) {
|
||||||
call<T, R>(meta, arg[it])
|
call<T, R>(meta, arg[it], inputType, outputType)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a generic suspended function with given name and descriptor
|
* Get a generic suspended function with given name and descriptor
|
||||||
*/
|
*/
|
||||||
fun <T : Any, R : Any> function(
|
fun <T : Any, R : Any> function(
|
||||||
meta: Meta
|
meta: Meta,
|
||||||
): (suspend (T) -> R) = { call(meta, it) }
|
inputType: KClass<out T>,
|
||||||
|
outputType: KClass<out R>
|
||||||
|
): (suspend (T) -> R) = { call(meta, it, inputType, outputType) }
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val FUNCTION_NAME_KEY = "function"
|
const val FUNCTION_NAME_KEY = "function"
|
||||||
@ -40,16 +45,31 @@ interface FunctionServer : ContextAware {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun <T : Any> IOPlugin.getInputFormat(meta: Meta): IOFormat<T> {
|
suspend inline fun <reified T : Any, reified R : Any> FunctionServer.call(meta: Meta, arg: T) =
|
||||||
|
call(meta, arg, T::class, R::class)
|
||||||
|
|
||||||
|
suspend inline fun <reified T : Any, reified R : Any> FunctionServer.callMany(meta: Meta, arg: List<T>) =
|
||||||
|
callMany(meta, arg, T::class, R::class)
|
||||||
|
|
||||||
|
inline fun <reified T : Any, reified R : Any> FunctionServer.function(meta: Meta) =
|
||||||
|
function(meta, T::class, R::class)
|
||||||
|
|
||||||
|
fun <T : Any> IOPlugin.getInputFormat(meta: Meta, type: KClass<out T>): IOFormat<T> {
|
||||||
return meta[FunctionServer.INPUT_FORMAT_KEY]?.let {
|
return meta[FunctionServer.INPUT_FORMAT_KEY]?.let {
|
||||||
resolveIOFormat(it) as IOFormat<T>
|
resolveIOFormat<T>(it, type)
|
||||||
} ?: error("Input format not resolved")
|
} ?: error("Input format not resolved")
|
||||||
}
|
}
|
||||||
|
|
||||||
fun <R : Any> IOPlugin.getOutputFormat(meta: Meta): IOFormat<R> {
|
fun <R : Any> IOPlugin.getOutputFormat(meta: Meta, type: KClass<out R>): IOFormat<R> {
|
||||||
return meta[FunctionServer.OUTPUT_FORMAT_KEY]?.let {
|
return meta[FunctionServer.OUTPUT_FORMAT_KEY]?.let {
|
||||||
resolveIOFormat(it) as IOFormat<R>
|
resolveIOFormat<R>(it, type)
|
||||||
} ?: error("Input format not resolved")
|
} ?: error("Input format not resolved")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline fun <reified T : Any> IOPlugin.getInputFormat(meta: Meta): IOFormat<T> =
|
||||||
|
getInputFormat(meta, T::class)
|
||||||
|
|
||||||
|
inline fun <reified R : Any> IOPlugin.getOutputFormat(meta: Meta): IOFormat<R> =
|
||||||
|
getOutputFormat(meta, R::class)
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,17 +6,19 @@ import hep.dataforge.io.*
|
|||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.get
|
import hep.dataforge.meta.get
|
||||||
import hep.dataforge.meta.int
|
import hep.dataforge.meta.int
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware {
|
class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware {
|
||||||
|
|
||||||
private fun <T : Any> IOPlugin.encodeOne(
|
private fun <T : Any> IOPlugin.encodeOne(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
value: T
|
value: T,
|
||||||
|
valueType: KClass<out T> = value::class
|
||||||
): Envelope = Envelope.invoke {
|
): Envelope = Envelope.invoke {
|
||||||
meta(meta)
|
meta(meta)
|
||||||
type = REQUEST_TYPE
|
type = REQUEST_TYPE
|
||||||
data {
|
data {
|
||||||
val inputFormat: IOFormat<T> = getInputFormat<T>(meta)
|
val inputFormat: IOFormat<T> = getInputFormat(meta, valueType)
|
||||||
inputFormat.run {
|
inputFormat.run {
|
||||||
writeThis(value)
|
writeThis(value)
|
||||||
}
|
}
|
||||||
@ -25,7 +27,8 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
|
|||||||
|
|
||||||
private fun <T : Any> IOPlugin.encodeMany(
|
private fun <T : Any> IOPlugin.encodeMany(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
values: List<T>
|
values: List<T>,
|
||||||
|
valueType: KClass<out T>
|
||||||
): Envelope = Envelope.invoke {
|
): Envelope = Envelope.invoke {
|
||||||
meta(meta)
|
meta(meta)
|
||||||
type = REQUEST_TYPE
|
type = REQUEST_TYPE
|
||||||
@ -33,7 +36,7 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
|
|||||||
SIZE_KEY to values.size
|
SIZE_KEY to values.size
|
||||||
}
|
}
|
||||||
data {
|
data {
|
||||||
val inputFormat: IOFormat<T> = getInputFormat<T>(meta)
|
val inputFormat: IOFormat<T> = getInputFormat(meta, valueType)
|
||||||
inputFormat.run {
|
inputFormat.run {
|
||||||
values.forEach {
|
values.forEach {
|
||||||
writeThis(it)
|
writeThis(it)
|
||||||
@ -42,14 +45,14 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun <R : Any> IOPlugin.decode(envelope: Envelope): List<R> {
|
private fun <R : Any> IOPlugin.decode(envelope: Envelope, valueType: KClass<out R>): List<R> {
|
||||||
require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" }
|
require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" }
|
||||||
val size = envelope.meta[SIZE_KEY].int ?: 1
|
val size = envelope.meta[SIZE_KEY].int ?: 1
|
||||||
|
|
||||||
return if (size == 0) {
|
return if (size == 0) {
|
||||||
emptyList()
|
emptyList()
|
||||||
} else {
|
} else {
|
||||||
val outputFormat: IOFormat<R> = getOutputFormat<R>(envelope.meta)
|
val outputFormat: IOFormat<R> = getOutputFormat(envelope.meta, valueType)
|
||||||
envelope.data?.read {
|
envelope.data?.read {
|
||||||
List<R>(size) {
|
List<R>(size) {
|
||||||
outputFormat.run {
|
outputFormat.run {
|
||||||
@ -66,20 +69,24 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
|
|||||||
|
|
||||||
override suspend fun <T : Any, R : Any> call(
|
override suspend fun <T : Any, R : Any> call(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
arg: T
|
arg: T,
|
||||||
|
inputType: KClass<out T>,
|
||||||
|
outputType: KClass<out R>
|
||||||
): R = plugin.run {
|
): R = plugin.run {
|
||||||
val request = encodeOne(meta, arg)
|
val request = encodeOne(meta, arg)
|
||||||
val response = responder.respond(request)
|
val response = responder.respond(request)
|
||||||
return decode<R>(response).first()
|
return decode<R>(response, outputType).first()
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <T : Any, R : Any> callMany(
|
override suspend fun <T : Any, R : Any> callMany(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
arg: List<T>
|
arg: List<T>,
|
||||||
|
inputType: KClass<out T>,
|
||||||
|
outputType: KClass<out R>
|
||||||
): List<R> = plugin.run {
|
): List<R> = plugin.run {
|
||||||
val request = encodeMany(meta, arg)
|
val request = encodeMany(meta, arg, inputType)
|
||||||
val response = responder.respond(request)
|
val response = responder.respond(request)
|
||||||
return decode<R>(response)
|
return decode<R>(response, outputType)
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
@ -23,7 +23,7 @@ class RemoteFunctionServer(
|
|||||||
require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" }
|
require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" }
|
||||||
|
|
||||||
val inputFormat = plugin.getInputFormat<Any>(request.meta)
|
val inputFormat = plugin.getInputFormat<Any>(request.meta)
|
||||||
val outputFormat = plugin.getOutputFormat<Any>(request.meta)
|
val outputFormat = plugin.getOutputFormat<Any>(request.meta)
|
||||||
|
|
||||||
val size = request.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1
|
val size = request.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import hep.dataforge.io.functions.FunctionServer
|
|||||||
import hep.dataforge.io.functions.FunctionServer.Companion.FUNCTION_NAME_KEY
|
import hep.dataforge.io.functions.FunctionServer.Companion.FUNCTION_NAME_KEY
|
||||||
import hep.dataforge.io.functions.FunctionServer.Companion.INPUT_FORMAT_KEY
|
import hep.dataforge.io.functions.FunctionServer.Companion.INPUT_FORMAT_KEY
|
||||||
import hep.dataforge.io.functions.FunctionServer.Companion.OUTPUT_FORMAT_KEY
|
import hep.dataforge.io.functions.FunctionServer.Companion.OUTPUT_FORMAT_KEY
|
||||||
|
import hep.dataforge.io.functions.function
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.buildMeta
|
import hep.dataforge.meta.buildMeta
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package hep.dataforge.meta
|
package hep.dataforge.meta
|
||||||
|
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
|
import kotlin.jvm.JvmName
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marker interface for classes with specifications
|
* Marker interface for classes with specifications
|
||||||
@ -64,4 +65,9 @@ fun <C : Specific, S : Specification<C>> S.createStyle(action: C.() -> Unit): Me
|
|||||||
fun <C : Specific> Specific.spec(
|
fun <C : Specific> Specific.spec(
|
||||||
spec: Specification<C>,
|
spec: Specification<C>,
|
||||||
key: Name? = null
|
key: Name? = null
|
||||||
): MutableMorphDelegate<Config, C> = MutableMorphDelegate(config, key) { spec.wrap(it) }
|
): MutableMorphDelegate<Config, C> = MutableMorphDelegate(config, key) { spec.wrap(it) }
|
||||||
|
|
||||||
|
fun <T: Specific> MetaItem<*>.spec(spec: Specification<T>): T? = node?.let { spec.wrap(it)}
|
||||||
|
|
||||||
|
@JvmName("configSpec")
|
||||||
|
fun <T: Specific> MetaItem<Config>.spec(spec: Specification<T>): T? = node?.let { spec.wrap(it)}
|
Loading…
Reference in New Issue
Block a user