Communication

This chapter describes the modules for the asynchronous communication framework.

Pluggable Notifier

The htf.communication.PluggableNotifier is an observer that lets subjects register tuples of name, filterExpression and a callback.

When data is input all registered subjects are used the following way. First the filterExpression is called with the data as a parameter. If True is returned the callback is called with the data as a parameter.

With the htf.communication.PluggableNotifier you can create an asynchronous communication framework easily.

digraph inheritance3b3afe1ce1 { rankdir=LR; size="8.0, 12.0"; "htf.communication._pluggable_notifier.PluggableNotifier" [fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5)",tooltip="``PluggableNotifier`` implements an observer pattern."]; "threading.Thread" -> "htf.communication._pluggable_notifier.PluggableNotifier" [arrowsize=0.5,style="setlinewidth(0.5)"]; "htf.communication.pluggable_notifier.PluggableNotifier" [fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5)",tooltip="``PluggableNotifier`` implements an observer pattern."]; "htf.communication._pluggable_notifier.PluggableNotifier" -> "htf.communication.pluggable_notifier.PluggableNotifier" [arrowsize=0.5,style="setlinewidth(0.5)"]; "threading.Thread" [fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5)",tooltip="A class that represents a thread of control."]; }
class htf.communication.PluggableNotifier(verbosity=0, raiseExceptions=False, filterExceptionHandler=None, callbackExceptionHandler=None)

PluggableNotifier implements an observer pattern. Subjects can register tuples consisting of executable filter expressions and callbacks to be notified about events.

There is a method to enqueue new items and a thread to apply filters on items. Communication between method and thread is done via a queue.

PluggableNotififer extends threading.Thread.

Parameters
  • verbosity (int) – if set to 1 debug messages are printed to stdout

  • raiseExceptions (bool) – if set to True exceptions are raised if a filter expression or a callback raises an exception. This may be used for debugging. In case of an exception stop() is called so no further items can be put in.

  • filterExceptionHandler (None or callable) – a callback that is called to handle a raised exception in a filter expression. If the filterExceptionHandler returns True the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst accept name and item arguments. name is the callback name supplied by register() and item is the supplied item from input().

  • callbackExceptionHandler (None or callable) – a callback that is called to handle a raised exception in a callback. If the callbackExceptionHandler returns True the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst accept name and item arguments. name is the callback name supplied by register() and item is the supplied item from input().

getFilterNames()

Get the filter names.

Returns

a sorted list containing the filter names.

Return type

list of str

input(item)

Input an item to be used with all registered subjects.

Parameters

item – an item to be processed.

Raises
isAlive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

join(timeout=1.0)

Wait until the thread ends.

Parameters

timeout (float) – the timeout in seconds. Default: 1.0 seconds.

register(name, filterExpression, callback)

Register a tuple of name, filterExpression and a callback.

Parameters
  • name (str) – a name for the tuple (for debugging, see verbosity).

  • filterExpression (callable) – a callable filter expression (see htf.filters for more information) or a method that is called. filterExpression is called with the item from input() as a parameter. If filterExpression returns True the callback is called with the same item.

  • callback (callable) – a callable object that is called in case filterExpression(item) returns True.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

stop()

Stop the thread.

unregister(name)

Unregister a callback with a given name.

Parameters

name (str) – the name of the callback to be unregistered.

Raises

KeyError – if name was not found in notifications.

Notification Callbacks

Notification callbacks can be used in htf.communication.PluggableNotifier as the callback.

For example you can enqueue an item into a queue the callee just created to asynchronously receive data.

htf.communication.notification_callbacks.enqueueItem(queue)

Enqueues item into queue. This way asynchronous communication can be set up easily.

Parameters

queue (queue.Queue) – a queue to be used.

Returns

an anonymous method that can be called with an item that is put into queue.

Return type

callable

Example:

from htf.communication import PluggableNotifier, enqueueItem
import queue # or Queue in Python 2

pn = PluggableNotifier()
q = queue.Queue()
pn.register("subject", filterExpression=lambda *args, **kwargs: kwargs["item"]=="example item", callback=enqueueItem(q))
pn.input("example item") # is passed
item = q.get(block=True, timeout=1)
print(item) # "example item"
pn.input("another item") # is filtered out
item = q.get(block=True, timeout=1) # raises Queue.Empty

SSH Client

With the SSH Client you can run commands remotely via SSH.

class htf.communication.SSHClient

SSHClient executes commands remotely using ssh.

close()

Close current ssh connection.

connect(hostname, username=None, password=None)

Establish an ssh connection.

Parameters
  • hostname (str) – the hostname or ip address

  • username=None (str) – The username to be used. If set to None the current username is used instead of using the ssh keys in your system.

  • password=None (str) – The password for username. If set to None the password for username or the current username is used.

execute(*command)

Execute a command remotely via ssh.

Parameters

*command (tuple of str) – the command to be executed.

Returns

a tuple containing exitcode, stdout and stderr of the executed command.

Return type

tuple (exitcode, stdout, stderr)

SFTP Client

With the SFTP Client it is possible to do file operations remotely via SSH.

class htf.communication.SFTPClient

SFTPClient lets you access files via sftp protocol.

close()

Close current sftp connection.

connect(hostname, username=None, password=None)

Establish an sftp connection.

Parameters
  • hostname (str) – the hostname or ip address

  • username=None (str) – The username to be used. If set to None the current username is used instead of using the ssh keys in your system.

  • password=None (str) – The password for username. If set to None the password for username or the current username is used.

execute(*command)

Execute a command remotely via ssh.

Parameters

*command (tuple of str) – the command to be executed.

Returns

a tuple containing exitcode, stdout and stderr of the executed command.

Return type

tuple (exitcode, stdout, stderr)

get(remotepath, localpath)

Copies file identified by remotepath to localpath using sftp protocol.

Parameters
  • remotepath (str) – the remote file to be copied

  • localpath (str) – the local path where the field is copied to

put(localpath, remotepath)

Copies file identified by localpath to remotepath using sftp protocol.

Parameters
  • localpath (str) – the local path to be copied

  • remotepath (str) – the remote file where the field is copied to

Returns

the result of os.stat

Return type

SFTPAttributes

remove(remotepath)

Removes a file identified by remotepath.

Parameters

remotepath (str) – the path for the file to be removed

Raises

IOError – if the path is not a file

rename(oldpath, newpath)

Remotely renames a file or a folder.

Parameters
  • oldpath (str) – the file or folder to be renamed

  • newpath (str) – the new file- or foldername

Raises

IOError – if something goes wrong

Removes a file identified by remotepath.

Parameters

remotepath (str) – the path for the file to be removed

Raises

IOError – if the path is not a file

SLIP — Serial Line Internet Protocol

SLIP contains a mixin to be used to implement SLIP-based communications.

If you don’t know about SLIP you should first read RFC 1055.

The htf.communication.SlipMixin contains two methods for encoding and decoding SLIP-encoded communication frames.

class htf.communication.SlipMixin

The SlipMixin helps to encode and decode frames transferred in the Serial Line Internet Protocol.

It cannot be used standalone and must be mixed into a class that implements the htf.communication.SlipInterface.

slip_receive_frame(timeouts=None, max_length=1024)

Receive a frame encoded in the Serial Line Internet Protocol and return the decoded frame.

Parameters
  • timeouts=None (int) – the number of timeouts until None is returned. The timeout depends on the mixing class and underlying communication system.

  • max_length=1024 (int) – maximum number of bytes that will be received, not including ESC. Prevents the method from perpetually blocking.

Returns

the decoded frame if a frame was received or

None if no frame was received.

Return type

bytes or None

slip_send_frame(frame)

Send a frame encoded in the Serial Line Internet Protocol.

Parameters

frame (bytes) – the frame to be sent.

A class can subclass htf.communication.SlipMixin and must implement the htf.communication.SlipInterface.

class htf.communication.SlipInterface
receive_byte()

Receive a single byte. If no byte was received, None is returned to tell about a timeout.

Returns

a single byte that was received or None if a

timeout occurred.

Return type

bytes or None

send_byte(byte)

Send a single byte.

Parameters

byte (bytes) – a single byte to be sent.

In the following example one-way SLIP communication is established with a client and a server.

SLIP-Client:

from __future__ import \
    absolute_import, division, print_function, unicode_literals

from htf.communication import SlipMixin
import serial


class SlipClient(SlipMixin):
    def __init__(self, comport):
        self._serial = serial.Serial(comport, 115200,
                                     timeout=1.0, rtscts=False)

    def send_byte(self, byte):
        self._serial.write(byte)

    def receive_byte(self):
        return self._serial.read(1)

    def send(self, message):
        if not isinstance(message, bytes):
            message = message.encode()
        self.slip_send_frame(message)

if __name__ == "__main__":
    s = SlipClient("/dev/ttyUSB1")
    message = "Hello World!"
    s.send(message)

SLIP-Server:

from __future__ import \
    absolute_import, division, print_function, unicode_literals

from htf.communication import SlipMixin
import serial


class SlipServer(SlipMixin):
    def __init__(self, comport):
        self._serial = serial.Serial(comport, 115200,
                                     timeout=1.0, rtscts=False)

    def send_byte(self, byte):
        self._serial.write(byte)

    def receive_byte(self):
        return self._serial.read(1)

    def send(self, message):
        if not isinstance(message, bytes):
            message = message.encode()
        self.slip_send_frame(message)

    def receive(self):
        message = self.slip_receive_frame(timeouts=3)
        print("Received message:", message)

if __name__ == "__main__":
    s = SlipServer("/dev/ttyUSB2")
    while True:
        s.receive()