A lot of small changes.

This commit is contained in:
Alexander Nozik 2020-09-21 21:34:40 +03:00
parent bf213c9814
commit e4705b8239
19 changed files with 382 additions and 74 deletions

View File

@ -3,7 +3,8 @@ plugins{
kotlin("js") version "1.4.0" apply false kotlin("js") version "1.4.0" apply false
} }
val dataforgeVersion by extra("0.1.9-dev-2") val dataforgeVersion: String by extra("0.1.9-dev-2")
val ktorVersion: String by extra("1.4.0")
allprojects { allprojects {
repositories { repositories {

View File

@ -3,7 +3,7 @@ plugins {
id("ru.mipt.npm.publish") id("ru.mipt.npm.publish")
} }
val ktorVersion: String by extra("1.4.0") val ktorVersion: String by rootProject.extra
kotlin { kotlin {
sourceSets { sourceSets {
@ -15,14 +15,12 @@ kotlin {
} }
jvmMain { jvmMain {
dependencies { dependencies {
implementation("io.ktor:ktor-client-cio:$ktorVersion")
} }
} }
jsMain { jsMain {
dependencies { dependencies {
implementation("io.ktor:ktor-client-js:$ktorVersion")
implementation(npm("text-encoding", "0.7.0"))
implementation(npm("abort-controller", "3.0.0"))
} }
} }
} }

View File

@ -0,0 +1,13 @@
package hep.dataforge.control.client
import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.HttpStatement
import io.ktor.utils.io.ByteReadChannel
suspend fun HttpClient.sse(address: String) = get<HttpStatement>(address).execute { response: HttpResponse ->
// Response is not downloaded here.
val channel = response.receive<ByteReadChannel>()
}

View File

@ -23,7 +23,7 @@ import kotlinx.io.Binary
public class DeviceController( public class DeviceController(
public val device: Device, public val device: Device,
public val deviceTarget: String, public val deviceTarget: String,
public val scope: CoroutineScope = device.scope public val scope: CoroutineScope = device.scope,
) : Responder, Consumer, DeviceListener { ) : Responder, Consumer, DeviceListener {
init { init {
@ -32,7 +32,8 @@ public class DeviceController(
private val outputChannel = Channel<Envelope>(Channel.CONFLATED) private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = respondMessage(device, deviceTarget, message) public suspend fun respondMessage(message: DeviceMessage): DeviceMessage =
respondMessage(device, deviceTarget, message)
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request) override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request)
@ -87,16 +88,14 @@ public class DeviceController(
return response.seal() return response.seal()
} }
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail { DeviceMessage.fail(cause = ex).wrap()
comment = ex.message
}.wrap()
} }
} }
internal suspend fun respondMessage( internal suspend fun respondMessage(
device: Device, device: Device,
deviceTarget: String, deviceTarget: String,
request: DeviceMessage request: DeviceMessage,
): DeviceMessage { ): DeviceMessage {
return try { return try {
val result: List<MessageData> = when (val action = request.type) { val result: List<MessageData> = when (val action = request.type) {
@ -157,9 +156,7 @@ public class DeviceController(
data = result data = result
} }
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail { DeviceMessage.fail(request, cause = ex)
comment = ex.message
}
} }
} }
} }
@ -172,8 +169,6 @@ public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessa
val device = this[targetName] ?: error("The device with name $targetName not found in $this") val device = this[targetName] ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request) DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail { DeviceMessage.fail(request, cause = ex)
comment = ex.message
}
} }
} }

View File

@ -47,12 +47,22 @@ public class DeviceMessage : Scheme() {
public inline fun fail( public inline fun fail(
request: DeviceMessage? = null, request: DeviceMessage? = null,
cause: Throwable? = null,
block: DeviceMessage.() -> Unit = {} block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage { ): DeviceMessage = DeviceMessage {
target = request?.source target = request?.source
status = RESPONSE_FAIL_STATUS status = RESPONSE_FAIL_STATUS
if(cause!=null){
configure {
set("error.type", cause::class.simpleName)
set("error.message", cause.message)
//set("error.trace", ex.stackTraceToString())
}
comment = cause.message
}
}.apply(block) }.apply(block)
override val descriptor: SerialDescriptor = MetaSerializer.descriptor override val descriptor: SerialDescriptor = MetaSerializer.descriptor
override fun deserialize(decoder: Decoder): DeviceMessage { override fun deserialize(decoder: Decoder): DeviceMessage {

View File

@ -18,9 +18,9 @@ public interface Port: Closeable, ContextAware {
public typealias PortFactory = Factory<Port> public typealias PortFactory = Factory<Port>
public abstract class AbstractPort(override val context: Context, parentContext: CoroutineContext = context.coroutineContext) : Port { public abstract class AbstractPort(override val context: Context, coroutineContext: CoroutineContext = context.coroutineContext) : Port {
protected val scope: CoroutineScope = CoroutineScope(parentContext + SupervisorJob(parentContext[Job])) protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job]))
private val outgoing = Channel<ByteArray>(100) private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(Channel.CONFLATED) private val incoming = Channel<ByteArray>(Channel.CONFLATED)
@ -41,7 +41,7 @@ public abstract class AbstractPort(override val context: Context, parentContext:
*/ */
protected fun receive(data: ByteArray) { protected fun receive(data: ByteArray) {
scope.launch { scope.launch {
logger.debug { "RECEIVED: ${data.decodeToString()}" } logger.debug { "[${this@AbstractPort}] RECEIVED: ${data.decodeToString()}" }
incoming.send(data) incoming.send(data)
} }
} }
@ -50,7 +50,7 @@ public abstract class AbstractPort(override val context: Context, parentContext:
for (data in outgoing) { for (data in outgoing) {
try { try {
write(data) write(data)
logger.debug { "SENT: ${data.decodeToString()}" } logger.debug { "[${this@AbstractPort}] SENT: ${data.decodeToString()}" }
} catch (ex: Exception) { } catch (ex: Exception) {
if (ex is CancellationException) throw ex if (ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" } logger.error(ex) { "Error while writing data to the port" }

View File

@ -23,8 +23,8 @@ public class KtorTcpPort internal constructor(
context: Context, context: Context,
public val host: String, public val host: String,
public val port: Int, public val port: Int,
parentContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, parentContext), AutoCloseable { ) : AbstractPort(context, coroutineContext), AutoCloseable {
override fun toString(): String = "port[tcp:$host:$port]" override fun toString(): String = "port[tcp:$host:$port]"

View File

@ -23,8 +23,8 @@ public class TcpPort private constructor(
context: Context, context: Context,
public val host: String, public val host: String,
public val port: Int, public val port: Int,
parentContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, parentContext), AutoCloseable { ) : AbstractPort(context, coroutineContext), AutoCloseable {
override fun toString(): String = "port[tcp:$host:$port]" override fun toString(): String = "port[tcp:$host:$port]"

View File

@ -18,8 +18,8 @@ import jssc.SerialPort as JSSCPort
public class SerialPort private constructor( public class SerialPort private constructor(
context: Context, context: Context,
private val jssc: JSSCPort, private val jssc: JSSCPort,
parentContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, parentContext) { ) : AbstractPort(context, coroutineContext) {
override fun toString(): String = "port[${jssc.portName}]" override fun toString(): String = "port[${jssc.portName}]"

View File

@ -8,9 +8,10 @@ kscience {
} }
val dataforgeVersion: String by rootProject.extra val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by extra("1.4.0") val ktorVersion: String by rootProject.extra
dependencies{ dependencies{
implementation(project(":ktor-sse"))
implementation(project(":dataforge-device-core")) implementation(project(":dataforge-device-core"))
implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion") implementation("io.ktor:ktor-websockets:$ktorVersion")

View File

@ -1,41 +0,0 @@
package hep.dataforge.control.server
import io.ktor.application.ApplicationCall
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondTextWriter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
/**
* The data class representing a SSE Event that will be sent to the client.
*/
public data class SseEvent(val data: String, val event: String? = null, val id: String? = null)
/**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
* and serializing them in a way that is compatible with the Server-Sent Events specification.
*
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
*/
@Suppress("BlockingMethodInNonBlockingContext")
public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondTextWriter(contentType = ContentType.Text.EventStream) {
events.collect { event->
if (event.id != null) {
write("id: ${event.id}\n")
}
if (event.event != null) {
write("event: ${event.event}\n")
}
for (dataLine in event.data.lines()) {
write("data: $dataLine\n")
}
write("\n")
flush()
}
}
}

View File

@ -0,0 +1,26 @@
package hep.dataforge.control.server
import io.ktor.application.ApplicationCall
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import ru.mipt.npm.io.sse.SseEvent
import ru.mipt.npm.io.sse.writeSseFlow
/**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
* and serializing them in a way that is compatible with the Server-Sent Events specification.
*
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
*/
@OptIn(KtorExperimentalAPI::class)
public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
}
}

28
ktor-sse/build.gradle.kts Normal file
View File

@ -0,0 +1,28 @@
plugins {
id("ru.mipt.npm.mpp")
}
group = "ru.mipt.npm"
val ktorVersion: String by rootProject.extra
kscience{
useCoroutines()
}
kotlin {
sourceSets {
commonMain {
dependencies {
api("io.ktor:ktor-io:$ktorVersion")
}
}
jvmTest{
dependencies{
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("ch.qos.logback:logback-classic:1.2.3")
}
}
}
}

View File

@ -0,0 +1,60 @@
package ru.mipt.npm.io.sse
import io.ktor.utils.io.*
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
/**
* The data class representing a SSE Event that will be sent to the client.
*/
public data class SseEvent(val data: String, val event: String? = null, val id: String? = null)
public suspend fun ByteWriteChannel.writeSseFlow(events: Flow<SseEvent>): Unit = events.collect { event ->
if (event.id != null) {
writeStringUtf8("id: ${event.id}\n")
}
if (event.event != null) {
writeStringUtf8("event: ${event.event}\n")
}
for (dataLine in event.data.lines()) {
writeStringUtf8("data: $dataLine\n")
}
writeStringUtf8("\n")
flush()
}
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun ByteReadChannel.readSseFlow(): Flow<SseEvent> = channelFlow {
while (isActive) {
//val lines = ArrayList<String>()
val builder = StringBuilder()
var id: String? = null
var event: String? = null
//read lines until blank line or the end of stream
do{
val line = readUTF8Line()
if (line != null && !line.isBlank()) {
val key = line.substringBefore(":")
val value = line.substringAfter(": ")
when (key) {
"id" -> id = value
"event" -> event = value
"data" -> builder.append(value)
else -> error("Unrecognized event-stream key $key")
}
}
} while (line?.isBlank() != true)
if(builder.isNotBlank()) {
send(SseEvent(builder.toString(), event, id))
}
}
awaitClose {
this@readSseFlow.cancel()
}
}

View File

@ -0,0 +1,74 @@
package ru.mipt.npm.io.sse
import io.ktor.application.ApplicationCall
import io.ktor.application.call
import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.HttpStatement
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.routing.get
import io.ktor.routing.routing
import io.ktor.server.cio.CIO
import io.ktor.server.engine.embeddedServer
import io.ktor.util.KtorExperimentalAPI
import io.ktor.utils.io.ByteReadChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@OptIn(KtorExperimentalAPI::class)
suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
}
}
suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Unit =
get<HttpStatement>(address).execute { response: HttpResponse ->
// Response is not downloaded here.
val channel = response.receive<ByteReadChannel>()
val flow = channel.readSseFlow()
flow.collect(block)
}
class SseTest {
@OptIn(KtorExperimentalAPI::class)
@Test
fun testSseIntegration() {
runBlocking {
val server = embeddedServer(CIO, 12080) {
routing {
get("/") {
val flow = flow {
repeat(5) {
delay(300)
emit(it)
}
}.map {
SseEvent(data = it.toString(), id = it.toString())
}
call.respondSse(flow)
}
}
}
server.start(wait = false)
delay(1000)
val client = HttpClient(io.ktor.client.engine.cio.CIO)
client.readSse("http://localhost:12080") {
println(it)
}
client.close()
server.stop(1000, 1000)
}
}
}

View File

@ -48,6 +48,10 @@ public class PiMotionMasterDevice(
private val mutex = Mutex() private val mutex = Mutex()
private suspend fun dispatchError(errorCode: Int){
}
private suspend fun sendCommandInternal(command: String, vararg arguments: String) { private suspend fun sendCommandInternal(command: String, vararg arguments: String) {
val joinedArguments = if (arguments.isEmpty()) { val joinedArguments = if (arguments.isEmpty()) {
"" ""
@ -58,14 +62,31 @@ public class PiMotionMasterDevice(
connector.send(stringToSend) connector.send(stringToSend)
} }
public suspend fun getErrorCode(): Int = mutex.withLock{
withTimeout(timeoutValue) {
sendCommandInternal("ERR?")
val errorString = connector.receiving().withDelimiter("\n").first()
errorString.toInt()
}
}
/** /**
* Send a synchronous request and receive a list of lines as a response * Send a synchronous request and receive a list of lines as a response
*/ */
private suspend fun request(command: String, vararg arguments: String): List<String> = mutex.withLock { private suspend fun request(command: String, vararg arguments: String): List<String> = mutex.withLock {
withTimeout(timeoutValue) { try {
sendCommandInternal(command, *arguments) withTimeout(timeoutValue) {
val phrases = connector.receiving().withDelimiter("\n") sendCommandInternal(command, *arguments)
phrases.takeWhile { it.endsWith(" \n") }.toList() + phrases.first() val phrases = connector.receiving().withDelimiter("\n")
phrases.takeWhile { it.endsWith(" \n") }.toList() + phrases.first()
}
} catch (ex: Throwable){
logger.warn { "Error during PIMotionMaster request. Requesting error code." }
val errorCode = getErrorCode()
dispatchError(errorCode)
logger.warn { "Error code $errorCode" }
error("Error code $errorCode")
} }
} }
@ -87,6 +108,7 @@ public class PiMotionMasterDevice(
} }
} }
public val initialize: Action by acting { public val initialize: Action by acting {
send("INI") send("INI")
} }

View File

@ -0,0 +1,45 @@
package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.context.Context
import hep.dataforge.control.ports.AbstractPort
abstract class VirtualPort(context: Context) : AbstractPort(context)
class PiMotionMasterVirtualPort(context: Context) : VirtualPort(context) {
init {
//add asynchronous send logic here
}
private val axisID = "0"
private var errorCode: Int = 0
private var velocity: Float = 1.0f
private var position: Float = 0.0f
private var servoMode: Int = 1
private var targetPosition: Float = 0.0f
private fun receive(str: String) = receive((str + "\n").toByteArray())
override suspend fun write(data: ByteArray) {
assert(data.last() == '\n'.toByte())
val string = data.decodeToString().substringBefore("\n")
val parts = string.split(' ')
val command = parts.firstOrNull() ?: error("Command not present")
when (command) {
"XXX" -> receive("WAT?")
"VER?" -> receive("test")
"ERR?" -> receive(errorCode.toString())
"SVO?" -> receive("$axisID=$servoMode")
"SVO" ->{
val requestAxis = parts[1]
if(requestAxis == axisID) {
servoMode = parts[2].toInt()
}
}
else -> errorCode = 2 // do not send anything. Ser error code
}
}
}

View File

@ -0,0 +1,75 @@
package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.context.Global
import hep.dataforge.control.ports.Port
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.InternalAPI
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.moveToByteArray
import io.ktor.utils.io.readUntilDelimiter
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import java.net.InetSocketAddress
import java.nio.ByteBuffer
private val delimeter = ByteBuffer.wrap("\n".encodeToByteArray())
@OptIn(KtorExperimentalAPI::class, InternalAPI::class)
fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = launch {
val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port))
println("Started virtual port server at ${server.localAddress}")
while (isActive) {
val socket = try {
server.accept()
} catch (ex: Exception) {
server.close()
return@launch
}
launch {
println("Socket accepted: ${socket.remoteAddress}")
try {
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
val buffer = ByteBuffer.allocate(1024)
launch {
virtualPort.receiving().collect {
println("Sending: ${it.decodeToString()}")
output.writeAvailable(it)
}
}
while (isActive) {
buffer.rewind()
val read = input.readUntilDelimiter(delimeter, buffer)
if (read > 0) {
buffer.flip()
val array = buffer.moveToByteArray()
println("Received: ${array.decodeToString()}")
virtualPort.send(array)
}
}
} catch (ex: Exception) {
cancel()
} finally {
socket.close()
}
}
}
}
fun main() {
val port = 10024
val virtualPort = PiMotionMasterVirtualPort(Global)
runBlocking(Dispatchers.Default) {
val serverJob = launchPiDebugServer(port, virtualPort)
readLine()
serverJob.cancel()
}
}

View File

@ -26,6 +26,7 @@ pluginManagement {
rootProject.name = "dataforge-control" rootProject.name = "dataforge-control"
include( include(
":ktor-sse",
":dataforge-device-core", ":dataforge-device-core",
":dataforge-device-serial", ":dataforge-device-serial",
":dataforge-device-server", ":dataforge-device-server",