Usage
The main class for communication in the stateful API provided by this package is ICOsystem. Depending on the current state of objects of this class you will be able to use different coroutines to interact with the system.
State Diagram
stateDiagram-v2
disconnected: Disconnected
stu_connected: STU Connected
sensor_node_connected: Sensor Node Connected
measurement: Measurement
disconnected --> stu_connected: connect_stu
stu_connected --> disconnected: disconnect_stu
stu_connected --> stu_connected: get_adc_configuration, rename, reset_stu, set_adc_configuration
stu_connected --> sensor_node_connected: connect_sensor_node_mac
sensor_node_connected --> stu_connected: disconnect_sensor_node
sensor_node_connected --> sensor_node_connected: get_adc_configuration, get_sensor_configuration, rename, set_adc_configuration, set_sensor_configuration
sensor_node_connected --> measurement: start_measurement
measurement --> sensor_node_connected: stop_measurement
In addition to coroutines that label the edges of the state diagram above you can also:
Use the coroutine
ICOsystem.is_sensor_node_connected(), which works in any state.Use the coroutines:
which works in all states except
State.Disconnected.
STU
Connecting to STU
Before you work with the ICOtronic system you need to set up the CAN connection to the STU (Stationary Transceiver Unit), which you can do using the coroutine ICOsystem.connect_stu(). After you are done working working with the STU you also need to disconnect the connection using the coroutine ICOsystem.disconnect_stu().
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> async def connect_disconnect_stu(icosystem: ICOsystem):
... await icosystem.connect_stu()
... await icosystem.disconnect_stu()
>>> run(connect_disconnect_stu(ICOsystem()))
Resetting STU
In case you have the ICOtronic system does not react as you expect you can reset it using the coroutine ICOsystem.reset_stu().
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> async def reset_stu(icosystem: ICOsystem):
... await icosystem.connect_stu()
... await icosystem.reset_stu()
... await icosystem.disconnect_stu()
>>> run(reset_stu(ICOsystem()))
Finding Available Sensor Nodes
To retrieve information about available sensor nodes use the coroutine ICOsystem.collect_sensor_nodes().
>>> from asyncio import run
>>> from netaddr import EUI
>>> from icostate import ICOsystem, SensorNodeInfo
>>> async def get_sensor_nodes(icosystem: ICOsystem) -> list[SensorNodeInfo]:
... await icosystem.connect_stu()
... sensor_nodes = await icosystem.collect_sensor_nodes()
... await icosystem.disconnect_stu()
... return sensor_nodes
>>> sensor_nodes = run(get_sensor_nodes(ICOsystem()))
>>> # We assume that at least one sensor node is available
>>> len(sensor_nodes) >= 1
True
>>> node_info = sensor_nodes[0]
>>> # Each list entry contains information about name, MAC address and RSSI
>>> isinstance(node_info.name, str)
True
>>> len(node_info.name) <= 8
True
>>> isinstance(node_info.mac_address, EUI)
True
>>> -100 < node_info.rssi < 0
True
Sensor Node
Connecting to Sensor Node
Before you start a measurement you need to connect to a sensor node. To do that use the coroutine ICOsystem.connect_sensor_node_mac(). Please do not forget to disconnect from the node with the coroutine ICOsystem.disconnect_sensor_node() afterwards.
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> from icostate.config import settings
>>> async def connect_disconnect_sensor_node(icosystem: ICOsystem,
... mac_address: str):
... await icosystem.connect_stu()
... print(f"Connected: {await icosystem.is_sensor_node_connected()}")
... await icosystem.connect_sensor_node_mac(mac_address)
... print(f"Connected: {await icosystem.is_sensor_node_connected()}")
... await icosystem.disconnect_sensor_node()
... print(f"Connected: {await icosystem.is_sensor_node_connected()}")
... await icosystem.disconnect_stu()
>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> run(connect_disconnect_sensor_node(ICOsystem(), mac_address))
Connected: False
Connected: True
Connected: False
Rename a Sensor Node
To rename a sensor node use the coroutine ICOsystem.rename(), which requires the new name of the sensor node as parameter. If the system is currently not connected to a sensor node, then the coroutine also requires the MAC address of the sensor node. After using the coroutine successfully the system will switch back to the state it was in before renaming the sensor node (either “STU Connected” or “Sensor Node Connected”).
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> from icostate.config import settings
>>> async def rename_disconnected(icosystem: ICOsystem,
... mac_address: str,
... new_name: str):
... await icosystem.connect_stu()
... print(f"State Before: {icosystem.state}")
... await icosystem.rename(new_name, mac_address)
... print(f"State After: {icosystem.state}")
... await icosystem.disconnect_stu()
>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> name = "Test-STH"
>>> run(rename_disconnected(ICOsystem(), mac_address, name))
State Before: STU Connected
State After: STU Connected
Events
Objects of the ICOsystem provides an event based API (based on pyee) you can use to react to changes to the system. Currently the following events are supported:
sensor_node_name: Called when the name of the current sensor node changessensor_node_mac_address: Called when the MAC address of a sensor node changessensor_node_adc_configuration: Called when the ADC configuration of a sensor node is updatedsensor_node_measurement_data: Called when new streaming data is available
The example below shows how you can react to changes of the sensor node name:
>>> from asyncio import sleep, run
>>> from icostate import ICOsystem
>>> from icostate.config import settings
>>> async def react_sensor_node_name(icosystem: ICOsystem, mac_address: str):
...
... @icosystem.on("sensor_node_name")
... async def name_changed(name: str):
... print(f"Name of sensor node: {name}")
...
... await icosystem.connect_stu()
... await icosystem.connect_sensor_node_mac(mac_address)
... await sleep(0) # Allow scheduler to trigger event coroutine
... await icosystem.disconnect_sensor_node()
... await icosystem.disconnect_stu()
>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> run(react_sensor_node_name(ICOsystem(), mac_address))
Name of sensor node: Test-STH
Measurement (Streaming)
To start a measurement use the function start_measurement and provide a StreamingConfiguration, which specifies the measurement channels that should be active. To retrieve the measurement data (objects of the class MeasurementData) use the pyee event sensor_node_measurement_data. The example below shows you how to do that:
>>> from asyncio import run, sleep
>>> from time import time
>>> from icostate import ICOsystem, MeasurementData, StreamingConfiguration
>>> from icostate.config import settings
>>> async def measure_data(icosystem: ICOsystem, mac_address: str) -> None:
... await icosystem.connect_stu()
... await icosystem.connect_sensor_node_mac(mac_address)
...
... data = None
... measurement_time = None
...
... @icosystem.on("sensor_node_measurement_data")
... async def measurement_data_changed(measurement_data:
... MeasurementData) -> None:
... nonlocal measurement_time
... measurement_time = time()
... nonlocal data
... data = measurement_data
...
... await icosystem.start_measurement(StreamingConfiguration(first=True))
... # Wait until one measurement data object is ready
... while data is None:
... await sleep(0.01)
...
... await icosystem.stop_measurement()
...
... # Use methods `first`, `second`, and `third` to access different
... # channels
... first_channel = data.first()
... # By default measurement data is saved as raw 16 bit ADC value
... assert all((
... True if 0 <= datapoint.value <= 2**16 else False
... for datapoint in first_channel
... ))
... # Timestamps are stored in seconds from the epoch
... assert all((
... (
... True
... if measurement_time - 1
... <= datapoint.timestamp
... <= measurement_time + 1
... else False
... )
... for datapoint in first_channel
... )), "Offset of timestamp of measurement data too large"
...
... # You can also access the measurement counters
... # (cyclic value between 0 - 255)
... assert all((
... True if 0 <= datapoint.counter <= 255 else False
... for datapoint in first_channel
... ))
... # To get a sense of the quality of the signal you can use the method
... # dataloss, which will return a value between 0 (no data loss) and
... # 1 (all data lost).
... assert 0 <= data.dataloss() <= 1
...
... await icosystem.disconnect_sensor_node()
... await icosystem.disconnect_stu()
>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> run(measure_data(ICOsystem(), mac_address))
For more information on how to work with measurement data, please take a look at the links below.
Topic |
Links |
Conversion |
|
Storing Measurement Data |
|