Fixed write performance
This commit is contained in:
parent
11552b6a6f
commit
6e4606066e
@ -24,19 +24,18 @@ import hep.dataforge.meta.MetaBuilder;
|
||||
import hep.dataforge.storage.api.ObjectLoader;
|
||||
import hep.dataforge.storage.api.Storage;
|
||||
import hep.dataforge.storage.filestorage.FileEnvelope;
|
||||
import hep.dataforge.storage.filestorage.FileStorage;
|
||||
import hep.dataforge.storage.loaders.AbstractLoader;
|
||||
import hep.dataforge.tables.Table;
|
||||
import inr.numass.data.*;
|
||||
import org.apache.commons.vfs2.FileObject;
|
||||
import org.apache.commons.vfs2.VFS;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
@ -44,7 +43,6 @@ import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.commons.vfs2.FileType.FOLDER;
|
||||
|
||||
/**
|
||||
* The reader for numass main detector data directory or zip format;
|
||||
@ -53,6 +51,84 @@ import static org.apache.commons.vfs2.FileType.FOLDER;
|
||||
*/
|
||||
public class NumassDataLoader extends AbstractLoader implements ObjectLoader<Envelope>, NumassData {
|
||||
|
||||
|
||||
public static NumassDataLoader fromFile(Storage storage, Path zipFile) throws IOException {
|
||||
throw new UnsupportedOperationException("TODO");
|
||||
// FileObject zipRoot = VFS.getManager().createFileSystem(zipFile);
|
||||
// return fromDir(storage, zipRoot, zipFile.getName().getBaseName());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Construct numass loader from directory
|
||||
*
|
||||
* @param storage
|
||||
* @param directory
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static NumassDataLoader fromDir(Storage storage, Path directory, String name) throws IOException {
|
||||
if (!Files.isDirectory(directory)) {
|
||||
throw new IllegalArgumentException("Numass data directory required");
|
||||
}
|
||||
Meta annotation = new MetaBuilder("loader")
|
||||
.putValue("type", "numass")
|
||||
.putValue("numass.loaderFormat", "dir")
|
||||
// .putValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime()))
|
||||
.build();
|
||||
|
||||
if (name == null || name.isEmpty()) {
|
||||
name = FileStorage.entryName(directory);
|
||||
}
|
||||
|
||||
//FIXME envelopes are lazy do we need to do additional lazy evaluations here?
|
||||
Map<String, Supplier<Envelope>> items = new LinkedHashMap<>();
|
||||
|
||||
Files.list(directory).filter(file -> {
|
||||
String fileName = file.getFileName().toString();
|
||||
return fileName.equals(META_FRAGMENT_NAME)
|
||||
|| fileName.equals(HV_FRAGMENT_NAME)
|
||||
|| fileName.startsWith(POINT_FRAGMENT_NAME);
|
||||
}).forEach(file -> {
|
||||
try {
|
||||
items.put(FileStorage.entryName(file), () -> FileEnvelope.open(file, true));
|
||||
} catch (Exception ex) {
|
||||
LoggerFactory.getLogger(NumassDataLoader.class)
|
||||
.error("Can't load numass data directory " + FileStorage.entryName(directory), ex);
|
||||
}
|
||||
});
|
||||
|
||||
return new NumassDataLoader(storage, name, annotation, items);
|
||||
}
|
||||
|
||||
|
||||
// private static Envelope readFile(Path file) {
|
||||
// String fileName = file.getFileName().toString();
|
||||
// if (fileName.equals(META_FRAGMENT_NAME)
|
||||
// || fileName.equals(HV_FRAGMENT_NAME)
|
||||
// || fileName.startsWith(POINT_FRAGMENT_NAME)) {
|
||||
// return FileEnvelope.open(file, true);
|
||||
// } else {
|
||||
// return null;
|
||||
// }
|
||||
// //}
|
||||
// }
|
||||
|
||||
/**
|
||||
* "start_time": "2016-04-20T04:08:50",
|
||||
*
|
||||
* @param meta
|
||||
* @return
|
||||
*/
|
||||
private static Instant readTime(Meta meta) {
|
||||
if (meta.hasValue("start_time")) {
|
||||
return meta.getValue("start_time").timeValue();
|
||||
} else {
|
||||
return Instant.EPOCH;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The name of informational meta file in numass data directory
|
||||
*/
|
||||
@ -75,90 +151,12 @@ public class NumassDataLoader extends AbstractLoader implements ObjectLoader<Env
|
||||
readOnly = true;
|
||||
}
|
||||
|
||||
private NumassDataLoader(Storage storage, String name, Meta annotation, Map<String, Supplier<Envelope>> items) {
|
||||
super(storage, name, annotation);
|
||||
private NumassDataLoader(Storage storage, String name, Meta meta, Map<String, Supplier<Envelope>> items) {
|
||||
super(storage, name, meta);
|
||||
this.itemsProvider = items;
|
||||
readOnly = true;
|
||||
}
|
||||
|
||||
public static NumassDataLoader fromLocalDir(Storage storage, File directory) throws IOException {
|
||||
return fromDir(storage, VFS.getManager().toFileObject(directory), null);
|
||||
}
|
||||
|
||||
public static NumassDataLoader fromZip(Storage storage, FileObject zipFile) throws IOException {
|
||||
FileObject zipRoot = VFS.getManager().createFileSystem(zipFile);
|
||||
return fromDir(storage, zipRoot, zipFile.getName().getBaseName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct numass loader from directory
|
||||
*
|
||||
* @param storage
|
||||
* @param directory
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static NumassDataLoader fromDir(Storage storage, FileObject directory, String name) throws IOException {
|
||||
if (directory.getType() != FOLDER || !directory.exists()) {
|
||||
throw new IllegalArgumentException("numass data directory reuired");
|
||||
}
|
||||
Meta annotation = new MetaBuilder("loader")
|
||||
.putValue("type", "numass")
|
||||
.putValue("numass.loaderFormat", "dir")
|
||||
.putValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime()))
|
||||
.build();
|
||||
|
||||
if (name == null || name.isEmpty()) {
|
||||
name = directory.getName().getBaseName();
|
||||
}
|
||||
|
||||
URL url = directory.getURL();
|
||||
|
||||
//FIXME envelopes are lazy do we need to do additional lazy evaluations here?
|
||||
Map<String, Supplier<Envelope>> items = new LinkedHashMap<>();
|
||||
|
||||
try (FileObject dir = VFS.getManager().resolveFile(url.toString())) {
|
||||
|
||||
for (FileObject it : dir.getChildren()) {
|
||||
items.put(it.getName().getBaseName(), () -> readFile(it));
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
LoggerFactory.getLogger(NumassDataLoader.class)
|
||||
.error("Can't load numass data directory " + directory.getName().getBaseName(), ex);
|
||||
return null;
|
||||
}
|
||||
|
||||
return new NumassDataLoader(storage, name, annotation, items);
|
||||
}
|
||||
|
||||
private static Envelope readFile(FileObject file) {
|
||||
//VFS file reading seems to work badly in parallel
|
||||
//synchronized (Global.instance()) {
|
||||
String fileName = file.getName().getBaseName();
|
||||
if (fileName.equals(META_FRAGMENT_NAME)
|
||||
|| fileName.equals(HV_FRAGMENT_NAME)
|
||||
|| fileName.startsWith(POINT_FRAGMENT_NAME)) {
|
||||
return new FileEnvelope(file.getPublicURIString(), true);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
//}
|
||||
}
|
||||
|
||||
/**
|
||||
* "start_time": "2016-04-20T04:08:50",
|
||||
*
|
||||
* @param meta
|
||||
* @return
|
||||
*/
|
||||
private static Instant readTime(Meta meta) {
|
||||
if (meta.hasValue("start_time")) {
|
||||
return meta.getValue("start_time").timeValue();
|
||||
} else {
|
||||
return Instant.EPOCH;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read numass point from envelope and apply transformation (e.g. debuncing)
|
||||
|
@ -20,24 +20,21 @@ import hep.dataforge.events.Event;
|
||||
import hep.dataforge.events.EventBuilder;
|
||||
import hep.dataforge.exceptions.StorageException;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import hep.dataforge.storage.filestorage.FilePointLoader;
|
||||
import hep.dataforge.storage.filestorage.FileStorage;
|
||||
import inr.numass.data.NMFile;
|
||||
import inr.numass.data.NumassData;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.vfs2.FileObject;
|
||||
import org.apache.commons.vfs2.FileSystemException;
|
||||
import org.apache.commons.vfs2.FileType;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.commons.vfs2.FileType.FOLDER;
|
||||
import static java.nio.file.StandardOpenOption.*;
|
||||
|
||||
|
||||
/**
|
||||
* The file storage containing numass data directories or zips.
|
||||
@ -68,26 +65,26 @@ public class NumassStorage extends FileStorage {
|
||||
protected void updateDirectoryLoaders() {
|
||||
try {
|
||||
this.loaders.clear();
|
||||
for (FileObject file : getDataDir().getChildren()) {
|
||||
Files.list(getDataDir()).forEach( file -> {
|
||||
try {
|
||||
if (file.getType() == FOLDER) {
|
||||
FileObject meta = file.resolveFile(NumassDataLoader.META_FRAGMENT_NAME);
|
||||
if (meta.exists()) {
|
||||
this.loaders.put(file.getName().getBaseName(),
|
||||
if (Files.isDirectory(file)) {
|
||||
Path metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME);
|
||||
if (Files.exists(metaFile)) {
|
||||
this.loaders.put(entryName(file),
|
||||
NumassDataLoader.fromDir(this, file, null));
|
||||
} else {
|
||||
this.shelves.put(file.getName().getBaseName(),
|
||||
new NumassStorage(this, file.getName().getBaseName(), meta()));
|
||||
}
|
||||
} else if (file.getName().toString().endsWith(NUMASS_ZIP_EXTENSION)) {
|
||||
this.loaders.put(file.getName().getBaseName(), NumassDataLoader.fromZip(this, file));
|
||||
} else if (file.getName().toString().endsWith(".points")) {
|
||||
try {
|
||||
loaders.put(FilenameUtils.getBaseName(file.getName().getBaseName()),
|
||||
FilePointLoader.fromFile(this, file, true));
|
||||
} catch (Exception ex) {
|
||||
getLogger().error("Failed to build numass point loader from file {}", file.getName());
|
||||
this.shelves.put(entryName(file),
|
||||
new NumassStorage(this, entryName(file), meta()));
|
||||
}
|
||||
} else if (file.getFileName().endsWith(NUMASS_ZIP_EXTENSION)) {
|
||||
this.loaders.put(entryName(file), NumassDataLoader.fromFile(this, file));
|
||||
// } else if (file.getFileName().endsWith(".points")) {
|
||||
// try {
|
||||
// loaders.put(getFileName(file),
|
||||
// FilePointLoader.fromFile(this, file, true));
|
||||
// } catch (Exception ex) {
|
||||
// getLogger().error("Failed to build numass point loader from file {}", file.getName());
|
||||
// }
|
||||
} else {
|
||||
//updating non-numass loader files
|
||||
updateFile(file);
|
||||
@ -97,8 +94,8 @@ public class NumassStorage extends FileStorage {
|
||||
} catch (StorageException ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Error while creating numass group", ex);
|
||||
}
|
||||
}
|
||||
} catch (FileSystemException ex) {
|
||||
});
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
@ -121,16 +118,15 @@ public class NumassStorage extends FileStorage {
|
||||
public void pushNumassData(String fileName, ByteBuffer data) throws StorageException {
|
||||
//FIXME move zip to internal
|
||||
try {
|
||||
FileObject nmFile = getDataDir().resolveFile(fileName + NUMASS_ZIP_EXTENSION);
|
||||
if (!nmFile.exists()) {
|
||||
nmFile.createFile();
|
||||
} else {
|
||||
Path nmFile = getDataDir().resolve(fileName + NUMASS_ZIP_EXTENSION);
|
||||
if (Files.exists(nmFile)) {
|
||||
LoggerFactory.getLogger(getClass()).warn("Trying to rewrite existing numass data file {}", nmFile.toString());
|
||||
}
|
||||
try (OutputStream os = nmFile.getContent().getOutputStream(false)) {
|
||||
os.write(data.array());
|
||||
try (ByteChannel channel = Files.newByteChannel(nmFile, CREATE, WRITE)) {
|
||||
channel.write(data);
|
||||
}
|
||||
dispatchEvent(NumassDataPointEvent.build(getName(), fileName, (int) nmFile.getContent().getSize()));
|
||||
|
||||
dispatchEvent(NumassDataPointEvent.build(getName(), fileName, (int) Files.size(nmFile)));
|
||||
} catch (IOException ex) {
|
||||
throw new StorageException(ex);
|
||||
}
|
||||
@ -149,19 +145,18 @@ public class NumassStorage extends FileStorage {
|
||||
public List<NumassData> legacyFiles() {
|
||||
try {
|
||||
List<NumassData> files = new ArrayList<>();
|
||||
for (FileObject file : getDataDir().getChildren()) {
|
||||
if (file.getType() == FileType.FILE && file.getName().getExtension().equalsIgnoreCase("dat")) {
|
||||
InputStream is = file.getContent().getInputStream();
|
||||
String name = file.getName().getBaseName();
|
||||
Files.list(getDataDir()).forEach(file -> {
|
||||
if (Files.isRegularFile(file) && file.getFileName().toString().toLowerCase().endsWith(".dat")) {
|
||||
String name = file.getFileName().toString();
|
||||
try {
|
||||
files.add(NMFile.readStream(is, name, Meta.buildEmpty("numassData")));
|
||||
files.add(NMFile.readStream(Files.newInputStream(file, READ), name, Meta.buildEmpty("numassData")));
|
||||
} catch (Exception ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Error while reading legacy numass file " + file.getName().getBaseName(), ex);
|
||||
}
|
||||
LoggerFactory.getLogger(getClass()).error("Error while reading legacy numass file " + file.getFileName(), ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
return files;
|
||||
} catch (FileSystemException ex) {
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
@ -21,11 +21,10 @@ import javafx.application.Application;
|
||||
import javafx.scene.Scene;
|
||||
import javafx.stage.Stage;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author darksnake
|
||||
*/
|
||||
public class TestDirectoryViewer extends Application {
|
||||
@ -34,7 +33,7 @@ public class TestDirectoryViewer extends Application {
|
||||
public void start(Stage stage) throws IOException {
|
||||
new StorageManager().startGlobal();
|
||||
|
||||
NumassDataLoader reader = NumassDataLoader.fromLocalDir(null, new File("C:\\Users\\darksnake\\Dropbox\\PlayGround\\data-test\\20150703143643_1\\"));
|
||||
NumassDataLoader reader = NumassDataLoader.fromDir(null, Paths.get("C:\\Users\\darksnake\\Dropbox\\PlayGround\\data-test\\20150703143643_1\\"), null);
|
||||
// NumassLoader reader = NumassLoader.fromZip(null, new File("C:\\Users\\darksnake\\Dropbox\\PlayGround\\data-test\\20150703143643_1.zip"));
|
||||
|
||||
NumassLoaderView comp = new NumassLoaderView();
|
||||
|
Loading…
Reference in New Issue
Block a user