Front matter envelope format

This commit is contained in:
Alexander Nozik 2019-10-28 19:37:15 +03:00
parent 6cfaac74ae
commit 728da96eff
19 changed files with 254 additions and 75 deletions
dataforge-io
dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta
dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace
settings.gradle.kts

@ -0,0 +1,12 @@
plugins {
id("scientifik.jvm")
}
description = "YAML meta IO"
dependencies {
api(project(":dataforge-io"))
api("org.yaml:snakeyaml:1.25")
testImplementation(kotlin("test"))
testImplementation(kotlin("test-junit"))
}

@ -0,0 +1,92 @@
package hep.dataforge.io.yaml
import hep.dataforge.context.Context
import hep.dataforge.io.*
import hep.dataforge.meta.EmptyMeta
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import kotlinx.io.core.*
import kotlinx.serialization.toUtf8Bytes
class FrontMatterEnvelopeFormat(
val io: IOPlugin,
val metaType: String = YamlMetaFormat.name.toString(),
meta: Meta = EmptyMeta
) : EnvelopeFormat {
val metaFormat = io.metaFormat(metaType, meta)
?: error("Meta format with type $metaType could not be resolved in $io")
override fun Input.readPartial(): PartialEnvelope {
var line: String = ""
var offset = 0u
do {
line = readUTF8Line() ?: error("Input does not contain front matter separator")
offset += line.toUtf8Bytes().size.toUInt()
} while (!line.startsWith(SEPARATOR))
val readMetaFormat =
metaTypeRegex.matchEntire(line)?.groupValues?.first()
?.let { io.metaFormat(it) } ?: YamlMetaFormat.default
val metaBlock = buildPacket {
do {
line = readUTF8Line() ?: error("Input does not contain closing front matter separator")
appendln(line)
offset += line.toUtf8Bytes().size.toUInt()
} while (!line.startsWith(SEPARATOR))
}
val meta = readMetaFormat.fromBytes(metaBlock)
return PartialEnvelope(meta, offset, null)
}
override fun Input.readObject(): Envelope {
var line: String = ""
do {
line = readUTF8Line() ?: error("Input does not contain front matter separator")
} while (!line.startsWith(SEPARATOR))
val readMetaFormat =
metaTypeRegex.matchEntire(line)?.groupValues?.first()
?.let { io.metaFormat(it) } ?: YamlMetaFormat.default
val metaBlock = buildPacket {
do {
appendln(readUTF8Line() ?: error("Input does not contain closing front matter separator"))
} while (!line.startsWith(SEPARATOR))
}
val meta = readMetaFormat.fromBytes(metaBlock)
val bytes = readBytes()
val data = bytes.asBinary()
return SimpleEnvelope(meta, data)
}
override fun Output.writeObject(obj: Envelope) {
writeText("$SEPARATOR\r\n")
metaFormat.run { writeObject(obj.meta) }
writeText("$SEPARATOR\r\n")
obj.data?.read { copyTo(this@writeObject) }
}
companion object : EnvelopeFormatFactory {
const val SEPARATOR = "---"
private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
override fun invoke(meta: Meta, context: Context): EnvelopeFormat {
val metaFormatName: String = meta["name"].string ?: YamlMetaFormat.name.toString()
return FrontMatterEnvelopeFormat(context.io, metaFormatName, meta)
}
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
val line = input.readUTF8Line(3, 30)
return if (line != null && line.startsWith("---")) {
invoke()
} else {
null
}
}
}
}

@ -0,0 +1,54 @@
package hep.dataforge.io.yaml
import hep.dataforge.context.Context
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.io.MetaFormat
import hep.dataforge.io.MetaFormatFactory
import hep.dataforge.meta.Meta
import hep.dataforge.meta.toMap
import hep.dataforge.meta.toMeta
import hep.dataforge.names.Name
import hep.dataforge.names.plus
import kotlinx.io.core.Input
import kotlinx.io.core.Output
import kotlinx.io.core.readUByte
import kotlinx.io.core.writeText
import org.yaml.snakeyaml.Yaml
import java.io.InputStream
private class InputAsStream(val input: Input) : InputStream() {
override fun read(): Int {
if (input.endOfInput) return -1
return input.readUByte().toInt()
}
override fun close() {
input.close()
}
}
private fun Input.asStream() = InputAsStream(this)
class YamlMetaFormat(val meta: Meta) : MetaFormat {
private val yaml = Yaml()
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) {
val string = yaml.dump(meta.toMap(descriptor))
writeText(string)
}
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
val map: Map<String, Any?> = yaml.load(asStream())
return map.toMeta(descriptor)
}
companion object : MetaFormatFactory {
val default = YamlMetaFormat()
override fun invoke(meta: Meta, context: Context): MetaFormat = YamlMetaFormat(meta)
override val name: Name = super.name + "yaml"
override val key: Short = 0x594d //YM
}
}

@ -0,0 +1,43 @@
package hep.dataforge.io.yaml
import hep.dataforge.io.parse
import hep.dataforge.io.toString
import hep.dataforge.meta.Meta
import hep.dataforge.meta.buildMeta
import hep.dataforge.meta.get
import hep.dataforge.meta.seal
import org.junit.Test
import kotlin.test.assertEquals
class YamlMetaFormatTest{
@Test
fun testYamlMetaFormat(){
val meta = buildMeta {
"a" to 22
"node" to {
"b" to "DDD"
"c" to 11.1
"d" to {
"d1" to {
"d11" to "aaa"
"d12" to "bbb"
}
"d2" to 2
}
"array" to doubleArrayOf(1.0, 2.0, 3.0)
}
}
val string = meta.toString(YamlMetaFormat)
println(string)
val result = YamlMetaFormat.parse(string)
assertEquals<Meta>(meta, meta.seal())
meta.items.keys.forEach {
if (meta[it] != result[it]) error("${meta[it]} != ${result[it]}")
}
assertEquals(meta, result)
}
}

@ -76,13 +76,13 @@ fun ByteArray.asBinary() = ArrayBinary(this)
*/
fun <T : Any> Binary.readWith(format: IOFormat<T>): T = format.run {
read {
readThis()
readObject()
}
}
fun <T : Any> IOFormat<T>.writeBinary(obj: T): Binary {
val packet = buildPacket {
writeThis(obj)
writeObject(obj)
}
return ArrayBinary(packet.readBytes())
}

@ -85,7 +85,7 @@ object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
writeValue(item.value)
}
is MetaItem.NodeItem -> {
writeThis(item.node)
writeObject(item.node)
}
}
}

@ -20,9 +20,9 @@ data class PartialEnvelope(val meta: Meta, val dataOffset: UInt, val dataSize: U
interface EnvelopeFormat : IOFormat<Envelope> {
fun Input.readPartial(): PartialEnvelope
override fun Input.readThis(): Envelope
override fun Input.readObject(): Envelope
override fun Output.writeThis(obj: Envelope)
override fun Output.writeObject(obj: Envelope)
}
@Type(ENVELOPE_FORMAT_TYPE)

@ -23,27 +23,27 @@ import kotlin.reflect.KClass
* And interface for reading and writing objects into with IO streams
*/
interface IOFormat<T : Any> {
fun Output.writeThis(obj: T)
fun Input.readThis(): T
fun Output.writeObject(obj: T)
fun Input.readObject(): T
}
fun <T : Any> Input.readWith(format: IOFormat<T>): T = format.run { readThis() }
fun <T : Any> Output.writeWith(format: IOFormat<T>, obj: T) = format.run { writeThis(obj) }
fun <T : Any> Input.readWith(format: IOFormat<T>): T = format.run { readObject() }
fun <T : Any> Output.writeWith(format: IOFormat<T>, obj: T) = format.run { writeObject(obj) }
class ListIOFormat<T : Any>(val format: IOFormat<T>) : IOFormat<List<T>> {
override fun Output.writeThis(obj: List<T>) {
override fun Output.writeObject(obj: List<T>) {
writeInt(obj.size)
format.run {
obj.forEach {
writeThis(it)
writeObject(it)
}
}
}
override fun Input.readThis(): List<T> {
override fun Input.readObject(): List<T> {
val size = readInt()
return format.run {
List(size) { readThis() }
List(size) { readObject() }
}
}
}
@ -79,32 +79,8 @@ inline fun buildPacketWithoutPool(headerSizeHint: Int = 0, block: BytePacketBuil
return builder.build()
}
//@Suppress("EXPERIMENTAL_API_USAGE", "EXPERIMENTAL_OVERRIDE")
//internal fun <R> Input.useAtMost(most: Int, reader: Input.() -> R): R {
// val limitedInput: Input = object : AbstractInput(
// IoBuffer.Pool.borrow(),
// remaining = most.toLong(),
// pool = IoBuffer.Pool
// ) {
// var read = 0
// override fun closeSource() {
// this@useAtMost.close()
// }
//
// override fun fill(): IoBuffer? {
// if (read >= most) return null
// return IoBuffer.Pool.fill {
// reserveEndGap(IoBuffer.ReservedSize)
// read += this@useAtMost.peekTo(this, max = most - read)
// }
// }
//
// }
// return limitedInput.reader()
//}
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) }
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes()
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeObject(obj) }
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeObject(obj) }.readBytes()
fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T {
//= ByteReadPacket(array).readThis()
val byteArrayInput: Input = object : AbstractInput(
@ -129,7 +105,7 @@ fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T {
}
}
return byteArrayInput.readThis()
return byteArrayInput.readObject()
}
object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
@ -139,11 +115,11 @@ object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
override val type: KClass<out Double> get() = Double::class
override fun Output.writeThis(obj: Double) {
override fun Output.writeObject(obj: Double) {
writeDouble(obj)
}
override fun Input.readThis(): Double = readDouble()
override fun Input.readObject(): Double = readDouble()
}
object ValueIOFormat : IOFormat<Value>, IOFormatFactory<Value> {
@ -153,11 +129,11 @@ object ValueIOFormat : IOFormat<Value>, IOFormatFactory<Value> {
override val type: KClass<out Value> get() = Value::class
override fun Output.writeThis(obj: Value) {
override fun Output.writeObject(obj: Value) {
BinaryMetaFormat.run { writeValue(obj) }
}
override fun Input.readThis(): Value {
override fun Input.readObject(): Value {
return (BinaryMetaFormat.run { readMetaItem() } as? MetaItem.ValueItem)?.value
?: error("The item is not a value")
}
@ -175,12 +151,12 @@ class SerializerIOFormat<T : Any>(
//override val name: Name = type.simpleName?.toName() ?: EmptyName
override fun Output.writeThis(obj: T) {
override fun Output.writeObject(obj: T) {
val bytes = Cbor.plain.dump(serializer, obj)
writeFully(bytes)
}
override fun Input.readThis(): T {
override fun Input.readObject(): T {
//FIXME reads the whole input
val bytes = readBytes()
return Cbor.plain.load(serializer, bytes)

@ -16,11 +16,11 @@ import kotlin.reflect.KClass
interface MetaFormat : IOFormat<Meta> {
override fun Output.writeThis(obj: Meta) {
override fun Output.writeObject(obj: Meta) {
writeMeta(obj, null)
}
override fun Input.readThis(): Meta = readMeta()
override fun Input.readObject(): Meta = readMeta()
fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor? = null)
fun Input.readMeta(descriptor: NodeDescriptor? = null): Meta
@ -42,23 +42,23 @@ interface MetaFormatFactory : IOFormatFactory<Meta> {
}
fun Meta.toString(format: MetaFormat): String = buildPacket {
format.run { writeThis(this@toString) }
format.run { writeObject(this@toString) }
}.readText()
fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory())
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat.default): ByteReadPacket = buildPacket {
format.run { writeThis(this@toBytes) }
format.run { writeObject(this@toBytes) }
}
fun MetaFormat.parse(str: String): Meta {
return ByteReadPacket(str.toByteArray()).readThis()
return ByteReadPacket(str.toByteArray()).readObject()
}
fun MetaFormatFactory.parse(str: String): Meta = invoke().parse(str)
fun MetaFormat.fromBytes(packet: ByteReadPacket): Meta {
return packet.readThis()
return packet.readObject()
}

@ -28,12 +28,12 @@ class TaggedEnvelopeFormat(
writeText(END_SEQUENCE)
}
override fun Output.writeThis(obj: Envelope) {
override fun Output.writeObject(obj: Envelope) {
val metaBytes = metaFormat.writeBytes(obj.meta)
val tag = Tag(metaFormatKey, metaBytes.size.toUInt(), obj.data?.size ?: 0.toULong())
writePacket(tag.toBytes())
writeFully(metaBytes)
obj.data?.read { copyTo(this@writeThis) }
obj.data?.read { copyTo(this@writeObject) }
}
/**
@ -42,7 +42,7 @@ class TaggedEnvelopeFormat(
* @param input an input to read from
* @param formats a collection of meta formats to resolve
*/
override fun Input.readThis(): Envelope {
override fun Input.readObject(): Envelope {
val tag = readTag()
val metaFormat = io.metaFormat(tag.metaFormatKey)
@ -51,7 +51,7 @@ class TaggedEnvelopeFormat(
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
val dataBytes = readBytes(tag.dataSize.toInt())
val meta = metaFormat.run { metaPacket.readThis() }
val meta = metaFormat.run { metaPacket.readObject() }
return SimpleEnvelope(meta, ArrayBinary(dataBytes))
}
@ -62,7 +62,7 @@ class TaggedEnvelopeFormat(
?: error("Meta format with key ${tag.metaFormatKey} not found")
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
val meta = metaFormat.run { metaPacket.readThis() }
val meta = metaFormat.run { metaPacket.readObject() }
return PartialEnvelope(meta, TAG_SIZE + tag.metaSize, tag.dataSize)
}

@ -22,7 +22,7 @@ class TaglessEnvelopeFormat(
writeText("#? $key: $value;\r\n")
}
override fun Output.writeThis(obj: Envelope) {
override fun Output.writeObject(obj: Envelope) {
//printing header
writeText(TAGLESS_ENVELOPE_HEADER + "\r\n")
@ -48,7 +48,7 @@ class TaglessEnvelopeFormat(
}
}
override fun Input.readThis(): Envelope {
override fun Input.readObject(): Envelope {
var line: String = ""
do {
line = readUTF8Line() ?: error("Input does not contain tagless envelope header")
@ -73,10 +73,10 @@ class TaglessEnvelopeFormat(
val metaSize = properties.get(META_LENGTH_PROPERTY)?.toInt()
meta = if (metaSize != null) {
val metaPacket = ByteReadPacket(readBytes(metaSize))
metaFormat.run { metaPacket.readThis() }
metaFormat.run { metaPacket.readObject() }
} else {
metaFormat.run {
readThis()
readObject()
}
}
}
@ -128,7 +128,7 @@ class TaglessEnvelopeFormat(
meta = if (metaSize != null) {
val metaPacket = ByteReadPacket(readBytes(metaSize))
offset += metaSize.toUInt()
metaFormat.run { metaPacket.readThis() }
metaFormat.run { metaPacket.readObject() }
} else {
error("Can't partially read an envelope with undefined meta size")
}

@ -20,7 +20,7 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
data {
val inputFormat: IOFormat<T> = getInputFormat(meta, valueType)
inputFormat.run {
writeThis(value)
writeObject(value)
}
}
}
@ -39,7 +39,7 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
val inputFormat: IOFormat<T> = getInputFormat(meta, valueType)
inputFormat.run {
values.forEach {
writeThis(it)
writeObject(it)
}
}
}
@ -56,7 +56,7 @@ class RemoteFunctionClient(override val context: Context, val responder: Respond
envelope.data?.read {
List<R>(size) {
outputFormat.run {
readThis()
readObject()
}
}
} ?: error("Message does not contain data")

@ -30,7 +30,7 @@ class RemoteFunctionServer(
val input = request.data?.read {
inputFormat.run {
List(size) {
readThis()
readObject()
}
}
} ?: error("Input is empty")
@ -48,7 +48,7 @@ class RemoteFunctionServer(
data {
outputFormat.run {
output.forEach {
writeThis(it)
writeObject(it)
}
}
}

@ -46,7 +46,7 @@ fun IOPlugin.writeEnvelopeFile(
).asOutput()
with(formatFactory(formatMeta, context)) {
output.writeThis(envelope)
output.writeObject(envelope)
}
}

@ -56,10 +56,10 @@ class EnvelopeClient(
val output = socket.getOutputStream()
format.run {
output.writePacket {
writeThis(request)
writeObject(request)
}
logger.debug { "Sent request with type ${request.type} to ${socket.remoteSocketAddress}" }
val res = input.readThis()
val res = input.readObject()
logger.debug { "Received response with type ${res.type} from ${socket.remoteSocketAddress}" }
return@withContext res
}

@ -75,7 +75,7 @@ class EnvelopeServer(
val outputStream = socket.getOutputStream()
format.run {
while (socket.isConnected) {
val request = input.readThis()
val request = input.readObject()
logger.debug { "Accepted request with type ${request.type} from ${socket.remoteSocketAddress}" }
if (request.type == SHUTDOWN_ENVELOPE_TYPE) {
//Echo shutdown command
@ -87,7 +87,7 @@ class EnvelopeServer(
runBlocking {
val response = responder.respond(request)
outputStream.writePacket {
writeThis(response)
writeObject(response)
}
logger.debug { "Sent response with type ${response.type} to ${socket.remoteSocketAddress}" }
}

@ -1,5 +1,6 @@
package hep.dataforge.meta
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.values.Value
///**
@ -13,7 +14,7 @@ import hep.dataforge.values.Value
/**
* Convert meta to map of maps
*/
fun Meta.toMap(): Map<String, Any?> {
fun Meta.toMap(descriptor: NodeDescriptor? = null): Map<String, Any?> {
return items.entries.associate { (token, item) ->
token.toString() to when (item) {
is MetaItem.NodeItem -> item.node.toMap()
@ -25,7 +26,7 @@ fun Meta.toMap(): Map<String, Any?> {
/**
* Convert map of maps to meta
*/
fun Map<String, Any?>.toMeta(): Meta = buildMeta {
fun Map<String, Any?>.toMeta(descriptor: NodeDescriptor? = null): Meta = buildMeta {
entries.forEach { (key, value) ->
@Suppress("UNCHECKED_CAST")
when (value) {

@ -63,7 +63,7 @@ fun <T : Any> IOPlugin.readData(
dataFormat.run {
Files.newByteChannel(path, StandardOpenOption.READ)
.asInput()
.readThis()
.readObject()
}
}
}

@ -24,6 +24,7 @@ enableFeaturePreview("GRADLE_METADATA")
include(
":dataforge-meta",
":dataforge-io",
":dataforge-io:dataforge-io-yaml",
":dataforge-context",
":dataforge-data",
":dataforge-output",