Remove kotlinx-io

This commit is contained in:
Alexander Nozik 2021-02-28 22:03:40 +03:00
parent c98ffd1eb4
commit f3d43cd40a
36 changed files with 401 additions and 254 deletions

View File

@ -2,6 +2,7 @@
## [Unreleased]
### Added
- LogManager plugin
### Changed
- Kotlin-logging moved from common to JVM and JS. Replaced by console for native.
@ -9,6 +10,8 @@
### Deprecated
### Removed
- Common dependency on Kotlin-logging
- Kotlinx-io fork dependency. Replaced by Ktor-io.
### Fixed

View File

@ -110,14 +110,13 @@ public final class hep/dataforge/context/KLoggingManager$Companion : hep/datafor
public synthetic fun invoke (Lhep/dataforge/meta/Meta;Lhep/dataforge/context/Context;)Ljava/lang/Object;
}
public abstract interface class hep/dataforge/context/LogManager : hep/dataforge/context/Plugin {
public abstract interface class hep/dataforge/context/LogManager : hep/dataforge/context/Logable, hep/dataforge/context/Plugin {
public static final field Companion Lhep/dataforge/context/LogManager$Companion;
public static final field DEBUG Ljava/lang/String;
public static final field ERROR Ljava/lang/String;
public static final field INFO Ljava/lang/String;
public static final field TRACE Ljava/lang/String;
public static final field WARNING Ljava/lang/String;
public abstract fun log (Lhep/dataforge/names/Name;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
}
public final class hep/dataforge/context/LogManager$Companion {
@ -137,9 +136,24 @@ public final class hep/dataforge/context/LogManager$DefaultImpls {
}
public final class hep/dataforge/context/LogManagerKt {
public static final fun debug (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static synthetic fun debug$default (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
public static final fun error (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static final fun error (Lhep/dataforge/context/Logable;Ljava/lang/Throwable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static synthetic fun error$default (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
public static synthetic fun error$default (Lhep/dataforge/context/Logable;Ljava/lang/Throwable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
public static final fun getLogger (Lhep/dataforge/context/Context;)Lhep/dataforge/context/LogManager;
public static final fun info (Lhep/dataforge/context/LogManager;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static synthetic fun info$default (Lhep/dataforge/context/LogManager;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
public static final fun getLogger (Lhep/dataforge/context/ContextAware;)Lhep/dataforge/context/Logable;
public static final fun info (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static synthetic fun info$default (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
public static final fun trace (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static synthetic fun trace$default (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
public static final fun warn (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;)V
public static synthetic fun warn$default (Lhep/dataforge/context/Logable;Lhep/dataforge/names/Name;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)V
}
public abstract interface class hep/dataforge/context/Logable {
public abstract fun log (Lhep/dataforge/names/Name;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
}
public abstract interface class hep/dataforge/context/Plugin : hep/dataforge/context/ContextAware, hep/dataforge/meta/MetaRepr, hep/dataforge/misc/Named, hep/dataforge/provider/Provider {

View File

@ -7,7 +7,6 @@ import kotlinx.coroutines.SupervisorJob
import kotlin.coroutines.CoroutineContext
import kotlin.native.concurrent.ThreadLocal
internal expect val globalLogger: LogManager
/**

View File

@ -1,10 +1,14 @@
package hep.dataforge.context
import hep.dataforge.misc.Named
import hep.dataforge.names.Name
import hep.dataforge.names.plus
public interface LogManager : Plugin {
public interface Logable{
public fun log(name: Name, tag: String, body: () -> String)
}
public interface LogManager : Plugin, Logable {
public companion object {
public const val TRACE: String = "TRACE"
@ -15,24 +19,40 @@ public interface LogManager : Plugin {
}
}
public fun LogManager.info(name: Name = Name.EMPTY, body: () -> String): Unit = log(name,LogManager.INFO,body)
public fun Logable.trace(name: Name = Name.EMPTY, body: () -> String): Unit = log(name, LogManager.TRACE, body)
public fun Logable.info(name: Name = Name.EMPTY, body: () -> String): Unit = log(name, LogManager.INFO, body)
public fun Logable.debug(name: Name = Name.EMPTY, body: () -> String): Unit = log(name, LogManager.DEBUG, body)
public fun Logable.warn(name: Name = Name.EMPTY, body: () -> String): Unit = log(name, LogManager.WARNING, body)
public fun Logable.error(name: Name = Name.EMPTY, body: () -> String): Unit = log(name, LogManager.ERROR, body)
public fun Logable.error(throwable: Throwable?, name: Name = Name.EMPTY, body: () -> String): Unit =
log(name, LogManager.ERROR) {
buildString {
appendLine(body())
throwable?.let { appendLine(throwable.stackTraceToString())}
}
}
/**
* Context log manager inherited from parent
*/
public val Context.logger: LogManager
get() = plugins.find(inherit = true) { it is LogManager } as? LogManager ?: Global.logger
///**
// * The logger specific to this context
// */
//public val Context.logger: Logger get() = buildLogger(name.toString())
//
///**
// * The logger
// */
//public val ContextAware.logger: Logger
// get() = if (this is Named) {
// context.buildLogger(Path(context.name, this.name).toString())
// } else {
// context.logger
// }
/**
* The named proxy logger for a context member
*/
public val ContextAware.logger: Logable
get() = if (this is Named) {
object :Logable{
val contextLog = context.logger
override fun log(name: Name, tag: String, body: () -> String) {
contextLog.log(this@logger.name + name,tag, body)
}
}
} else {
context.logger
}

View File

@ -11,14 +11,14 @@ kscience {
}
}
val ioVersion by rootProject.extra("0.2.0-npm-dev-11")
//val ioVersion by rootProject.extra("0.2.0-npm-dev-11")
kotlin {
sourceSets {
commonMain {
dependencies {
api(project(":dataforge-context"))
api("org.jetbrains.kotlinx:kotlinx-io:$ioVersion")
api("io.ktor:ktor-io:${ru.mipt.npm.gradle.KScienceVersions.ktorVersion}")
}
}
}

View File

@ -6,9 +6,10 @@ import hep.dataforge.io.IOFormat.Companion.META_KEY
import hep.dataforge.io.IOFormat.Companion.NAME_KEY
import hep.dataforge.meta.Meta
import hep.dataforge.misc.DFExperimental
import kotlinx.io.*
import kotlinx.io.text.readUtf8Line
import kotlinx.io.text.writeUtf8String
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.readUTF8Line
@DFExperimental
public class FrontMatterEnvelopeFormat(
@ -20,7 +21,7 @@ public class FrontMatterEnvelopeFormat(
var line: String
var offset = 0u
do {
line = input.readUtf8Line() //?: error("Input does not contain front matter separator")
line = input.readUTF8Line() ?: error("Input does not contain front matter separator")
offset += line.encodeToByteArray().size.toUInt()
} while (!line.startsWith(SEPARATOR))
@ -31,7 +32,7 @@ public class FrontMatterEnvelopeFormat(
//TODO replace by preview
val meta = Binary {
do {
line = input.readUtf8Line()
line = input.readSafeUtf8Line()
writeUtf8String(line + "\r\n")
offset += line.encodeToByteArray().size.toUInt()
} while (!line.startsWith(SEPARATOR))
@ -45,7 +46,7 @@ public class FrontMatterEnvelopeFormat(
override fun readObject(input: Input): Envelope {
var line: String
do {
line = input.readUtf8Line() //?: error("Input does not contain front matter separator")
line = input.readSafeUtf8Line() //?: error("Input does not contain front matter separator")
} while (!line.startsWith(SEPARATOR))
val readMetaFormat =
@ -54,12 +55,12 @@ public class FrontMatterEnvelopeFormat(
val meta = Binary {
do {
writeUtf8String(input.readUtf8Line() + "\r\n")
writeUtf8String(input.readSafeUtf8Line() + "\r\n")
} while (!line.startsWith(SEPARATOR))
}.read {
readMetaFormat.readMeta(input)
}
val bytes = input.readByteArray()
val bytes = input.readBytes()
val data = bytes.asBinary()
return SimpleEnvelope(meta, data)
}
@ -94,14 +95,12 @@ public class FrontMatterEnvelopeFormat(
return FrontMatterEnvelopeFormat(context.io, meta)
}
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
return input.preview {
val line = readUtf8Line()
return@preview if (line.startsWith("---")) {
invoke()
} else {
null
}
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = binary.read {
val line = readSafeUtf8Line()
return@read if (line.startsWith("---")) {
invoke()
} else {
null
}
}

View File

@ -5,6 +5,8 @@ import hep.dataforge.io.IOFormat.Companion.META_KEY
import hep.dataforge.io.IOFormat.Companion.NAME_KEY
import hep.dataforge.io.MetaFormat
import hep.dataforge.io.MetaFormatFactory
import hep.dataforge.io.readUtf8String
import hep.dataforge.io.writeUtf8String
import hep.dataforge.meta.*
import hep.dataforge.meta.descriptors.ItemDescriptor
import hep.dataforge.meta.descriptors.NodeDescriptor
@ -14,10 +16,8 @@ import hep.dataforge.names.withIndex
import hep.dataforge.values.ListValue
import hep.dataforge.values.Null
import hep.dataforge.values.parseValue
import kotlinx.io.Input
import kotlinx.io.Output
import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import net.mamoe.yamlkt.*
public fun Meta.toYaml(): YamlMap {

View File

@ -0,0 +1,74 @@
package hep.dataforge.io
import io.ktor.utils.io.core.*
import kotlin.math.min
/**
* [Binary] represents a fixed-size multi-read byte block, which is not attached to the Input which was used to create it.
* The binary could be associated with a resource, but it should guarantee that when someone is trying to read the binary,
* this resource is re-acquired.
*/
public interface Binary {
public val size: Int
/**
* Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed
* when leaving scope, so it could not be leaked outside of scope of [block].
*/
public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Input.() -> R): R
public companion object {
public val EMPTY: Binary = ByteArrayBinary(ByteArray(0))
}
}
internal class ByteArrayBinary(
internal val array: ByteArray,
internal val start: Int = 0,
override val size: Int = array.size - start,
) : Binary {
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R {
require(offset >= 0) { "Offset must be positive" }
require(offset < array.size) { "Offset $offset is larger than array size" }
val input = ByteReadPacket(
array,
offset + start,
min(atMost, size - offset)
)
return input.use(block)
}
}
public fun ByteArray.asBinary(): Binary = ByteArrayBinary(this)
/**
* Produce a [buildByteArray] representing an exact copy of this [Binary]
*/
public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
array.copyOf() // TODO do we need to ensure data safety here?
} else {
read {
readBytes()
}
}
public fun Input.readBinary(size: Int): Binary {
val array = readBytes(size)
return ByteArrayBinary(array)
}
/**
* Direct write of binary to the output. Returns the number of bytes written
*/
public fun Output.writeBinary(binary: Binary): Int {
return if (binary is ByteArrayBinary) {
writeFully(binary.array, binary.start, binary.start + binary.size)
binary.size
} else {
binary.read {
copyTo(this@writeBinary).toInt()
}
}
}

View File

@ -4,9 +4,7 @@ import hep.dataforge.context.Context
import hep.dataforge.meta.*
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.values.*
import kotlinx.io.*
import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import io.ktor.utils.io.core.*
/**
* A DataForge-specific simplified binary format for meta
@ -26,7 +24,7 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
private fun Output.writeString(str: String) {
writeInt(str.length)
writeUtf8String(str)
writeFully(str.encodeToByteArray())
}
public fun Output.writeValue(value: Value): Unit = when (value.type) {
@ -76,8 +74,8 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
}
override fun writeMeta(
output: kotlinx.io.Output,
meta: hep.dataforge.meta.Meta,
output: Output,
meta: Meta,
descriptor: hep.dataforge.meta.descriptors.NodeDescriptor?,
) {
output.writeChar('M')
@ -97,7 +95,8 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
private fun Input.readString(): String {
val length = readInt()
return readUtf8String(length)
val array = readBytes(length)
return array.decodeToString()
}
@Suppress("UNCHECKED_CAST")

View File

@ -7,7 +7,6 @@ import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.plus
import kotlinx.io.Binary
public interface Envelope {
public val meta: Meta

View File

@ -1,7 +1,7 @@
package hep.dataforge.io
import hep.dataforge.meta.*
import kotlinx.io.*
import io.ktor.utils.io.core.Output
public class EnvelopeBuilder : Envelope {
private val metaBuilder = MetaBuilder()
@ -33,11 +33,8 @@ public class EnvelopeBuilder : Envelope {
/**
* Construct a data binary from given builder
*/
@OptIn(ExperimentalIoApi::class)
public fun data(block: Output.() -> Unit) {
val arrayBuilder = ByteArrayOutput()
arrayBuilder.block()
data = arrayBuilder.toByteArray().asBinary()
data = buildByteArray { block() }.asBinary()
}
public fun seal(): Envelope = SimpleEnvelope(metaBuilder.seal(), data)

View File

@ -6,8 +6,8 @@ import hep.dataforge.meta.Meta
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import kotlinx.io.Input
import kotlinx.io.Output
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@ -48,7 +48,7 @@ public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeForm
* Try to infer specific format from input and return null if the attempt is failed.
* This method does **not** return Input into initial state.
*/
public fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat?
public fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat?
public companion object {
public const val ENVELOPE_FORMAT_TYPE: String = "io.format.envelope"

View File

@ -11,8 +11,6 @@ import hep.dataforge.meta.*
import hep.dataforge.names.asName
import hep.dataforge.names.plus
import hep.dataforge.names.toName
import kotlinx.io.Binary
import kotlinx.io.writeBinary
private class PartDescriptor : Scheme() {
var offset by int(0)

View File

@ -12,9 +12,7 @@ import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.values.Value
import kotlinx.io.*
import kotlinx.io.buffer.Buffer
import kotlinx.io.pool.ObjectPool
import io.ktor.utils.io.core.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@ -35,6 +33,10 @@ public interface IOFormat<T : Any> : MetaRepr {
public fun <T : Any> Input.readWith(format: IOFormat<T>): T = format.readObject(this@readWith)
public fun <T: Any> IOFormat<T>.readObject(binary: Binary): T = binary.read {
readObject(this)
}
/**
* Read given binary as object using given format
*/
@ -73,15 +75,15 @@ public inline fun <reified T : Any> IOFormat.Companion.listOf(
}
public fun ObjectPool<Buffer>.fill(block: Buffer.() -> Unit): Buffer {
val buffer = borrow()
return try {
buffer.apply(block)
} catch (ex: Exception) {
//recycle(buffer)
throw ex
}
}
//public fun ObjectPool<Buffer>.fill(block: Buffer.() -> Unit): Buffer {
// val buffer = borrow()
// return try {
// buffer.apply(block)
// } catch (ex: Exception) {
// //recycle(buffer)
// throw ex
// }
//}
@Type(IO_FORMAT_TYPE)
public interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named, MetaRepr {

View File

@ -10,10 +10,9 @@ import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.meta.node
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMetaItem
import kotlinx.io.Input
import kotlinx.io.Output
import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlin.reflect.KType

View File

@ -8,10 +8,11 @@ import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.plus
import kotlinx.io.ByteArrayInput
import kotlinx.io.Input
import kotlinx.io.Output
import kotlinx.io.use
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.use
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@ -54,13 +55,15 @@ public interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
}
public fun Meta.toString(format: MetaFormat): String = buildByteArray {
format.run { writeObject(this@buildByteArray, this@toString) }
format.run {
writeObject(this@buildByteArray, this@toString)
}
}.decodeToString()
public fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory())
public fun MetaFormat.parse(str: String): Meta {
return ByteArrayInput(str.encodeToByteArray()).use { readObject(it) }
return ByteReadPacket(str.encodeToByteArray()).use { readObject(it) }
}
public fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = invoke(formatMeta).parse(str)

View File

@ -10,7 +10,7 @@ import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.plus
import hep.dataforge.names.toName
import kotlinx.io.*
import io.ktor.utils.io.core.*
/**
* A streaming-friendly envelope format with a short binary tag.
@ -57,7 +57,6 @@ public class TaggedEnvelopeFormat(
envelope.data?.let {
output.writeBinary(it)
}
output.flush()
}
/**
@ -72,7 +71,9 @@ public class TaggedEnvelopeFormat(
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
?: error("Meta format with key ${tag.metaFormatKey} not found")
val meta: Meta = metaFormat.readObject(input.limit(tag.metaSize.toInt()))
val metaBinary = input.readBinary(tag.metaSize.toInt())
val meta: Meta = metaFormat.readObject(metaBinary)
val data = input.readBinary(tag.dataSize.toInt())
@ -85,7 +86,9 @@ public class TaggedEnvelopeFormat(
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
?: error("Meta format with key ${tag.metaFormatKey} not found")
val meta: Meta = metaFormat.readObject(input.limit(tag.metaSize.toInt()))
val metaBinary = input.readBinary(tag.metaSize.toInt())
val meta: Meta = metaFormat.readObject(metaBinary)
return PartialEnvelope(meta, version.tagSize + tag.metaSize, tag.dataSize)
@ -143,11 +146,11 @@ public class TaggedEnvelopeFormat(
return Tag(metaFormatKey, metaLength, dataLength)
}
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
return try {
input.preview {
binary.read{
val header = readRawString(6)
return@preview when (header.substring(2..5)) {
return@read when (header.substring(2..5)) {
VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02)
VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03)
else -> null

View File

@ -9,9 +9,7 @@ import hep.dataforge.meta.isEmpty
import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import kotlinx.io.*
import kotlinx.io.text.readUtf8Line
import kotlinx.io.text.writeUtf8String
import io.ktor.utils.io.core.*
import kotlin.collections.set
/**
@ -27,7 +25,7 @@ public class TaglessEnvelopeFormat(
private val dataStart = meta[DATA_START_PROPERTY].string ?: DEFAULT_DATA_START
private fun Output.writeProperty(key: String, value: Any) {
writeUtf8String("#? $key: $value;\r\n")
writeFully("#? $key: $value;\r\n".encodeToByteArray())
}
override fun writeEnvelope(
@ -69,7 +67,7 @@ public class TaglessEnvelopeFormat(
override fun readObject(input: Input): Envelope {
var line: String
do {
line = input.readUtf8Line() // ?: error("Input does not contain tagless envelope header")
line = input.readSafeUtf8Line() // ?: error("Input does not contain tagless envelope header")
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
val properties = HashMap<String, String>()
@ -82,8 +80,8 @@ public class TaglessEnvelopeFormat(
properties[key] = value
}
//If can't read line, return envelope without data
if (input.exhausted()) return SimpleEnvelope(Meta.EMPTY, null)
line = input.readUtf8Line()
if (input.endOfInput) return SimpleEnvelope(Meta.EMPTY, null)
line = input.readSafeUtf8Line()
}
var meta: Meta = Meta.EMPTY
@ -92,7 +90,7 @@ public class TaglessEnvelopeFormat(
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.resolveMetaFormat(it) } ?: JsonMetaFormat
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
meta = if (metaSize != null) {
metaFormat.readObject(input.limit(metaSize))
metaFormat.readObject(input.readBinary(metaSize))
} else {
metaFormat.readObject(input)
}
@ -100,14 +98,14 @@ public class TaglessEnvelopeFormat(
do {
try {
line = input.readUtf8Line()
line = input.readSafeUtf8Line()
} catch (ex: EOFException) {
//returning an Envelope without data if end of input is reached
return SimpleEnvelope(meta, null)
}
} while (!line.startsWith(dataStart))
val data: Binary? = if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
val data: Binary = if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
input.readBinary(properties[DATA_LENGTH_PROPERTY]!!.toInt())
// val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt())
// readByteArray(bytes)
@ -125,7 +123,7 @@ public class TaglessEnvelopeFormat(
var offset = 0u
var line: String
do {
line = input.readUtf8Line()// ?: error("Input does not contain tagless envelope header")
line = input.readSafeUtf8Line()// ?: error("Input does not contain tagless envelope header")
offset += line.encodeToByteArray().size.toUInt()
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
val properties = HashMap<String, String>()
@ -139,7 +137,7 @@ public class TaglessEnvelopeFormat(
properties[key] = value
}
try {
line = input.readUtf8Line()
line = input.readSafeUtf8Line()
offset += line.encodeToByteArray().size.toUInt()
} catch (ex: EOFException) {
return PartialEnvelope(Meta.EMPTY, offset.toUInt(), 0.toULong())
@ -153,14 +151,14 @@ public class TaglessEnvelopeFormat(
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
meta = if (metaSize != null) {
offset += metaSize.toUInt()
metaFormat.readObject(input.limit(metaSize))
metaFormat.readObject(input.readBinary(metaSize))
} else {
error("Can't partially read an envelope with undefined meta size")
}
}
do {
line = input.readUtf8Line() //?: return PartialEnvelope(Meta.EMPTY, offset.toUInt(), 0.toULong())
line = input.readSafeUtf8Line() //?: return PartialEnvelope(Meta.EMPTY, offset.toUInt(), 0.toULong())
offset += line.encodeToByteArray().size.toUInt()
//returning an Envelope without data if end of input is reached
} while (!line.startsWith(dataStart))
@ -220,11 +218,11 @@ public class TaglessEnvelopeFormat(
override fun readObject(input: Input): Envelope = default.readObject(input)
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
return try {
input.preview {
binary.read {
val string = readRawString(TAGLESS_ENVELOPE_HEADER.length)
return@preview if (string == TAGLESS_ENVELOPE_HEADER) {
return@read if (string == TAGLESS_ENVELOPE_HEADER) {
TaglessEnvelopeFormat(io)
} else {
null

View File

@ -1,21 +1,33 @@
package hep.dataforge.io
import kotlinx.io.*
import io.ktor.utils.io.charsets.Charsets
import io.ktor.utils.io.charsets.decodeExactBytes
import io.ktor.utils.io.core.*
import kotlin.math.min
public fun Output.writeRawString(str: String) {
str.forEach { writeByte(it.toByte()) }
writeFully(str.toByteArray(Charsets.ISO_8859_1))
}
public fun Output.writeUtf8String(str: String) {
writeFully(str.encodeToByteArray())
}
@OptIn(ExperimentalIoApi::class)
public fun Input.readRawString(size: Int): String {
val array = CharArray(size) { readByte().toChar() }
return array.concatToString()
return Charsets.ISO_8859_1.newDecoder().decodeExactBytes(this, size)
}
public inline fun buildByteArray(expectedSize: Int = 16, block: Output.() -> Unit): ByteArray =
ByteArrayOutput(expectedSize).apply(block).toByteArray()
public fun Input.readUtf8String(): String = readBytes().decodeToString()
public fun Input.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found")
public inline fun buildByteArray(expectedSize: Int = 16, block: Output.() -> Unit): ByteArray {
val builder = BytePacketBuilder(expectedSize)
builder.block()
return builder.build().readBytes()
}
@Suppress("FunctionName")
public inline fun Binary(expectedSize: Int = 16, block: Output.() -> Unit): Binary =
buildByteArray(expectedSize, block).asBinary()

View File

@ -1,8 +1,6 @@
package hep.dataforge.io
import kotlinx.io.asBinary
import kotlinx.io.readByte
import kotlinx.io.readInt
import io.ktor.utils.io.core.readInt
import kotlin.test.Test
import kotlin.test.assertEquals

View File

@ -1,7 +1,7 @@
package hep.dataforge.io
import kotlinx.io.readDouble
import kotlinx.io.writeDouble
import io.ktor.utils.io.core.readDouble
import io.ktor.utils.io.core.writeDouble
import kotlin.test.Test
import kotlin.test.assertEquals

View File

@ -0,0 +1,17 @@
package hep.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.readBytes
import kotlin.test.Test
import kotlin.test.assertEquals
class IOTest {
@Test
fun readBytes() {
val bytes = ByteArray(8) { it.toByte() }
val input = ByteReadPacket(bytes)
val first = input.readBytes(4)
val second = input.readBytes(4)
assertEquals(4.toByte(), second[0])
}
}

View File

@ -4,7 +4,7 @@ import hep.dataforge.meta.*
import hep.dataforge.meta.JsonMeta.Companion.JSON_ARRAY_KEY
import hep.dataforge.values.ListValue
import hep.dataforge.values.number
import kotlinx.io.asBinary
import kotlinx.serialization.json.*
import kotlin.test.Test
import kotlin.test.assertEquals

View File

@ -4,7 +4,6 @@ import hep.dataforge.context.Global
import hep.dataforge.meta.get
import hep.dataforge.meta.int
import hep.dataforge.misc.DFExperimental
import kotlinx.io.text.writeUtf8String
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

View File

@ -1,7 +1,12 @@
package hep.dataforge.io
import kotlinx.io.ByteArrayInput
import kotlinx.io.use
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.use
fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = buildByteArray { writeObject(this, obj) }
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteArrayInput(array).use { readObject(it) }
fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = buildByteArray {
writeObject(this, obj)
}
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteReadPacket(array).use {
readObject(it)
}

View File

@ -4,14 +4,39 @@ import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.meta.isEmpty
import hep.dataforge.misc.DFExperimental
import kotlinx.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.streams.asOutput
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.inputStream
import kotlin.math.min
import kotlin.reflect.full.isSupertypeOf
import kotlin.reflect.typeOf
import kotlin.streams.asSequence
internal class PathBinary(
private val path: Path,
private val fileOffset: Int = 0,
override val size: Int = Files.size(path).toInt() - fileOffset,
) : Binary {
@OptIn(ExperimentalPathApi::class)
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R {
val actualOffset = offset + fileOffset
val actualSize = min(atMost, size - offset)
val array = path.inputStream().use {
it.skip(actualOffset.toLong())
it.readNBytes(actualSize)
}
return ByteReadPacket(array).block()
}
}
public fun Path.asBinary(): Binary = PathBinary(this)
public fun <R> Path.read(block: Input.() -> R): R = asBinary().read(block = block)
/**
@ -46,11 +71,13 @@ public fun Path.rewrite(block: Output.() -> Unit): Unit {
public fun Path.readEnvelope(format: EnvelopeFormat): Envelope {
val partialEnvelope: PartialEnvelope = asBinary().read {
format.run { readPartial(this@read) }
format.run {
readPartial(this@read)
}
}
val offset: Int = partialEnvelope.dataOffset.toInt()
val size: Int = partialEnvelope.dataSize?.toInt() ?: (Files.size(this).toInt() - offset)
val binary = FileBinary(this, offset, size)
val binary = PathBinary(this, offset, size)
return SimpleEnvelope(partialEnvelope.meta, binary)
}
@ -60,7 +87,7 @@ public fun Path.readEnvelope(format: EnvelopeFormat): Envelope {
@Suppress("UNCHECKED_CAST")
@DFExperimental
public inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
return ioFormatFactories.find { it.type.isSupertypeOf(typeOf<T>())} as IOFormat<T>?
return ioFormatFactories.find { it.type.isSupertypeOf(typeOf<T>()) } as IOFormat<T>?
}
/**
@ -119,9 +146,7 @@ public fun IOPlugin.writeMetaFile(
public fun IOPlugin.peekFileEnvelopeFormat(path: Path): EnvelopeFormat? {
val binary = path.asBinary()
val formats = envelopeFormatFactories.mapNotNull { factory ->
binary.read {
factory.peekFormat(this@peekFileEnvelopeFormat, this@read)
}
factory.peekFormat(this@peekFileEnvelopeFormat, binary)
}
return when (formats.size) {
@ -231,7 +256,7 @@ public fun IOPlugin.writeEnvelopeDirectory(
dataFile.write {
envelope.data?.read {
val copied = copyTo(this@write)
if (copied != envelope.data?.size) {
if (copied != envelope.data?.size?.toLong()) {
error("The number of copied bytes does not equal data size")
}
}

View File

@ -1,45 +1,40 @@
package hep.dataforge.io
import kotlinx.io.*
import kotlinx.io.buffer.Buffer
import kotlinx.io.buffer.set
import java.io.InputStream
import java.io.OutputStream
private class BlockingStreamInput(val source: InputStream) : Input() {
override fun closeSource() {
source.close()
}
override fun fill(buffer: Buffer, startIndex: Int, endIndex: Int): Int {
while (source.available() == 0) {
//block until input is available
}
// Zero-copy attempt
if (buffer.buffer.hasArray()) {
val result = source.read(buffer.buffer.array(), startIndex, endIndex - startIndex)
return result.coerceAtLeast(0) // -1 when IS is closed
}
for (i in startIndex until endIndex) {
val byte = source.read()
if (byte == -1) return (i - startIndex)
buffer[i] = byte.toByte()
}
return endIndex - startIndex
}
}
public fun <R> InputStream.read(size: Int, block: Input.() -> R): R {
val buffer = ByteArray(size)
read(buffer)
return buffer.asBinary().read(block = block)
}
public fun <R> InputStream.read(block: Input.() -> R): R = asInput().block()
public fun <R> InputStream.readBlocking(block: Input.() -> R): R = BlockingStreamInput(this).block()
public inline fun OutputStream.write(block: Output.() -> Unit) {
asOutput().block()
}
//
//private class BlockingStreamInput(val source: InputStream) : AbstractInput() {
// override fun closeSource() {
// source.close()
// }
//
// override fun fill(destination: Memory, offset: Int, length: Int): Int {
// while (source.available() == 0) {
// //block until input is available
// }
// // Zero-copy attempt
// if (buffer.buffer.hasArray()) {
// val result = source.read(buffer.buffer.array(), startIndex, endIndex - startIndex)
// return result.coerceAtLeast(0) // -1 when IS is closed
// }
//
// for (i in startIndex until endIndex) {
// val byte = source.read()
// if (byte == -1) return (i - startIndex)
// buffer[i] = byte.toByte()
// }
// return endIndex - startIndex
// }
//}
//
//public fun <R> InputStream.read(size: Int, block: Input.() -> R): R {
// val buffer = ByteArray(size)
// read(buffer)
// return buffer.asBinary().read(block = block)
//}
//
//public fun <R> InputStream.read(block: Input.() -> R): R = asInput().block()
//
//public fun <R> InputStream.readBlocking(block: Input.() -> R): R = BlockingStreamInput(this).block()
//
//public inline fun OutputStream.write(block: Output.() -> Unit) {
// asOutput().block()
//}

View File

@ -2,9 +2,7 @@ package hep.dataforge.io
import hep.dataforge.context.Global
import hep.dataforge.misc.DFExperimental
import kotlinx.io.asBinary
import kotlinx.io.toByteArray
import kotlinx.io.writeDouble
import io.ktor.utils.io.core.writeDouble
import java.nio.file.Files
import kotlin.test.Test
import kotlin.test.assertEquals

View File

@ -2,7 +2,7 @@ package hep.dataforge.io
import hep.dataforge.context.Global
import hep.dataforge.misc.DFExperimental
import kotlinx.io.writeDouble
import io.ktor.utils.io.core.writeDouble
import java.nio.file.Files
import kotlin.test.Test
import kotlin.test.assertTrue
@ -19,7 +19,6 @@ class FileEnvelopeTest {
dataID = "myData" // добавил только что
data {
writeDouble(16.7)
}
}

View File

@ -1,8 +1,6 @@
package hep.dataforge.scripting
import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.context.logger
import hep.dataforge.context.*
import hep.dataforge.workspace.Workspace
import hep.dataforge.workspace.WorkspaceBuilder
import java.io.File
@ -44,7 +42,7 @@ public object Builders {
error(scriptDiagnostic.toString())
}
ScriptDiagnostic.Severity.WARNING -> context.logger.warn { scriptDiagnostic.toString() }
ScriptDiagnostic.Severity.INFO -> context.logger.info { scriptDiagnostic.toString() }
ScriptDiagnostic.Severity.INFO -> context.logger.info { scriptDiagnostic.toString() }
ScriptDiagnostic.Severity.DEBUG -> context.logger.debug { scriptDiagnostic.toString() }
}
}

View File

@ -1,23 +1,18 @@
package hep.dataforge.tables.io
import hep.dataforge.io.Binary
import hep.dataforge.io.readSafeUtf8Line
import hep.dataforge.io.writeUtf8String
import hep.dataforge.tables.*
import hep.dataforge.values.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.io.Binary
import kotlinx.io.ExperimentalIoApi
import kotlinx.io.Output
import kotlinx.io.text.forEachUtf8Line
import kotlinx.io.text.readUtf8Line
import kotlinx.io.text.readUtf8StringUntilDelimiter
import kotlinx.io.text.writeUtf8String
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.readBytes
import kotlinx.coroutines.flow.*
/**
* Read a lin as a fixed width [Row]
*/
private fun readLine(header: ValueTableHeader, line: String): Row<Value> {
private fun readRow(header: ValueTableHeader, line: String): Row<Value> {
val values = line.trim().split("\\s+".toRegex()).map { it.lazyParseValue() }
if (values.size == header.size) {
@ -31,32 +26,45 @@ private fun readLine(header: ValueTableHeader, line: String): Row<Value> {
/**
* Finite or infinite [Rows] created from a fixed width text binary
*/
@ExperimentalIoApi
public class TextRows(override val headers: ValueTableHeader, private val binary: Binary) : Rows<Value> {
/**
* A flow of indexes of string start offsets ignoring empty strings
*/
public fun indexFlow(): Flow<Int> = binary.read {
var counter: Int = 0
flow {
val string = readUtf8StringUntilDelimiter('\n')
counter += string.length
if (!string.isBlank()) {
emit(counter)
}
}
//TODO replace by line reader
val text = readBytes().decodeToString()
text.lineSequence()
.map { it.trim() }
.filter { it.isNotEmpty() }
.scan(0) { acc, str -> acc + str.length }.asFlow()
// var counter: Int = 0
// flow {
// do {
// val line = readUTF8Line()
// counter += line?.length ?: 0
// if (!line.isNullOrBlank()) {
// emit(counter)
// }
// } while (!endOfInput)
// }
}
override fun rowFlow(): Flow<Row<Value>> = binary.read {
flow {
forEachUtf8Line { line ->
if (line.isNotBlank()) {
val row = readLine(headers, line)
emit(row)
}
}
}
val text = readBytes().decodeToString()
text.lineSequence()
.map { it.trim() }
.filter { it.isNotEmpty() }
.map { readRow(headers, it) }.asFlow()
// flow {
// do {
// val line = readUTF8Line()
// if (!line.isNullOrBlank()) {
// val row = readRow(headers, line)
// emit(row)
// }
// } while (!endOfInput)
// }
}
public companion object
@ -65,17 +73,15 @@ public class TextRows(override val headers: ValueTableHeader, private val binary
/**
* Create a row offset index for [TextRows]
*/
@ExperimentalIoApi
public suspend fun TextRows.buildRowIndex(): List<Int> = indexFlow().toList()
/**
* Finite table created from [RandomAccessBinary] with fixed width text table
*/
@ExperimentalIoApi
public class TextTable(
override val headers: ValueTableHeader,
private val binary: Binary,
public val index: List<Int>
public val index: List<Int>,
) : Table<Value> {
override val columns: Collection<Column<Value>> get() = headers.map { RowTableColumn(this, it) }
@ -86,8 +92,8 @@ public class TextTable(
private fun readAt(offset: Int): Row<Value> {
return binary.read(offset) {
val line = readUtf8Line()
return@read readLine(headers, line)
val line = readSafeUtf8Line()
return@read readRow(headers, line)
}
}
@ -139,6 +145,7 @@ public suspend fun Output.writeRows(rows: Rows<Value>) {
rows.headers.forEachIndexed { index, columnHeader ->
writeValue(row[columnHeader] ?: Null, widths[index])
}
// appendLine()
writeUtf8String("\r\n")
}
}

View File

@ -1,19 +1,17 @@
package hep.dataforge.tables.io
import hep.dataforge.io.Binary
import hep.dataforge.io.Envelope
import hep.dataforge.io.asBinary
import hep.dataforge.io.buildByteArray
import hep.dataforge.meta.*
import hep.dataforge.misc.DFExperimental
import hep.dataforge.tables.SimpleColumnHeader
import hep.dataforge.tables.Table
import hep.dataforge.values.Value
import kotlinx.io.Binary
import kotlinx.io.ByteArrayOutput
import kotlinx.io.ExperimentalIoApi
import kotlinx.io.asBinary
import kotlin.reflect.typeOf
@ExperimentalIoApi
public suspend fun Table<Value>.toEnvelope(): Envelope = Envelope {
meta {
headers.forEachIndexed { index, columnHeader ->
@ -29,11 +27,12 @@ public suspend fun Table<Value>.toEnvelope(): Envelope = Envelope {
type = "table.value"
dataID = "valueTable[${this@toEnvelope.hashCode()}]"
data = ByteArrayOutput().apply { writeRows(this@toEnvelope) }.toByteArray().asBinary()
data = buildByteArray {
writeRows(this@toEnvelope)
}.asBinary()
}
@DFExperimental
@ExperimentalIoApi
public fun TextRows.Companion.readEnvelope(envelope: Envelope): TextRows {
val header = envelope.meta.getIndexed("column")
.entries.sortedBy { it.key?.toInt() }

View File

@ -1,5 +1,6 @@
package hep.dataforge.tables.io
import hep.dataforge.io.toByteArray
import hep.dataforge.misc.DFExperimental
import hep.dataforge.tables.Table
import hep.dataforge.tables.row
@ -8,14 +9,11 @@ import hep.dataforge.values.int
import hep.dataforge.values.string
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.io.ExperimentalIoApi
import kotlinx.io.toByteArray
import kotlin.test.Test
import kotlin.test.assertEquals
@DFExperimental
@ExperimentalIoApi
class TextRowsTest {
val table = Table<Value> {
val a by column<Value>()

View File

@ -1,6 +1,7 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.info
import hep.dataforge.context.logger
import hep.dataforge.data.GoalLogger
import kotlinx.coroutines.launch

View File

@ -5,10 +5,10 @@ import hep.dataforge.data.*
import hep.dataforge.io.*
import hep.dataforge.meta.*
import hep.dataforge.misc.DFExperimental
import io.ktor.utils.io.streams.asOutput
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.io.asOutput
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
@ -166,6 +166,7 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
}
@Suppress("BlockingMethodInNonBlockingContext")
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
treeItem: DataTreeItem<T>,
@ -180,7 +181,10 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
val entry = ZipEntry(name)
putNextEntry(entry)
envelopeFormat.run {
writeObject(asOutput(), envelope)
asOutput().run {
writeEnvelope(this, envelope)
flush()
}
}
}
is DataTreeItem.Node -> {
@ -196,8 +200,9 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
}
}
@Suppress("BlockingMethodInNonBlockingContext")
@DFExperimental
suspend fun <T : Any> IOPlugin.writeZip(
public suspend fun <T : Any> IOPlugin.writeZip(
path: Path,
tree: DataTree<T>,
format: IOFormat<T>,
@ -217,22 +222,6 @@ suspend fun <T : Any> IOPlugin.writeZip(
zos.use {
it.writeNode("", DataTreeItem.Node(tree), format, envelopeFormat)
}
// if (Files.exists(actualFile) && Files.size(path) == 0.toLong()) {
// Files.delete(path)
// }
// //Files.createFile(actualFile)
// newZFS(actualFile).use { zipfs ->
// val zipRootPath = zipfs.getPath("/")
// Files.createDirectories(zipRootPath)
// val tmp = Files.createTempDirectory("df_zip")
// writeDataDirectory(tmp, node, format, envelopeFormat, metaFormat)
// Files.list(tmp).forEach { sourcePath ->
// val targetPath = sourcePath.fileName.toString()
// val internalTargetPath = zipRootPath.resolve(targetPath)
// Files.copy(sourcePath, internalTargetPath, StandardCopyOption.REPLACE_EXISTING)
// }
// }
}
}

View File

@ -4,13 +4,13 @@ import hep.dataforge.context.Global
import hep.dataforge.data.*
import hep.dataforge.io.IOFormat
import hep.dataforge.io.io
import hep.dataforge.io.readUtf8String
import hep.dataforge.io.writeUtf8String
import hep.dataforge.meta.Meta
import hep.dataforge.misc.DFExperimental
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import kotlinx.coroutines.runBlocking
import kotlinx.io.Input
import kotlinx.io.Output
import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import java.nio.file.Files
import java.nio.file.Path
import kotlin.reflect.KType