Make remote device connection ask for descriptors before start
This commit is contained in:
parent
ee83f81a04
commit
e5088ac8e4
@ -100,8 +100,8 @@ public interface Device : ContextAware, CoroutineScope {
|
||||
* Close and terminate the device. This function does not wait for the device to be closed.
|
||||
*/
|
||||
public suspend fun stop() {
|
||||
coroutineContext[Job]?.cancel("The device is closed")
|
||||
logger.info { "Device $this is closed" }
|
||||
cancel("The device is closed")
|
||||
}
|
||||
|
||||
public val lifecycleState: DeviceLifecycleState
|
||||
|
@ -17,11 +17,15 @@ kscience {
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
dependencies {
|
||||
commonMain {
|
||||
api(projects.magix.magixApi)
|
||||
api(projects.controlsCore)
|
||||
api(libs.uuid)
|
||||
}
|
||||
|
||||
jvmTest{
|
||||
implementation(spclibs.logback.classic)
|
||||
}
|
||||
}
|
||||
|
||||
readme {
|
||||
|
@ -1,8 +1,11 @@
|
||||
package space.kscience.controls.client
|
||||
|
||||
import com.benasher44.uuid.uuid4
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import space.kscience.controls.api.*
|
||||
@ -23,9 +26,11 @@ private fun stringUID() = uuid4().leastSignificantBits.toString(16)
|
||||
/**
|
||||
* A remote accessible device that relies on connection via Magix
|
||||
*/
|
||||
public class DeviceClient(
|
||||
public class DeviceClient internal constructor(
|
||||
override val context: Context,
|
||||
private val deviceName: Name,
|
||||
override val propertyDescriptors: Collection<PropertyDescriptor>,
|
||||
override val actionDescriptors: Collection<ActionDescriptor>,
|
||||
incomingFlow: Flow<DeviceMessage>,
|
||||
private val send: suspend (DeviceMessage) -> Unit,
|
||||
) : CachingDevice {
|
||||
@ -37,12 +42,6 @@ public class DeviceClient(
|
||||
|
||||
private val propertyCache = HashMap<String, Meta>()
|
||||
|
||||
override var propertyDescriptors: Collection<PropertyDescriptor> = emptyList()
|
||||
private set
|
||||
|
||||
override var actionDescriptors: Collection<ActionDescriptor> = emptyList()
|
||||
private set
|
||||
|
||||
private val flowInternal = incomingFlow.filter {
|
||||
it.sourceDevice == deviceName
|
||||
}.shareIn(this, started = SharingStarted.Eagerly).also {
|
||||
@ -52,11 +51,6 @@ public class DeviceClient(
|
||||
propertyCache[message.property] = message.value
|
||||
}
|
||||
|
||||
is DescriptionMessage -> mutex.withLock {
|
||||
propertyDescriptors = message.properties
|
||||
actionDescriptors = message.actions
|
||||
}
|
||||
|
||||
else -> {
|
||||
//ignore
|
||||
}
|
||||
@ -112,14 +106,38 @@ public class DeviceClient(
|
||||
* @param targetEndpointName the name of endpoint in Magix to connect to
|
||||
* @param deviceName the name of device within endpoint
|
||||
*/
|
||||
public fun MagixEndpoint.remoteDevice(
|
||||
public suspend fun MagixEndpoint.remoteDevice(
|
||||
context: Context,
|
||||
sourceEndpointName: String,
|
||||
targetEndpointName: String,
|
||||
deviceName: Name,
|
||||
): DeviceClient {
|
||||
): DeviceClient = coroutineScope{
|
||||
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(targetEndpointName)).map { it.second }
|
||||
return DeviceClient(context, deviceName, subscription) {
|
||||
|
||||
val deferredDescriptorMessage = CompletableDeferred<DescriptionMessage>()
|
||||
|
||||
launch {
|
||||
deferredDescriptorMessage.complete(subscription.filterIsInstance<DescriptionMessage>().first())
|
||||
}
|
||||
|
||||
send(
|
||||
format = DeviceManager.magixFormat,
|
||||
payload = GetDescriptionMessage(targetDevice = deviceName),
|
||||
source = sourceEndpointName,
|
||||
target = targetEndpointName,
|
||||
id = stringUID()
|
||||
)
|
||||
|
||||
|
||||
val descriptionMessage = deferredDescriptorMessage.await()
|
||||
|
||||
DeviceClient(
|
||||
context = context,
|
||||
deviceName = deviceName,
|
||||
propertyDescriptors = descriptionMessage.properties,
|
||||
actionDescriptors = descriptionMessage.actions,
|
||||
incomingFlow = subscription
|
||||
) {
|
||||
send(
|
||||
format = DeviceManager.magixFormat,
|
||||
payload = it,
|
||||
@ -130,6 +148,8 @@ public fun MagixEndpoint.remoteDevice(
|
||||
}
|
||||
}
|
||||
|
||||
//public fun MagixEndpoint.remoteDeviceHub()
|
||||
|
||||
/**
|
||||
* Subscribe on specific property of a device without creating a device
|
||||
*/
|
||||
|
@ -1,5 +1,6 @@
|
||||
package space.kscience.controls.client
|
||||
|
||||
import com.benasher44.uuid.uuid4
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
@ -29,7 +30,7 @@ public val DeviceManager.Companion.magixFormat: MagixFormat<DeviceMessage> get()
|
||||
internal fun generateId(request: MagixMessage): String = if (request.id != null) {
|
||||
"${request.id}.response"
|
||||
} else {
|
||||
"controls[${request.payload.hashCode().toUInt().toString(16)}]"
|
||||
uuid4().leastSignificantBits.toULong().toString(16)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,9 +1,12 @@
|
||||
package space.kscience.controls.client
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import kotlinx.serialization.json.Json
|
||||
import space.kscience.controls.api.DeviceMessage
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.manager.install
|
||||
import space.kscience.controls.manager.respondMessage
|
||||
@ -46,7 +49,7 @@ internal class RemoteDeviceConnect {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun wrapper() = runTest {
|
||||
fun deviceClient() = runTest {
|
||||
val context = Context {
|
||||
plugin(DeviceManager)
|
||||
}
|
||||
@ -56,11 +59,15 @@ internal class RemoteDeviceConnect {
|
||||
val virtualMagixEndpoint = object : MagixEndpoint {
|
||||
|
||||
|
||||
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = device.messageFlow.map {
|
||||
private val additionalMessages = MutableSharedFlow<DeviceMessage>(10)
|
||||
|
||||
override fun subscribe(
|
||||
filter: MagixMessageFilter,
|
||||
): Flow<MagixMessage> = merge(device.messageFlow, additionalMessages).map {
|
||||
MagixMessage(
|
||||
format = DeviceManager.magixFormat.defaultFormat,
|
||||
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
|
||||
sourceEndpoint = "source",
|
||||
sourceEndpoint = "device",
|
||||
)
|
||||
}
|
||||
|
||||
@ -68,16 +75,18 @@ internal class RemoteDeviceConnect {
|
||||
device.respondMessage(
|
||||
Name.EMPTY,
|
||||
Json.decodeFromJsonElement(DeviceManager.magixFormat.serializer, message.payload)
|
||||
)
|
||||
)?.let {
|
||||
additionalMessages.emit(it)
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "source", "target", Name.EMPTY)
|
||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "client", "device", Name.EMPTY)
|
||||
|
||||
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
|
||||
|
||||
}
|
||||
}
|
@ -56,6 +56,6 @@ public suspend fun <T> MagixEndpoint.send(
|
||||
parentId = parentId,
|
||||
user = user
|
||||
)
|
||||
broadcast(message)
|
||||
send(message)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user