Usage

The main class for communication in the stateful API provided by this package is ICOsystem. Depending on the current state of objects of this class you will be able to use different coroutines to interact with the system.

State Diagram

        stateDiagram-v2
    disconnected: Disconnected
    stu_connected: STU Connected
    sensor_node_connected: Sensor Node Connected
    measurement: Measurement

    disconnected --> stu_connected: connect_stu

    stu_connected --> disconnected: disconnect_stu
    stu_connected --> stu_connected: get_adc_configuration, rename, reset_stu, set_adc_configuration
    stu_connected --> sensor_node_connected: connect_sensor_node_mac

    sensor_node_connected --> stu_connected: disconnect_sensor_node
    sensor_node_connected --> sensor_node_connected: get_adc_configuration, get_sensor_configuration, rename, set_adc_configuration, set_sensor_configuration
    sensor_node_connected --> measurement: start_measurement

    measurement --> sensor_node_connected: stop_measurement
    

In addition to coroutines that label the edges of the state diagram above you can also:

STU

Connecting to STU

Before you work with the ICOtronic system you need to set up the CAN connection to the STU (Stationary Transceiver Unit), which you can do using the coroutine ICOsystem.connect_stu(). After you are done working working with the STU you also need to disconnect the connection using the coroutine ICOsystem.disconnect_stu().

>>> from asyncio import run
>>> from icostate import ICOsystem

>>> async def connect_disconnect_stu(icosystem: ICOsystem):
...     await icosystem.connect_stu()
...     await icosystem.disconnect_stu()

>>> run(connect_disconnect_stu(ICOsystem()))

Resetting STU

In case you have the ICOtronic system does not react as you expect you can reset it using the coroutine ICOsystem.reset_stu().

>>> from asyncio import run
>>> from icostate import ICOsystem

>>> async def reset_stu(icosystem: ICOsystem):
...     await icosystem.connect_stu()
...     await icosystem.reset_stu()
...     await icosystem.disconnect_stu()

>>> run(reset_stu(ICOsystem()))

Finding Available Sensor Nodes

To retrieve information about available sensor nodes use the coroutine ICOsystem.collect_sensor_nodes().

>>> from asyncio import run
>>> from netaddr import EUI
>>> from icostate import ICOsystem, SensorNodeInfo

>>> async def get_sensor_nodes(icosystem: ICOsystem) -> list[SensorNodeInfo]:
...     await icosystem.connect_stu()
...     sensor_nodes = await icosystem.collect_sensor_nodes()
...     await icosystem.disconnect_stu()
...     return sensor_nodes

>>> sensor_nodes = run(get_sensor_nodes(ICOsystem()))
>>> # We assume that at least one sensor node is available
>>> len(sensor_nodes) >= 1
True
>>> node_info = sensor_nodes[0]
>>> # Each list entry contains information about name, MAC address and RSSI
>>> isinstance(node_info.name, str)
True
>>> len(node_info.name) <= 8
True
>>> isinstance(node_info.mac_address, EUI)
True
>>> -100 < node_info.rssi < 0
True

Sensor Node

Connecting to Sensor Node

Before you start a measurement you need to connect to a sensor node. To do that use the coroutine ICOsystem.connect_sensor_node_mac(). Please do not forget to disconnect from the node with the coroutine ICOsystem.disconnect_sensor_node() afterwards.

>>> from asyncio import run
>>> from icostate import ICOsystem
>>> from icostate.config import settings

>>> async def connect_disconnect_sensor_node(icosystem: ICOsystem,
...                                          mac_address: str):
...     await icosystem.connect_stu()
...     print(f"Connected: {await icosystem.is_sensor_node_connected()}")
...     await icosystem.connect_sensor_node_mac(mac_address)
...     print(f"Connected: {await icosystem.is_sensor_node_connected()}")
...     await icosystem.disconnect_sensor_node()
...     print(f"Connected: {await icosystem.is_sensor_node_connected()}")
...     await icosystem.disconnect_stu()

>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> run(connect_disconnect_sensor_node(ICOsystem(), mac_address))
Connected: False
Connected: True
Connected: False

Rename a Sensor Node

To rename a sensor node use the coroutine ICOsystem.rename(), which requires the new name of the sensor node as parameter. If the system is currently not connected to a sensor node, then the coroutine also requires the MAC address of the sensor node. After using the coroutine successfully the system will switch back to the state it was in before renaming the sensor node (either “STU Connected” or “Sensor Node Connected”).

>>> from asyncio import run
>>> from icostate import ICOsystem
>>> from icostate.config import settings

>>> async def rename_disconnected(icosystem: ICOsystem,
...                               mac_address: str,
...                               new_name: str):
...     await icosystem.connect_stu()
...     print(f"State Before: {icosystem.state}")
...     await icosystem.rename(new_name, mac_address)
...     print(f"State After: {icosystem.state}")
...     await icosystem.disconnect_stu()

>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> name = "Test-STH"
>>> run(rename_disconnected(ICOsystem(), mac_address, name))
State Before: STU Connected
State After: STU Connected

Events

Objects of the ICOsystem provides an event based API (based on pyee) you can use to react to changes to the system. Currently the following events are supported:

  • sensor_node_name: Called when the name of the current sensor node changes

  • sensor_node_mac_address: Called when the MAC address of a sensor node changes

  • sensor_node_adc_configuration: Called when the ADC configuration of a sensor node is updated

  • sensor_node_measurement_data: Called when new streaming data is available

The example below shows how you can react to changes of the sensor node name:

>>> from asyncio import sleep, run
>>> from icostate import ICOsystem
>>> from icostate.config import settings

>>> async def react_sensor_node_name(icosystem: ICOsystem, mac_address: str):
...
...     @icosystem.on("sensor_node_name")
...     async def name_changed(name: str):
...         print(f"Name of sensor node: {name}")
...
...     await icosystem.connect_stu()
...     await icosystem.connect_sensor_node_mac(mac_address)
...     await sleep(0)  # Allow scheduler to trigger event coroutine
...     await icosystem.disconnect_sensor_node()
...     await icosystem.disconnect_stu()

>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> run(react_sensor_node_name(ICOsystem(), mac_address))
Name of sensor node: Test-STH

Measurement (Streaming)

To start a measurement use the function start_measurement and provide a StreamingConfiguration, which specifies the measurement channels that should be active. To retrieve the measurement data (objects of the class MeasurementData) use the pyee event sensor_node_measurement_data. The example below shows you how to do that:

>>> from asyncio import run, sleep
>>> from time import time
>>> from icostate import ICOsystem, MeasurementData, StreamingConfiguration
>>> from icostate.config import settings

>>> async def measure_data(icosystem: ICOsystem, mac_address: str) -> None:
...     await icosystem.connect_stu()
...     await icosystem.connect_sensor_node_mac(mac_address)
...
...     data = None
...     measurement_time = None
...
...     @icosystem.on("sensor_node_measurement_data")
...     async def measurement_data_changed(measurement_data:
...                                        MeasurementData) -> None:
...         nonlocal measurement_time
...         measurement_time = time()
...         nonlocal data
...         data = measurement_data
...
...     await icosystem.start_measurement(StreamingConfiguration(first=True))
...     # Wait until one measurement data object is ready
...     while data is None:
...         await sleep(0.01)
...
...     await icosystem.stop_measurement()
...
...     # Use methods `first`, `second`, and `third` to access different
...     # channels
...     first_channel = data.first()
...     # By default measurement data is saved as raw 16 bit ADC value
...     assert all((
...         True if 0 <= datapoint.value <= 2**16 else False
...         for datapoint in first_channel
...     ))
...     # Timestamps are stored in seconds from the epoch
...     assert all((
...         (
...             True
...             if measurement_time - 1
...             <= datapoint.timestamp
...             <= measurement_time + 1
...             else False
...         )
...         for datapoint in first_channel
...     )), "Offset of timestamp of measurement data too large"
...
...     # You can also access the measurement counters
...     # (cyclic value between 0 - 255)
...     assert all((
...         True if 0 <= datapoint.counter <= 255 else False
...         for datapoint in first_channel
...     ))
...     # To get a sense of the quality of the signal you can use the method
...     # dataloss, which will return a value between 0 (no data loss) and
...     # 1 (all data lost).
...     assert 0 <= data.dataloss() <= 1
...
...     await icosystem.disconnect_sensor_node()
...     await icosystem.disconnect_stu()

>>> mac_address = settings.sensor_node.eui # Change to MAC address of your sensor node
>>> run(measure_data(ICOsystem(), mac_address))

For more information on how to work with measurement data, please take a look at the links below.

Additional documentation about measurement data

Topic

Links

Conversion

Storing Measurement Data