Source code for stepseries.server

"""Manages IO with multiple STEP-series devices."""


import atexit
from threading import Thread
from typing import Any, List, Tuple

from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import ThreadingOSCUDPServer
from pythonosc.udp_client import SimpleUDPClient

from stepseries.commands import OSCCommand
from stepseries.exceptions import ClientNotFoundError

# Work around for this circular import; allows annotations while
# writing code and doesn't break when running it.
try:
    from stepseries.stepXXX import STEPXXX
except ImportError:
    STEPXXX = Any


[docs]class Manager: _bound_devices: List[Tuple[STEPXXX, SimpleUDPClient, ThreadingOSCUDPServer, Thread]] def __init__(self) -> None: self._bound_devices = list() # Shutdown hook to ensure servers are properly closed atexit.register(self.shutdown) def _handle_incoming_message( self, client_address: Tuple[str, int], message_address: str, *osc_args: Tuple[Any] ) -> None: # Find the device bound to this address address, _ = client_address for (device, _, _, _) in self._bound_devices: if device.address == address: device._handle_incoming_message(message_address, *osc_args)
[docs] def add_device(self, device: STEPXXX) -> None: """ For internal use only. Add a device to send data to when it is received. """ # Check if the device wants to bind to a pre-existing server # Bind the device to that server if needed client = SimpleUDPClient(device.address, device.port) for (_, _, server, _) in self._bound_devices: if (device.server_address, device.server_port) == server.server_address: self._bound_devices.append((device, client, server, None)) return # Create a new server and bind the device to it dispatcher = Dispatcher() dispatcher.set_default_handler(self._handle_incoming_message, True) server = ThreadingOSCUDPServer( (device.server_address, device.server_port), dispatcher ) thread = Thread(target=server.serve_forever, daemon=True) thread.start() # Only add the thread to this list to keep a reference to it self._bound_devices.append((device, client, server, thread))
[docs] def remove_device(self, device: STEPXXX) -> None: """ For internal use only. Remove a tracked device to stop sending data to it. """ for i, (d, _, s, _) in enumerate(self._bound_devices): if d == device: self._bound_devices.pop(i) s.shutdown() d._is_closed = True break
[docs] def shutdown(self) -> None: """Shuts down all tracked servers.""" for _, _, s, _ in self._bound_devices: s.shutdown() self._bound_devices = list()
[docs] def send(self, device: STEPXXX, message: OSCCommand) -> None: """Send `message` to the `device`.""" # Get the client bound to this device client = None for d, c, _, _ in self._bound_devices: if d == device: client = c break else: raise ClientNotFoundError("device is not registered with a server") client.send(message.build())
DEFAULT_SERVER = Manager()