Compare commits

..

No commits in common. "master" and "feature/device-collective-demo" have entirely different histories.

97 changed files with 390 additions and 1248 deletions
.space.ktsCHANGELOG.mdREADME.mdbuild.gradle.kts
controls-constructor
README.mdbuild.gradle.kts
src
commonMain/kotlin/space/kscience/controls/constructor
commonTest/kotlin/space/kscience/controls/constructor
controls-core
controls-jupyter
controls-magix
README.mdbuild.gradle.kts
src/commonMain/kotlin/space/kscience/controls/client
controls-modbus
controls-opcua
controls-pi
README.md
src/jvmMain/kotlin/space/kscience/controls/pi
controls-plc4x
controls-ports-ktor
README.md
src/jvmMain/kotlin/space/kscience/controls/ports
controls-serial
controls-server
controls-storage
README.md
controls-xodus
controls-vision
controls-visualisation-compose
demo
all-things/src/main/kotlin/space/kscience/controls/demo
car
constructor/src/jvmMain/kotlin
device-collective
echo
magix-demo/src/main/kotlin
many-devices
mks-pdr900/src/main/kotlin/center/sciprog/devices/mks
motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster
gradle
magix
magix-api
magix-java-endpoint
magix-mqtt
magix-rabbit
README.md
src/main/kotlin/space/kscience/magix/rabbit
magix-rsocket
magix-server
magix-storage
magix-utils
magix-zmq
settings.gradle.kts
simulation-kt

45
.space.kts Normal file

@ -0,0 +1,45 @@
import kotlin.io.path.readText
job("Build") {
gradlew("spc.registry.jetbrains.space/p/sci/containers/kotlin-ci:1.0.3", "build")
}
job("Publish") {
startOn {
gitPush { enabled = false }
}
container("spc.registry.jetbrains.space/p/sci/containers/kotlin-ci:1.0.3") {
env["SPACE_USER"] = "{{ project:space_user }}"
env["SPACE_TOKEN"] = "{{ project:space_token }}"
kotlinScript { api ->
val spaceUser = System.getenv("SPACE_USER")
val spaceToken = System.getenv("SPACE_TOKEN")
// write the version to the build directory
api.gradlew("version")
//read the version from build file
val version = java.nio.file.Path.of("build/project-version.txt").readText()
val revisionSuffix = if (version.endsWith("SNAPSHOT")) {
"-" + api.gitRevision().take(7)
} else {
""
}
api.space().projects.automation.deployments.start(
project = api.projectIdentifier(),
targetIdentifier = TargetIdentifier.Key("maps-kt"),
version = version+revisionSuffix,
// automatically update deployment status based on the status of a job
syncWithAutomationJob = true
)
api.gradlew(
"publishAllPublicationsToSpaceRepository",
"-Ppublishing.space.user=\"$spaceUser\"",
"-Ppublishing.space.token=\"$spaceToken\"",
)
}
}
}

@ -8,8 +8,6 @@
- Shortcuts to access all Controls devices in a magix network.
- `DeviceClient` properly evaluates lifecycle and logs
- `PeerConnection` API for direct device-device binary sharing
- DeviceDrawable2D intermediate visualization implementation
- New interface `WithLifeCycle`. Change Port API to adhere to it.
### Changed
- Constructor properties return `DeviceState` in order to be able to subscribe to them
@ -17,8 +15,6 @@
- `DeviceClient` now initializes property and action descriptors eagerly.
- `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices.
- Add some utility methods to ports. Synchronous port response could be now consumed as `Source`.
- `DeviceLifecycleState` is replaced by `LifecycleState`.
### Deprecated

@ -147,15 +147,6 @@ Automatically checks consistency.
>
> **Maturity**: EXPERIMENTAL
### [simulation-kt](simulation-kt)
> A framework for combination of asynchronous simulations.
>
> **Maturity**: PROTOTYPE
>
> **Features:**
> - [timeline](simulation-kt/#) : Timeline is an ordered discrete history containing TimeLineEvent
### [controls-storage/controls-xodus](controls-storage/controls-xodus)
> An implementation of controls-storage on top of JetBrains Xodus.
>

@ -3,12 +3,11 @@ import space.kscience.gradle.useSPCTeam
plugins {
id("space.kscience.gradle.project")
alias(libs.plugins.versions)
}
allprojects {
group = "space.kscience"
version = "0.4.0-dev-7"
version = "0.4.0-dev-4"
repositories{
google()
}

@ -6,7 +6,7 @@ A low-code constructor for composite devices simulation
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-constructor:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-constructor:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-constructor:0.4.0-dev-7")
implementation("space.kscience:controls-constructor:0.4.0-dev-4")
}
```

@ -10,8 +10,6 @@ description = """
kscience{
jvm()
js()
native()
wasm()
useCoroutines()
useSerialization()
commonMain {

@ -3,7 +3,7 @@ package space.kscience.controls.constructor
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import space.kscience.controls.api.*
import space.kscience.controls.api.LifecycleState.*
import space.kscience.controls.api.DeviceLifecycleState.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.spec.DevicePropertySpec
@ -165,11 +165,11 @@ public open class DeviceGroup(
return action(argument)
}
final override var lifecycleState: LifecycleState = LifecycleState.STOPPED
final override var lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STOPPED
private set
private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
private suspend fun setLifecycleState(lifecycleState: DeviceLifecycleState) {
this.lifecycleState = lifecycleState
sharedMessageFlow.emit(
DeviceLifeCycleMessage(lifecycleState)

@ -14,7 +14,7 @@ import space.kscience.dataforge.context.Context
/**
* A device that detects if a motor hits the end of its range
* Virtual [LimitSwitch]
*/
public class LimitSwitch(
context: Context,

@ -21,26 +21,23 @@ import kotlin.time.DurationUnit
*/
public class StepDrive(
context: Context,
ticksPerSecond: Double,
position: MutableDeviceState<Long> = MutableDeviceState(0),
ticksPerSecond: MutableDeviceState<Double>,
target: MutableDeviceState<Long> = MutableDeviceState(0),
private val writeTicks: suspend (ticks: Long, speed: Double) -> Unit = { _, _ -> },
) : DeviceConstructor(context) {
public val target: MutableDeviceState<Long> by property(
MetaConverter.long,
MutableDeviceState<Long>(position.value)
)
public val target: MutableDeviceState<Long> by property(MetaConverter.long, target)
public val speed: MutableDeviceState<Double> by property(
MetaConverter.double,
MutableDeviceState<Double>(ticksPerSecond)
)
public val speed: MutableDeviceState<Double> by property(MetaConverter.double, ticksPerSecond)
private val positionState = stateOf(target.value)
public val position: DeviceState<Long> by property(MetaConverter.long, positionState)
public val position: DeviceState<Long> by property(MetaConverter.long, position)
//FIXME round to zero problem
private val ticker = onTimer(reads = setOf(target, position), writes = setOf(position)) { prev, next ->
val tickSpeed = speed.value
val tickSpeed = ticksPerSecond.value
val timeDelta = (next - prev).toDouble(DurationUnit.SECONDS)
val ticksDelta: Long = target.value - position.value
val steps: Long = when {
@ -49,7 +46,7 @@ public class StepDrive(
else -> return@onTimer
}
writeTicks(steps, tickSpeed)
position.value += steps
positionState.value += steps
}
}

@ -11,7 +11,7 @@ import kotlinx.coroutines.flow.emptyFlow
*/
private class VirtualDeviceState<T>(
initialValue: T,
private val callback: (T) -> Unit = {}
private val callback: (T) -> Unit = {},
) : MutableDeviceState<T> {
private val flow = MutableStateFlow(initialValue)
override val valueFlow: Flow<T> get() = flow
@ -34,7 +34,7 @@ private class VirtualDeviceState<T>(
*/
public fun <T> MutableDeviceState(
initialValue: T,
callback: (T) -> Unit = {}
callback: (T) -> Unit = {},
): MutableDeviceState<T> = VirtualDeviceState(initialValue, callback)

@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.test.runTest
import space.kscience.controls.api.Device
import space.kscience.controls.api.DeviceLifeCycleMessage
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.api.DeviceLifecycleState
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.spec.doRecurring
@ -37,7 +37,7 @@ class DeviceGroupTest {
}
error("Error!")
}
testDevice.messageFlow.first { it is DeviceLifeCycleMessage && it.state == LifecycleState.STOPPED }
testDevice.messageFlow.first { it is DeviceLifeCycleMessage && it.state == DeviceLifecycleState.STOPPED }
println("stopped")
}
}

@ -16,7 +16,7 @@ Core interfaces for building a device server
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-core:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-core:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -26,6 +26,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-core:0.4.0-dev-7")
implementation("space.kscience:controls-core:0.4.0-dev-4")
}
```

@ -13,7 +13,6 @@ kscience {
jvm()
js()
native()
wasm()
useCoroutines()
useSerialization{
json()

@ -5,7 +5,7 @@ import kotlinx.coroutines.flow.Flow
/**
* A generic bidirectional asynchronous sender/receiver object
*/
public interface AsynchronousSocket<T> : WithLifeCycle {
public interface AsynchronousSocket<T> : AutoCloseable {
/**
* Send an object to the socket
*/
@ -15,6 +15,16 @@ public interface AsynchronousSocket<T> : WithLifeCycle {
* Flow of objects received from socket
*/
public fun subscribe(): Flow<T>
/**
* Start socket operation
*/
public fun open()
/**
* Check if this socket is open
*/
public val isOpen: Boolean
}
/**

@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.*
import kotlinx.serialization.Serializable
import space.kscience.controls.api.Device.Companion.DEVICE_TARGET
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.context.info
@ -14,13 +15,40 @@ import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.names.parseAsName
/**
* A lifecycle state of a device
*/
@Serializable
public enum class DeviceLifecycleState {
/**
* Device is initializing
*/
STARTING,
/**
* The Device is initialized and running
*/
STARTED,
/**
* The Device is closed
*/
STOPPED,
/**
* The device encountered irrecoverable error
*/
ERROR
}
/**
* General interface describing a managed Device.
* [Device] is a supervisor scope encompassing all operations on a device.
* When canceled, cancels all running processes.
*/
@DfType(DEVICE_TARGET)
public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
public interface Device : ContextAware, CoroutineScope {
/**
* Initial configuration meta for the device
@ -66,16 +94,18 @@ public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
* Initialize the device. This function suspends until the device is finished initialization.
* Does nothing if the device is started or is starting
*/
override suspend fun start(): Unit = Unit
public suspend fun start(): Unit = Unit
/**
* Close and terminate the device. This function does not wait for the device to be closed.
*/
override suspend fun stop() {
public suspend fun stop() {
coroutineContext[Job]?.cancel("The device is closed")
logger.info { "Device $this is closed" }
}
public val lifecycleState: DeviceLifecycleState
public companion object {
public const val DEVICE_TARGET: String = "device"
}
@ -84,7 +114,7 @@ public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
/**
* Inner id of a device. Not necessary corresponds to the name in the parent container
*/
public val Device.id: String get() = meta["id"].string ?: "device[${hashCode().toString(16)}]"
public val Device.id: String get() = meta["id"].string?: "device[${hashCode().toString(16)}]"
/**
* Device that caches properties values
@ -137,12 +167,3 @@ public fun Device.onPropertyChange(
public fun Device.propertyMessageFlow(propertyName: String): Flow<PropertyChangedMessage> = messageFlow
.filterIsInstance<PropertyChangedMessage>()
.filter { it.property == propertyName }
/**
* React on device lifecycle events
*/
public fun Device.onLifecycleEvent(
block: suspend (LifecycleState) -> Unit
): Job = messageFlow.filterIsInstance<DeviceLifeCycleMessage>().onEach {
block(it.state)
}.launchIn(this)

@ -240,7 +240,7 @@ public data class DeviceErrorMessage(
@Serializable
@SerialName("lifecycle")
public data class DeviceLifeCycleMessage(
val state: LifecycleState,
val state: DeviceLifecycleState,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,

@ -1,59 +0,0 @@
package space.kscience.controls.api
import kotlinx.serialization.Serializable
/**
* A lifecycle state of a device
*/
@Serializable
public enum class LifecycleState {
/**
* Device is initializing
*/
STARTING,
/**
* The Device is initialized and running
*/
STARTED,
/**
* The Device is closed
*/
STOPPED,
/**
* The device encountered irrecoverable error
*/
ERROR
}
/**
* An object that could be started or stopped functioning
*/
public interface WithLifeCycle {
public suspend fun start()
public suspend fun stop()
public val lifecycleState: LifecycleState
}
/**
* Bind this object lifecycle to a device lifecycle
*
* The starting and stopping are done in device scope
*/
public fun WithLifeCycle.bindToDeviceLifecycle(device: Device){
device.onLifecycleEvent {
when(it){
LifecycleState.STARTING -> start()
LifecycleState.STARTED -> {/*ignore*/}
LifecycleState.STOPPED -> stop()
LifecycleState.ERROR -> stop()
}
}
}

@ -6,7 +6,6 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.io.Source
import space.kscience.controls.api.AsynchronousSocket
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
@ -66,8 +65,8 @@ public abstract class AbstractAsynchronousPort(
protected abstract fun onOpen()
final override suspend fun start() {
if (lifecycleState == LifecycleState.STOPPED) {
final override fun open() {
if (!isOpen) {
sendJob = scope.launch {
for (data in outgoing) {
try {
@ -81,7 +80,7 @@ public abstract class AbstractAsynchronousPort(
}
onOpen()
} else {
logger.warn { "$this already started" }
logger.warn { "$this already opened" }
}
}
@ -90,7 +89,7 @@ public abstract class AbstractAsynchronousPort(
* Send a data packet via the port
*/
override suspend fun send(data: ByteArray) {
check(lifecycleState == LifecycleState.STARTED) { "The port is not opened" }
check(isOpen) { "The port is not opened" }
outgoing.send(data)
}
@ -101,7 +100,7 @@ public abstract class AbstractAsynchronousPort(
*/
override fun subscribe(): Flow<ByteArray> = incoming.receiveAsFlow()
override suspend fun stop() {
override fun close() {
outgoing.close()
incoming.close()
sendJob?.cancel()

@ -7,8 +7,6 @@ import kotlinx.coroutines.sync.withLock
import kotlinx.io.Buffer
import kotlinx.io.Source
import kotlinx.io.readByteArray
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.api.WithLifeCycle
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextAware
@ -16,7 +14,11 @@ import space.kscience.dataforge.context.ContextAware
* A port handler for synchronous (request-response) communication with a port.
* Only one request could be active at a time (others are suspended).
*/
public interface SynchronousPort : ContextAware, WithLifeCycle {
public interface SynchronousPort : ContextAware, AutoCloseable {
public fun open()
public val isOpen: Boolean
/**
* Send a single message and wait for the flow of response chunks.
@ -69,14 +71,14 @@ private class SynchronousOverAsynchronousPort(
override val context: Context get() = port.context
override suspend fun start() {
if (port.lifecycleState == LifecycleState.STOPPED) port.start()
override fun open() {
if (!port.isOpen) port.open()
}
override val lifecycleState: LifecycleState get() = port.lifecycleState
override val isOpen: Boolean get() = port.isOpen
override suspend fun stop() {
if (port.lifecycleState == LifecycleState.STARTED) port.stop()
override fun close() {
if (port.isOpen) port.close()
}
override suspend fun <R> respond(

@ -72,6 +72,7 @@ public abstract class DeviceBase<D : Device>(
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
@OptIn(ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext = context.newCoroutineContext(
SupervisorJob(context.coroutineContext[Job]) +
CoroutineName("Device $id") +
@ -187,11 +188,11 @@ public abstract class DeviceBase<D : Device>(
return spec.executeWithMeta(self, argument ?: Meta.EMPTY)
}
final override var lifecycleState: LifecycleState = LifecycleState.STOPPED
final override var lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STOPPED
private set
private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
private suspend fun setLifecycleState(lifecycleState: DeviceLifecycleState) {
this.lifecycleState = lifecycleState
sharedMessageFlow.emit(
DeviceLifeCycleMessage(lifecycleState)
@ -203,11 +204,11 @@ public abstract class DeviceBase<D : Device>(
}
final override suspend fun start() {
if (lifecycleState == LifecycleState.STOPPED) {
if (lifecycleState == DeviceLifecycleState.STOPPED) {
super.start()
setLifecycleState(LifecycleState.STARTING)
setLifecycleState(DeviceLifecycleState.STARTING)
onStart()
setLifecycleState(LifecycleState.STARTED)
setLifecycleState(DeviceLifecycleState.STARTED)
} else {
logger.debug { "Device $this is already started" }
}
@ -219,7 +220,7 @@ public abstract class DeviceBase<D : Device>(
final override suspend fun stop() {
onStop()
setLifecycleState(LifecycleState.STOPPED)
setLifecycleState(DeviceLifecycleState.STOPPED)
super.stop()
}

@ -33,7 +33,7 @@ public abstract class DeviceSpec<D : Device> {
public open suspend fun D.onOpen() {
}
public open suspend fun D.onClose() {
public open fun D.onClose() {
}

@ -41,24 +41,25 @@ public fun <T, D : Device> DeviceSpec<D>.mutableProperty(
write = { _, value: T -> readWriteProperty.set(this, value) }
)
//read only delegates
/**
* Register a read-only logical property
* (without a corresponding physical state or with a state that is updated asynchronously) for a device
* Register a mutable logical property (without a corresponding physical state) for a device
*/
public fun <T, D : DeviceBase<D>> DeviceSpec<D>.property(
public fun <T, D : DeviceBase<D>> DeviceSpec<D>.logicalProperty(
converter: MetaConverter<T>,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, DevicePropertySpec<D, T>>> =
property(
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, MutableDevicePropertySpec<D, T>>> =
mutableProperty(
converter,
descriptorBuilder,
name,
read = { propertyName -> getProperty(propertyName)?.let(converter::readOrNull) },
write = { propertyName, value -> writeProperty(propertyName, converter.convert(value)) }
)
//read only delegates
public fun <D : Device> DeviceSpec<D>.booleanProperty(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
@ -140,25 +141,7 @@ public fun <D : Device> DeviceSpec<D>.metaProperty(
//read-write delegates
/**
* Register a mutable logical property
* (without a corresponding physical state or with a state that is updated asynchronously) for a device
*/
public fun <T, D : DeviceBase<D>> DeviceSpec<D>.mutableProperty(
converter: MetaConverter<T>,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, MutableDevicePropertySpec<D, T>>> =
mutableProperty(
converter,
descriptorBuilder,
name,
read = { propertyName -> getProperty(propertyName)?.let(converter::readOrNull) },
write = { propertyName, value -> writeProperty(propertyName, converter.convert(value)) }
)
public fun <D : Device> DeviceSpec<D>.mutableBooleanProperty(
public fun <D : Device> DeviceSpec<D>.booleanProperty(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
read: suspend D.(propertyName: String) -> Boolean?,
@ -178,7 +161,7 @@ public fun <D : Device> DeviceSpec<D>.mutableBooleanProperty(
)
public fun <D : Device> DeviceSpec<D>.mutableNumberProperty(
public fun <D : Device> DeviceSpec<D>.numberProperty(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
read: suspend D.(propertyName: String) -> Number,
@ -186,7 +169,7 @@ public fun <D : Device> DeviceSpec<D>.mutableNumberProperty(
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, MutableDevicePropertySpec<D, Number>>> =
mutableProperty(MetaConverter.number, numberDescriptor(descriptorBuilder), name, read, write)
public fun <D : Device> DeviceSpec<D>.mutableDoubleProperty(
public fun <D : Device> DeviceSpec<D>.doubleProperty(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
read: suspend D.(propertyName: String) -> Double,
@ -194,7 +177,7 @@ public fun <D : Device> DeviceSpec<D>.mutableDoubleProperty(
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, MutableDevicePropertySpec<D, Double>>> =
mutableProperty(MetaConverter.double, numberDescriptor(descriptorBuilder), name, read, write)
public fun <D : Device> DeviceSpec<D>.mutableStringProperty(
public fun <D : Device> DeviceSpec<D>.stringProperty(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
read: suspend D.(propertyName: String) -> String,
@ -202,7 +185,7 @@ public fun <D : Device> DeviceSpec<D>.mutableStringProperty(
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, MutableDevicePropertySpec<D, String>>> =
mutableProperty(MetaConverter.string, descriptorBuilder, name, read, write)
public fun <D : Device> DeviceSpec<D>.mutableMetaProperty(
public fun <D : Device> DeviceSpec<D>.metaProperty(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
name: String? = null,
read: suspend D.(propertyName: String) -> Meta,

@ -1,7 +1,6 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.*
import java.net.InetSocketAddress
@ -31,7 +30,7 @@ public class ChannelPort(
meta: Meta,
coroutineContext: CoroutineContext = context.coroutineContext,
channelBuilder: suspend () -> ByteChannel,
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
) : AbstractAsynchronousPort(context, meta, coroutineContext), AutoCloseable {
/**
* A handler to await port connection
@ -42,8 +41,7 @@ public class ChannelPort(
private var listenerJob: Job? = null
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean get() = listenerJob?.isActive == true
override fun onOpen() {
listenerJob = scope.launch(Dispatchers.IO) {
@ -73,12 +71,12 @@ public class ChannelPort(
}
@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun stop() {
override fun close() {
listenerJob?.cancel()
if (futureChannel.isCompleted) {
futureChannel.getCompleted().close()
}
super.stop()
super.close()
}
}
@ -107,12 +105,12 @@ public object TcpPort : Factory<AsynchronousPort> {
/**
* Create and open TCP port
*/
public suspend fun start(
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = build(context, host, port, coroutineContext).apply { start() }
): ChannelPort = build(context, host, port, coroutineContext).apply { open() }
override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost"
@ -158,13 +156,13 @@ public object UdpPort : Factory<AsynchronousPort> {
/**
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
*/
public suspend fun start(
public fun open(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String = "localhost",
): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { start() }
): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { open() }
override fun build(context: Context, meta: Meta): ChannelPort {

@ -1,7 +1,6 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import java.net.DatagramPacket
@ -40,13 +39,13 @@ public class UdpSocketPort(
}
}
override suspend fun stop() {
override fun close() {
listenerJob?.cancel()
super.stop()
super.close()
}
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean get() = listenerJob?.isActive == true
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
val packet = DatagramPacket(

@ -29,8 +29,8 @@ internal class AsynchronousPortIOTest {
@Test
fun testUdpCommunication() = runTest {
val receiver = UdpPort.start(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.start(Global, "localhost", 8812, localPort = 8811)
val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811)
delay(30)
repeat(10) {
@ -44,7 +44,7 @@ internal class AsynchronousPortIOTest {
.toList()
assertEquals("Line number 3", res[3].trim())
receiver.stop()
sender.stop()
receiver.close()
sender.close()
}
}

@ -1,9 +0,0 @@
package space.kscience.controls.spec
import space.kscience.controls.api.ActionDescriptor
import space.kscience.controls.api.PropertyDescriptor
import kotlin.reflect.KProperty
internal actual fun PropertyDescriptor.fromSpec(property: KProperty<*>){}
internal actual fun ActionDescriptor.fromSpec(property: KProperty<*>){}

@ -6,7 +6,7 @@
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-jupyter:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-jupyter:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-jupyter:0.4.0-dev-7")
implementation("space.kscience:controls-jupyter:0.4.0-dev-4")
}
```

@ -12,7 +12,7 @@ Magix service for binding controls devices (both as RPC client and server)
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-magix:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-magix:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -22,6 +22,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-magix:0.4.0-dev-7")
implementation("space.kscience:controls-magix:0.4.0-dev-4")
}
```

@ -13,7 +13,6 @@ kscience {
jvm()
js()
native()
wasm()
useCoroutines()
useSerialization {
json()

@ -99,10 +99,10 @@ public class DeviceClient internal constructor(
}
private val lifecycleStateFlow = messageFlow.filterIsInstance<DeviceLifeCycleMessage>()
.map { it.state }.stateIn(this, started = SharingStarted.Eagerly, LifecycleState.STARTED)
.map { it.state }.stateIn(this, started = SharingStarted.Eagerly, DeviceLifecycleState.STARTED)
@DFExperimental
override val lifecycleState: LifecycleState get() = lifecycleStateFlow.value
override val lifecycleState: DeviceLifecycleState get() = lifecycleStateFlow.value
}
/**

@ -14,7 +14,7 @@ Automatically checks consistency.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-modbus:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-modbus:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -24,6 +24,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-modbus:0.4.0-dev-7")
implementation("space.kscience:controls-modbus:0.4.0-dev-4")
}
```

@ -12,7 +12,7 @@ A client and server connectors for OPC-UA via Eclipse Milo
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-opcua:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-opcua:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -22,6 +22,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-opcua:0.4.0-dev-7")
implementation("space.kscience:controls-opcua:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ Utils to work with controls-kt on Raspberry pi
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-pi:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-pi:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-pi:0.4.0-dev-7")
implementation("space.kscience:controls-pi:0.4.0-dev-4")
}
```

@ -5,7 +5,6 @@ import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.controls.ports.copyToArray
@ -50,10 +49,9 @@ public class AsynchronousPiPort(
}
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean get() = listenerJob?.isActive == true
override suspend fun stop() {
override fun close() {
listenerJob?.cancel()
serial.close()
}
@ -76,11 +74,11 @@ public class AsynchronousPiPort(
return AsynchronousPiPort(context, meta, serial)
}
public suspend fun start(
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): AsynchronousPiPort = build(context, device, block).apply { start() }
): AsynchronousPiPort = build(context, device, block).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val device: String = meta["device"].string ?: error("Device name not defined")

@ -10,7 +10,6 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.SynchronousPort
import space.kscience.controls.ports.copyToArray
import space.kscience.dataforge.context.*
@ -28,13 +27,11 @@ public class SynchronousPiPort(
) : SynchronousPort {
private val pi = context.request(PiPlugin)
override suspend fun start() {
override fun open() {
serial.open()
}
override val lifecycleState: LifecycleState
get() = if(serial.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean get() = serial.isOpen
override suspend fun <R> respond(
request: ByteArray,
@ -44,7 +41,7 @@ public class SynchronousPiPort(
serial.write(request)
flow<ByteArray> {
val buffer = ByteBuffer.allocate(1024)
while (serial.isOpen) {
while (isOpen) {
try {
val num = serial.read(buffer)
if (num > 0) {
@ -67,7 +64,7 @@ public class SynchronousPiPort(
}
}
override suspend fun stop() {
override fun close() {
serial.close()
}
@ -89,11 +86,11 @@ public class SynchronousPiPort(
return SynchronousPiPort(context, meta, serial)
}
public suspend fun start(
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): SynchronousPiPort = build(context, device, block).apply { start() }
): SynchronousPiPort = build(context, device, block).apply { open() }
override fun build(context: Context, meta: Meta): SynchronousPiPort {
val device: String = meta["device"].string ?: error("Device name not defined")

@ -1,21 +0,0 @@
# Module controls-plc4x
A plugin for Controls-kt device server on top of plc4x library
## Usage
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-plc4x:0.4.0-dev-7`.
**Gradle Kotlin DSL:**
```kotlin
repositories {
maven("https://repo.kotlin.link")
mavenCentral()
}
dependencies {
implementation("space.kscience:controls-plc4x:0.4.0-dev-7")
}
```

@ -6,7 +6,7 @@ Implementation of byte ports on top os ktor-io asynchronous API
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-ports-ktor:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-ports-ktor:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-ports-ktor:0.4.0-dev-7")
implementation("space.kscience:controls-ports-ktor:0.4.0-dev-4")
}
```

@ -6,9 +6,9 @@ import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
@ -25,7 +25,7 @@ public class KtorTcpPort internal constructor(
public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
override fun toString(): String = "port[tcp:$host:$port]"
@ -55,13 +55,13 @@ public class KtorTcpPort internal constructor(
writeChannel.await().writeAvailable(data)
}
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean
get() = listenerJob?.isActive == true
override suspend fun stop() {
override fun close() {
listenerJob?.cancel()
futureSocket.cancel()
super.stop()
super.close()
}
public companion object : Factory<AsynchronousPort> {
@ -82,13 +82,13 @@ public class KtorTcpPort internal constructor(
return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions)
}
public suspend fun start(
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { start() }
): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val host = meta["host"].string ?: "localhost"

@ -4,9 +4,9 @@ import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.*
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
@ -24,7 +24,7 @@ public class KtorUdpPort internal constructor(
public val localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
@ -58,13 +58,13 @@ public class KtorUdpPort internal constructor(
writeChannel.await().writeAvailable(data)
}
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean
get() = listenerJob?.isActive == true
override suspend fun stop() {
override fun close() {
listenerJob?.cancel()
futureSocket.cancel()
super.stop()
super.close()
}
public companion object : Factory<AsynchronousPort> {
@ -101,7 +101,7 @@ public class KtorUdpPort internal constructor(
/**
* Create and open UDP port
*/
public suspend fun start(
public fun open(
context: Context,
remoteHost: String,
remotePort: Int,
@ -117,7 +117,7 @@ public class KtorUdpPort internal constructor(
localHost,
coroutineContext,
socketOptions
).apply { start() }
).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val remoteHost by meta.string { error("Remote host is not specified") }

@ -6,7 +6,7 @@ Implementation of direct serial port communication with JSerialComm
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-serial:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-serial:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-serial:0.4.0-dev-7")
implementation("space.kscience:controls-serial:0.4.0-dev-4")
}
```

@ -5,7 +5,6 @@ import com.fazecast.jSerialComm.SerialPortDataListener
import com.fazecast.jSerialComm.SerialPortEvent
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.dataforge.context.Context
@ -29,7 +28,7 @@ public class AsynchronousSerialPort(
private val serialPortListener = object : SerialPortDataListener {
override fun getListeningEvents(): Int =
SerialPort.LISTENING_EVENT_DATA_RECEIVED or SerialPort.LISTENING_EVENT_DATA_AVAILABLE
SerialPort.LISTENING_EVENT_DATA_RECEIVED and SerialPort.LISTENING_EVENT_DATA_AVAILABLE
override fun serialEvent(event: SerialPortEvent) {
when (event.eventType) {
@ -56,20 +55,18 @@ public class AsynchronousSerialPort(
comPort.addDataListener(serialPortListener)
}
override val lifecycleState: LifecycleState
get() = if(comPort.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean get() = comPort.isOpen
override suspend fun write(data: ByteArray) {
comPort.writeBytes(data, data.size)
}
override suspend fun stop() {
override fun close() {
comPort.removeDataListener()
if (comPort.isOpen) {
comPort.closePort()
}
super.stop()
super.close()
}
public companion object : Factory<AsynchronousPort> {
@ -103,7 +100,7 @@ public class AsynchronousSerialPort(
/**
* Construct ComPort with given parameters
*/
public suspend fun start(
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
@ -121,7 +118,7 @@ public class AsynchronousSerialPort(
parity = parity,
coroutineContext = coroutineContext,
additionalConfig = additionalConfig
).apply { start() }
).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {

@ -11,8 +11,6 @@ import space.kscience.dataforge.names.asName
public class SerialPortPlugin : AbstractPlugin() {
public val ports: Ports by require(Ports)
override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when (target) {

@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.SynchronousPort
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
@ -29,17 +28,16 @@ public class SynchronousSerialPort(
override fun toString(): String = "port[${comPort.descriptivePortName}]"
override suspend fun start() {
if (!comPort.isOpen) {
override fun open() {
if (!isOpen) {
comPort.openPort()
}
}
override val lifecycleState: LifecycleState
get() = if(comPort.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean get() = comPort.isOpen
override suspend fun stop() {
override fun close() {
if (comPort.isOpen) {
comPort.closePort()
}
@ -54,7 +52,7 @@ public class SynchronousSerialPort(
comPort.flushIOBuffers()
comPort.writeBytes(request, request.size)
flow<ByteArray> {
while (comPort.isOpen) {
while (isOpen) {
try {
val available = comPort.bytesAvailable()
if (available > 0) {
@ -110,7 +108,7 @@ public class SynchronousSerialPort(
/**
* Construct ComPort with given parameters
*/
public suspend fun start(
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
@ -126,7 +124,7 @@ public class SynchronousSerialPort(
stopBits = stopBits,
parity = parity,
additionalConfig = additionalConfig
).apply { start() }
).apply { open() }
override fun build(context: Context, meta: Meta): SynchronousPort {

@ -6,7 +6,7 @@ A combined Magix event loop server with web server for visualization.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-server:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-server:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-server:0.4.0-dev-7")
implementation("space.kscience:controls-server:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ An API for stand-alone Controls-kt device or a hub.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-storage:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-storage:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-storage:0.4.0-dev-7")
implementation("space.kscience:controls-storage:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ An implementation of controls-storage on top of JetBrains Xodus.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-xodus:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-xodus:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-xodus:0.4.0-dev-7")
implementation("space.kscience:controls-xodus:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ Dashboard and visualization extensions for devices
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-vision:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:controls-vision:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-vision:0.4.0-dev-7")
implementation("space.kscience:controls-vision:0.4.0-dev-4")
}
```

@ -1,21 +0,0 @@
# Module controls-visualisation-compose
Visualisation extension using compose-multiplatform
## Usage
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-visualisation-compose:0.4.0-dev-7`.
**Gradle Kotlin DSL:**
```kotlin
repositories {
maven("https://repo.kotlin.link")
mavenCentral()
}
dependencies {
implementation("space.kscience:controls-visualisation-compose:0.4.0-dev-7")
}
```

@ -18,11 +18,25 @@ kscience {
useContextReceivers()
commonMain {
api(projects.controlsConstructor)
api(libs.koala.plots)
api(compose.foundation)
api(compose.material3)
@OptIn(ExperimentalComposeLibrary::class)
api(compose.desktop.components.splitPane)
api("io.github.koalaplot:koalaplot-core:0.6.0")
}
}
kotlin {
sourceSets {
commonMain {
dependencies {
api(compose.foundation)
api(compose.material3)
@OptIn(ExperimentalComposeLibrary::class)
api(compose.desktop.components.splitPane)
}
}
// jvmMain {
// dependencies {
// implementation(compose.desktop.currentOs)
// }
// }
}
}

@ -1,64 +0,0 @@
package space.kscience.controls.compose
import androidx.compose.runtime.Immutable
import androidx.compose.ui.geometry.Offset
import androidx.compose.ui.geometry.Size
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.graphics.drawscope.DrawScope
import androidx.compose.ui.graphics.drawscope.rotate
/**
* A single 2D drawable
*/
@Immutable
public sealed interface DeviceDrawable2D {
public fun DrawScope.draw()
override fun equals(other: Any?): Boolean
}
@Immutable
public data class CircleDrawable2D(val position: Offset, val radius: Float, val color: Color) : DeviceDrawable2D {
override fun DrawScope.draw() {
drawCircle(color, radius = radius, center = position)
}
}
@Drawable2DBuilder
public fun DeviceDrawable2DStore.circle(id: String, position: Offset, radius: Float, color: Color) {
emit(id, CircleDrawable2D(position, radius, color))
}
@Immutable
public data class RectangleDrawable2D(
val position: Offset,
val rectangleSize: Size,
val color: Color,
val rotateDegrees: Float = 0f,
) : DeviceDrawable2D {
override fun DrawScope.draw() {
rotate(rotateDegrees) {
drawRect(
color = color,
topLeft = Offset(
(position.x - rectangleSize.width / 2),
(position.y - rectangleSize.height / 2)
),
size = Size(rectangleSize.width, rectangleSize.height)
)
}
}
}
@Drawable2DBuilder
public fun DeviceDrawable2DStore.rectangle(
id: String,
position: Offset,
rectangleSize: Size,
color: Color,
rotateDegrees: Float = 0f,
) {
emit(id, RectangleDrawable2D(position, rectangleSize, color, rotateDegrees))
}

@ -1,94 +0,0 @@
package space.kscience.controls.compose
import androidx.compose.foundation.Canvas
import androidx.compose.runtime.*
import androidx.compose.ui.Modifier
import androidx.compose.ui.geometry.Size
import androidx.compose.ui.graphics.drawscope.DrawScope
import androidx.compose.ui.graphics.drawscope.clipRect
import androidx.compose.ui.layout.onGloballyPositioned
import androidx.compose.ui.unit.toSize
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import space.kscience.controls.api.Device
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.propertyFlow
@DslMarker
public annotation class Drawable2DBuilder
@Drawable2DBuilder
public class DeviceDrawable2DStore(public val scope: CoroutineScope, public val size: Size) {
public val drawableFlow: MutableStateFlow<Map<String, DeviceDrawable2D>> = MutableStateFlow(emptyMap())
}
public fun DeviceDrawable2DStore.emit(id: String, drawable2D: DeviceDrawable2D) {
drawableFlow.value += (id to drawable2D)
}
public fun DeviceDrawable2DStore.emitAll(drawables: Map<String, DeviceDrawable2D>) {
drawableFlow.value += drawables
}
/**
* Fill drawables from a flow
*/
public fun DeviceDrawable2DStore.observe(id: String, flow: Flow<DeviceDrawable2D>): Job = flow.onEach {
drawableFlow.value += (id to it)
}.launchIn(scope)
/**
* Observe single [DeviceState]
*/
public fun <T> DeviceDrawable2DStore.observeState(
state: DeviceState<T>,
id: String = state.toString(),
transform: suspend DeviceDrawable2DStore.(T) -> DeviceDrawable2D,
): Job = observe(id, state.valueFlow.map { transform(this, it) })
/**
* Observe a single [Device] property
*/
public fun <T, D : Device, P : DevicePropertySpec<D, T>> DeviceDrawable2DStore.observeProperty(
device: D,
devicePropertySpec: DevicePropertySpec<D, T>,
id: String = devicePropertySpec.toString(),
transform: suspend DeviceDrawable2DStore.(T) -> DeviceDrawable2D,
): Job = observe(id, device.propertyFlow(devicePropertySpec).map { transform(this, it) })
@Composable
public fun Device2DCanvas(
modifier: Modifier = Modifier,
onDraw: DrawScope.() -> Unit = {},
flowBuilder: suspend DeviceDrawable2DStore.() -> Unit,
) {
val coroutineScope = rememberCoroutineScope()
var canvasSize by remember { mutableStateOf(Size(100f, 100f)) }
val store = remember(canvasSize) {
DeviceDrawable2DStore(coroutineScope, canvasSize).apply {
coroutineScope.launch {
flowBuilder()
}
}
}
val drawables by store.drawableFlow.collectAsState()
key(store) {
Canvas(modifier.onGloballyPositioned {
canvasSize = it.size.toSize()
}) {
clipRect {
drawables.values.forEach {
with(it) { draw() }
}
onDraw()
}
}
}
}

@ -82,24 +82,22 @@ class DemoController : ContextAware {
opcUaServer.startup()
opcUaServer.serveDevices(deviceManager)
//create a remote listener endpoint
val listenerEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
// subscribe remote endpoint
listenerEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
// print all messages that are not property change message
if (deviceMessage !is PropertyChangedMessage) {
println(">> ${json.encodeToString(MagixMessage.serializer(), magixMessage)}")
}
}.launchIn(this)
// send description request
listenerEndpoint.send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(),
source = "listener",
// target = "demoDevice"
)
}
fun shutdown(): Job = context.launch {

@ -37,8 +37,8 @@ kotlin{
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
compilerOptions {
freeCompilerArgs.addAll("-Xjvm-default=all")
kotlinOptions {
freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
}
}

@ -1,77 +0,0 @@
package space.kscience.controls.demo.constructor
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import space.kscience.controls.constructor.DeviceConstructor
import space.kscience.controls.constructor.collectValuesIn
import space.kscience.controls.constructor.device
import space.kscience.controls.constructor.devices.LimitSwitch
import space.kscience.controls.constructor.devices.StepDrive
import space.kscience.controls.constructor.models.MutableRangeState
import space.kscience.controls.manager.DeviceManager
import space.kscience.dataforge.context.Context
import kotlin.time.Duration.Companion.seconds
private val ticksPerSecond = 3000.0
class LinearStepDrive(
context: Context,
drive: StepDrive,
atStart: LimitSwitch,
atEnd: LimitSwitch,
) : DeviceConstructor(context) {
val drive by device(drive)
val atStart by device(atStart)
val atEnd by device(atEnd)
}
fun LinearStepDrive(
context: Context,
position: MutableRangeState<Long>,
): LinearStepDrive = LinearStepDrive(
context = context,
drive = StepDrive(context, ticksPerSecond, position),
atStart = LimitSwitch(context, position.atStart),
atEnd = LimitSwitch(context, position.atEnd)
)
suspend fun LinearStepDrive.calibrate(step: Long = 10): ClosedRange<Long> = coroutineScope {
do {
ensureActive()
drive.target.value -= step
delay((step / ticksPerSecond).seconds)
} while (!atStart.locked.value)
val start = drive.position.value
do {
ensureActive()
drive.target.value += step
delay((step / ticksPerSecond).seconds)
} while (!atEnd.locked.value)
val end = drive.position.value
return@coroutineScope start..end
}
suspend fun main() = coroutineScope {
val context = Context {
plugin(DeviceManager)
}
val positionModel = MutableRangeState<Long>(0L, -1000L..1012L)
val linearStepDrive = LinearStepDrive(context, positionModel)
val printJob = linearStepDrive.drive.target.collectValuesIn(this){
println("Move to $it")
}
println(linearStepDrive.calibrate())
printJob.cancel()
}

@ -19,13 +19,10 @@ import io.github.koalaplot.core.util.toString
import io.github.koalaplot.core.xygraph.XYGraph
import io.github.koalaplot.core.xygraph.rememberDoubleLinearAxisModel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.datetime.Instant
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.compose.NumberTextField
import space.kscience.controls.compose.PlotNumericState
import space.kscience.controls.compose.TimeAxisModel
@ -42,9 +39,11 @@ import space.kscience.controls.constructor.onTimer
import space.kscience.controls.constructor.units.Kilograms
import space.kscience.controls.constructor.units.Meters
import space.kscience.controls.constructor.units.NumericalValue
import space.kscience.controls.manager.*
import space.kscience.controls.manager.ClockManager
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.clock
import space.kscience.controls.manager.install
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import java.awt.Dimension
import kotlin.math.PI
import kotlin.math.sin
@ -166,15 +165,6 @@ fun main() = application {
//bind pid parameters
LaunchedEffect(Unit) {
// start listening to local device hub
context.request(DeviceManager).hubMessageFlow()
.filterIsInstance<PropertyChangedMessage>() // filter only property change messages
//.filter { it.sourceDevice == "linearDrive".asName()} //optionally filter by device name
.onEach {
println("${it.sourceDevice} >> ${it.property} changed to ${it.value}")
}.launchIn(this)
snapshotFlow {
pidParameters
}.onEach {

@ -1,29 +1,19 @@
package space.kscience.controls.demo.constructor
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.fillMaxHeight
import androidx.compose.foundation.Canvas
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.material3.Button
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.Text
import androidx.compose.material.MaterialTheme
import androidx.compose.runtime.*
import androidx.compose.ui.Modifier
import androidx.compose.ui.geometry.Offset
import androidx.compose.ui.geometry.Size
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.unit.dp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane
import space.kscience.controls.compose.*
import space.kscience.controls.constructor.*
import space.kscience.controls.constructor.devices.LimitSwitch
import space.kscience.controls.constructor.devices.StepDrive
@ -102,7 +92,7 @@ private suspend fun Plotter.square(xRange: IntRange, yRange: IntRange) {
private val xRange = NumericalValue<Meters>(-0.5)..NumericalValue<Meters>(0.5)
private val yRange = NumericalValue<Meters>(-0.5)..NumericalValue<Meters>(0.5)
private const val ticksPerSecond = 3000.0
private val ticksPerSecond = MutableDeviceState(3000.0)
private val step = NumericalValue<Degrees>(1.8)
@ -131,30 +121,24 @@ private class PlotterModel(
context = context,
xDrive = xDrive,
yDrive = yDrive,
xStartLimit = LimitSwitch(context, x.atStart),
xEndLimit = LimitSwitch(context, x.atEnd),
yStartLimit = LimitSwitch(context, x.atStart),
yEndLimit = LimitSwitch(context, x.atEnd),
xStartLimit = LimitSwitch(context,x.atStart),
xEndLimit = LimitSwitch(context,x.atEnd),
yStartLimit = LimitSwitch(context,x.atStart),
yEndLimit = LimitSwitch(context,x.atEnd),
) { color ->
println("Point X: ${x.value.value}, Y: ${y.value.value}, color: $color")
callback(PlotterPoint(x.value, y.value, color))
}
}
private val range = -1000..1000
@OptIn(ExperimentalSplitPaneApi::class)
suspend fun main() = application {
Window(title = "Pid regulator simulator", onCloseRequest = ::exitApplication) {
window.minimumSize = Dimension(400, 400)
val scope = rememberCoroutineScope()
val points = remember { mutableStateListOf<PlotterPoint>() }
var position by remember { mutableStateOf(XY<Meters>(0, 0)) }
var updateJob: Job? = remember { null }
var points by remember { mutableStateOf<List<PlotterPoint>>(emptyList()) }
val plotterModel = remember {
LaunchedEffect(Unit) {
val context = Context {
plugin(DeviceManager)
plugin(ClockManager)
@ -162,81 +146,53 @@ suspend fun main() = application {
/* Here goes the device definition block */
PlotterModel(context) { plotterPoint ->
points += plotterPoint
val plotterModel = PlotterModel(context) { plotterPoint ->
points.add(plotterPoint)
}
/* Start visualization program */
plotterModel.xy.valueFlow.onEach {
position = it
}.launchIn(this)
/* run program */
val range = -1000..1000
// plotterModel.plotter.modernArt(range, range)
plotterModel.plotter.square(range, range)
}
/* Here goes the visualization block */
MaterialTheme {
HorizontalSplitPane {
first(200.dp) {
Column(modifier = Modifier.fillMaxHeight()) {
Button({
updateJob?.cancel()
updateJob = scope.launch {
plotterModel.plotter.square(range, range)
}
}, modifier = Modifier.fillMaxWidth()) {
Text("Rectangle")
}
Button({
updateJob?.cancel()
updateJob = scope.launch {
plotterModel.plotter.modernArt(range, range)
}
}, modifier = Modifier.fillMaxWidth()) {
Text("Modern Art")
}
Button({
updateJob?.cancel()
}, modifier = Modifier.fillMaxWidth()) {
Text("Stop")
}
}
Canvas(modifier = Modifier.fillMaxSize()) {
fun toOffset(x: NumericalValue<Meters>, y: NumericalValue<Meters>): Offset {
val canvasX = (x - xRange.start) / (xRange.endInclusive - xRange.start) * size.width
val canvasY = (y - yRange.start) / (yRange.endInclusive - yRange.start) * size.height
return Offset(canvasX.toFloat(), canvasY.toFloat())
}
second {
Device2DCanvas(modifier = Modifier.fillMaxSize()) {
fun xToPx(x: NumericalValue<Meters>): Float =
((x - xRange.start) / (xRange.endInclusive - xRange.start) * size.width).toFloat()
fun yToPx(y: NumericalValue<Meters>): Float =
((y - yRange.start) / (yRange.endInclusive - yRange.start) * size.height).toFloat()
val center = toOffset(position.x, position.y)
fun toOffset(xy: XY<Meters>): Offset = Offset(xToPx(xy.x), yToPx(xy.y))
drawRect(
Color.LightGray,
topLeft = Offset(0f, center.y - 5f),
size = Size(size.width, 10f)
)
observeState(plotterModel.y, "beam") { y ->
RectangleDrawable2D(
position = Offset(size.width / 2, yToPx(y)),
rectangleSize = Size(size.width, 10f),
color = Color.LightGray
)
}
drawCircle(Color.Black, radius = 10f, center = center)
observeState(plotterModel.xy, "head") { xy ->
CircleDrawable2D(
position = toOffset(xy),
radius = 10f,
color = Color.Black
)
}
snapshotFlow { points }.onEach {
it.forEachIndexed { index, plotterPoint ->
circle(
"point[$index]",
Offset(xToPx(plotterPoint.x), yToPx(plotterPoint.y)),
radius = 5f,
color = plotterPoint.color
)
}
}.launchIn(scope)
}
points.forEach {
drawCircle(it.color, radius = 2f, center = toOffset(it.x, it.y))
}
}
}
}
}

@ -1,4 +1,14 @@
# Module device-collective
# Running demo from gradle
1. Install JDK 17-21 in the system (for example, from https://sdkman.io/jdks or https://github.com/ScoopInstaller/Java).
2. Clone the repository with Git.
3. Run `./gradlew :demo:device-collective:run` from the project root directory.
# Install distribution
1. Install JDK 17-21 in the system (for example, from https://sdkman.io/jdks or https://github.com/ScoopInstaller/Java).
2. Clone the repository with Git.
3. Run `./gradlew :demo:device-collective:packageUberJarForCurrentOS` from the project root directory.
4. Go to `build/compose/jars/device-collective-<your OS>-<version>.jar`. You can copy it and run with `java -jar <full-name>.jar`

@ -21,8 +21,8 @@ kotlin{
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
compilerOptions {
freeCompilerArgs.addAll("-Xjvm-default=all")
kotlinOptions {
freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
}
}

@ -26,8 +26,8 @@ kotlin{
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
compilerOptions {
freeCompilerArgs.addAll("-Xjvm-default=all")
kotlinOptions {
freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
}
}

@ -88,20 +88,20 @@ class MksPdr900Device(context: Context, meta: Meta) : DeviceBySpec<MksPdr900Devi
override fun build(context: Context, meta: Meta): MksPdr900Device = MksPdr900Device(context, meta)
val powerOn by mutableBooleanProperty(read = { readPowerOn() }, write = { _, value -> writePowerOn(value) })
val powerOn by booleanProperty(read = { readPowerOn() }, write = { _, value -> writePowerOn(value) })
val channel by property(MetaConverter.int)
val channel by logicalProperty(MetaConverter.int)
val value by doubleProperty(read = {
readChannelData(getOrRead(channel))
})
val error by property(MetaConverter.string)
val error by logicalProperty(MetaConverter.string)
override suspend fun MksPdr900Device.onClose() {
override fun MksPdr900Device.onClose() {
if (portDelegate.isInitialized()) {
port.stop()
port.close()
}
}
}

@ -23,6 +23,8 @@ import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
@ -166,7 +168,7 @@ class PiMotionMasterDevice(
}
//Update port
//address = portSpec.node
port = portFactory(portSpec, context).apply { start() }
port = portFactory(portSpec, context).apply { open() }
// connector.open()
//Initialize axes
val idn = read(identity)
@ -188,7 +190,7 @@ class PiMotionMasterDevice(
}) {
port?.let {
execute(stop)
it.stop()
it.close()
}
port = null
propertyChanged(connected, false)
@ -236,7 +238,7 @@ class PiMotionMasterDevice(
private fun axisBooleanProperty(
command: String,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
) = mutableBooleanProperty(
) = booleanProperty(
read = {
readAxisBoolean("$command?")
},
@ -249,7 +251,7 @@ class PiMotionMasterDevice(
private fun axisNumberProperty(
command: String,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
) = mutableDoubleProperty(
) = doubleProperty(
read = {
mm.requestAndParse("$command?", axisId)[axisId]?.toDoubleOrNull()
?: error("Malformed $command response. Should include float value for $axisId")

@ -6,7 +6,6 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.AsynchronousSocket
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.withDelimiter
import space.kscience.dataforge.context.*
@ -49,10 +48,10 @@ abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<Byt
respond(response())
}
override val lifecycleState: LifecycleState
get() = if(scope.isActive) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean
get() = scope.isActive
override suspend fun stop() = scope.cancel()
override fun close() = scope.cancel()
}
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractAsynchronousPort(context, Meta.EMPTY) {
@ -73,12 +72,12 @@ class VirtualPort(private val device: VirtualDevice, context: Context) : Abstrac
device.send(data)
}
override val lifecycleState: LifecycleState
get() = if(respondJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override val isOpen: Boolean
get() = respondJob?.isActive == true
override suspend fun stop() {
override fun close() {
respondJob?.cancel()
super.stop()
super.close()
}
}
@ -89,7 +88,7 @@ class PiMotionMasterVirtualDevice(
scope: CoroutineScope = context,
) : VirtualDevice(scope), ContextAware {
override suspend fun start() {
override fun open() {
//add asynchronous send logic here
}

@ -31,6 +31,8 @@ plc4j = "0.12.0"
visionforge = "0.4.2"
versions = "0.51.0"
[libraries]
dataforge-io = { module = "space.kscience:dataforge-io", version.ref = "dataforge" }
@ -79,13 +81,10 @@ visionforge-markdown = { module = "space.kscience:visionforge-markdown", version
visionforge-server = { module = "space.kscience:visionforge-server", version.ref = "visionforge" }
visionforge-compose-html = { module = "space.kscience:visionforge-compose-html", version.ref = "visionforge" }
sciprog-maps-compose = { module = "space.kscience:maps-kt-compose", version = "0.3.0" }
koala-plots = { module = "io.github.koalaplot:koalaplot-core", version = "0.6.1" }
sciprog-maps-compose = "space.kscience:maps-kt-compose:0.3.0"
# Buildscript
[plugins]
versions = "com.github.ben-manes.versions:0.51.0"
versions-update = "nl.littlerobots.version-catalog-update:0.8.4"
versions = { id = "com.github.ben-manes.versions", version.ref = "versions" }

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

@ -6,7 +6,7 @@ A kotlin API for magix standard and some zero-dependency magix services
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-api:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-api:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-api:0.4.0-dev-7")
implementation("space.kscience:magix-api:0.4.0-dev-4")
}
```

@ -13,7 +13,6 @@ kscience {
jvm()
js()
native()
wasm()
useCoroutines()
useSerialization{
json()

@ -6,7 +6,7 @@ Java API to work with magix endpoints without Kotlin
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-java-endpoint:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-java-endpoint:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-java-endpoint:0.4.0-dev-7")
implementation("space.kscience:magix-java-endpoint:0.4.0-dev-4")
}
```

@ -1,3 +1,5 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import space.kscience.gradle.KScienceVersions
import space.kscience.gradle.Maturity
plugins {
@ -15,6 +17,19 @@ dependencies {
implementation(spclibs.kotlinx.coroutines.jdk9)
}
readme {
//java {
// sourceCompatibility = KScienceVersions.JVM_TARGET
// targetCompatibility = KScienceVersions.JVM_TARGET
//}
//FIXME https://youtrack.jetbrains.com/issue/KT-52815/Compiler-option-Xjdk-release-fails-to-compile-mixed-projects
tasks.withType<KotlinCompile>{
kotlinOptions {
freeCompilerArgs -= "-Xjdk-release=11"
}
}
readme{
maturity = Maturity.EXPERIMENTAL
}

@ -6,7 +6,7 @@ MQTT client magix endpoint
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-mqtt:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-mqtt:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-mqtt:0.4.0-dev-7")
implementation("space.kscience:magix-mqtt:0.4.0-dev-4")
}
```

@ -1,5 +1,5 @@
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.jvm")
`maven-publish`
}
@ -7,15 +7,12 @@ description = """
MQTT client magix endpoint
""".trimIndent()
kscience {
jvm()
jvmMain {
api(projects.magix.magixApi)
implementation(libs.hivemq.mqtt.client)
implementation(spclibs.kotlinx.coroutines.jdk8)
}
dependencies {
api(projects.magix.magixApi)
implementation(libs.hivemq.mqtt.client)
implementation(spclibs.kotlinx.coroutines.jdk8)
}
readme {
readme{
maturity = space.kscience.gradle.Maturity.PROTOTYPE
}

@ -6,7 +6,7 @@ RabbitMQ client magix endpoint
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-rabbit:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-rabbit:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-rabbit:0.4.0-dev-7")
implementation("space.kscience:magix-rabbit:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ Magix endpoint (client) based on RSocket
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-rsocket:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-rsocket:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-rsocket:0.4.0-dev-7")
implementation("space.kscience:magix-rsocket:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ A magix event loop implementation in Kotlin. Includes HTTP/SSE and RSocket route
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-server:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-server:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-server:0.4.0-dev-7")
implementation("space.kscience:magix-server:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@ Magix history database API
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-storage:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-storage:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-storage:0.4.0-dev-7")
implementation("space.kscience:magix-storage:0.4.0-dev-4")
}
```

@ -6,7 +6,7 @@
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-storage-xodus:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-storage-xodus:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-storage-xodus:0.4.0-dev-7")
implementation("space.kscience:magix-storage-xodus:0.4.0-dev-4")
}
```

@ -1,23 +1,19 @@
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.jvm")
`maven-publish`
}
kscience {
jvm()
useCoroutines()
jvmMain {
api(projects.magix.magixStorage)
implementation(libs.xodus.entity.store)
// implementation("org.jetbrains.xodus:dnq:2.0.0")
}
jvmTest{
implementation(spclibs.kotlinx.coroutines.test)
}
}
dependencies {
api(projects.magix.magixStorage)
implementation(libs.xodus.entity.store)
// implementation("org.jetbrains.xodus:dnq:2.0.0")
testImplementation(spclibs.kotlinx.coroutines.test)
}
readme {
maturity = space.kscience.gradle.Maturity.PROTOTYPE

@ -1,21 +0,0 @@
# Module magix-utils
Common utilities and services for Magix endpoints.
## Usage
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-utils:0.4.0-dev-7`.
**Gradle Kotlin DSL:**
```kotlin
repositories {
maven("https://repo.kotlin.link")
mavenCentral()
}
dependencies {
implementation("space.kscience:magix-utils:0.4.0-dev-7")
}
```

@ -6,7 +6,7 @@ ZMQ client endpoint for Magix
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-zmq:0.4.0-dev-7`.
The Maven coordinates of this project are `space.kscience:magix-zmq:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-zmq:0.4.0-dev-7")
implementation("space.kscience:magix-zmq:0.4.0-dev-4")
}
```

@ -1,7 +1,7 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.jvm")
`maven-publish`
}
@ -9,13 +9,10 @@ description = """
ZMQ client endpoint for Magix
""".trimIndent()
kscience {
jvm()
jvmMain {
api(projects.magix.magixApi)
api("org.slf4j:slf4j-api:2.0.6")
api("org.zeromq:jeromq:0.5.3")
}
dependencies {
api(projects.magix.magixApi)
api("org.slf4j:slf4j-api:2.0.6")
api("org.zeromq:jeromq:0.5.3")
}
readme {

@ -4,6 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import org.slf4j.LoggerFactory
import org.zeromq.SocketType

@ -21,10 +21,6 @@ pluginManagement {
}
}
plugins {
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}
dependencyResolutionManagement {
val toolsVersion: String by extra
@ -56,7 +52,6 @@ dependencyResolutionManagement {
}
include(
":simulation-kt",
":controls-core",
":controls-ports-ktor",
":controls-serial",

@ -1,26 +0,0 @@
# Module simulation-kt
## Features
- [timeline](#) : Timeline is an ordered discrete history containing TimeLineEvent
## Usage
## Artifact:
The Maven coordinates of this project are `space.kscience:simulation-kt:0.4.0-dev-7`.
**Gradle Kotlin DSL:**
```kotlin
repositories {
maven("https://repo.kotlin.link")
mavenCentral()
}
dependencies {
implementation("space.kscience:simulation-kt:0.4.0-dev-7")
}
```

@ -1,33 +0,0 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
kscience {
jvm()
js()
native()
wasm()
useCoroutines()
useContextReceivers()
commonMain {
api(spclibs.kotlinx.datetime)
}
jvmTest {
implementation(spclibs.logback.classic)
}
}
readme {
maturity = Maturity.PROTOTYPE
description = """
A framework for combination of asynchronous simulations.
""".trimIndent()
feature("timeline") { "Timeline is an ordered discrete history containing TimeLineEvent" }
}

@ -1,66 +0,0 @@
package space.kscience.simulation
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.time.Duration
/**
* Suspend the collection of this [Flow] until event time is lower that threshold
*/
public fun <E : TimelineEvent> Flow<E>.withTimeThreshold(
threshold: Flow<Instant>
): Flow<E> = transform { event ->
threshold.first { it > event.time }
emit(event)
}
/**
* @param lookaheadInterval an interval for generated events ahead of the last observed event.
*/
public class GeneratingTimeline<E : TimelineEvent>(
origin: E,
private val lookaheadInterval: Duration,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val generator: suspend FlowCollector<E>.(E) -> Unit
) : ProducerTimeline<E>(origin.time, coroutineContext) {
private val startEventFlow = MutableStateFlow(origin)
private data class EventWithOrigin<E : TimelineEvent>(val origin: E, val event: E) : TimelineEvent {
override val time: Instant get() = event.time
}
private val events: SharedFlow<E> = flow {
coroutineScope {
startEventFlow.collect { startEvent ->
emitAll(
flow { generator(startEvent) }.takeWhile { startEvent == startEventFlow.value }.map {
EventWithOrigin(startEvent, it)
}
)
}
}
}.withTimeThreshold(
threshold = time.map { it + lookaheadInterval }
).buffer(Channel.UNLIMITED).mapNotNull {
//a barrier to avoid leaking stale events after interruption from buffer
it.takeIf { it.origin == startEventFlow.value }?.event
}.shareIn(
scope = timelineScope,
started = SharingStarted.Lazily,
)
override fun events(): Flow<E> = events
public suspend fun interrupt(newStart: E) {
check(newStart.time >= time.value) {
"Can't interrupt generating timeline after observed event"
}
startTime = newStart.time
startEventFlow.emit(newStart)
}
}

@ -1,82 +0,0 @@
package space.kscience.simulation
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
public class MergedTimeline<E : TimelineEvent>(
private val timelines: List<Timeline<E>>,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : Timeline<E> {
protected val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
CoroutineName("MergedTimeline")
)
override val time: StateFlow<Instant> = combine(timelines.map { it.time }){ array->
array.max()
}.stateIn(timelineScope, SharingStarted.Lazily, timelines.maxOf { it.time.value })
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val context = currentCoroutineContext()
val buffer = mutableListOf<E>()
val timelineObservers = timelines.map {
it.observeEach { event ->
buffer.add(event)
}
}
val observer = object : TimelineObserver {
private val channel = Channel<E>()
override val time = MutableStateFlow(this@MergedTimeline.time.value)
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(it.time)
}.collector()
}
private val mutex = Mutex()
override suspend fun collect(upTo: Instant) = mutex.withLock{
timelineObservers.forEach {
it.collect(upTo)
}
buffer.sortedBy { it.time }.forEach {
channel.send(it)
buffer.remove(it)
}
}
override fun close() {
collectJob.cancel()
observers.remove(this)
}
}
observers.add(observer)
return observer
}
}

@ -1,83 +0,0 @@
package space.kscience.simulation
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
public abstract class ProducerTimeline<E : TimelineEvent>(
protected var startTime: Instant,
coroutineContext: CoroutineContext
) : Timeline<E>, AutoCloseable {
protected val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
CoroutineName("Timeline")
)
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
private val feedbackChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val time: StateFlow<Instant> = feedbackChannel.consumeAsFlow().map {
maxOf(startTime,observers.maxOfOrNull { it.time.value } ?: startTime)
}.stateIn(timelineScope, SharingStarted.Lazily, startTime)
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
/**
* Flow unobserved events starting at [time]. The flow could be interrupted if timeline changes
*/
protected abstract fun events(): Flow<E>
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val context = currentCoroutineContext()
val observer = object : TimelineObserver {
// observed time
override val time = MutableStateFlow(startTime)
private val channel = Channel<E>()
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(it.time)
feedbackChannel.send(Unit)
}.collector()
}
private val mutex = Mutex()
override suspend fun collect(upTo: Instant) = mutex.withLock {
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
events().takeWhile {
it.time <= upTo
}.collect {
channel.send(it)
}
}
override fun close() {
collectJob.cancel()
observers.remove(this)
}
}
observers.add(observer)
return observer
}
override fun close() {
observers.forEach { it.close() }
timelineScope.cancel()
}
}

@ -1,31 +0,0 @@
package space.kscience.simulation
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* A manually mutable [Timeline] that could be modified via [emit] method by multiple
*/
public class SharedTimeline<E : TimelineEvent>(
startTime: Instant,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : ProducerTimeline<E>(startTime, coroutineContext) {
private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED)
override fun events(): Flow<E> = events
/**
* Emit new event to the timeline
*/
public suspend fun emit(event: E) {
if (event.time < (events.replayCache.lastOrNull()?.time ?: time.value)) {
error("Can't emit event $event because timeline monotony is broken")
}
events.emit(event)
}
}

@ -1,81 +0,0 @@
package space.kscience.simulation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.datetime.Instant
import kotlin.time.Duration
public interface TimelineEvent {
public val time: Instant
}
public interface TimelineInterval : TimelineEvent {
public val startTime: Instant
public val duration: Duration
override val time: Instant
get() = startTime + duration
}
public data class SimpleTimelineEvent<T>(override val time: Instant, val value: T) : TimelineEvent
public interface TimelineObserver : AutoCloseable {
/**
* The subjective time of this observer (last observed time)
*/
public val time: StateFlow<Instant>
/**
* Collect all uncollected events from [time] to [upTo].
*
* By default, collects all events.
*/
public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE)
}
/**
* Collect events for a fixed [duration] since last observed time
*/
public suspend fun TimelineObserver.collect(duration: Duration): Unit = collect(time.value + duration)
/**
* A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of
* the next event is greater than the previous event time.
*
* Timeline guarantees that all collectors could read all events when they need. Meaning that all unread events are cached.
*
* Timeline guarantees that already read events won't change, but unread events could change.
*/
public interface Timeline<E : TimelineEvent> {
/**
* A subjective time of this timeline. The subjective time is the last observed time.
*/
public val time: StateFlow<Instant>
/**
* Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand.
*
* Each collection shifts [TimelineObserver.time] for this observer.
*/
public suspend fun observe(
collector: suspend Flow<E>.() -> Unit
): TimelineObserver
/**
* Advance simulation time to [toTime]. This method forces all observers to collect all events in the given range.
*
* This method suspends until all advancement is done
*/
public suspend fun advance(toTime: Instant)
}
/**
* Perform [collector] action on each event
*/
public suspend fun <E : TimelineEvent> Timeline<E>.observeEach(
collector: suspend (E) -> Unit
): TimelineObserver = observe {
collect(collector)
}

@ -1,35 +0,0 @@
package space.kscience.simulation
internal inline fun <T, R : Comparable<R>> Iterable<T>.minOfNotNullOrNull(selector: (T) -> R?): R? {
val iterator = iterator()
if (!iterator.hasNext()) return null
var minValue = selector(iterator.next())
while (iterator.hasNext()) {
val v = selector(iterator.next())
when {
minValue == null -> minValue = v
v == null -> {/*do nothing*/}
minValue > v -> {
minValue = v
}
}
}
return minValue
}
internal inline fun <T, R : Comparable<R>> Iterable<T>.maxOfNotNullOrNull(selector: (T) -> R?): R? {
val iterator = iterator()
if (!iterator.hasNext()) return null
var maxValue = selector(iterator.next())
while (iterator.hasNext()) {
val v = selector(iterator.next())
when {
maxValue == null -> maxValue = v
v == null -> {/*do nothing*/}
maxValue < v -> {
maxValue = v
}
}
}
return maxValue
}

@ -1,48 +0,0 @@
package space.kscience.simulation
import kotlinx.coroutines.isActive
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
class TimelineTests {
@Test
fun testGeneration() = runTest(timeout = 1.seconds) {
val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
val generation = GeneratingTimeline(
origin = SimpleTimelineEvent(startTime, Unit),
lookaheadInterval = 1.seconds
) { event ->
var time = event.time
while (isActive) {
time += 0.1.seconds
println("Emit: ${time - startTime}")
emit(SimpleTimelineEvent(time, Unit))
}
}
val result = mutableListOf<Duration>()
val observer = generation.observeEach {
println("Consume: ${it.time - startTime}")
result.add(it.time - startTime)
}
observer.collect(2.seconds)
println("First collection complete")
observer.collect(2.seconds)
println("Second collection complete")
println("Interrupt")
generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, Unit))
println("Collecting after interruption")
observer.collect(startTime + 6.seconds + 2.5.seconds)
println(result)
generation.close()
}
}