Yet another change to storage2 API. Maybe final?
This commit is contained in:
parent
286a76960c
commit
0d1c027820
@ -3,10 +3,8 @@ package inr.numass.data
|
|||||||
import hep.dataforge.io.envelopes.Envelope
|
import hep.dataforge.io.envelopes.Envelope
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.MetaBuilder
|
import hep.dataforge.meta.MetaBuilder
|
||||||
import hep.dataforge.tables.Table
|
|
||||||
import inr.numass.data.api.*
|
import inr.numass.data.api.*
|
||||||
import inr.numass.data.storage.ProtoBlock
|
import inr.numass.data.storage.ProtoBlock
|
||||||
import kotlinx.coroutines.experimental.Deferred
|
|
||||||
import java.io.ByteArrayInputStream
|
import java.io.ByteArrayInputStream
|
||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
@ -22,8 +20,7 @@ import kotlin.streams.toList
|
|||||||
object NumassDataUtils {
|
object NumassDataUtils {
|
||||||
fun join(setName: String, sets: Collection<NumassSet>): NumassSet {
|
fun join(setName: String, sets: Collection<NumassSet>): NumassSet {
|
||||||
return object : NumassSet {
|
return object : NumassSet {
|
||||||
override val hvData: Deferred<Table?>
|
override suspend fun getHvData() = TODO()
|
||||||
get() = TODO("Join hv tables")
|
|
||||||
|
|
||||||
override val points: List<NumassPoint> by lazy {
|
override val points: List<NumassPoint> by lazy {
|
||||||
val points = sets.stream().flatMap<NumassPoint> { it.points.stream() }
|
val points = sets.stream().flatMap<NumassPoint> { it.points.stream() }
|
||||||
|
@ -12,7 +12,6 @@ import hep.dataforge.providers.Provider
|
|||||||
import hep.dataforge.providers.Provides
|
import hep.dataforge.providers.Provides
|
||||||
import hep.dataforge.providers.ProvidesNames
|
import hep.dataforge.providers.ProvidesNames
|
||||||
import hep.dataforge.tables.Table
|
import hep.dataforge.tables.Table
|
||||||
import kotlinx.coroutines.experimental.Deferred
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
@ -41,7 +40,7 @@ interface NumassSet : Named, Metoid, Iterable<NumassPoint>, Provider {
|
|||||||
val startTime: Instant
|
val startTime: Instant
|
||||||
get() = meta.optValue(NumassPoint.START_TIME_KEY).map<Instant> { it.time }.orElseGet { firstPoint.startTime }
|
get() = meta.optValue(NumassPoint.START_TIME_KEY).map<Instant> { it.time }.orElseGet { firstPoint.startTime }
|
||||||
|
|
||||||
val hvData: Deferred<Table?>
|
suspend fun getHvData(): Table?
|
||||||
|
|
||||||
override fun iterator(): Iterator<NumassPoint> {
|
override fun iterator(): Iterator<NumassPoint> {
|
||||||
return points.iterator()
|
return points.iterator()
|
||||||
|
@ -5,9 +5,6 @@ import hep.dataforge.meta.MetaBuilder
|
|||||||
import hep.dataforge.tables.Table
|
import hep.dataforge.tables.Table
|
||||||
import inr.numass.data.api.*
|
import inr.numass.data.api.*
|
||||||
import inr.numass.data.api.NumassPoint.Companion.HV_KEY
|
import inr.numass.data.api.NumassPoint.Companion.HV_KEY
|
||||||
import kotlinx.coroutines.experimental.CompletableDeferred
|
|
||||||
import kotlinx.coroutines.experimental.Deferred
|
|
||||||
import org.apache.commons.io.FilenameUtils
|
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.ByteOrder
|
import java.nio.ByteOrder
|
||||||
@ -26,7 +23,8 @@ import java.util.*
|
|||||||
*/
|
*/
|
||||||
class NumassDatFile @Throws(IOException::class)
|
class NumassDatFile @Throws(IOException::class)
|
||||||
constructor(override val name: String, private val path: Path, meta: Meta) : NumassSet {
|
constructor(override val name: String, private val path: Path, meta: Meta) : NumassSet {
|
||||||
override val hvData: Deferred<Table?> = CompletableDeferred(null)
|
|
||||||
|
override suspend fun getHvData(): Table? = null
|
||||||
|
|
||||||
override val meta: Meta
|
override val meta: Meta
|
||||||
|
|
||||||
@ -49,10 +47,6 @@ constructor(override val name: String, private val path: Path, meta: Meta) : Num
|
|||||||
throw RuntimeException(ex)
|
throw RuntimeException(ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Throws(IOException::class)
|
|
||||||
constructor(path: Path, meta: Meta) : this(FilenameUtils.getBaseName(path.fileName.toString()), path, meta) {
|
|
||||||
}
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
val head = readHead(path)//2048
|
val head = readHead(path)//2048
|
||||||
this.meta = MetaBuilder(meta)
|
this.meta = MetaBuilder(meta)
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package inr.numass.data.legacy
|
package inr.numass.data.legacy
|
||||||
|
|
||||||
import hep.dataforge.io.envelopes.EnvelopeTag
|
import hep.dataforge.io.envelopes.EnvelopeTag
|
||||||
import hep.dataforge.storage.filestorage.FileEnvelope
|
import hep.dataforge.storage.files.FileEnvelope
|
||||||
import inr.numass.NumassEnvelopeType
|
import inr.numass.NumassEnvelopeType
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
|
@ -4,8 +4,7 @@ import hep.dataforge.context.Context
|
|||||||
import hep.dataforge.data.DataFactory
|
import hep.dataforge.data.DataFactory
|
||||||
import hep.dataforge.data.DataNodeEditor
|
import hep.dataforge.data.DataNodeEditor
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.storage.commons.StorageManager
|
import hep.dataforge.storage.StorageManager
|
||||||
import hep.dataforge.storage.commons.StorageUtils
|
|
||||||
import inr.numass.data.api.NumassSet
|
import inr.numass.data.api.NumassSet
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -15,33 +15,26 @@
|
|||||||
*/
|
*/
|
||||||
package inr.numass.data.storage
|
package inr.numass.data.storage
|
||||||
|
|
||||||
|
import hep.dataforge.connections.ConnectionHelper
|
||||||
import hep.dataforge.context.Context
|
import hep.dataforge.context.Context
|
||||||
import hep.dataforge.exceptions.StorageException
|
|
||||||
import hep.dataforge.io.ColumnedDataReader
|
import hep.dataforge.io.ColumnedDataReader
|
||||||
import hep.dataforge.io.envelopes.Envelope
|
import hep.dataforge.io.envelopes.Envelope
|
||||||
|
import hep.dataforge.io.envelopes.EnvelopeReader
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.MetaBuilder
|
|
||||||
import hep.dataforge.providers.Provider
|
import hep.dataforge.providers.Provider
|
||||||
import hep.dataforge.storage.api.ObjectLoader
|
import hep.dataforge.storage.Loader
|
||||||
import hep.dataforge.storage.api.Storage
|
import hep.dataforge.storage.StorageElement
|
||||||
import hep.dataforge.storage.commons.DummyStorage
|
import hep.dataforge.storage.files.FileStorage
|
||||||
import hep.dataforge.storage.filestorage.FileStorage
|
import hep.dataforge.storage.files.FileStorageElement
|
||||||
import hep.dataforge.storage.loaders.AbstractLoader
|
|
||||||
import hep.dataforge.tables.Table
|
import hep.dataforge.tables.Table
|
||||||
import inr.numass.data.api.NumassPoint
|
import inr.numass.data.api.NumassPoint
|
||||||
import inr.numass.data.api.NumassSet
|
import inr.numass.data.api.NumassSet
|
||||||
import inr.numass.data.legacy.NumassFileEnvelope
|
|
||||||
import kotlinx.coroutines.experimental.CoroutineStart
|
|
||||||
import kotlinx.coroutines.experimental.Deferred
|
|
||||||
import kotlinx.coroutines.experimental.async
|
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import kotlin.reflect.KClass
|
||||||
import java.util.function.Supplier
|
|
||||||
import java.util.stream.Stream
|
|
||||||
import kotlin.streams.toList
|
import kotlin.streams.toList
|
||||||
|
|
||||||
|
|
||||||
@ -51,41 +44,28 @@ import kotlin.streams.toList
|
|||||||
* @author darksnake
|
* @author darksnake
|
||||||
*/
|
*/
|
||||||
class NumassDataLoader(
|
class NumassDataLoader(
|
||||||
storage: Storage,
|
override val context: Context,
|
||||||
name: String,
|
override val parent: StorageElement?,
|
||||||
meta: Meta,
|
override val name: String,
|
||||||
private val items: Map<String, Supplier<out Envelope>>,
|
override val path: Path
|
||||||
override var isReadOnly: Boolean = true
|
) : Loader<NumassPoint>, NumassSet, Provider, FileStorageElement {
|
||||||
) : AbstractLoader(storage, name, meta), ObjectLoader<Envelope>, NumassSet, Provider {
|
|
||||||
|
|
||||||
override val meta: Meta = items[META_FRAGMENT_NAME]?.get()?.meta ?: Meta.empty()
|
override val type: KClass<NumassPoint> = NumassPoint::class
|
||||||
|
|
||||||
private val hvEnvelope: Envelope?
|
private val _connectionHelper = ConnectionHelper(this)
|
||||||
get() = items[HV_FRAGMENT_NAME]?.get()
|
|
||||||
|
|
||||||
private val pointEnvelopes: Stream<Envelope>
|
override fun getConnectionHelper(): ConnectionHelper =_connectionHelper
|
||||||
get() = items.entries.stream()
|
|
||||||
.filter { entry -> entry.key.startsWith(POINT_FRAGMENT_NAME) }
|
|
||||||
.map { entry -> entry.value.get() }
|
|
||||||
.sorted(Comparator.comparing<Envelope, Int> { t -> t.meta.getInt("external_meta.point_index", -1) })
|
|
||||||
|
|
||||||
val isReversed: Boolean
|
|
||||||
get() = this.meta.getBoolean("iteration_info.reverse", false)
|
|
||||||
|
|
||||||
override val isEmpty: Boolean
|
override val meta: Meta by lazy {
|
||||||
get() = items.isEmpty()
|
FileStorage.resolveMeta(path) ?: Meta.empty()
|
||||||
|
|
||||||
override val description: String = this.meta.getString("description", "").replace("\\n", "\n")
|
|
||||||
|
|
||||||
override fun fragmentNames(): Collection<String> {
|
|
||||||
return items.keys
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override val hvData: Deferred<Table?>
|
override suspend fun getHvData(): Table? {
|
||||||
get() = async(start = CoroutineStart.LAZY) {
|
val hvEnvelope = path.resolve(HV_FRAGMENT_NAME)?.let { EnvelopeReader.readFile(it) }
|
||||||
hvEnvelope?.let { hvEnvelope ->
|
return hvEnvelope?.let {
|
||||||
try {
|
try {
|
||||||
ColumnedDataReader(hvEnvelope.data.stream, "timestamp", "block", "value").toTable()
|
ColumnedDataReader(it.data.stream, "timestamp", "block", "value").toTable()
|
||||||
} catch (ex: IOException) {
|
} catch (ex: IOException) {
|
||||||
LoggerFactory.getLogger(javaClass).error("Failed to load HV data from file", ex)
|
LoggerFactory.getLogger(javaClass).error("Failed to load HV data from file", ex)
|
||||||
null
|
null
|
||||||
@ -94,30 +74,30 @@ class NumassDataLoader(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private val pointEnvelopes: List<Envelope>
|
||||||
|
get() = Files.list(path)
|
||||||
|
.filter { it.fileName.toString().startsWith(POINT_FRAGMENT_NAME) }
|
||||||
|
.map { EnvelopeReader.readFile(it) }.toList()
|
||||||
|
|
||||||
|
val isReversed: Boolean
|
||||||
|
get() = this.meta.getBoolean("iteration_info.reverse", false)
|
||||||
|
|
||||||
|
val description: String
|
||||||
|
get() = this.meta.getString("description", "").replace("\\n", "\n")
|
||||||
|
|
||||||
|
|
||||||
override val points: List<NumassPoint>
|
override val points: List<NumassPoint>
|
||||||
get() {
|
get() = pointEnvelopes.map {
|
||||||
return pointEnvelopes.map {
|
|
||||||
NumassPoint.read(it)
|
NumassPoint.read(it)
|
||||||
}.toList()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun pull(fragmentName: String): Envelope {
|
|
||||||
//PENDING read data to memory?
|
|
||||||
return items[fragmentName]?.get()
|
|
||||||
?: throw StorageException("The fragment with name $fragmentName is not found in the loader $name")
|
|
||||||
}
|
|
||||||
|
|
||||||
@Throws(StorageException::class)
|
|
||||||
override fun push(fragmentName: String, data: Envelope) {
|
|
||||||
tryPush()
|
|
||||||
TODO()
|
|
||||||
}
|
|
||||||
|
|
||||||
override val startTime: Instant
|
override val startTime: Instant
|
||||||
get() = meta.optValue("start_time").map<Instant> { it.time }.orElseGet { super.startTime }
|
get() = meta.optValue("start_time").map<Instant> { it.time }.orElseGet { super.startTime }
|
||||||
|
|
||||||
override val isOpen: Boolean
|
override suspend fun open() {
|
||||||
get() = true
|
|
||||||
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
//do nothing
|
//do nothing
|
||||||
@ -125,85 +105,86 @@ class NumassDataLoader(
|
|||||||
|
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
//
|
||||||
@Throws(IOException::class)
|
// @Throws(IOException::class)
|
||||||
fun fromFile(storage: Storage, zipFile: Path): NumassDataLoader {
|
// fun fromFile(storage: Storage, zipFile: Path): NumassDataLoader {
|
||||||
throw UnsupportedOperationException("TODO")
|
// throw UnsupportedOperationException("TODO")
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
|
//
|
||||||
/**
|
// /**
|
||||||
* Construct numass loader from directory
|
// * Construct numass loader from directory
|
||||||
*
|
// *
|
||||||
* @param storage
|
// * @param storage
|
||||||
* @param directory
|
// * @param directory
|
||||||
* @return
|
// * @return
|
||||||
* @throws IOException
|
// * @throws IOException
|
||||||
*/
|
// */
|
||||||
@Throws(IOException::class)
|
// @Throws(IOException::class)
|
||||||
fun fromDir(storage: Storage, directory: Path, name: String = FileStorage.entryName(directory)): NumassDataLoader {
|
// fun fromDir(storage: Storage, directory: Path, name: String = FileStorage.entryName(directory)): NumassDataLoader {
|
||||||
if (!Files.isDirectory(directory)) {
|
// if (!Files.isDirectory(directory)) {
|
||||||
throw IllegalArgumentException("Numass data directory required")
|
// throw IllegalArgumentException("Numass data directory required")
|
||||||
}
|
// }
|
||||||
val annotation = MetaBuilder("loader")
|
// val annotation = MetaBuilder("loader")
|
||||||
.putValue("type", "numass")
|
// .putValue("type", "numass")
|
||||||
.putValue("numass.loaderFormat", "dir")
|
// .putValue("numass.loaderFormat", "dir")
|
||||||
// .setValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime()))
|
// // .setValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime()))
|
||||||
.build()
|
// .build()
|
||||||
|
//
|
||||||
//FIXME envelopes are lazy do we need to do additional lazy evaluations here?
|
// //FIXME envelopes are lazy do we need to do additional lazy evaluations here?
|
||||||
val items = LinkedHashMap<String, Supplier<out Envelope>>()
|
// val items = LinkedHashMap<String, Supplier<out Envelope>>()
|
||||||
|
//
|
||||||
Files.list(directory).filter { file ->
|
// Files.list(directory).filter { file ->
|
||||||
val fileName = file.fileName.toString()
|
// val fileName = file.fileName.toString()
|
||||||
(fileName == META_FRAGMENT_NAME
|
// (fileName == META_FRAGMENT_NAME
|
||||||
|| fileName == HV_FRAGMENT_NAME
|
// || fileName == HV_FRAGMENT_NAME
|
||||||
|| fileName.startsWith(POINT_FRAGMENT_NAME))
|
// || fileName.startsWith(POINT_FRAGMENT_NAME))
|
||||||
}.forEach { file ->
|
// }.forEach { file ->
|
||||||
try {
|
// try {
|
||||||
items[FileStorage.entryName(file)] = Supplier { NumassFileEnvelope.open(file, true) }
|
// items[FileStorage.entryName(file)] = Supplier { NumassFileEnvelope.open(file, true) }
|
||||||
} catch (ex: Exception) {
|
// } catch (ex: Exception) {
|
||||||
LoggerFactory.getLogger(NumassDataLoader::class.java)
|
// LoggerFactory.getLogger(NumassDataLoader::class.java)
|
||||||
.error("Can't load numass data directory " + FileStorage.entryName(directory), ex)
|
// .error("Can't load numass data directory " + FileStorage.entryName(directory), ex)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
return NumassDataLoader(storage, name, annotation, items)
|
// return NumassDataLoader(storage, name, annotation, items)
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
fun fromDir(context: Context, directory: Path, name: String = FileStorage.entryName(directory)): NumassDataLoader {
|
// fun fromDir(context: Context, directory: Path, name: String = FileStorage.entryName(directory)): NumassDataLoader {
|
||||||
return fromDir(DummyStorage(context), directory, name)
|
// return fromDir(DummyStorage(context), directory, name)
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
/**
|
// /**
|
||||||
* "start_time": "2016-04-20T04:08:50",
|
// * "start_time": "2016-04-20T04:08:50",
|
||||||
*
|
// *
|
||||||
* @param meta
|
// * @param meta
|
||||||
* @return
|
// * @return
|
||||||
*/
|
// */
|
||||||
private fun readTime(meta: Meta): Instant {
|
// private fun readTime(meta: Meta): Instant {
|
||||||
return if (meta.hasValue("start_time")) {
|
// return if (meta.hasValue("start_time")) {
|
||||||
meta.getValue("start_time").time
|
// meta.getValue("start_time").time
|
||||||
} else {
|
// } else {
|
||||||
Instant.EPOCH
|
// Instant.EPOCH
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The name of informational meta file in numass data directory
|
* The name of informational meta file in numass data directory
|
||||||
*/
|
*/
|
||||||
val META_FRAGMENT_NAME = "meta"
|
const val META_FRAGMENT_NAME = "meta"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The beginning of point fragment name
|
* The beginning of point fragment name
|
||||||
*/
|
*/
|
||||||
val POINT_FRAGMENT_NAME = "p"
|
const val POINT_FRAGMENT_NAME = "p"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The beginning of hv fragment name
|
* The beginning of hv fragment name
|
||||||
*/
|
*/
|
||||||
val HV_FRAGMENT_NAME = "voltage"
|
const val HV_FRAGMENT_NAME = "voltage"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -15,155 +15,51 @@
|
|||||||
*/
|
*/
|
||||||
package inr.numass.data.storage
|
package inr.numass.data.storage
|
||||||
|
|
||||||
import hep.dataforge.context.Context
|
|
||||||
import hep.dataforge.events.Event
|
import hep.dataforge.events.Event
|
||||||
import hep.dataforge.events.EventBuilder
|
import hep.dataforge.events.EventBuilder
|
||||||
import hep.dataforge.exceptions.StorageException
|
import hep.dataforge.io.envelopes.TaglessEnvelopeType
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.storage.filestorage.FileStorage
|
import hep.dataforge.nullable
|
||||||
import inr.numass.data.api.NumassSet
|
import hep.dataforge.storage.files.FileStorage
|
||||||
import inr.numass.data.legacy.NumassDatFile
|
import hep.dataforge.storage.files.FileStorageElement
|
||||||
import org.slf4j.LoggerFactory
|
import hep.dataforge.storage.files.FileStorageElementType
|
||||||
import java.io.IOException
|
|
||||||
import java.nio.ByteBuffer
|
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.nio.file.StandardOpenOption.CREATE
|
import java.nio.file.StandardOpenOption
|
||||||
import java.nio.file.StandardOpenOption.WRITE
|
|
||||||
import java.util.*
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The file storage containing numass data directories or zips.
|
* Numass storage directory. Works as a normal directory, but creates a numass loader from each directory with meta
|
||||||
*
|
|
||||||
*
|
|
||||||
* Any subdirectory is treated as numass data directory. Any zip must have
|
|
||||||
* `NUMASS_ZIP_EXTENSION` extension to be recognized. Any other files are
|
|
||||||
* ignored.
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* @author Alexander Nozik
|
|
||||||
*/
|
*/
|
||||||
class NumassStorage : FileStorage {
|
class NumassDirectory : FileStorageElementType {
|
||||||
|
override val name: String = "inr.numass.storage.directory"
|
||||||
|
|
||||||
val description: String
|
|
||||||
get() = meta.getString("description", "")
|
|
||||||
|
|
||||||
private constructor(parent: FileStorage, config: Meta, shelf: String) : super(parent, config, shelf)
|
|
||||||
|
|
||||||
constructor(context: Context, config: Meta, path: Path) : super(context, config, path)
|
//TODO create mutable loader
|
||||||
|
override suspend fun create(parent: FileStorage, meta: Meta): FileStorageElement {
|
||||||
init {
|
val fileName = meta.getString("name")
|
||||||
refresh()
|
val path: Path = parent.path.resolve(fileName)
|
||||||
|
Files.createDirectory(path)
|
||||||
|
//writing meta to directory
|
||||||
|
val metaFile = path.resolve("meta.df")
|
||||||
|
Files.newOutputStream(metaFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE).use {
|
||||||
|
TaglessEnvelopeType.INSTANCE.writer.write(it, FileStorage.createMetaEnvelope(meta))
|
||||||
|
}
|
||||||
|
return FileStorage(parent.context, meta, path, parent, this)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun refresh() {
|
override suspend fun read(parent: FileStorage, path: Path): FileStorageElement? {
|
||||||
try {
|
val meta = FileStorage.resolveMeta(path)
|
||||||
this.shelves.clear()
|
val type = meta?.optString("type").nullable?.let { type -> parent.types.find { it.name == type } }
|
||||||
this.loaders.clear()
|
return when {
|
||||||
Files.list(dataDir).forEach { file ->
|
type == this || Files.isDirectory(path) && meta != null -> NumassDataLoader(parent.context, parent, path.fileName.toString(), path)
|
||||||
try {
|
Files.isDirectory(path) -> FileStorage(parent.context, meta ?: Meta.empty(), path, parent, this)
|
||||||
if (Files.isDirectory(file)) {
|
else -> type?.read(parent, path)
|
||||||
val metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME)
|
|
||||||
if (Files.exists(metaFile)) {
|
|
||||||
this.loaders[entryName(file)] = NumassDataLoader.fromDir(this, file)
|
|
||||||
} else {
|
|
||||||
this.shelves[entryName(file)] = NumassStorage(this, meta, entryName(file))
|
|
||||||
}
|
|
||||||
} else if (file.fileName.endsWith(NUMASS_ZIP_EXTENSION)) {
|
|
||||||
this.loaders[entryName(file)] = NumassDataLoader.fromFile(this, file)
|
|
||||||
} else {
|
|
||||||
//updating non-numass loader files
|
|
||||||
updateFile(file)
|
|
||||||
}
|
|
||||||
} catch (ex: IOException) {
|
|
||||||
LoggerFactory.getLogger(javaClass).error("Error while creating numass loader", ex)
|
|
||||||
} catch (ex: StorageException) {
|
|
||||||
LoggerFactory.getLogger(javaClass).error("Error while creating numass group", ex)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ex: IOException) {
|
}
|
||||||
throw RuntimeException(ex)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
class NumassDataPointEvent(meta: Meta) : Event(meta) {
|
||||||
|
|
||||||
@Throws(StorageException::class)
|
|
||||||
fun pushNumassData(path: String?, fileName: String, data: ByteBuffer) {
|
|
||||||
if (path == null || path.isEmpty()) {
|
|
||||||
pushNumassData(fileName, data)
|
|
||||||
} else {
|
|
||||||
val st = buildShelf(path) as NumassStorage
|
|
||||||
st.pushNumassData(fileName, data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read nm.zip content and write it as a new nm.zip file
|
|
||||||
*
|
|
||||||
* @param fileName
|
|
||||||
*/
|
|
||||||
@Throws(StorageException::class)
|
|
||||||
fun pushNumassData(fileName: String, data: ByteBuffer) {
|
|
||||||
//FIXME move zip to internal
|
|
||||||
try {
|
|
||||||
val nmFile = dataDir.resolve(fileName + NUMASS_ZIP_EXTENSION)
|
|
||||||
if (Files.exists(nmFile)) {
|
|
||||||
LoggerFactory.getLogger(javaClass).warn("Trying to rewrite existing numass data file {}", nmFile.toString())
|
|
||||||
}
|
|
||||||
Files.newByteChannel(nmFile, CREATE, WRITE).use { channel -> channel.write(data) }
|
|
||||||
|
|
||||||
dispatchEvent(NumassDataPointEvent.build(name, fileName, Files.size(nmFile).toInt()))
|
|
||||||
} catch (ex: IOException) {
|
|
||||||
throw StorageException(ex)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Throws(StorageException::class)
|
|
||||||
override fun createShelf(shelfConfiguration: Meta, shelfName: String): NumassStorage {
|
|
||||||
return NumassStorage(this, shelfConfiguration, shelfName)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A list of legacy DAT files in the directory
|
|
||||||
*
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
fun legacyFiles(): List<NumassSet> {
|
|
||||||
try {
|
|
||||||
val files = ArrayList<NumassSet>()
|
|
||||||
Files.list(dataDir).forEach { file ->
|
|
||||||
if (Files.isRegularFile(file) && file.fileName.toString().toLowerCase().endsWith(".dat")) {
|
|
||||||
//val name = file.fileName.toString()
|
|
||||||
try {
|
|
||||||
files.add(NumassDatFile(file, Meta.empty()))
|
|
||||||
} catch (ex: Exception) {
|
|
||||||
LoggerFactory.getLogger(javaClass).error("Error while reading legacy numass file " + file.fileName, ex)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return files
|
|
||||||
} catch (ex: IOException) {
|
|
||||||
throw RuntimeException(ex)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Throws(Exception::class)
|
|
||||||
override fun close() {
|
|
||||||
super.close()
|
|
||||||
//close remote file system after use
|
|
||||||
try {
|
|
||||||
dataDir.fileSystem.close()
|
|
||||||
} catch (ex: UnsupportedOperationException) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
class NumassDataPointEvent(meta: Meta) : Event(meta) {
|
|
||||||
|
|
||||||
val fileSize: Int = meta.getInt(FILE_SIZE_KEY, 0)
|
val fileSize: Int = meta.getInt(FILE_SIZE_KEY, 0)
|
||||||
|
|
||||||
@ -191,12 +87,145 @@ class NumassStorage : FileStorage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
|
|
||||||
const val NUMASS_ZIP_EXTENSION = ".nm.zip"
|
|
||||||
const val NUMASS_DATA_LOADER_TYPE = "numassData"
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
///**
|
||||||
|
// * The file storage containing numass data directories or zips.
|
||||||
|
// *
|
||||||
|
// *
|
||||||
|
// * Any subdirectory is treated as numass data directory. Any zip must have
|
||||||
|
// * `NUMASS_ZIP_EXTENSION` extension to be recognized. Any other files are
|
||||||
|
// * ignored.
|
||||||
|
// *
|
||||||
|
// *
|
||||||
|
// * @author Alexander Nozik
|
||||||
|
// */
|
||||||
|
//class NumassStorage : FileStorage {
|
||||||
|
//
|
||||||
|
// val description: String
|
||||||
|
// get() = meta.getString("description", "")
|
||||||
|
//
|
||||||
|
// private constructor(parent: FileStorage, config: Meta, shelf: String) : super(parent, config, shelf)
|
||||||
|
//
|
||||||
|
// constructor(context: Context, config: Meta, path: Path) : super(context, config, path)
|
||||||
|
//
|
||||||
|
// init {
|
||||||
|
// refresh()
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// override fun refresh() {
|
||||||
|
// try {
|
||||||
|
// this.shelves.clear()
|
||||||
|
// this.loaders.clear()
|
||||||
|
// Files.list(dataDir).forEach { file ->
|
||||||
|
// try {
|
||||||
|
// if (Files.isDirectory(file)) {
|
||||||
|
// val metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME)
|
||||||
|
// if (Files.exists(metaFile)) {
|
||||||
|
// this.loaders[entryName(file)] = NumassDataLoader.fromDir(this, file)
|
||||||
|
// } else {
|
||||||
|
// this.shelves[entryName(file)] = NumassStorage(this, meta, entryName(file))
|
||||||
|
// }
|
||||||
|
// } else if (file.fileName.endsWith(NUMASS_ZIP_EXTENSION)) {
|
||||||
|
// this.loaders[entryName(file)] = NumassDataLoader.fromFile(this, file)
|
||||||
|
// } else {
|
||||||
|
// //updating non-numass loader files
|
||||||
|
// updateFile(file)
|
||||||
|
// }
|
||||||
|
// } catch (ex: IOException) {
|
||||||
|
// LoggerFactory.getLogger(javaClass).error("Error while creating numass loader", ex)
|
||||||
|
// } catch (ex: StorageException) {
|
||||||
|
// LoggerFactory.getLogger(javaClass).error("Error while creating numass group", ex)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// } catch (ex: IOException) {
|
||||||
|
// throw RuntimeException(ex)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Throws(StorageException::class)
|
||||||
|
// fun pushNumassData(path: String?, fileName: String, data: ByteBuffer) {
|
||||||
|
// if (path == null || path.isEmpty()) {
|
||||||
|
// pushNumassData(fileName, data)
|
||||||
|
// } else {
|
||||||
|
// val st = buildShelf(path) as NumassStorage
|
||||||
|
// st.pushNumassData(fileName, data)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// /**
|
||||||
|
// * Read nm.zip content and write it as a new nm.zip file
|
||||||
|
// *
|
||||||
|
// * @param fileName
|
||||||
|
// */
|
||||||
|
// @Throws(StorageException::class)
|
||||||
|
// fun pushNumassData(fileName: String, data: ByteBuffer) {
|
||||||
|
// //FIXME move zip to internal
|
||||||
|
// try {
|
||||||
|
// val nmFile = dataDir.resolve(fileName + NUMASS_ZIP_EXTENSION)
|
||||||
|
// if (Files.exists(nmFile)) {
|
||||||
|
// LoggerFactory.getLogger(javaClass).warn("Trying to rewrite existing numass data file {}", nmFile.toString())
|
||||||
|
// }
|
||||||
|
// Files.newByteChannel(nmFile, CREATE, WRITE).use { channel -> channel.write(data) }
|
||||||
|
//
|
||||||
|
// dispatchEvent(NumassDataPointEvent.build(name, fileName, Files.size(nmFile).toInt()))
|
||||||
|
// } catch (ex: IOException) {
|
||||||
|
// throw StorageException(ex)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Throws(StorageException::class)
|
||||||
|
// override fun createShelf(shelfConfiguration: Meta, shelfName: String): NumassStorage {
|
||||||
|
// return NumassStorage(this, shelfConfiguration, shelfName)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// /**
|
||||||
|
// * A list of legacy DAT files in the directory
|
||||||
|
// *
|
||||||
|
// * @return
|
||||||
|
// */
|
||||||
|
// fun legacyFiles(): List<NumassSet> {
|
||||||
|
// try {
|
||||||
|
// val files = ArrayList<NumassSet>()
|
||||||
|
// Files.list(dataDir).forEach { file ->
|
||||||
|
// if (Files.isRegularFile(file) && file.fileName.toString().toLowerCase().endsWith(".dat")) {
|
||||||
|
// //val name = file.fileName.toString()
|
||||||
|
// try {
|
||||||
|
// files.add(NumassDatFile(file, Meta.empty()))
|
||||||
|
// } catch (ex: Exception) {
|
||||||
|
// LoggerFactory.getLogger(javaClass).error("Error while reading legacy numass file " + file.fileName, ex)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// return files
|
||||||
|
// } catch (ex: IOException) {
|
||||||
|
// throw RuntimeException(ex)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Throws(Exception::class)
|
||||||
|
// override fun close() {
|
||||||
|
// super.close()
|
||||||
|
// //close remote file system after use
|
||||||
|
// try {
|
||||||
|
// dataDir.fileSystem.close()
|
||||||
|
// } catch (ex: UnsupportedOperationException) {
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
|
||||||
|
//
|
||||||
|
// companion object {
|
||||||
|
//
|
||||||
|
// const val NUMASS_ZIP_EXTENSION = ".nm.zip"
|
||||||
|
// const val NUMASS_DATA_LOADER_TYPE = "numassData"
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//}
|
||||||
|
@ -156,7 +156,7 @@ object NumassUtils {
|
|||||||
builder.putData(pointName, point, pointMeta)
|
builder.putData(pointName, point, pointMeta)
|
||||||
}
|
}
|
||||||
runBlocking {
|
runBlocking {
|
||||||
set.hvData.await()?.let { hv -> builder.putData("hv", hv, Meta.empty()) }
|
set.getHvData().await()?.let { hv -> builder.putData("hv", hv, Meta.empty()) }
|
||||||
}
|
}
|
||||||
return builder.build()
|
return builder.build()
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,6 @@ import hep.dataforge.configure
|
|||||||
import hep.dataforge.fx.dfIcon
|
import hep.dataforge.fx.dfIcon
|
||||||
import hep.dataforge.fx.plots.PlotContainer
|
import hep.dataforge.fx.plots.PlotContainer
|
||||||
import hep.dataforge.fx.runGoal
|
import hep.dataforge.fx.runGoal
|
||||||
import hep.dataforge.fx.ui
|
|
||||||
import hep.dataforge.plots.PlotFrame
|
import hep.dataforge.plots.PlotFrame
|
||||||
import hep.dataforge.plots.data.DataPlot
|
import hep.dataforge.plots.data.DataPlot
|
||||||
import hep.dataforge.plots.data.TimePlot
|
import hep.dataforge.plots.data.TimePlot
|
||||||
@ -54,7 +53,7 @@ class HVView : View(title = "High voltage time plot", icon = ImageView(dfIcon))
|
|||||||
if (change.wasAdded()) {
|
if (change.wasAdded()) {
|
||||||
runLater { container.progress = -1.0 }
|
runLater { container.progress = -1.0 }
|
||||||
runGoal("hvData[${change.key}]") {
|
runGoal("hvData[${change.key}]") {
|
||||||
change.valueAdded.hvData.await()
|
change.valueAdded.getHvData().await()
|
||||||
} ui { hvData ->
|
} ui { hvData ->
|
||||||
hvData?.let {
|
hvData?.let {
|
||||||
for (dp in it) {
|
for (dp in it) {
|
||||||
|
Loading…
Reference in New Issue
Block a user