HART

This section deals with device communication using the HART industrial automation protocol. Familiarity with this protocol and general topology is required to understand the examples given. The usefulness of this section to a general audience will be limited.

You can find more information in the HART documentation.

Warning

This HART demo is not usable with the htf-community version. Please contact us for a demo license.

Basic HART Communication

For a simple example of how sending and receiving HART commands works see hart/test_basic_hart_communication.py.

Before running you have to edit the file to fit to your needs. Enter a valid value for COM_PORT.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

from typing import Generator

import htf
from htf.hart import HartDeviceCommunication, HartFrame

COM_PORT = "ENTER_VALID_COM_PORT"  # Replace this with the COM port your HART modem is connected to.


@htf.fixture("test")
def hart_device() -> Generator[HartDeviceCommunication, None, None]:
    com = HartDeviceCommunication(COM_PORT)
    com.find_device()
    yield com
    com.close()


"""
These tests show different ways to send and receive HART commands. It is only intended to be a high-level
introduction to how communication can be achieved. Please check the documentation if lower-level access is needed.
"""


def test_query(hart_device: HartDeviceCommunication) -> None:
    response = hart_device.query(HartFrame(1))  # Command 1 - Read Primary Variable
    print("Response:")
    print(response)


def test_query_context() -> None:
    with HartDeviceCommunication(COM_PORT) as com:
        response = com.query(HartFrame(1))  # Command 1 - Read Primary Variable
        print("Response:")
        print(response)


def test_request_payload(hart_device: HartDeviceCommunication) -> None:
    request = HartFrame(18)  # Command 18 - Write Tag, Descriptor, Date
    request.payload.tag.set(b"TESTTAG")
    print("Request:")
    print(request)
    response = hart_device.query(request)
    print("Response:")
    print(response)


if __name__ == "__main__":
    htf.main()

To execute the demo, run

htf -o hart/test_basic_hart_communication.py

Device Specific Commands

In order to use device specific or custom HART commands, a specialized HartFrame is needed. See hart/test_custom_commands.py for an example.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

from typing import Generator, Tuple

import htf
import oser  # type: ignore
from htf.hart import HartFrame, ResponseCode, FieldDeviceStatus
from htf.hart.data_types import HartResponse

# Let's define a custom command to read random bytes from a devices memory
#
# The command number shall be 123 and have the following payloads:
# Request
#         - one unsigned 8-bit Integer specifying the number of bytes to read
# Response
#         - one unsigned 8-bit Integer to echo the number of requested bytes
#         - a byte string of variable length, depending on the number of requested bytes
#
#         Possible error response codes: passed parameter too large (to disallow requests of over 127 bytes)
#                                        too few data bytes received (when the number of bytes parameter is missing)


# Defining request and response data bytes using the oser package


class Command123_ReadRandomBytesRequest(oser.ByteStruct):  # noqa: N801
    def __init__(self) -> None:
        super(Command123_ReadRandomBytesRequest, self).__init__()
        self.number_of_bytes = oser.UBInt8()


class Command123_ReadRandomBytesResponse(HartResponse):  # noqa: N801
    def __init__(self) -> None:
        super(Command123_ReadRandomBytesResponse, self).__init__()
        self.response_code = ResponseCode(error_passed_parameter_too_large=3, error_too_few_data_bytes_received=5)
        self.field_device_status = FieldDeviceStatus()

        self.number_of_bytes = oser.UBInt8()
        self.read_bytes = oser.Data(length=lambda self: self.number_of_bytes.get())


# To use these payloads a custom HartFrame is required to map request and response data bytes to specific commands.
# Universal and common practice commands are supported by the HartFrame base class, so only additional commands
# need to be added here.


class MyDeviceHartFrame(HartFrame):
    def requests_generator(self) -> Generator[Tuple[int, oser.Lazy], None, None]:
        """
        This generator function maps request data bytes to their respective command numbers
        """
        yield 123, oser.Lazy(Command123_ReadRandomBytesRequest)  # wrapping the class with oser.Lazy() is not required,
        # but can improve performance when a large number of

    def responses_generator(self) -> Generator[Tuple[int, oser.Lazy], None, None]:
        """
        This generator function maps response data bytes to their respective command numbers
        """
        yield 123, oser.Lazy(Command123_ReadRandomBytesResponse)


# The new command is now ready to be used and can be passed to HART communication functions as the 'decoder_generator'
# argument.
#
# Example:
#     with HartDeviceCommunication(comport='COM1', decoder_generator=MyDeviceHartFrame) as com:
#         response = com.query(MyDeviceHartFrame(123))
#         print(response)


def test_custom_command() -> None:
    request = MyDeviceHartFrame(123)
    print("Request:")
    print(request)
    response = MyDeviceHartFrame(123)
    response.delimiter.frame_type.set("ack")
    print("Response:")
    print(response)


if __name__ == "__main__":
    htf.main()

To execute the demo, run

htf -o hart/test_custom_commands.py

HART Slave Device Simulation

When testing HART master devices it can be difficult to replicate certain test conditions, for example when a slave device is misbehaving or faulty or when a large number of slave devices are required. In these cases it is often easier to use device simulators instead of physical devices. See hart/test_slave_simulator.py for an example.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

from typing import Optional

import htf
from htf.hart import HartFrame
from htf.hart.slave_simulator import HartSlaveSimulator
from hart.test_custom_commands import MyDeviceHartFrame
from random import randint
from unittest.mock import patch

# This HART slave simulator will respond to the custom command 123 from the previous example


class SlaveSimulator(HartSlaveSimulator):
    def __init__(self, comport: str) -> None:
        super(SlaveSimulator, self).__init__(
            comport=comport, decoder_generator=MyDeviceHartFrame, device_type=0x1234, device_id=0x56
        )
        self.device_memory = b"This is my memory. There's lots of data in it. " * 50

    def get_random_bytes(self, number_of_bytes: int) -> bytes:
        start = randint(0, len(self.device_memory) - number_of_bytes)
        end = start + number_of_bytes
        return self.device_memory[start : end + 1]

    def handle_command123(self, request: HartFrame) -> Optional[HartFrame]:
        response = self._create_response(request)
        number_of_bytes_requested = request.payload.number_of_bytes.get()
        if number_of_bytes_requested > 127:  # only 127 bytes are allowed to be read at one time
            response.payload.response_code.set("error_passed_parameter_too_large")
        else:
            response.payload.number_of_bytes.set(number_of_bytes_requested)  # echo number of bytes
            random_bytes = self.get_random_bytes(number_of_bytes_requested)  # read random bytes from memory
            response.payload.read_bytes.set(random_bytes)
        return response


"""
This test will start the slave simulator with a mocked interface to simulated a request.
"""


def test_start_simulator_success() -> None:
    with patch("htf.hart.slave_simulator.HartInterface") as mock_interface_class:
        mock_interface = mock_interface_class.return_value

        request = MyDeviceHartFrame(123)
        request.address.address.set(0x1234000056)  # the slave will only respond to commands addressed to it
        request.payload.number_of_bytes.set(25)  # read 25 bytes
        mock_interface.read.return_value = request.encode()  # set the request for the slave to read
        print("Request:")
        print(request)

        sim = SlaveSimulator(comport="")
        sim.run(1)  # run until one request has been answered
        response = MyDeviceHartFrame()
        response.decode(mock_interface.write.call_args[0][0])  # decode the response written to the mocked interface
        print("Response:")
        print(response)

        htf.assert_equal(response.payload.response_code.get(), "success")  # expect a successful response
        htf.assert_equal(response.payload.number_of_bytes.get(), 25)  # number of requested bytes should be echoed
        htf.assert_equal(len(response.payload.read_bytes.get()), 25)  # the actual length of read bytes should match

        sim.close()


def test_start_simulator_error_response() -> None:
    with patch("htf.hart.slave_simulator.HartInterface") as mock_interface_class:
        mock_interface = mock_interface_class.return_value
        request = MyDeviceHartFrame(123)
        request.address.address.set(0x1234000056)  # the slave will only respond to commands addressed to it
        request.payload.number_of_bytes.set(128)  # read 128 bytes, violating the threshold
        mock_interface.read.return_value = request.encode()
        print("Request:")
        print(request)

        sim = SlaveSimulator(comport="")
        sim.run(1)  # run until one request has been answered
        response = MyDeviceHartFrame()
        response.decode(mock_interface.write.call_args[0][0])  # decode the response written to the mocked interface
        print("Response:")
        print(response)

        # expect an error response for requesting more than 127 bytes
        htf.assert_equal(response.payload.response_code.get(), "error_passed_parameter_too_large")

        sim.close()


if __name__ == "__main__":
    htf.main()

To execute the demo, run

htf -o hart/test_slave_simulator.py

HART Simulation/Hardware Tests

When writing tests to check device behavior you may want to run them against a simulator and your device without having to develop the same test twice. This can be accomplished by using a combination of fixtures and tags. Once the test is written you can select the target by passing the desired fixture tag to the runner.

See hart/test_read_primary_variable.py for an example.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

from typing import Generator, Any, Optional

import htf
from unittest.mock import patch
from htf.hart import HartFrame, HartDeviceCommunication
from htf.hart.slave_simulator import HartSlaveSimulator


COM_PORT = "/dev/ttyUSB0"  # !!! Change this to the port your device is connected to


class PrimaryVariableSimulator(HartSlaveSimulator):
    def __init__(self, comport: str) -> None:
        super(PrimaryVariableSimulator, self).__init__(
            comport=comport, decoder_generator=HartFrame, device_type=0x1234, device_id=0x56
        )
        self.broken = False

    def handle_command1(self, request: HartFrame) -> Optional[HartFrame]:
        response = self._create_response(request)
        response.payload.primary_variable.set(123.45)
        if self.broken:
            response.payload.response_code.set("error_device_specific_command_error")
        return response


@htf.tags("simulator")
@htf.fixture(scope="test", name="device")
def device_simulator() -> Generator[PrimaryVariableSimulator, None, None]:
    patch("htf.hart.slave_simulator.HartInterface")
    sim = PrimaryVariableSimulator("/dev/ttyUSB0")
    sim.run(2)
    yield sim


@htf.tags("hardware")
@htf.fixture(scope="test", name="device")
def device_hardware() -> Generator[Any, None, None]:
    yield object()  # Replace this with an object controlling your device


@htf.tags("simulator")
@htf.fixture(scope="test", name="interface")
def interface_simulator(device: Any) -> Generator["SimulatorInterface", None, None]:  # type: ignore  # noqa: F821
    class SimulatorInterface:
        def query(self, request: HartFrame) -> Optional[HartFrame]:
            return device._get_response_from_handler(request)  # type: ignore

    yield SimulatorInterface()


@htf.tags("hardware")
@htf.fixture(scope="test", name="interface")
def interface_hardware(device: Any) -> Generator[HartDeviceCommunication, None, None]:
    yield HartDeviceCommunication(COM_PORT)


def test_read_primary_variable(
    device: Any,
    interface: "SimulatorInterface",  # type: ignore  # noqa: F821
    step: htf.fixtures.step,
) -> None:
    with step("Read primary variable"):
        request = HartFrame(1)
        request.address.address.set(0x1234000056)  # the slave will only respond to commands addressed to it
        print("Request:")
        print(request)

        response = interface.query(request)
        print("Response:")
        print(response)

    with step("Validate primary variable and response code"):
        htf.assert_equal(response.payload.response_code.get(), "success")  # expect a successful response
        htf.assert_almost_equal(
            response.payload.primary_variable.get(), 123.45
        )  # number of requested bytes should be echoed

    with step("Break device"):
        device.broken = True

    with step("Read primary variable again"):
        print("Request:")
        print(request)

        response = interface.query(request)
        print("Response:")
        print(response)

    with step("Validate response code indicates device specific error"):
        htf.assert_equal(response.payload.response_code.get(), "error_device_specific_command_error")  # expect an error

To execute the demo against the simulator, run

htf -o -F simulator hart/test_read_primary_variable.py

likewise, to use a real device as the target run

htf -o -F hardware hart/test_read_primary_variable.py

Attention: You will need to change the ``COM_PORT` variable at the top of the test module to the port your device is connected to.