Named return Name instead of String

This commit is contained in:
Alexander Nozik 2019-09-08 15:11:46 +03:00
parent 8bac218715
commit 10b8385324
34 changed files with 523 additions and 208 deletions

View File

@ -3,10 +3,13 @@ package hep.dataforge.context
import hep.dataforge.meta.EmptyMeta
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
import kotlin.reflect.KProperty
abstract class AbstractPlugin(override val meta: Meta = EmptyMeta) : Plugin {
private var _context: Context? = null
private val dependencies = ArrayList<PluginFactory<*>>()
override val context: Context
get() = _context ?: error("Plugin $tag is not attached")
@ -19,9 +22,23 @@ abstract class AbstractPlugin(override val meta: Meta = EmptyMeta) : Plugin {
this._context = null
}
override fun provideTop(target: String): Map<Name, Any> = emptyMap()
final override fun dependsOn(): List<PluginFactory<*>> = dependencies
companion object{
fun <T: Named> Collection<T>.toMap(): Map<Name, T> = associate { it.name.toName() to it }
/**
* Register plugin dependency and return a delegate which provides lazily initialized reference to dependent plugin
*/
protected fun <P : Plugin> require(factory: PluginFactory<P>): ReadOnlyProperty<AbstractPlugin, P> {
dependencies.add(factory)
return PluginDependencyDelegate(factory.type)
}
override fun provideTop(target: String): Map<Name, Any> = emptyMap()
}
fun <T : Named> Collection<T>.toMap(): Map<Name, T> = associate { it.name to it }
private class PluginDependencyDelegate<P : Plugin>(val type: KClass<out P>) : ReadOnlyProperty<AbstractPlugin, P> {
override fun getValue(thisRef: AbstractPlugin, property: KProperty<*>): P {
return thisRef.context.plugins[type] ?: error("Plugin with type $type not found")
}
}

View File

@ -2,8 +2,8 @@ package hep.dataforge.context
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.appendLeft
import hep.dataforge.names.toName
import hep.dataforge.names.asName
import hep.dataforge.names.plus
import hep.dataforge.provider.Provider
import hep.dataforge.provider.top
import hep.dataforge.values.Value
@ -27,7 +27,7 @@ import kotlin.jvm.JvmName
* @author Alexander Nozik
*/
open class Context(
final override val name: String,
final override val name: Name,
val parent: Context? = Global
) : Named, MetaRepr, Provider, CoroutineScope {
@ -45,7 +45,7 @@ open class Context(
/**
* Context logger
*/
val logger: KLogger = KotlinLogging.logger(name)
val logger: KLogger = KotlinLogging.logger(name.toString())
/**
* A [PluginManager] for current context
@ -64,7 +64,7 @@ open class Context(
override fun provideTop(target: String): Map<Name, Any> {
return when (target) {
Value.TYPE -> properties.sequence().toMap()
Plugin.PLUGIN_TARGET -> plugins.sequence(true).associateBy { it.name.toName() }
Plugin.PLUGIN_TARGET -> plugins.sequence(true).associateBy { it.name }
else -> emptyMap()
}
}
@ -118,14 +118,14 @@ fun Context.content(target: String): Map<Name, Any> = content<Any>(target)
@JvmName("typedContent")
inline fun <reified T : Any> Context.content(target: String): Map<Name, T> =
plugins.flatMap { plugin ->
plugin.top<T>(target).entries.map { (it.key.appendLeft(plugin.name)) to it.value }
plugin.top<T>(target).entries.map { (plugin.name + it.key) to it.value }
}.associate { it }
/**
* A global root context. Closing [Global] terminates the framework.
*/
object Global : Context("GLOBAL", null) {
object Global : Context("GLOBAL".asName(), null) {
/**
* Closing all contexts
*
@ -173,7 +173,7 @@ interface ContextAware {
val logger: KLogger
get() = if (this is Named) {
KotlinLogging.logger(context.name + "." + (this as Named).name)
KotlinLogging.logger((context.name + this.name).toString())
} else {
context.logger
}

View File

@ -2,11 +2,12 @@ package hep.dataforge.context
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.buildMeta
import hep.dataforge.names.toName
/**
* A convenience builder for context
*/
class ContextBuilder(var name: String = "@anonimous", val parent: Context = Global) {
class ContextBuilder(var name: String = "@anonymous", val parent: Context = Global) {
private val plugins = ArrayList<Plugin>()
private var meta = MetaBuilder()
@ -31,7 +32,7 @@ class ContextBuilder(var name: String = "@anonimous", val parent: Context = Glob
}
fun build(): Context {
return Context(name, parent).apply {
return Context(name.toName(), parent).apply {
this@ContextBuilder.plugins.forEach {
plugins.load(it)
}

View File

@ -15,6 +15,10 @@
*/
package hep.dataforge.context
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.isEmpty
/**
* Any object that have name
*
@ -27,10 +31,9 @@ interface Named {
*
* @return
*/
val name: String
val name: Name
companion object {
const val ANONYMOUS = ""
/**
* Get the name of given object. If object is Named its name is used,
@ -39,11 +42,11 @@ interface Named {
* @param obj
* @return
*/
fun nameOf(obj: Any): String {
fun nameOf(obj: Any): Name {
return if (obj is Named) {
obj.name
} else {
obj.toString()
obj.toString().asName()
}
}
}
@ -54,4 +57,4 @@ interface Named {
* @return
*/
val Named.isAnonymous: Boolean
get() = this.name == Named.ANONYMOUS
get() = this.name.isEmpty()

View File

@ -3,6 +3,8 @@ package hep.dataforge.context
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.buildMeta
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
/**
@ -37,7 +39,7 @@ interface Plugin : Named, ContextAware, Provider, MetaRepr {
*
* @return
*/
override val name: String get() = tag.name
override val name: Name get() = tag.name.toName()
/**
* Plugin dependencies which are required to attach this plugin. Plugin
@ -46,7 +48,7 @@ interface Plugin : Named, ContextAware, Provider, MetaRepr {
*
* @return
*/
fun dependsOn(): List<PluginFactory<*>> = emptyList()
fun dependsOn(): Collection<PluginFactory<*>>
/**
* Start this plugin and attach registration info to the context. This method

View File

@ -5,7 +5,6 @@ import hep.dataforge.names.appendLeft
import hep.dataforge.names.toName
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class ContextTest {
@ -26,7 +25,7 @@ class ContextTest {
val members = Global.content<Name>("test")
assertEquals(3, members.count())
members.forEach {
assertTrue{it.key == it.value.appendLeft("test")}
assertEquals(it.key, it.value.appendLeft("test"))
}
}

View File

@ -4,6 +4,7 @@ import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.Input
import kotlinx.io.core.buildPacket
import kotlinx.io.core.readBytes
import kotlin.math.min
/**
* A source of binary data
@ -30,6 +31,8 @@ interface RandomAccessBinary : Binary {
/**
* Read at most [size] of bytes starting at [from] offset from the beginning of the binary.
* This method could be called multiple times simultaneously.
*
* If size
*/
fun <R> read(from: UInt, size: UInt = UInt.MAX_VALUE, block: Input.() -> R): R
@ -60,7 +63,8 @@ class ArrayBinary(val array: ByteArray) : RandomAccessBinary {
override val size: ULong get() = array.size.toULong()
override fun <R> read(from: UInt, size: UInt, block: Input.() -> R): R {
return ByteReadPacket(array, from.toInt(), size.toInt()).block()
val theSize = min(size, array.size.toUInt() - from)
return ByteReadPacket(array, from.toInt(), theSize.toInt()).block()
}
}

View File

@ -2,6 +2,8 @@ package hep.dataforge.io
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.plus
import hep.dataforge.values.*
import kotlinx.io.core.Input
import kotlinx.io.core.Output
@ -9,7 +11,7 @@ import kotlinx.io.core.readText
import kotlinx.io.core.writeText
object BinaryMetaFormat : MetaFormat {
override val name: String = "bin"
override val name: Name = super.name + "bin"
override val key: Short = 0x4249//BI
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
@ -23,7 +25,7 @@ object BinaryMetaFormat : MetaFormat {
writeText(str)
}
private fun Output.writeValue(value: Value) {
fun Output.writeValue(value: Value) {
if (value.isList()) {
writeChar('L')
writeInt(value.list.size)
@ -92,7 +94,7 @@ object BinaryMetaFormat : MetaFormat {
}
@Suppress("UNCHECKED_CAST")
private fun Input.readMetaItem(): MetaItem<MetaBuilder> {
fun Input.readMetaItem(): MetaItem<MetaBuilder> {
return when (val keyChar = readByte().toChar()) {
'S' -> MetaItem.ValueItem(StringValue(readString()))
'N' -> MetaItem.ValueItem(Null)

View File

@ -77,6 +77,10 @@ class EnvelopeBuilder {
metaBuilder.apply(block)
}
fun meta(meta: Meta) {
metaBuilder.update(meta)
}
var type by metaBuilder.string(key = Envelope.ENVELOPE_TYPE_KEY)
var dataType by metaBuilder.string(key = Envelope.ENVELOPE_DATA_TYPE_KEY)
var description by metaBuilder.string(key = Envelope.ENVELOPE_DESCRIPTION_KEY)

View File

@ -3,9 +3,12 @@ package hep.dataforge.io
import hep.dataforge.context.Named
import hep.dataforge.io.EnvelopeFormat.Companion.ENVELOPE_FORMAT_TYPE
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.provider.Type
import kotlinx.io.core.Input
import kotlinx.io.core.Output
import kotlin.reflect.KClass
/**
* A partially read envelope with meta, but without data
@ -15,6 +18,9 @@ data class PartialEnvelope(val meta: Meta, val dataOffset: UInt, val dataSize: U
@Type(ENVELOPE_FORMAT_TYPE)
interface EnvelopeFormat : IOFormat<Envelope>, Named {
override val name: Name get() = "envelope".asName()
override val type: KClass<out Envelope> get() = Envelope::class
fun Input.readPartial(formats: Collection<MetaFormat> = IOPlugin.defaultMetaFormats): PartialEnvelope
fun Input.readEnvelope(formats: Collection<MetaFormat> = IOPlugin.defaultMetaFormats): Envelope

View File

@ -1,16 +1,31 @@
package hep.dataforge.io
import hep.dataforge.context.Named
import hep.dataforge.io.IOFormat.Companion.TYPE
import hep.dataforge.meta.MetaItem
import hep.dataforge.names.EmptyName
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.toName
import hep.dataforge.provider.Type
import hep.dataforge.values.Value
import kotlinx.io.core.*
import kotlinx.serialization.ImplicitReflectionSerializer
import kotlinx.serialization.KSerializer
import kotlinx.serialization.cbor.Cbor
import kotlinx.serialization.serializer
import kotlin.reflect.KClass
/**
* And interface for serialization facilities
*/
@Type(TYPE)
interface IOFormat<T : Any> {
interface IOFormat<T : Any> : Named {
/**
* Explicit type for dynamic type checks
*/
val type: KClass<out T>
fun Output.writeThis(obj: T)
fun Input.readThis(): T
@ -24,16 +39,46 @@ fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThi
object DoubleIOFormat : IOFormat<Double> {
override val name: Name = "double".asName()
override val type: KClass<out Double> get() = Double::class
override fun Output.writeThis(obj: Double) {
writeDouble(obj)
}
override fun Input.readThis(): Double = readDouble()
}
object ValueIOFormat : IOFormat<Value> {
override val name: Name = "value".asName()
override val type: KClass<out Value> get() = Value::class
override fun Output.writeThis(obj: Value) {
BinaryMetaFormat.run { writeValue(obj) }
}
override fun Input.readThis(): Value {
return (BinaryMetaFormat.run { readMetaItem() } as? MetaItem.ValueItem)?.value
?: error("The item is not a value")
}
}
/**
* Experimental
*/
class SerializerIOFormat<T : Any>(val serializer: KSerializer<T>) : IOFormat<T> {
@ImplicitReflectionSerializer
class SerializerIOFormat<T : Any>(
override val type: KClass<T>,
val serializer: KSerializer<T> = type.serializer()
) : IOFormat<T> {
override val name: Name = type.simpleName?.toName() ?: EmptyName
override fun Output.writeThis(obj: T) {
val bytes = Cbor.plain.dump(serializer, obj)
writeFully(bytes)

View File

@ -1,11 +1,11 @@
package hep.dataforge.io
import hep.dataforge.context.AbstractPlugin
import hep.dataforge.context.PluginFactory
import hep.dataforge.context.PluginTag
import hep.dataforge.context.content
import hep.dataforge.context.*
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.get
import kotlin.reflect.KClass
class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
@ -16,20 +16,36 @@ class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
}
fun metaFormat(key: Short): MetaFormat? = metaFormats.find { it.key == key }
fun metaFormat(name: String): MetaFormat? = metaFormats.find { it.name == name }
fun metaFormat(name: String): MetaFormat? = metaFormats.find { it.name.toString() == name }
override fun provideTop(target: String): Map<Name, Any> {
return when (target) {
MetaFormat.META_FORMAT_TYPE -> defaultMetaFormats.toMap()
EnvelopeFormat.ENVELOPE_FORMAT_TYPE -> defaultEnvelopeFormats.toMap()
IOFormat.TYPE -> defaultIOFormats.toMap()
else -> super.provideTop(target)
}
}
val ioFormats: Map<Name, IOFormat<*>> by lazy {
context.content<IOFormat<*>>(IOFormat.TYPE)
}
fun resolveIOFormat(item: MetaItem<*>): IOFormat<*>? {
val key = item.string ?: error("Not a string value!")
return ioFormats[key]
}
companion object : PluginFactory<IOPlugin> {
val defaultMetaFormats: List<MetaFormat> = listOf(JsonMetaFormat, BinaryMetaFormat)
val defaultEnvelopeFormats = listOf(TaggedEnvelopeFormat)
val defaultIOFormats = listOf(
DoubleIOFormat,
ValueIOFormat,
BinaryMetaFormat
)
override val tag: PluginTag = PluginTag("io", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out IOPlugin> = IOPlugin::class
override fun invoke(meta: Meta): IOPlugin = IOPlugin(meta)

View File

@ -6,7 +6,9 @@ import hep.dataforge.descriptors.ValueDescriptor
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBase
import hep.dataforge.meta.MetaItem
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import hep.dataforge.names.plus
import hep.dataforge.names.toName
import hep.dataforge.values.*
import kotlinx.io.core.Input
@ -21,7 +23,7 @@ import kotlin.collections.set
object JsonMetaFormat : MetaFormat {
override val name: String = "json"
override val name: Name = super.name + "json"
override val key: Short = 0x4a53//"JS"
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) {

View File

@ -4,17 +4,22 @@ import hep.dataforge.context.Named
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.io.MetaFormat.Companion.META_FORMAT_TYPE
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.provider.Type
import kotlinx.io.core.*
import kotlin.reflect.KClass
/**
* A format for meta serialization
*/
@Type(META_FORMAT_TYPE)
interface MetaFormat : IOFormat<Meta>, Named {
override val name: String
override val name: Name get() = "meta".asName()
val key: Short
override val type: KClass<out Meta> get() = Meta::class
override fun Output.writeThis(obj: Meta) {
writeMeta(obj, null)
}

View File

@ -1,5 +1,7 @@
package hep.dataforge.io
import hep.dataforge.names.Name
import hep.dataforge.names.plus
import kotlinx.io.core.*
@ -10,7 +12,7 @@ object TaggedEnvelopeFormat : EnvelopeFormat {
private const val END_SEQUENCE = "~#\r\n"
private const val TAG_SIZE = 26u
override val name: String get() = VERSION
override val name: Name = super.name + VERSION
private fun Tag.toBytes(): ByteReadPacket = buildPacket(24) {
writeText(START_SEQUENCE)

View File

@ -1,7 +1,12 @@
package hep.dataforge.io
package hep.dataforge.io.functions
import hep.dataforge.context.ContextAware
import hep.dataforge.io.functions.FunctionSpec
import hep.dataforge.io.IOFormat
import hep.dataforge.io.IOPlugin
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.names.asName
import hep.dataforge.names.plus
/**
@ -11,24 +16,40 @@ interface FunctionServer : ContextAware {
/**
* Call a function with given name and descriptor
*/
suspend fun <T : Any, R : Any> call(name: String, spec: FunctionSpec<T, R>, arg: T): R
suspend fun <T : Any, R : Any> call(meta: Meta, arg: T): R
suspend fun <T : Any, R : Any> callMany(
name: String,
spec: FunctionSpec<T, R>,
meta: Meta,
arg: List<T>
): List<R> = List(arg.size) {
call(name, spec, arg[it])
call<T, R>(meta, arg[it])
}
/**
* Get a generic suspended function with given name and descriptor
*/
fun <T : Any, R : Any> get(
name: String,
spec: FunctionSpec<T, R>
): (suspend (T) -> R) =
{ call(name, spec, it) }
fun <T : Any, R : Any> function(
meta: Meta
): (suspend (T) -> R) = { call(meta, it) }
companion object {
const val FUNCTION_NAME_KEY = "function"
val FORMAT_KEY = "format".asName()
val INPUT_FORMAT_KEY = FORMAT_KEY + "input"
val OUTPUT_FORMAT_KEY = FORMAT_KEY + "output"
}
}
fun <T : Any> IOPlugin.getInputFormat(meta: Meta): IOFormat<T> {
return meta[FunctionServer.INPUT_FORMAT_KEY]?.let {
resolveIOFormat(it) as IOFormat<T>
} ?: error("Input format not resolved")
}
fun <R : Any> IOPlugin.getOutputFormat(meta: Meta): IOFormat<R> {
return meta[FunctionServer.OUTPUT_FORMAT_KEY]?.let {
resolveIOFormat(it) as IOFormat<R>
} ?: error("Input format not resolved")
}

View File

@ -1,9 +0,0 @@
package hep.dataforge.io.functions
import hep.dataforge.io.IOFormat
import hep.dataforge.meta.MetaRepr
interface FunctionSpec<T : Any, R : Any>: MetaRepr {
val inputFormat: IOFormat<T>
val outputFormat: IOFormat<R>
}

View File

@ -1,58 +0,0 @@
package hep.dataforge.io.functions
import hep.dataforge.context.AbstractPlugin
import hep.dataforge.context.PluginFactory
import hep.dataforge.context.PluginTag
import hep.dataforge.io.DoubleIOFormat
import hep.dataforge.io.IOFormat
import hep.dataforge.io.IOPlugin
import hep.dataforge.meta.Meta
import hep.dataforge.meta.buildMeta
import kotlin.reflect.KClass
class FunctionsPlugin(meta: Meta) : AbstractPlugin(meta) {
override val tag: PluginTag get() = Companion.tag
override fun dependsOn(): List<PluginFactory<*>> = listOf(IOPlugin)
private val specs: Collection<FunctionSpec<out Any, out Any>> = listOf(
DoubleToDoubleFunctionSpec
)
fun resolve(meta: Meta): FunctionSpec<*, *>? {
return specs.find { it.toMeta() == meta }
}
// fun <T : Any, R : Any> resolve(inputType: KClass<out T>, outputType: KClass<out R>): FunctionSpec<T, R> {
//
// }
companion object : PluginFactory<FunctionsPlugin> {
override val tag: PluginTag = PluginTag("io.functions", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out FunctionsPlugin> = FunctionsPlugin::class
override fun invoke(meta: Meta): FunctionsPlugin = FunctionsPlugin(meta)
}
}
object DoubleToDoubleFunctionSpec : FunctionSpec<Double, Double> {
override val inputFormat: IOFormat<Double> get() = DoubleIOFormat
override val outputFormat: IOFormat<Double> get() = DoubleIOFormat
override fun toMeta(): Meta = buildMeta {
"input" to "Double"
"output" to "Double"
}
}
//suspend inline fun <reified T : Any, reified R : Any> FunctionServer.call(name: String, arg: T): R {
// val plugin = context.plugins.load(FunctionsPlugin)
// val spec = plugin.resolve(T::class, R::class)
// return call(name, spec, arg)
//}
//
//inline operator fun <reified T : Any, reified R : Any> FunctionServer.get(name: String): (suspend (T) -> R) {
// val plugin = context.plugins.load(FunctionsPlugin)
// val spec = plugin.resolve(T::class, R::class)
// return get(name, spec)
//}

View File

@ -1,39 +1,40 @@
package hep.dataforge.io
package hep.dataforge.io.functions
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.io.functions.FunctionSpec
import hep.dataforge.io.functions.FunctionsPlugin
import hep.dataforge.io.*
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.int
import kotlin.reflect.KClass
class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware {
private fun <T : Any> encodeOne(name: String, spec: FunctionSpec<T, *>, value: T): Envelope =
Envelope.build {
private fun <T : Any> IOPlugin.encodeOne(
meta: Meta,
value: T
): Envelope = Envelope.build {
meta(meta)
type = REQUEST_TYPE
meta {
FUNCTION_NAME_KEY to name
FUNCTION_SPEC_KEY to spec.toMeta()
}
data {
spec.inputFormat.run {
val inputFormat: IOFormat<T> = getInputFormat<T>(meta)
inputFormat.run {
writeThis(value)
}
}
}
private fun <T : Any> encodeMany(name: String, spec: FunctionSpec<T, *>, values: List<T>): Envelope =
Envelope.build {
private fun <T : Any> IOPlugin.encodeMany(
meta: Meta,
values: List<T>
): Envelope = Envelope.build {
meta(meta)
type = REQUEST_TYPE
meta {
FUNCTION_NAME_KEY to name
FUNCTION_SPEC_KEY to spec.toMeta()
SIZE_KEY to values.size
}
data {
spec.inputFormat.run {
val inputFormat: IOFormat<T> = getInputFormat<T>(meta)
inputFormat.run {
values.forEach {
writeThis(it)
}
@ -41,16 +42,17 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
}
}
private fun <R : Any> decode(spec: FunctionSpec<*, R>, envelope: Envelope): List<R> {
private fun <R : Any> IOPlugin.decode(envelope: Envelope): List<R> {
require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" }
val size = envelope.meta[SIZE_KEY].int ?: 1
return if (size == 0) {
emptyList()
} else {
val outputFormat: IOFormat<R> = getOutputFormat<R>(envelope.meta)
envelope.data?.read {
List<R>(size) {
spec.outputFormat.run {
outputFormat.run {
readThis()
}
}
@ -58,43 +60,32 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
}
}
private val plugin by lazy {
context.plugins.load(IOPlugin)
}
override suspend fun <T : Any, R : Any> call(
name: String,
spec: FunctionSpec<T, R>,
meta: Meta,
arg: T
): R {
val request = encodeOne(name, spec, arg)
): R = plugin.run {
val request = encodeOne(meta, arg)
val response = responder.respond(request)
return decode(spec, response).first()
return decode<R>(response).first()
}
override suspend fun <T : Any, R : Any> callMany(
name: String,
spec: FunctionSpec<T, R>,
meta: Meta,
arg: List<T>
): List<R> {
val request = encodeMany(name, spec, arg)
): List<R> = plugin.run {
val request = encodeMany(meta, arg)
val response = responder.respond(request)
return decode(spec, response)
}
private val plugin by lazy {
context.plugins.load(FunctionsPlugin)
}
fun <T : Any, R : Any> resolveSpec(
inputType: KClass<out T>,
outputType: KClass<out R>
): FunctionSpec<T, R> {
return plugin.resolve(inputType, outputType)
return decode<R>(response)
}
companion object {
const val REQUEST_TYPE = "function.request"
const val RESPONSE_TYPE = "function.response"
const val FUNCTION_NAME_KEY = "function"
const val SIZE_KEY = "size"
const val FUNCTION_SPEC_KEY = "spec"
}
}

View File

@ -2,11 +2,12 @@ package hep.dataforge.io.functions
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.io.*
import hep.dataforge.io.Envelope
import hep.dataforge.io.IOPlugin
import hep.dataforge.io.Responder
import hep.dataforge.io.type
import hep.dataforge.meta.get
import hep.dataforge.meta.int
import hep.dataforge.meta.node
import hep.dataforge.meta.string
class RemoteFunctionServer(
override val context: Context,
@ -14,39 +15,38 @@ class RemoteFunctionServer(
) : ContextAware, Responder {
private val plugin by lazy {
context.plugins.load(FunctionsPlugin)
context.plugins.load(IOPlugin)
}
override suspend fun respond(request: Envelope): Envelope {
require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" }
val functionName = request.meta[RemoteFunctionClient.FUNCTION_NAME_KEY].string ?: ""
@Suppress("UNCHECKED_CAST") val spec = request.meta[RemoteFunctionClient.FUNCTION_SPEC_KEY].node?.let {
plugin.resolve(it) as FunctionSpec<Any, Any>
} ?: error("Function specification not found")
val inputFormat = plugin.getInputFormat<Any>(request.meta)
val outputFormat = plugin.getOutputFormat<Any>(request.meta)
val size = request
.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1
val size = request.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1
val input = request.data?.read {
spec.inputFormat.run {
inputFormat.run {
List(size) {
readThis()
}
}
} ?: error("Input is empty")
val output = functionServer.callMany<Any, Any>(functionName, spec, input)
val output = functionServer.callMany<Any, Any>(
request.meta,
input
)
return Envelope.build {
type = RemoteFunctionClient.RESPONSE_TYPE
meta {
RemoteFunctionClient.FUNCTION_NAME_KEY to functionName
RemoteFunctionClient.FUNCTION_SPEC_KEY to spec.toMeta()
RemoteFunctionClient.SIZE_KEY to output.size
meta(request.meta)
}
type = RemoteFunctionClient.RESPONSE_TYPE
data {
spec.outputFormat.run {
outputFormat.run {
output.forEach {
writeThis(it)
}

View File

@ -6,6 +6,7 @@ import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import kotlin.math.min
@ExperimentalUnsignedTypes
class FileBinary(val path: Path, private val offset: UInt = 0u, size: ULong? = null) : RandomAccessBinary {
@ -14,7 +15,8 @@ class FileBinary(val path: Path, private val offset: UInt = 0u, size: ULong? = n
override fun <R> read(from: UInt, size: UInt, block: Input.() -> R): R {
FileChannel.open(path, StandardOpenOption.READ).use {
val buffer = it.map(FileChannel.MapMode.READ_ONLY, (from + offset).toLong(), size.toLong())
val theSize: UInt = min(size, Files.size(path).toUInt() - offset)
val buffer = it.map(FileChannel.MapMode.READ_ONLY, (from + offset).toLong(), theSize.toLong())
return ByteReadPacket(buffer).block()
}
}

View File

@ -0,0 +1,34 @@
package hep.dataforge.io
import hep.dataforge.io.functions.FunctionServer
import hep.dataforge.io.functions.FunctionServer.Companion.FUNCTION_NAME_KEY
import hep.dataforge.io.functions.FunctionServer.Companion.INPUT_FORMAT_KEY
import hep.dataforge.io.functions.FunctionServer.Companion.OUTPUT_FORMAT_KEY
import hep.dataforge.meta.Meta
import hep.dataforge.meta.buildMeta
import hep.dataforge.names.Name
import kotlin.reflect.KClass
import kotlin.reflect.full.isSuperclassOf
inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
return ioFormats.values.find { it.type.isSuperclassOf(T::class) } as IOFormat<T>?
}
fun IOPlugin.resolveIOFormatName(type: KClass<*>): Name {
return ioFormats.entries.find { it.value.type.isSuperclassOf(type) }?.key
?: error("Can't resolve IOFormat for type $type")
}
inline fun <reified T : Any, reified R : Any> IOPlugin.generateFunctionMeta(functionName: String): Meta = buildMeta {
FUNCTION_NAME_KEY to functionName
INPUT_FORMAT_KEY to resolveIOFormatName(T::class)
OUTPUT_FORMAT_KEY to resolveIOFormatName(R::class)
}
inline fun <reified T : Any, reified R : Any> FunctionServer.function(
functionName: String
): (suspend (T) -> R) {
val plugin = context.plugins.get<IOPlugin>() ?: error("IO plugin not loaded")
val meta = plugin.generateFunctionMeta<T, R>(functionName)
return function(meta)
}

View File

@ -0,0 +1,61 @@
package hep.dataforge.io.tcp
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.io.*
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.io.streams.asInput
import kotlinx.io.streams.asOutput
import java.net.Socket
import java.util.concurrent.Executors
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.seconds
@ExperimentalTime
class EnvelopeClient(
override val context: Context,
val host: String,
val port: Int,
val timeout: Duration = 2.seconds,
val format: EnvelopeFormat = TaggedEnvelopeFormat
) : Responder, ContextAware {
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private var socket: Socket? = null
private fun getSocket(): Socket {
val socket = socket ?: Socket(host, port).also { this.socket = it }
return if (socket.isConnected) {
socket
} else {
Socket(host, port).also { this.socket = it }
}
}
suspend fun close() {
respond(
Envelope.build {
type = EnvelopeServer.SHUTDOWN_ENVELOPE_TYPE
}
)
}
override suspend fun respond(request: Envelope): Envelope = withContext(dispatcher) {
withTimeout(timeout.toLongMilliseconds()) {
val socket = getSocket()
val input = socket.getInputStream().asInput()
val output = socket.getOutputStream().asOutput()
format.run {
output.writeThis(request)
logger.debug { "Sent request with type ${request.type} to ${socket.remoteSocketAddress}" }
val res = input.readThis()
logger.debug { "Received response with type ${res.type} from ${socket.remoteSocketAddress}" }
return@withTimeout res
}
}
}
}

View File

@ -0,0 +1,70 @@
package hep.dataforge.io.tcp
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.io.EnvelopeFormat
import hep.dataforge.io.Responder
import hep.dataforge.io.TaggedEnvelopeFormat
import hep.dataforge.io.type
import kotlinx.coroutines.*
import kotlinx.io.streams.asInput
import kotlinx.io.streams.asOutput
import java.net.ServerSocket
import java.net.Socket
class EnvelopeServer(
override val context: Context,
val port: Int,
val responder: Responder,
val scope: CoroutineScope,
val format: EnvelopeFormat = TaggedEnvelopeFormat
) : ContextAware {
private var job: Job? = null
fun start() {
if (job == null) {
logger.info { "Starting envelope server on port $port" }
val job = scope.launch(Dispatchers.IO) {
val serverSocket = ServerSocket(port)
//TODO add handshake and format negotiation
while (!serverSocket.isClosed) {
val socket = serverSocket.accept()
logger.info { "Accepted connection from ${socket.remoteSocketAddress}" }
readSocket(socket)
}
}
}
}
fun stop() {
logger.info { "Stopping envelope server on port $port" }
job?.cancel()
job = null
}
private fun CoroutineScope.readSocket(socket: Socket) {
val input = socket.getInputStream().asInput()
val output = socket.getOutputStream().asOutput()
format.run {
launch {
while (isActive && socket.isConnected) {
val request = input.readThis()
logger.debug { "Accepted request with type ${request.type} from ${socket.remoteSocketAddress}" }
if (request.type == SHUTDOWN_ENVELOPE_TYPE) {
//Echo shutdown command
logger.info { "Accepted graceful shutdown signal from ${socket.inetAddress}" }
socket.close()
cancel("Graceful connection shutdown requested by client")
}
val response = responder.respond(request)
output.writeThis(response)
}
}
}
}
companion object {
const val SHUTDOWN_ENVELOPE_TYPE = "@shutdown"
}
}

View File

@ -0,0 +1,58 @@
package hep.dataforge.io.tcp
import hep.dataforge.context.Global
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import kotlinx.coroutines.GlobalScope
import org.junit.AfterClass
import org.junit.BeforeClass
import kotlin.time.ExperimentalTime
object EchoResponder : Responder {
override suspend fun respond(request: Envelope): Envelope = request
}
@ExperimentalTime
class EnvelopeServerTest {
companion object {
@JvmStatic
val echoEnvelopeServer = EnvelopeServer(Global, 7778, EchoResponder, GlobalScope)
@BeforeClass
@JvmStatic
fun start() {
echoEnvelopeServer.start()
}
@AfterClass
@JvmStatic
fun close() {
echoEnvelopeServer.stop()
}
}
// @Test
// fun doEchoTest() {
// val client = EnvelopeClient(Global, host = "localhost", port = 7778)
// val request = Envelope.build {
// type = "test.echo"
// meta {
// "test.value" to 22
// }
// data {
// writeDouble(22.7)
// }
// }
// val response = runBlocking {
// client.respond(request)
// }
//
// assertEquals(request.meta, response.meta)
// assertEquals(request.data, response.data)
//
// runBlocking {
// client.close()
// }
// }
}

View File

@ -1,17 +1,18 @@
package hep.dataforge.workspace
import hep.dataforge.data.*
import hep.dataforge.data.DataNode
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.node
import hep.dataforge.names.Name
import kotlin.reflect.KClass
//data class TaskEnv(val workspace: Workspace, val model: TaskModel)
class GenericTask<R : Any>(
override val name: String,
override val name: Name,
override val type: KClass<out R>,
override val descriptor: NodeDescriptor,
private val modelTransform: TaskModelBuilder.(Meta) -> Unit,

View File

@ -3,10 +3,10 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.context.content
import hep.dataforge.context.toMap
import hep.dataforge.data.DataNode
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.toName
/**
@ -20,7 +20,7 @@ class SimpleWorkspace(
) : Workspace {
override val tasks: Map<Name, Task<*>> by lazy {
context.content<Task<*>>(Task.TYPE) + tasks.associateBy { it.name.toName() }
context.content<Task<*>>(Task.TYPE) + tasks.toMap()
}
companion object {

View File

@ -23,7 +23,7 @@ import hep.dataforge.workspace.TaskModel.Companion.MODEL_TARGET_KEY
* @param dependencies a list of direct dependencies for this task
*/
data class TaskModel(
val name: String,
val name: Name,
val meta: Meta,
val dependencies: Collection<Dependency>
) : MetaRepr {
@ -66,7 +66,7 @@ annotation class TaskBuildScope
* A builder for [TaskModel]
*/
@TaskBuildScope
class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
class TaskModelBuilder(val name: Name, meta: Meta = EmptyMeta) {
/**
* Meta for current task. By default uses the whole input meta
*/
@ -78,10 +78,13 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
/**
* Add dependency for a task defined in a workspace and resolved by
*/
fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) {
dependencies.add(WorkspaceTaskDependency(name.toName(), meta, placement))
fun dependsOn(name: Name, meta: Meta = this.meta, placement: Name = EmptyName) {
dependencies.add(WorkspaceTaskDependency(name, meta, placement))
}
fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) =
dependsOn(name.toName(),meta,placement)
fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) {
dependencies.add(DirectTaskDependency(task, meta, placement))
}

View File

@ -1,8 +1,8 @@
package hep.dataforge.workspace
import hep.dataforge.context.AbstractPlugin
import hep.dataforge.context.toMap
import hep.dataforge.names.Name
import hep.dataforge.names.toName
/**
* An abstract plugin with some additional boilerplate to effectively work with workspace context
@ -12,7 +12,7 @@ abstract class WorkspacePlugin : AbstractPlugin() {
override fun provideTop(target: String): Map<Name, Any> {
return when(target){
Task.TYPE -> tasks.associateBy { it.name.toName() }
Task.TYPE -> tasks.toMap()
else -> emptyMap()
}
}

View File

@ -8,6 +8,7 @@ import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.names.EmptyName
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.toName
import kotlin.reflect.KClass
@ -176,7 +177,7 @@ class TaskBuilder(val name: String) {
internal fun build(): GenericTask<Any> =
GenericTask(
name,
name.asName(),
Any::class,
descriptor ?: NodeDescriptor.empty(),
modelTransform

Binary file not shown.

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.1.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

22
gradlew vendored
View File

@ -1,5 +1,21 @@
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m"'
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
@ -109,8 +125,8 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

18
gradlew.bat vendored
View File

@ -1,3 +1,19 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@ -14,7 +30,7 @@ set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m"
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome