refactor file reading

This commit is contained in:
Alexander Nozik 2022-05-21 11:08:59 +03:00
parent 6bd8a7acbc
commit 4833128857
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
5 changed files with 111 additions and 97 deletions

View File

@ -15,7 +15,7 @@ import kotlin.reflect.typeOf
* A data element characterized by its meta
*/
@Type(Data.TYPE)
public interface Data<out T : Any> : Goal<T>, MetaRepr {
public interface Data<out T> : Goal<T>, MetaRepr {
/**
* Type marker for the data. The type is known before the calculation takes place so it could be checked.
*/

View File

@ -27,7 +27,7 @@ public interface IOReader<out T> {
public fun readObject(input: Input): T
}
public inline fun <reified T : Any> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> {
public inline fun <reified T> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> {
override val type: KType = typeOf<T>()
override fun readObject(input: Input): T = input.read()
@ -41,7 +41,7 @@ public fun interface IOWriter<in T> {
/**
* And interface for reading and writing objects into with IO streams
*/
public interface IOFormat<T : Any> : IOReader<T>, IOWriter<T>
public interface IOFormat<T> : IOReader<T>, IOWriter<T>
public fun <T : Any> Input.readObject(format: IOReader<T>): T = format.readObject(this@readObject)

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.workspace
import io.ktor.utils.io.streams.asOutput
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
@ -17,11 +16,13 @@ import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import java.nio.file.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import java.time.Instant
import kotlin.io.path.extension
import kotlin.io.path.nameWithoutExtension
import kotlin.io.path.readAttributes
@ -34,34 +35,20 @@ import kotlin.streams.toList
public typealias FileFormatResolver<T> = (path: Path, meta: Meta) -> IOReader<T>
public object FileData {
public val META_FILE_KEY: Name = "file".asName()
public val META_FILE_PATH_KEY: Name = META_FILE_KEY + "path"
public val META_FILE_EXTENSION_KEY: Name = META_FILE_KEY + "extension"
public val META_FILE_CREATE_TIME_KEY: Name = META_FILE_KEY + "created"
public val META_FILE_UPDATE_TIME_KEY: Name = META_FILE_KEY + "update"
}
public class FileData<T> internal constructor(private val data: Data<T>) : Data<T> by data {
public val path: String? get() = meta[META_FILE_PATH_KEY].string
public val extension: String? get() = meta[META_FILE_EXTENSION_KEY].string
@DFInternal
@DFExperimental
public fun <T : Any> IOPlugin.readDataFile(
type: KType,
path: Path,
formatResolver: FileFormatResolver<T>,
): Data<T> {
val envelope = readEnvelopeFile(path, true)
val format = formatResolver(path, envelope.meta)
val updatedMeta = envelope.meta.copy {
FileData.META_FILE_PATH_KEY put path.toString()
FileData.META_FILE_EXTENSION_KEY put path.extension
public val createdTime: Instant? get() = meta[META_FILE_CREATE_TIME_KEY].string?.let { Instant.parse(it) }
public val updatedTime: Instant? get() = meta[META_FILE_UPDATE_TIME_KEY].string?.let { Instant.parse(it) }
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.META_FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.META_FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return Data(type, updatedMeta) {
envelope.data?.readWith(format) ?: error("Can't convert envelope without content to Data")
public companion object {
public val META_FILE_KEY: Name = "file".asName()
public val META_FILE_PATH_KEY: Name = META_FILE_KEY + "path"
public val META_FILE_EXTENSION_KEY: Name = META_FILE_KEY + "extension"
public val META_FILE_CREATE_TIME_KEY: Name = META_FILE_KEY + "created"
public val META_FILE_UPDATE_TIME_KEY: Name = META_FILE_KEY + "update"
}
}
@ -72,10 +59,25 @@ public fun <T : Any> IOPlugin.readDataFile(
*/
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataFile(
public fun <T : Any> IOPlugin.readDataFile(
path: Path,
noinline formatResolver: FileFormatResolver<T>,
): Data<T> = readDataFile(typeOf<T>(), path, formatResolver)
formatResolver: FileFormatResolver<T>,
): FileData<T> {
val envelope = readEnvelopeFile(path, true)
val format = formatResolver(path, envelope.meta)
val updatedMeta = envelope.meta.copy {
FileData.META_FILE_PATH_KEY put path.toString()
FileData.META_FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.META_FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.META_FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return FileData(Data(format.type, updatedMeta) {
envelope.data?.readWith(format) ?: error("Can't convert envelope without content to Data")
})
}
context(IOPlugin) @DFExperimental
private fun <T : Any> DataSetBuilder<T>.directory(path: Path, formatResolver: FileFormatResolver<T>) {
@ -122,7 +124,6 @@ public inline fun <reified T : Any> IOPlugin.readDataDirectory(
): DataTree<Any> = readDataDirectory(typeOf<T>(), path, formatResolver)
@OptIn(DFExperimental::class)
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@ -219,65 +220,6 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
}
}
@Suppress("BlockingMethodInNonBlockingContext")
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
treeItem: DataTreeItem<T>,
dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
) {
withContext(Dispatchers.IO) {
when (treeItem) {
is DataTreeItem.Leaf -> {
//TODO add directory-based envelope writer
val envelope = treeItem.data.toEnvelope(dataFormat)
val entry = ZipEntry(name)
putNextEntry(entry)
asOutput().run {
envelopeFormat.writeEnvelope(this, envelope)
flush()
}
}
is DataTreeItem.Node -> {
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
treeItem.tree.items.forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}
}
}
}
}
@DFExperimental
public suspend fun <T : Any> FileData.writeZip(
path: Path,
tree: DataTree<T>,
format: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
) {
withContext(Dispatchers.IO) {
val actualFile = if (path.toString().endsWith(".zip")) {
path
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fos = Files.newOutputStream(
actualFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", DataTreeItem.Node(tree), format, envelopeFormat)
}
}
}
/**
* Add file/directory-based data tree item
*/
@ -289,7 +231,7 @@ public fun <T : Any> DataSetBuilder<T>.file(
) {
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
val data = readDataFile(dataType, path, formatResolver)
val data = readDataFile(path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: path.nameWithoutExtension
data(name, data)
} else {

View File

@ -0,0 +1,72 @@
package space.kscience.dataforge.workspace
import io.ktor.utils.io.streams.asOutput
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataTreeItem
import space.kscience.dataforge.io.EnvelopeFormat
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.TaggedEnvelopeFormat
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
treeItem: DataTreeItem<T>,
dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
when (treeItem) {
is DataTreeItem.Leaf -> {
//TODO add directory-based envelope writer
val envelope = treeItem.data.toEnvelope(dataFormat)
val entry = ZipEntry(name)
putNextEntry(entry)
asOutput().run {
envelopeFormat.writeEnvelope(this, envelope)
flush()
}
}
is DataTreeItem.Node -> {
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
treeItem.tree.items.forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}
}
}
}
/**
* Write this [DataTree] as a zip archive
*/
@DFExperimental
public suspend fun <T : Any> DataTree<T>.writeZip(
path: Path,
format: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
val actualFile = if (path.toString().endsWith(".zip")) {
path
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fos = Files.newOutputStream(
actualFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", DataTreeItem.Node(this@writeZip), format, envelopeFormat)
}
}

View File

@ -64,7 +64,7 @@ class FileDataTest {
Global.io.run {
val zip = Files.createTempFile("df_data_node", ".zip")
runBlocking {
FileData.writeZip(zip, dataNode, StringIOFormat)
dataNode.writeZip(zip, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))