return type to IOReader

This commit is contained in:
Alexander Nozik 2022-05-21 10:38:53 +03:00
parent f5d32ba511
commit 6bd8a7acbc
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
11 changed files with 225 additions and 69 deletions

View File

@ -6,7 +6,11 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.6.0-dev-8"
version = "0.6.0-dev-9"
}
subprojects {
apply(plugin = "maven-publish")
tasks.withType<KotlinCompile>{
kotlinOptions{
@ -15,10 +19,6 @@ allprojects {
}
}
subprojects {
apply(plugin = "maven-publish")
}
readme {
readmeTemplate = file("docs/templates/README-TEMPLATE.md")
}

View File

@ -1,3 +1,5 @@
@file:OptIn(DFExperimental::class)
package space.kscience.dataforge.io.yaml
import space.kscience.dataforge.context.Context
@ -6,6 +8,7 @@ import space.kscience.dataforge.io.readEnvelope
import space.kscience.dataforge.io.toByteArray
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.Test
import kotlin.test.assertEquals

View File

@ -17,6 +17,9 @@ import kotlin.reflect.typeOf
public data class PartialEnvelope(val meta: Meta, val dataOffset: Int, val dataSize: ULong?)
public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>()
public val defaultMetaFormat: MetaFormatFactory get() = JsonMetaFormat
public fun readPartial(input: Input): PartialEnvelope

View File

@ -15,12 +15,25 @@ import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public fun interface IOReader<out T : Any> {
/**
* Reader of a custom object from input
*/
public interface IOReader<out T> {
/**
* The type of object being read
*/
public val type: KType
public fun readObject(input: Input): T
}
public fun interface IOWriter<in T : Any> {
public inline fun <reified T : Any> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> {
override val type: KType = typeOf<T>()
override fun readObject(input: Input): T = input.read()
}
public fun interface IOWriter<in T> {
public fun writeObject(output: Output, obj: T)
}

View File

@ -21,6 +21,8 @@ import kotlin.reflect.typeOf
*/
public interface MetaFormat : IOFormat<Meta> {
override val type: KType get() = typeOf<Meta>()
override fun writeObject(output: Output, obj: Meta) {
writeMeta(output, obj, null)
}

View File

@ -102,12 +102,14 @@ public operator fun Meta.get(token: NameToken): Meta? = items[token]
*
* If [name] is empty return current [Meta]
*/
public operator fun Meta.get(name: Name): Meta? = getMeta(name)
public operator fun Meta.get(name: Name): Meta? = this.getMeta(name)
//TODO allow nullable receivers after Kotlin 1.7
/**
* Parse [Name] from [key] using full name notation and pass it to [Meta.get]
*/
public operator fun Meta.get(key: String): Meta? = this[Name.parse(key)]
public operator fun Meta.get(key: String): Meta? = this.get(Name.parse(key))
/**
* Get all items matching given name. The index of the last element, if present is used as a [Regex],

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.names
import kotlinx.serialization.Serializable
import space.kscience.dataforge.misc.DFExperimental
/**
* A single name token. Body is not allowed to be empty.
@ -25,6 +26,20 @@ public data class NameToken(val body: String, val index: String? = null) {
} else {
body.escape()
}
public companion object {
/**
* Parse name token from a string
*/
@DFExperimental
public fun parse(string: String): NameToken {
val body = string.substringBefore('[')
val index = string.substringAfter('[', "")
if (index.isNotEmpty() && index.endsWith(']')) error("NameToken with index must end with ']'")
return NameToken(body,index.removeSuffix("]"))
}
}
}
/**

View File

@ -4,15 +4,20 @@ import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFInternal
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@DFInternal
public fun <T : Any> Envelope.toData(type: KType, format: IOReader<T>): Data<T> = Data(type, meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}
/**
* Convert an [Envelope] to a data via given format. The actual parsing is done lazily.
*/
@OptIn(DFInternal::class)
public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(typeOf<T>(), meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}
public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = toData(typeOf<T>(), format)
public suspend fun <T : Any> Data<T>.toEnvelope(format: IOWriter<T>): Envelope {
val obj = await()

View File

@ -2,67 +2,90 @@ package space.kscience.dataforge.workspace
import io.ktor.utils.io.streams.asOutput
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import java.nio.file.*
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.io.path.extension
import kotlin.io.path.nameWithoutExtension
import kotlin.io.path.readAttributes
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.streams.toList
//public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
public typealias FileFormatResolver<T> = (path: Path, meta: Meta) -> IOReader<T>
private fun newZFS(path: Path): FileSystem {
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
return fsProvider.newFileSystem(path, mapOf("create" to "true"))
public object FileData {
public val META_FILE_KEY: Name = "file".asName()
public val META_FILE_PATH_KEY: Name = META_FILE_KEY + "path"
public val META_FILE_EXTENSION_KEY: Name = META_FILE_KEY + "extension"
public val META_FILE_CREATE_TIME_KEY: Name = META_FILE_KEY + "created"
public val META_FILE_UPDATE_TIME_KEY: Name = META_FILE_KEY + "update"
}
/**
* Read data with supported envelope format and binary format. If envelope format is null, then read binary directly from file.
* The operation is blocking since it must read meta header. The reading of envelope body is lazy
*/
@DFInternal
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataFile(
public fun <T : Any> IOPlugin.readDataFile(
type: KType,
path: Path,
formatResolver: FileFormatResolver<T>,
): Data<T> {
val envelope = readEnvelopeFile(path, true)
val format = formatResolver(path, envelope.meta)
return envelope.toData(format)
val updatedMeta = envelope.meta.copy {
FileData.META_FILE_PATH_KEY put path.toString()
FileData.META_FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.META_FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.META_FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return Data(type, updatedMeta) {
envelope.data?.readWith(format) ?: error("Can't convert envelope without content to Data")
}
}
/**
* Add file/directory-based data tree item
* Read data with supported envelope format and binary format. If envelope format is null, then read binary directly from file.
* The operation is blocking since it must read meta header. The reading of envelope body is lazy
*/
context(IOPlugin) @DFExperimental
public fun DataSetBuilder<Any>.file(
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataFile(
path: Path,
formatResolver: FileFormatResolver<Any>,
) {
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
val data = readDataFile(path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
data(name, data)
} else {
//otherwise, read as directory
val data = readDataDirectory(path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
node(name, data)
noinline formatResolver: FileFormatResolver<T>,
): Data<T> = readDataFile(typeOf<T>(), path, formatResolver)
context(IOPlugin) @DFExperimental
private fun <T : Any> DataSetBuilder<T>.directory(path: Path, formatResolver: FileFormatResolver<T>) {
Files.list(path).toList().forEach { childPath ->
val fileName = childPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(childPath))
} else if (!fileName.startsWith("@")) {
file(childPath, formatResolver)
}
}
}
@ -70,29 +93,94 @@ public fun DataSetBuilder<Any>.file(
* Read the directory as a data node. If [path] is a zip archive, read it as directory
*/
@DFExperimental
public fun IOPlugin.readDataDirectory(
@DFInternal
public fun <T : Any> IOPlugin.readDataDirectory(
type: KType,
path: Path,
formatResolver: FileFormatResolver<Any>,
): DataTree<Any> {
formatResolver: FileFormatResolver<T>,
): DataTree<T> {
//read zipped data node
if (path.fileName != null && path.fileName.toString().endsWith(".zip")) {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fs = newZFS(path)
return readDataDirectory(fs.rootDirectories.first(), formatResolver)
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, mapOf("create" to "true"))
return readDataDirectory(type, fs.rootDirectories.first(), formatResolver)
}
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataTree<Any> {
Files.list(path).toList().forEach { path ->
val fileName = path.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(path))
} else if (!fileName.startsWith("@")) {
file(path, formatResolver)
return DataTree(type) {
directory(path, formatResolver)
}
}
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(
path: Path,
noinline formatResolver: FileFormatResolver<T>,
): DataTree<Any> = readDataDirectory(typeOf<T>(), path, formatResolver)
@OptIn(DFExperimental::class)
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@DFInternal
@DFExperimental
public fun <T : Any> IOPlugin.monitorDataDirectory(
type: KType,
path: Path,
formatResolver: FileFormatResolver<T>,
): DataSource<T> {
if (path.fileName.toString().endsWith(".zip")) error("Monitoring not supported for ZipFS")
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataSource(type, context) {
directory(path, formatResolver)
launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService()
path.register(
watchService,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE
)
do {
val key = watchService.take()
if (key != null) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
remove(eventPath.toName())
} else {
val fileName = eventPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(eventPath))
} else if (!fileName.startsWith("@")) {
file(eventPath, formatResolver)
}
}
}
key.reset()
}
} while (isActive && key != null)
}
}
}
/**
* Start monitoring given directory ([path]) as a [DataSource].
*/
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.monitorDataDirectory(
path: Path,
noinline formatResolver: FileFormatResolver<T>,
): DataSource<T> = monitorDataDirectory(typeOf<T>(), path, formatResolver)
/**
* Write data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
*/
@ -164,9 +252,8 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
}
}
@Suppress("BlockingMethodInNonBlockingContext")
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeZip(
public suspend fun <T : Any> FileData.writeZip(
path: Path,
tree: DataTree<T>,
format: IOFormat<T>,
@ -178,10 +265,12 @@ public suspend fun <T : Any> IOPlugin.writeZip(
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fos = Files.newOutputStream(actualFile,
val fos = Files.newOutputStream(
actualFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING)
StandardOpenOption.TRUNCATE_EXISTING
)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", DataTreeItem.Node(tree), format, envelopeFormat)
@ -189,3 +278,25 @@ public suspend fun <T : Any> IOPlugin.writeZip(
}
}
/**
* Add file/directory-based data tree item
*/
context(IOPlugin) @OptIn(DFInternal::class)
@DFExperimental
public fun <T : Any> DataSetBuilder<T>.file(
path: Path,
formatResolver: FileFormatResolver<out T>,
) {
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
val data = readDataFile(dataType, path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: path.nameWithoutExtension
data(name, data)
} else {
//otherwise, read as directory
val data = readDataDirectory(dataType, path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: path.nameWithoutExtension
node(name, data)
}
}

View File

@ -9,8 +9,11 @@ import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.io
import space.kscience.dataforge.io.readUtf8String
import space.kscience.dataforge.io.writeUtf8String
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.Test
import kotlin.test.assertEquals
@ -30,15 +33,13 @@ class FileDataTest {
object StringIOFormat : IOFormat<String> {
override val type: KType get() = typeOf<String>()
override fun writeObject(output: Output, obj: String) {
output.writeUtf8String(obj)
}
override fun readObject(input: Input): String {
return input.readUtf8String()
}
override fun readObject(input: Input): String = input.readUtf8String()
}
@Test
@ -50,7 +51,7 @@ class FileDataTest {
writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta, reconstructed["dir.a"]?.meta)
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
}
@ -63,10 +64,10 @@ class FileDataTest {
Global.io.run {
val zip = Files.createTempFile("df_data_node", ".zip")
runBlocking {
writeZip(zip, dataNode, StringIOFormat)
FileData.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta, reconstructed["dir.a"]?.meta)
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
}

View File

@ -1,7 +1,8 @@
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx4096m
kotlin.code.style=official
kotlin.mpp.stability.nowarn=true
#kotlin.incremental.js.ir=true
toolsVersion=0.11.5-kotlin-1.6.21