Update to stable DataForge

This commit is contained in:
Alexander Nozik 2023-05-09 22:50:35 +03:00
parent aabf2b85a4
commit 1d7403ef30
16 changed files with 58 additions and 49 deletions

View File

@ -6,7 +6,7 @@ plugins {
id("space.kscience.gradle.project")
}
val dataforgeVersion: String by extra("0.6.1-dev-4")
val dataforgeVersion: String by extra("0.6.1")
val ktorVersion: String by extra(space.kscience.gradle.KScienceVersions.ktorVersion)
val rsocketVersion by extra("0.15.4")
val xodusVersion by extra("2.0.1")

View File

@ -11,7 +11,6 @@ import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import kotlin.collections.set
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
/**
* DataForge Context plugin that allows to manage devices locally
@ -33,7 +32,6 @@ public class DeviceManager : AbstractPlugin(), DeviceHub {
public companion object : PluginFactory<DeviceManager> {
override val tag: PluginTag = PluginTag("devices", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out DeviceManager> = DeviceManager::class
override fun build(context: Context, meta: Meta): DeviceManager = DeviceManager()
}

View File

@ -3,7 +3,6 @@ package space.kscience.controls.ports
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.string
import kotlin.reflect.KClass
/**
* A DataForge plugin for managing ports
@ -32,8 +31,6 @@ public class Ports : AbstractPlugin() {
override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out Ports> = Ports::class
override fun build(context: Context, meta: Meta): Ports = Ports()
}

View File

@ -0,0 +1,18 @@
package space.kscience.controls.api
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import space.kscience.controls.spec.asMeta
import kotlin.test.Test
import kotlin.test.assertEquals
class MessageTest {
@Test
fun messageSerialization() {
val changedMessage = PropertyChangedMessage("test", 22.0.asMeta())
val json = Json.encodeToString(changedMessage)
val reconstructed: PropertyChangedMessage = Json.decodeFromString(json)
assertEquals(changedMessage.time, reconstructed.time)
}
}

View File

@ -6,7 +6,6 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
public class TcpPortPlugin : AbstractPlugin() {
@ -21,8 +20,6 @@ public class TcpPortPlugin : AbstractPlugin() {
override val tag: PluginTag = PluginTag("controls.ports.tcp", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out TcpPortPlugin> = TcpPortPlugin::class
override fun build(context: Context, meta: Meta): TcpPortPlugin = TcpPortPlugin()
}

View File

@ -6,7 +6,6 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
public class KtorTcpPortPlugin : AbstractPlugin() {
@ -21,8 +20,6 @@ public class KtorTcpPortPlugin : AbstractPlugin() {
override val tag: PluginTag = PluginTag("controls.ports.serial", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out KtorTcpPortPlugin> = KtorTcpPortPlugin::class
override fun build(context: Context, meta: Meta): KtorTcpPortPlugin = KtorTcpPortPlugin()
}

View File

@ -7,7 +7,6 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
public class SerialPortPlugin : AbstractPlugin() {
@ -22,8 +21,6 @@ public class SerialPortPlugin : AbstractPlugin() {
override val tag: PluginTag = PluginTag("controls.ports.serial", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out SerialPortPlugin> = SerialPortPlugin::class
override fun build(context: Context, meta: Meta): SerialPortPlugin = SerialPortPlugin()
}

View File

@ -16,6 +16,7 @@ dependencies {
implementation(projects.magix.magixServer)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("space.kscience:plotlykt-server:0.5.3")
@ -35,5 +36,5 @@ tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach
application {
mainClass.set("space.kscience.controls.demo.DemoControllerViewKt")
mainClass.set("space.kscience.controls.demo.MassDeviceKt")
}

View File

@ -21,14 +21,13 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.rsocket.rSocketStreamWithTcp
import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.startMagixServer
import space.kscience.plotly.Plotly
import space.kscience.plotly.bar
import space.kscience.plotly.layout
import space.kscience.plotly.plot
import space.kscience.plotly.scatter
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.server.show
@ -49,7 +48,7 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
val value by doubleProperty { randomValue }
override suspend fun MassDevice.onOpen() {
doRecurring(2.milliseconds) {
doRecurring(100.milliseconds) {
read(value)
}
}
@ -60,11 +59,13 @@ fun main() {
val context = Context("Mass")
context.startMagixServer(
RSocketMagixFlowPlugin()
RSocketMagixFlowPlugin(),
// ZmqMagixFlowPlugin()
)
val numDevices = 100
context.launch(Dispatchers.IO) {
repeat(numDevices) {
val deviceContext = Context("Device${it}") {
plugin(DeviceManager)
@ -74,9 +75,9 @@ fun main() {
deviceManager.install("device$it", MassDevice)
deviceContext.launch(Dispatchers.Default) {
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceManager.connectToMagix(deviceEndpoint, "device$it")
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint, endpointId)
}
}
@ -88,9 +89,9 @@ fun main() {
layout {
title = "Latest event"
}
scatter {
launch(Dispatchers.Default) {
val monitorEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
bar {
launch(Dispatchers.Default){
val monitorEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost")
val latest = ConcurrentHashMap<String, Instant>()
@ -101,8 +102,9 @@ fun main() {
while (isActive) {
delay(200)
val now = Clock.System.now()
x.strings = latest.keys
y.numbers = latest.values.map { now.minus(it).inWholeMilliseconds / 1000.0 }
val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
x.numbers = sorted.keys
y.numbers = sorted.values.map { now.minus(it).inWholeMilliseconds / 1000.0 }
}
}
}

View File

@ -143,5 +143,5 @@ val res = device.read(DemoDevice.sin)
## Other ways to create a device
It is not obligatory to use `DeviceBySpec` to define a `Device`. One could directly implement the `Device` interface or use intermediate abstraction `DeviceBase`, which uses properties schema but allows to define it manually.
It is not obligatory to use `DeviceBySpec` to define a `Device`. One could directly implement the `Device` interface or use intermediate abstraction `DeviceBase`, which uses properties' schema but allows to define it manually.

View File

@ -26,7 +26,7 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
/**
* RSocket endpoint based on an established channel. This way it works a bit faster than [RSocketMagixEndpoint]
* RSocket endpoint based on an established channel. This way it works a lot faster than [RSocketMagixEndpoint]
* for sending and receiving, but less flexible in terms of filters. One general [streamFilter] could be set
* in constructor and applied on the loop side. Filters in [subscribe] are applied on the endpoint side on top
* of received data.
@ -78,6 +78,7 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets(
host: String,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
path: String = "/rsocket",
filter: MagixMessageFilter = MagixMessageFilter.ALL,
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketStreamMagixEndpoint {
val client = HttpClient {
@ -89,10 +90,10 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets(
val rSocket = client.rSocket(host, port, path)
//Ensure client is closed after rSocket if finished
//Ensure the client is closed after rSocket if finished
rSocket.coroutineContext[Job]?.invokeOnCompletion {
client.close()
}
return RSocketStreamMagixEndpoint(rSocket, coroutineContext)
return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter)
}

View File

@ -4,6 +4,7 @@ import io.ktor.network.sockets.SocketOptions
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessageFilter
import kotlin.coroutines.coroutineContext
@ -30,6 +31,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp(
host: String,
port: Int = DEFAULT_MAGIX_RAW_PORT,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketStreamMagixEndpoint {
@ -40,5 +42,5 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp(
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketStreamMagixEndpoint(rSocket, coroutineContext)
return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter)
}

View File

@ -6,7 +6,6 @@ import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.RSocketMagixEndpoint
import space.kscience.magix.rsocket.buildConnector
import kotlin.coroutines.coroutineContext
/**
@ -25,5 +24,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(rSocket, coroutineContext)
return RSocketMagixEndpoint(rSocket)
}

View File

@ -49,7 +49,7 @@ public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_POR
val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
magixFlow.emit(message)
}
// bidirectional connection, not covered by a standard
// bidirectional connection, used for streaming connection
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach {
magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()))

View File

@ -13,7 +13,6 @@ import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.filter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class ZmqMagixEndpoint(
private val host: String,
@ -71,7 +70,8 @@ public class ZmqMagixEndpoint(
}
}
public suspend fun MagixEndpoint.Companion.zmq(
public fun MagixEndpoint.Companion.zmq(
scope: CoroutineScope,
host: String,
protocol: String = "tcp",
pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
@ -81,5 +81,5 @@ public suspend fun MagixEndpoint.Companion.zmq(
protocol,
pubPort,
pullPort,
coroutineContext = coroutineContext
coroutineContext = scope.coroutineContext
)

View File

@ -24,13 +24,13 @@ public class ZmqMagixFlowPlugin(
val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context ->
//launch publishing job
//launch the publishing job
val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
pubSocket.send(string)
logger.debug("Published: $string")
logger.trace("Published: $string")
}.launchIn(this)
//launch pulling job
@ -41,7 +41,7 @@ public class ZmqMagixFlowPlugin(
while (isActive) {
val string: String? = pullSocket.recvStr()
if (string != null) {
logger.debug("Received: $string")
logger.trace("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
magixFlow.emit(message)
}