diff --git a/numass-client/build.gradle b/numass-client/build.gradle index ee95af75..205b4ab5 100644 --- a/numass-client/build.gradle +++ b/numass-client/build.gradle @@ -8,6 +8,7 @@ mainClassName = mainClass dependencies { compile project(':numass-core') - compile 'commons-cli:commons-cli:1.3.1' - compile 'org.zeroturnaround:zt-zip:1.9' + compile 'commons-cli:commons-cli:1.4' + compile "hep.dataforge:dataforge-messages" + compile 'org.zeroturnaround:zt-zip:1.13' } \ No newline at end of file diff --git a/numass-client/src/main/java/inr/numass/client/NumassClient.java b/numass-client/src/main/java/inr/numass/client/NumassClient.java index f163b3b1..e34aea29 100644 --- a/numass-client/src/main/java/inr/numass/client/NumassClient.java +++ b/numass-client/src/main/java/inr/numass/client/NumassClient.java @@ -19,7 +19,7 @@ import hep.dataforge.io.envelopes.DefaultEnvelopeReader; import hep.dataforge.io.envelopes.DefaultEnvelopeType; import hep.dataforge.io.envelopes.Envelope; import hep.dataforge.io.envelopes.EnvelopeBuilder; -import hep.dataforge.io.messages.Responder; +import hep.dataforge.messages.Responder; import hep.dataforge.meta.Meta; import hep.dataforge.meta.MetaBuilder; import hep.dataforge.values.Value; @@ -39,9 +39,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import static hep.dataforge.io.messages.MessagesKt.*; +import static hep.dataforge.messages.MessagesKt.*; /** * @author darksnake @@ -68,8 +67,9 @@ public class NumassClient implements AutoCloseable, Responder { socket.close(); } + @NotNull @Override - public Envelope respond(Envelope message) { + public Envelope respond(@NotNull Envelope message) { try { write(message, socket.getOutputStream()); return read(socket.getInputStream()); @@ -255,10 +255,4 @@ public class NumassClient implements AutoCloseable, Responder { public Envelope sendDataPoints(String shelf, String loaderName, Collection points) { throw new UnsupportedOperationException(); } - - @NotNull - @Override - public CompletableFuture respondInFuture(@NotNull Envelope message) { - return CompletableFuture.supplyAsync(() -> respond(message)); - } } diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt index 638d6ad0..0cdc5ff8 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt @@ -113,10 +113,6 @@ class NumassDataLoader( TODO() } - override fun respond(message: Envelope): Envelope { - throw TODO("Not supported yet.") - } - override val startTime: Instant get() = meta.optValue("start_time").map { it.time }.orElseGet { super.startTime } diff --git a/numass-server/build.gradle b/numass-server/build.gradle index c4c17a1b..c39b86a8 100644 --- a/numass-server/build.gradle +++ b/numass-server/build.gradle @@ -19,6 +19,7 @@ mainClassName = "inr.numass.server.ServerRunner" dependencies { compile project(':numass-core') compile "hep.dataforge:storage-server" // project(':dataforge-storage:storage-servlet') + compile "hep.dataforge:dataforge-messages" compile 'commons-daemon:commons-daemon:1.+' } diff --git a/numass-server/src/main/java/inr/numass/server/AbstractNetworkListener.kt b/numass-server/src/main/java/inr/numass/server/AbstractNetworkListener.kt new file mode 100644 index 00000000..f848ad07 --- /dev/null +++ b/numass-server/src/main/java/inr/numass/server/AbstractNetworkListener.kt @@ -0,0 +1,180 @@ +/* + * Copyright 2018 Alexander Nozik. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package inr.numass.server + +import hep.dataforge.io.envelopes.DefaultEnvelopeReader +import hep.dataforge.io.envelopes.DefaultEnvelopeType +import hep.dataforge.io.envelopes.Envelope +import hep.dataforge.messages.errorResponseBase +import hep.dataforge.messages.isTerminator +import hep.dataforge.messages.terminator +import hep.dataforge.meta.Meta +import hep.dataforge.meta.Metoid +import org.slf4j.LoggerFactory +import java.io.IOException +import java.io.InputStream +import java.io.OutputStream +import java.net.ServerSocket +import java.net.Socket + +/** + * Abstract network listener for envelopes + * + * @author Darksnake + */ +abstract class AbstractNetworkListener(listnerConfig: Meta?) : Metoid, AutoCloseable { + + @Volatile + private var finishflag = false + private var serverSocket: ServerSocket? = null + final override val meta: Meta + + private val port: Int + get() = meta.getInt("port", 8335) + + init { + if (listnerConfig == null) { + this.meta = Meta.buildEmpty("listener") + } else { + this.meta = listnerConfig + } + } + + @Throws(IOException::class, InterruptedException::class) + override fun close() { + finishflag = true + if (serverSocket != null) { + serverSocket!!.close() + } + logger.info("Closing listner...") + } + + abstract fun respond(message: Envelope): Envelope + + @Throws(Exception::class) + open fun open() { + val clientGroup = ThreadGroup("clients") + Thread({ + try { + ServerSocket(port).use { ss -> + serverSocket = ss + logger.info("Starting to listning to the port {}", port) + while (!finishflag) { + //FIXME add timeout + val s = ss.accept() + logger.info("Client accepted from {}", s.remoteSocketAddress.toString()) + // new SocketProcessor(s).run(); + val socketProcessor = SocketProcessor(s) + Thread(clientGroup, socketProcessor).start() + } + } + } catch (ex: IOException) { + if (!finishflag) { + logger.error("Connection exception", ex) + } + } + + logger.info("Listener closed") + serverSocket = null + }, "listner").start() + } + + /** + * Decide to accept envelope + * + * @param envelope + * @return + */ + protected fun accept(envelope: Envelope): Boolean { + return true + } + + private inner class SocketProcessor (private val socket: Socket) : Runnable { + private val inputStream: InputStream = socket.getInputStream() + private val outputStream: OutputStream = socket.getOutputStream() + + override fun run() { + logger.info("Starting client processing from {}", socket.remoteSocketAddress.toString()) + while (!finishflag) { + if (socket.isClosed) { + finishflag = true + logger.debug("Socket {} closed by client", socket.remoteSocketAddress.toString()) + break + } + try { + val request = read() + //Breaking connection on terminator + if (isTerminator(request)) { + logger.info("Recieved terminator message from {}", socket.remoteSocketAddress.toString()) + break + } + if (accept(request)) { + var response: Envelope? + try { + response = respond(request) + } catch (ex: Exception) { + logger.error("Uncatched exception during response evaluation", ex) + response = errorResponseBase("", ex).build() + } + + //Null respnses are ignored + if (response != null) { + write(response) + } + } + } catch (ex: IOException) { + logger.error("IO exception during envelope evaluation", ex) + finishflag = true + } + + } + + logger.info("Client processing finished for {}", socket.remoteSocketAddress.toString()) + if (!socket.isClosed) { + try { + write(terminator)//Sending additional terminator to notify client that server is closing connection + } catch (ex: IOException) { + logger.error("Terminator send failed", ex) + } + + try { + socket.close() + } catch (ex: IOException) { + logger.error("Can't close the socket", ex) + } + + } + } + + @Throws(IOException::class) + private fun read(): Envelope { + return DefaultEnvelopeReader().readWithData(inputStream) + } + + @Throws(IOException::class) + private fun write(envelope: Envelope) { + DefaultEnvelopeType.INSTANCE.writer.write(outputStream, envelope) + outputStream.flush() + } + + + } + + companion object { + + private val logger = LoggerFactory.getLogger("LISTENER") + } +} diff --git a/numass-server/src/main/java/inr/numass/server/NumassRun.java b/numass-server/src/main/java/inr/numass/server/NumassRun.java index 43c767cd..5a2d6fcd 100644 --- a/numass-server/src/main/java/inr/numass/server/NumassRun.java +++ b/numass-server/src/main/java/inr/numass/server/NumassRun.java @@ -18,7 +18,7 @@ package inr.numass.server; import hep.dataforge.exceptions.StorageException; import hep.dataforge.io.envelopes.Envelope; import hep.dataforge.io.envelopes.EnvelopeBuilder; -import hep.dataforge.io.messages.Responder; +import hep.dataforge.messages.Responder; import hep.dataforge.meta.Meta; import hep.dataforge.meta.Metoid; import hep.dataforge.storage.api.ObjectLoader; @@ -36,8 +36,8 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; -import static hep.dataforge.io.messages.MessagesKt.errorResponseBase; -import static hep.dataforge.io.messages.MessagesKt.okResponseBase; +import static hep.dataforge.messages.MessagesKt.errorResponseBase; +import static hep.dataforge.messages.MessagesKt.okResponseBase; import static inr.numass.server.NumassServerUtils.getNotes; /** @@ -100,8 +100,8 @@ public class NumassRun implements Metoid, Responder { String type = meta.getString("type", "numass.run.state"); String action = meta.getString("action"); switch (type) { - case "numass.run.state": - return states.respond(message); +// case "numass.run.state": +// return states.respond(message); case "numass.data": switch (action) { case "push": diff --git a/numass-server/src/main/java/inr/numass/server/NumassServer.java b/numass-server/src/main/java/inr/numass/server/NumassServer.java index b4a660d5..9d939885 100644 --- a/numass-server/src/main/java/inr/numass/server/NumassServer.java +++ b/numass-server/src/main/java/inr/numass/server/NumassServer.java @@ -23,7 +23,6 @@ import hep.dataforge.meta.Meta; import hep.dataforge.meta.MetaBuilder; import hep.dataforge.storage.api.StateLoader; import hep.dataforge.storage.api.Storage; -import hep.dataforge.storage.commons.AbstractNetworkListener; import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.StorageManager; import hep.dataforge.storage.commons.StorageUtils; @@ -35,8 +34,8 @@ import ratpack.server.RatpackServer; import java.io.IOException; -import static hep.dataforge.io.messages.MessagesKt.errorResponseBase; -import static hep.dataforge.io.messages.MessagesKt.responseBase; +import static hep.dataforge.messages.MessagesKt.errorResponseBase; +import static hep.dataforge.messages.MessagesKt.responseBase; /** * @author darksnake @@ -120,10 +119,10 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar //switch message type String type = meta.getString("type", "numass.state"); switch (type) { - case "numass.storage": - return getRun().getStorage().respond(message); - case "numass.state": - return getRootState().respond(message); +// case "numass.storage": +// return getRun().getStorage().respond(message); +// case "numass.state": +// return getRootState().respond(message); case "numass.data": case "numass.notes": case "numass.run.state":