Migrate to io-dev-6
This commit is contained in:
parent
56e7d55450
commit
7efa19920b
@ -1,6 +1,6 @@
|
||||
|
||||
plugins {
|
||||
val toolsVersion = "0.4.0"
|
||||
val toolsVersion = "0.4.1"
|
||||
id("scientifik.mpp") version toolsVersion apply false
|
||||
id("scientifik.jvm") version toolsVersion apply false
|
||||
id("scientifik.publish") version toolsVersion apply false
|
||||
|
@ -11,7 +11,7 @@ serialization(sourceSet = TEST){
|
||||
cbor()
|
||||
}
|
||||
|
||||
val ioVersion by rootProject.extra("0.2.0-npm-dev-4")
|
||||
val ioVersion by rootProject.extra("0.2.0-npm-dev-6")
|
||||
|
||||
kotlin {
|
||||
sourceSets {
|
||||
|
@ -7,7 +7,6 @@ import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.text.readUtf8Line
|
||||
import kotlinx.io.text.writeRawString
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
import kotlinx.serialization.toUtf8Bytes
|
||||
|
||||
@ -29,7 +28,8 @@ class FrontMatterEnvelopeFormat(
|
||||
metaTypeRegex.matchEntire(line)?.groupValues?.first()
|
||||
?.let { io.metaFormat(it) } ?: YamlMetaFormat
|
||||
|
||||
val meta = buildBytes {
|
||||
//TODO replace by preview
|
||||
val meta = Binary {
|
||||
do {
|
||||
line = readUtf8Line()
|
||||
writeUtf8String(line + "\r\n")
|
||||
@ -53,7 +53,7 @@ class FrontMatterEnvelopeFormat(
|
||||
metaTypeRegex.matchEntire(line)?.groupValues?.first()
|
||||
?.let { io.metaFormat(it) } ?: YamlMetaFormat
|
||||
|
||||
val meta = buildBytes {
|
||||
val meta = Binary {
|
||||
do {
|
||||
writeUtf8String(readUtf8Line() + "\r\n")
|
||||
} while (!line.startsWith(SEPARATOR))
|
||||
@ -62,7 +62,7 @@ class FrontMatterEnvelopeFormat(
|
||||
readMeta()
|
||||
}
|
||||
}
|
||||
val bytes = readRemaining()
|
||||
val bytes = readByteArray()
|
||||
val data = bytes.asBinary()
|
||||
return SimpleEnvelope(meta, data)
|
||||
}
|
||||
|
@ -10,24 +10,12 @@ import hep.dataforge.meta.toMap
|
||||
import hep.dataforge.meta.toMeta
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.asInputStream
|
||||
import kotlinx.io.readUByte
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
import org.yaml.snakeyaml.Yaml
|
||||
import java.io.InputStream
|
||||
|
||||
private class InputAsStream(val input: Input) : InputStream() {
|
||||
override fun read(): Int {
|
||||
if (input.eof()) return -1
|
||||
return input.readUByte().toInt()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
input.close()
|
||||
}
|
||||
}
|
||||
|
||||
private fun Input.asStream() = InputAsStream(this)
|
||||
|
||||
@DFExperimental
|
||||
class YamlMetaFormat(val meta: Meta) : MetaFormat {
|
||||
private val yaml = Yaml()
|
||||
@ -38,7 +26,7 @@ class YamlMetaFormat(val meta: Meta) : MetaFormat {
|
||||
}
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
|
||||
val map: Map<String, Any?> = yaml.load(asStream())
|
||||
val map: Map<String, Any?> = yaml.load(asInputStream())
|
||||
return map.toMeta(descriptor)
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.*
|
||||
import kotlinx.io.ArrayBinary
|
||||
import kotlinx.io.Binary
|
||||
import kotlinx.io.ExperimentalIoApi
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.*
|
||||
|
||||
class EnvelopeBuilder {
|
||||
private val metaBuilder = MetaBuilder()
|
||||
@ -36,7 +33,9 @@ class EnvelopeBuilder {
|
||||
*/
|
||||
@OptIn(ExperimentalIoApi::class)
|
||||
fun data(block: Output.() -> Unit) {
|
||||
data = ArrayBinary.write(builder = block)
|
||||
val arrayBuilder = ByteArrayOutput()
|
||||
arrayBuilder.block()
|
||||
data = arrayBuilder.toByteArray().asBinary()
|
||||
}
|
||||
|
||||
fun build() = SimpleEnvelope(metaBuilder.seal(), data)
|
||||
|
@ -11,8 +11,6 @@ import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.names.toName
|
||||
import kotlinx.io.text.readRawString
|
||||
import kotlinx.io.text.writeRawString
|
||||
|
||||
object EnvelopeParts {
|
||||
val MULTIPART_KEY = "multipart".asName()
|
||||
|
@ -68,11 +68,7 @@ interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): Bytes = buildBytes { writeObject(obj) }
|
||||
|
||||
|
||||
fun <T : Any> IOFormat<T>.writeByteArray(obj: T): ByteArray = buildBytes { writeObject(obj) }.toByteArray()
|
||||
fun <T : Any> IOFormat<T>.readByteArray(array: ByteArray): T = array.asBinary().read { readObject() }
|
||||
fun <T : Any> IOFormat<T>.toBinary(obj: T): Binary = Binary { writeObject(obj) }
|
||||
|
||||
object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
||||
override fun invoke(meta: Meta, context: Context): IOFormat<Double> = this
|
||||
|
@ -11,6 +11,7 @@ import hep.dataforge.meta.toJson
|
||||
import hep.dataforge.meta.toMetaItem
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.readByteArray
|
||||
import kotlinx.io.text.readUtf8String
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
import kotlinx.serialization.UnstableDefault
|
||||
@ -26,7 +27,7 @@ class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat {
|
||||
}
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
|
||||
val str = readUtf8String()
|
||||
val str = readByteArray().decodeToString()
|
||||
val jsonElement = json.parseJson(str)
|
||||
val item = jsonElement.toMetaItem(descriptor)
|
||||
return item.node ?: Meta.EMPTY
|
||||
|
@ -1,14 +1,17 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.meta.descriptors.NodeDescriptor
|
||||
import hep.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.descriptors.NodeDescriptor
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.provider.Type
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.ByteArrayInput
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.use
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
@ -44,14 +47,14 @@ interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
|
||||
}
|
||||
}
|
||||
|
||||
fun Meta.toString(format: MetaFormat): String = buildBytes {
|
||||
fun Meta.toString(format: MetaFormat): String = buildByteArray {
|
||||
format.run { writeObject(this@toString) }
|
||||
}.toByteArray().decodeToString()
|
||||
}.decodeToString()
|
||||
|
||||
fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory())
|
||||
|
||||
fun MetaFormat.parse(str: String): Meta {
|
||||
return str.encodeToByteArray().read { readObject() }
|
||||
return ByteArrayInput(str.encodeToByteArray()).use { it.readObject() }
|
||||
}
|
||||
|
||||
fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = invoke(formatMeta).parse(str)
|
||||
|
@ -8,8 +8,6 @@ import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.names.toName
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.text.readRawString
|
||||
import kotlinx.io.text.writeRawString
|
||||
|
||||
@ExperimentalIoApi
|
||||
class TaggedEnvelopeFormat(
|
||||
@ -21,7 +19,7 @@ class TaggedEnvelopeFormat(
|
||||
// ?: error("Meta format with key $metaFormatKey could not be resolved in $io")
|
||||
|
||||
|
||||
private fun Tag.toBytes() = buildBytes(24) {
|
||||
private fun Tag.toBinary() = Binary(24) {
|
||||
writeRawString(START_SEQUENCE)
|
||||
writeRawString(version.name)
|
||||
writeShort(metaFormatKey)
|
||||
@ -39,14 +37,10 @@ class TaggedEnvelopeFormat(
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) {
|
||||
val metaFormat = metaFormatFactory.invoke(formatMeta, io.context)
|
||||
val metaBytes = metaFormat.writeBytes(envelope.meta)
|
||||
val actualSize: ULong = if (envelope.data == null) {
|
||||
0
|
||||
} else {
|
||||
envelope.data?.size ?: Binary.INFINITE
|
||||
}.toULong()
|
||||
val metaBytes = metaFormat.toBinary(envelope.meta)
|
||||
val actualSize: ULong = (envelope.data?.size ?: 0).toULong()
|
||||
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
|
||||
writeBinary(tag.toBytes())
|
||||
writeBinary(tag.toBinary())
|
||||
writeBinary(metaBytes)
|
||||
writeRawString("\r\n")
|
||||
envelope.data?.let {
|
||||
@ -73,7 +67,7 @@ class TaggedEnvelopeFormat(
|
||||
}
|
||||
}
|
||||
|
||||
val data = ByteArray(tag.dataSize.toInt()).also { readArray(it) }.asBinary()
|
||||
val data = ByteArray(tag.dataSize.toInt()).also { readByteArray(it) }.asBinary()
|
||||
|
||||
return SimpleEnvelope(meta, data)
|
||||
}
|
||||
|
@ -4,9 +4,7 @@ import hep.dataforge.context.Context
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.asName
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.text.readRawString
|
||||
import kotlinx.io.text.readUtf8Line
|
||||
import kotlinx.io.text.writeRawString
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
|
||||
@ExperimentalIoApi
|
||||
@ -31,17 +29,13 @@ class TaglessEnvelopeFormat(
|
||||
//printing all properties
|
||||
writeProperty(META_TYPE_PROPERTY, metaFormatFactory.shortName)
|
||||
//TODO add optional metaFormat properties
|
||||
val actualSize: Int = if (envelope.data == null) {
|
||||
0
|
||||
} else {
|
||||
envelope.data?.size ?: Binary.INFINITE
|
||||
}
|
||||
val actualSize: Int = envelope.data?.size ?: 0
|
||||
|
||||
writeProperty(DATA_LENGTH_PROPERTY, actualSize)
|
||||
|
||||
//Printing meta
|
||||
if (!envelope.meta.isEmpty()) {
|
||||
val metaBytes = metaFormat.writeBytes(envelope.meta)
|
||||
val metaBytes = metaFormat.toBinary(envelope.meta)
|
||||
writeProperty(META_LENGTH_PROPERTY, metaBytes.size + 2)
|
||||
writeUtf8String(metaStart + "\r\n")
|
||||
writeBinary(metaBytes)
|
||||
@ -71,7 +65,7 @@ class TaglessEnvelopeFormat(
|
||||
properties[key] = value
|
||||
}
|
||||
//If can't read line, return envelope without data
|
||||
if (eof()) return SimpleEnvelope(Meta.EMPTY, null)
|
||||
if (exhausted()) return SimpleEnvelope(Meta.EMPTY, null)
|
||||
line = readUtf8Line()
|
||||
}
|
||||
|
||||
@ -102,11 +96,11 @@ class TaglessEnvelopeFormat(
|
||||
|
||||
val data: Binary? = if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
|
||||
val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt())
|
||||
readArray(bytes)
|
||||
readByteArray(bytes)
|
||||
bytes.asBinary()
|
||||
} else {
|
||||
ArrayBinary.write {
|
||||
writeInput(this@readObject)
|
||||
Binary {
|
||||
copyTo(this)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,22 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import kotlinx.io.*
|
||||
|
||||
fun Output.writeRawString(str: String) {
|
||||
str.forEach { writeByte(it.toByte()) }
|
||||
}
|
||||
|
||||
fun Input.readRawString(size: Int): String {
|
||||
val array = CharArray(size) { readByte().toChar() }
|
||||
return String(array)
|
||||
}
|
||||
|
||||
inline fun buildByteArray(expectedSize: Int = 16, block: Output.() -> Unit): ByteArray =
|
||||
ByteArrayOutput(expectedSize).apply(block).toByteArray()
|
||||
|
||||
@Suppress("FunctionName")
|
||||
inline fun Binary(expectedSize: Int = 16, block: Output.() -> Unit): Binary =
|
||||
buildByteArray(expectedSize, block).asBinary()
|
||||
|
||||
@Deprecated("To be replaced by Binary.EMPTY",level = DeprecationLevel.WARNING)
|
||||
val EmptyBinary = ByteArrayBinary(ByteArray(0))
|
@ -23,7 +23,7 @@ class EnvelopeFormatTest {
|
||||
@Test
|
||||
fun testTaggedFormat(){
|
||||
TaggedEnvelopeFormat.run {
|
||||
val byteArray = this.writeByteArray(envelope)
|
||||
val byteArray = this.toByteArray(envelope)
|
||||
//println(byteArray.decodeToString())
|
||||
val res = readByteArray(byteArray)
|
||||
assertEquals(envelope.meta,res.meta)
|
||||
@ -37,7 +37,7 @@ class EnvelopeFormatTest {
|
||||
@Test
|
||||
fun testTaglessFormat(){
|
||||
TaglessEnvelopeFormat.run {
|
||||
val byteArray = writeByteArray(envelope)
|
||||
val byteArray = toByteArray(envelope)
|
||||
//println(byteArray.decodeToString())
|
||||
val res = readByteArray(byteArray)
|
||||
assertEquals(envelope.meta,res.meta)
|
||||
|
@ -1,20 +1,19 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.*
|
||||
import kotlinx.io.Bytes
|
||||
import kotlinx.io.buildBytes
|
||||
import kotlinx.io.asBinary
|
||||
import kotlinx.serialization.json.JsonPrimitive
|
||||
import kotlinx.serialization.json.json
|
||||
import kotlinx.serialization.json.jsonArray
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat): Bytes = buildBytes {
|
||||
format.run { writeObject(this@toBytes) }
|
||||
fun Meta.toByteArray(format: MetaFormat = JsonMetaFormat) = buildByteArray {
|
||||
format.run { writeObject(this@toByteArray) }
|
||||
}
|
||||
|
||||
fun MetaFormat.fromBytes(packet: Bytes): Meta {
|
||||
return packet.read { readObject() }
|
||||
fun MetaFormat.fromByteArray(packet: ByteArray): Meta {
|
||||
return packet.asBinary().read { readObject() }
|
||||
}
|
||||
|
||||
class MetaFormatTest {
|
||||
@ -28,8 +27,8 @@ class MetaFormatTest {
|
||||
"array" put doubleArrayOf(1.0, 2.0, 3.0)
|
||||
}
|
||||
}
|
||||
val bytes = meta.toBytes(BinaryMetaFormat)
|
||||
val result = BinaryMetaFormat.fromBytes(bytes)
|
||||
val bytes = meta.toByteArray(BinaryMetaFormat)
|
||||
val result = BinaryMetaFormat.fromByteArray(bytes)
|
||||
assertEquals(meta, result)
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@ package hep.dataforge.io
|
||||
import hep.dataforge.meta.DFExperimental
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.int
|
||||
import kotlinx.io.text.writeRawString
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
|
||||
import kotlin.test.Test
|
||||
@ -33,9 +32,9 @@ class MultipartTest {
|
||||
@Test
|
||||
fun testParts() {
|
||||
TaggedEnvelopeFormat.run {
|
||||
val singleEnvelopeData = writeBytes(envelopes[0])
|
||||
val singleEnvelopeData = toBinary(envelopes[0])
|
||||
val singleEnvelopeSize = singleEnvelopeData.size
|
||||
val bytes = writeBytes(partsEnvelope)
|
||||
val bytes = toBinary(partsEnvelope)
|
||||
assertTrue(5*singleEnvelopeSize < bytes.size)
|
||||
val reconstructed = bytes.readWith(this)
|
||||
val parts = reconstructed.parts()?.toList() ?: emptyList()
|
||||
|
@ -0,0 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import kotlinx.io.ByteArrayInput
|
||||
import kotlinx.io.use
|
||||
|
||||
fun <T : Any> IOFormat<T>.toByteArray(obj: T): ByteArray = buildByteArray { writeObject(obj) }
|
||||
fun <T : Any> IOFormat<T>.readByteArray(array: ByteArray): T = ByteArrayInput(array).use { it.readObject() }
|
@ -1,22 +0,0 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.Binary
|
||||
import kotlinx.io.ExperimentalIoApi
|
||||
import kotlinx.io.FileBinary
|
||||
import kotlinx.io.read
|
||||
import java.nio.file.Path
|
||||
|
||||
@ExperimentalIoApi
|
||||
class FileEnvelope internal constructor(val path: Path, val format: EnvelopeFormat) : Envelope {
|
||||
//TODO do not like this constructor. Hope to replace it later
|
||||
|
||||
private val partialEnvelope: PartialEnvelope = path.read {
|
||||
format.run { readPartial() }
|
||||
}
|
||||
|
||||
override val meta: Meta get() = partialEnvelope.meta
|
||||
|
||||
override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset.toInt(), partialEnvelope.dataSize?.toInt())
|
||||
}
|
||||
|
@ -8,9 +8,52 @@ import hep.dataforge.meta.isEmpty
|
||||
import kotlinx.io.*
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.reflect.full.isSuperclassOf
|
||||
import kotlin.streams.asSequence
|
||||
|
||||
fun <R> Path.read(block: Input.() -> R): R = asBinary().read(block = block)
|
||||
|
||||
/**
|
||||
* Write a live output to a newly created file. If file does not exist, throws error
|
||||
*/
|
||||
fun Path.write(block: Output.() -> Unit): Unit {
|
||||
val stream = Files.newOutputStream(this, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
||||
stream.asOutput().use(block)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file or append to exiting one with given output [block]
|
||||
*/
|
||||
fun Path.append(block: Output.() -> Unit): Unit {
|
||||
val stream = Files.newOutputStream(
|
||||
this,
|
||||
StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE
|
||||
)
|
||||
stream.asOutput().use(block)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file or replace existing one using given output [block]
|
||||
*/
|
||||
fun Path.rewrite(block: Output.() -> Unit): Unit {
|
||||
val stream = Files.newOutputStream(
|
||||
this,
|
||||
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE
|
||||
)
|
||||
stream.asOutput().use(block)
|
||||
}
|
||||
|
||||
fun Path.readEnvelope(format: EnvelopeFormat): Envelope {
|
||||
val partialEnvelope: PartialEnvelope = asBinary().read {
|
||||
format.run { readPartial() }
|
||||
}
|
||||
val offset: Int = partialEnvelope.dataOffset.toInt()
|
||||
val size: Int = partialEnvelope.dataSize?.toInt() ?: (Files.size(this).toInt() - offset)
|
||||
val binary = FileBinary(this, offset, size)
|
||||
return SimpleEnvelope(partialEnvelope.meta, binary)
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve IOFormat based on type
|
||||
*/
|
||||
@ -131,7 +174,7 @@ fun IOPlugin.readEnvelopeFile(
|
||||
}
|
||||
|
||||
return formatPeeker(path)?.let { format ->
|
||||
FileEnvelope(path, format)
|
||||
path.readEnvelope(format)
|
||||
} ?: if (readNonEnvelopes) { // if no format accepts file, read it as binary
|
||||
SimpleEnvelope(Meta.EMPTY, path.asBinary())
|
||||
} else null
|
||||
@ -156,7 +199,7 @@ fun IOPlugin.writeEnvelopeFile(
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
metaFormat: MetaFormatFactory? = null
|
||||
) {
|
||||
path.write {
|
||||
path.rewrite {
|
||||
with(envelopeFormat) {
|
||||
writeEnvelope(envelope, metaFormat ?: envelopeFormat.defaultMetaFormat)
|
||||
}
|
||||
@ -184,8 +227,8 @@ fun IOPlugin.writeEnvelopeDirectory(
|
||||
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
|
||||
dataFile.write {
|
||||
envelope.data?.read {
|
||||
val copied = writeInput(this)
|
||||
if (envelope.data?.size != Binary.INFINITE && copied != envelope.data?.size) {
|
||||
val copied = copyTo(this@write)
|
||||
if (copied != envelope.data?.size) {
|
||||
error("The number of copied bytes does not equal data size")
|
||||
}
|
||||
}
|
||||
|
@ -1,62 +1,45 @@
|
||||
package hep.dataforge.io.tcp
|
||||
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.asBinary
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.buffer.Buffer
|
||||
import kotlinx.io.buffer.get
|
||||
import kotlinx.io.buffer.set
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
|
||||
private class InputStreamInput(val source: InputStream, val waitForInput: Boolean = false) : Input() {
|
||||
private class BlockingStreamInput(val source: InputStream) : Input() {
|
||||
override fun closeSource() {
|
||||
source.close()
|
||||
}
|
||||
|
||||
override fun fill(buffer: Buffer): Int {
|
||||
if (waitForInput) {
|
||||
override fun fill(buffer: Buffer, startIndex: Int, endIndex: Int): Int {
|
||||
while (source.available() == 0) {
|
||||
//block until input is available
|
||||
}
|
||||
// Zero-copy attempt
|
||||
if (buffer.buffer.hasArray()) {
|
||||
val result = source.read(buffer.buffer.array(), startIndex, endIndex - startIndex)
|
||||
return result.coerceAtLeast(0) // -1 when IS is closed
|
||||
}
|
||||
var bufferPos = 0
|
||||
do {
|
||||
|
||||
for (i in startIndex until endIndex) {
|
||||
val byte = source.read()
|
||||
buffer[bufferPos] = byte.toByte()
|
||||
bufferPos++
|
||||
} while (byte > 0 && bufferPos < buffer.size && source.available() > 0)
|
||||
return bufferPos
|
||||
if (byte == -1) return (i - startIndex)
|
||||
buffer[i] = byte.toByte()
|
||||
}
|
||||
return endIndex - startIndex
|
||||
}
|
||||
}
|
||||
|
||||
private class OutputStreamOutput(val out: OutputStream) : Output() {
|
||||
override fun flush(source: Buffer, length: Int) {
|
||||
for (i in 0..length) {
|
||||
out.write(source[i].toInt())
|
||||
}
|
||||
out.flush()
|
||||
}
|
||||
|
||||
override fun closeSource() {
|
||||
out.flush()
|
||||
out.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fun <R> InputStream.read(size: Int, block: Input.() -> R): R {
|
||||
val buffer = ByteArray(size)
|
||||
read(buffer)
|
||||
return buffer.asBinary().read(block)
|
||||
return buffer.asBinary().read(block = block)
|
||||
}
|
||||
|
||||
fun <R> InputStream.read(block: Input.() -> R): R =
|
||||
InputStreamInput(this, false).block()
|
||||
fun <R> InputStream.read(block: Input.() -> R): R = asInput().block()
|
||||
|
||||
fun <R> InputStream.readBlocking(block: Input.() -> R): R =
|
||||
InputStreamInput(this, true).block()
|
||||
fun <R> InputStream.readBlocking(block: Input.() -> R): R = BlockingStreamInput(this).block()
|
||||
|
||||
fun OutputStream.write(block: Output.() -> Unit) {
|
||||
OutputStreamOutput(this).block()
|
||||
inline fun OutputStream.write(block: Output.() -> Unit) {
|
||||
asOutput().block()
|
||||
}
|
@ -4,7 +4,7 @@ import hep.dataforge.context.Global
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.Responder
|
||||
import hep.dataforge.io.TaggedEnvelopeFormat
|
||||
import hep.dataforge.io.writeByteArray
|
||||
import hep.dataforge.io.toByteArray
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.io.writeDouble
|
||||
@ -17,7 +17,7 @@ import kotlin.time.ExperimentalTime
|
||||
@ExperimentalStdlibApi
|
||||
object EchoResponder : Responder {
|
||||
override suspend fun respond(request: Envelope): Envelope {
|
||||
val string = TaggedEnvelopeFormat().run { writeByteArray(request).decodeToString() }
|
||||
val string = TaggedEnvelopeFormat().run { toByteArray(request).decodeToString() }
|
||||
println("ECHO:")
|
||||
println(string)
|
||||
return request
|
||||
|
@ -23,9 +23,6 @@ open class Scheme() : Configurable, Described, MetaRepr {
|
||||
var defaultProvider: (Name) -> MetaItem<*>? = { null }
|
||||
internal set
|
||||
|
||||
final override var descriptor: NodeDescriptor? = null
|
||||
internal set
|
||||
|
||||
override fun getDefaultItem(name: Name): MetaItem<*>? {
|
||||
return defaultProvider(name) ?: descriptor?.get(name)?.defaultItem()
|
||||
}
|
||||
@ -78,15 +75,10 @@ open class SchemeSpec<T : Scheme>(val builder: () -> T) :
|
||||
*/
|
||||
open class MetaScheme(
|
||||
val meta: Meta,
|
||||
descriptor: NodeDescriptor? = null,
|
||||
override val descriptor: NodeDescriptor? = null,
|
||||
config: Config = Config()
|
||||
) : Scheme(config, meta::get) {
|
||||
init {
|
||||
this.descriptor = descriptor
|
||||
}
|
||||
|
||||
override val defaultLayer: Meta
|
||||
get() = Laminate(meta, descriptor?.defaultItem().node)
|
||||
override val defaultLayer: Meta get() = Laminate(meta, descriptor?.defaultItem().node)
|
||||
}
|
||||
|
||||
fun Meta.asScheme() =
|
||||
|
@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.toList
|
||||
import kotlinx.io.Binary
|
||||
import kotlinx.io.ExperimentalIoApi
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.RandomAccessBinary
|
||||
import kotlinx.io.text.forEachUtf8Line
|
||||
import kotlinx.io.text.readUtf8Line
|
||||
import kotlinx.io.text.readUtf8StringUntilDelimiter
|
||||
@ -76,7 +75,7 @@ suspend fun TextRows.buildRowIndex(): List<Int> = indexFlow().toList()
|
||||
@ExperimentalIoApi
|
||||
class TextTable(
|
||||
override val header: ValueTableHeader,
|
||||
val binary: RandomAccessBinary,
|
||||
val binary: Binary,
|
||||
val index: List<Int>
|
||||
) : Table<Value> {
|
||||
|
||||
@ -99,7 +98,7 @@ class TextTable(
|
||||
}
|
||||
|
||||
companion object {
|
||||
suspend operator fun invoke(header: ValueTableHeader, binary: RandomAccessBinary): TextTable {
|
||||
suspend operator fun invoke(header: ValueTableHeader, binary: Binary): TextTable {
|
||||
val index = TextRows(header, binary).buildRowIndex()
|
||||
return TextTable(header, binary, index)
|
||||
}
|
||||
|
@ -1,12 +1,13 @@
|
||||
package hep.dataforge.tables.io
|
||||
|
||||
import hep.dataforge.io.Binary
|
||||
import hep.dataforge.io.EmptyBinary
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.tables.SimpleColumnHeader
|
||||
import hep.dataforge.tables.Table
|
||||
import hep.dataforge.values.Value
|
||||
import kotlinx.io.ByteArrayOutput
|
||||
import kotlinx.io.EmptyBinary
|
||||
import kotlinx.io.ExperimentalIoApi
|
||||
import kotlinx.io.asBinary
|
||||
|
||||
|
@ -2,11 +2,7 @@ package hep.dataforge.workspace
|
||||
|
||||
import hep.dataforge.data.Data
|
||||
import hep.dataforge.data.await
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.IOFormat
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.io.readWith
|
||||
import kotlinx.io.ArrayBinary
|
||||
import hep.dataforge.io.*
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
@ -18,8 +14,6 @@ fun <T : Any> Envelope.toData(type: KClass<out T>, format: IOFormat<T>): Data<T>
|
||||
|
||||
suspend fun <T : Any> Data<T>.toEnvelope(format: IOFormat<T>): Envelope {
|
||||
val obj = await()
|
||||
val binary = ArrayBinary.write {
|
||||
format.run { writeObject(obj) }
|
||||
}
|
||||
val binary = format.toBinary(obj)
|
||||
return SimpleEnvelope(meta, binary)
|
||||
}
|
Loading…
Reference in New Issue
Block a user