Source code for stepseries.stepXXX

"""Stepper motor driver with an Ethernet interface."""


from queue import Empty, Queue
from typing import Any, Callable, Dict, List, Tuple, Union

from stepseries.commands import (
    OSCGetCommand,
    OSCSetCommand,
    ReportError,
    ResetDevice,
    SetDestIP,
)
from stepseries.exceptions import ClientClosedError, ParseError, StepSeriesException
from stepseries.responses import DestIP, ErrorCommand, ErrorOSC, OSCResponse
from stepseries.server import DEFAULT_SERVER


[docs]class STEPXXX(object): """Send and receive data from a STEP-series motor driver. Args: id (`int`): The id set by the DIP switches on the device. address (`str`): The ip address of the device. Defaults to `10.0.0.100`. port (`int`): The local port the device is listening on. Defaults to `50000`. server_address (`str`): The ip address of the server (this machine). Should always be `0.0.0.0`. Defaults to `0.0.0.0`. server_port (`int`): The port the server is listening on. Defaults to `50100`. """ _id: int _address: str _port: int _server_address: str _server_port: int _boards_to_n_motors: Dict[str, int] _registered_callbacks: Dict[ Union[OSCResponse, None], List[Callable[[OSCResponse], None]] ] _get_request: OSCResponse _get_with_callback: bool _get_queue: Queue _is_closed: bool _is_multiple_response: bool _multiple_responses: List[OSCResponse] def __init__( self, id: int, address: str = "10.0.0.100", port: int = 50000, server_address: str = "0.0.0.0", server_port: int = 50100, ) -> None: self._id = id self._address = address self._port = port self._server_address = server_address self._server_port = server_port self._boards_to_n_motors = {"STEP400": 4, "STEP800": 8} self._registered_callbacks = dict() self._get_request = None self._get_with_callback = True self._get_queue = Queue() self._is_closed = True self._is_multiple_response = False self._multiple_responses = list() # Bind this device DEFAULT_SERVER.add_device(self) @property def address(self) -> str: """The local IP address of the client.""" return self._address @property def port(self) -> int: """The local port on the client.""" return self._port @property def server_address(self) -> str: """The remote IP address of the server.""" return self._server_address @property def server_port(self) -> int: """The remote port on the server.""" return self._server_port @property def is_closed(self) -> bool: """Is the connection to the device closed.""" return self._is_closed def _handle_incoming_message( self, message_address: str, *osc_args: Tuple[Any] ) -> None: # Reconstruct message as an object resp = None raw_resp = message_address + " " + " ".join([str(x) for x in osc_args]) for cls in OSCResponse.__subclasses__(): if cls.address == message_address: try: resp = cls(raw_resp) except (IndexError, TypeError) as exc: resp = ParseError("parsing failed to deconstruct response") resp.response = raw_resp resp.original_exc = exc break else: resp = ParseError("no response object matched this message") resp.response = raw_resp # Set the flag that the connection is open if isinstance(resp, DestIP): self._is_closed = False # Support multiple responses if self._is_multiple_response: if not isinstance(resp, Exception): if isinstance(resp, self._get_request): self._multiple_responses.append(resp) if ( len(self._multiple_responses) < self._boards_to_n_motors[self.__class__.__name__] ): return else: if self._get_request: if isinstance(resp, self._get_request): self._get_queue.put(resp) self._get_queue.join() # Send the message to all required callbacks # TODO: Look at thread pooling this process if ( not isinstance(resp, self._get_request or type(None)) or self._get_with_callback or isinstance(resp, Exception) ): for resp_type, callbacks in self._registered_callbacks.items(): if resp.__class__ == resp_type or resp_type is None: for callback in callbacks: if self._multiple_responses: callback(self._multiple_responses) else: callback(resp) # Return the get request if self._get_request: if isinstance(resp, self._get_request) or isinstance(resp, Exception): if self._multiple_responses: self._get_queue.put(self._multiple_responses) else: self._get_queue.put(resp) self._get_queue.join() def _check_status(self) -> None: if self.is_closed: raise ClientClosedError( "the connection to this client is closed. " "Send the command 'SetDestIP' to open the connection " "or check your configurations" )
[docs] def close(self) -> None: """Close the connection to the stepseries device. Note: No other command after this one should be called on the device. """ DEFAULT_SERVER.remove_device(self) self._is_closed = True
[docs] def on( self, message_type: Union[OSCResponse, None], fn: Callable[[OSCResponse], None] ) -> None: """Register `fn` to be executed when `message_type` is received. Args: message_type (`OSCResponse`, `None`): The message type to filter for. If `None`, then all messages received will be sent to `fn`. Note multiple `fn`s can be registered to the same type or multiple types. fn (`callable`): The callable to be executed when `message_type` is received. Note: `fn` should accept one and only one argument being the message received. Raises: `TypeError`: `message_type` is not an `OSCResponse`. `fn` is not a callable. """ if message_type is not None and OSCResponse not in message_type.__bases__: raise TypeError( "argument 'message_type' expected to be 'OSCResponse', " f"'{type(message_type).__name__}' found" ) if not callable(fn): raise TypeError( "argument 'fn' expected to be callable, " f"'{type(fn).__name__}' found" ) try: if fn not in self._registered_callbacks[message_type]: self._registered_callbacks[message_type].append(fn) except KeyError: self._registered_callbacks[message_type] = [fn]
[docs] def remove(self, fn: Callable[[OSCResponse], None]) -> None: """Remove `fn` from the registered callbacks.""" for k, callbacks in self._registered_callbacks.items(): for callback in callbacks: if callback == fn: self._registered_callbacks[k].remove(fn)
[docs] def reset(self) -> None: """Resets the device. Note: This function may return before the device is ready. """ self._check_status() self.set(ResetDevice())
[docs] def get( self, command: OSCGetCommand, with_callback: bool = True, wait: bool = True ) -> Union[OSCResponse, List[OSCResponse]]: """Send a 'get' command to the device and return the response. Note: The responses are also sent to each applicable callback. If a `ParseError` is received, then it will be raised. The raw response can be retrieved via the `response` attribute of the error. Args: command (`OSCGetCommand`): The completed command template (`stepseries.commands`). with_callback (`bool`): Send the response to callbacks as well (defaults to `True`). wait (`bool`): Wait for a response from the device if `True`, otherwise return without waiting for a response (defaults to `True`). Raises: `TypeError`: `command` is not an `OSCSetCommand`. """ if not isinstance(command, OSCGetCommand): raise TypeError( "argument 'command' expected to be 'OSCGetCommand', " f"'{type(command).__name__}' found" ) if not isinstance(command, SetDestIP): self._check_status() # Prepare for get request if wait: self._get_request = command.response_cls self._get_with_callback = with_callback if hasattr(command, "motorID"): if command.motorID == 255: self._is_multiple_response = True # Send the request DEFAULT_SERVER.send(self, command) # Wait for data and reset try: if wait: resp = self._get_queue.get(timeout=2) self._get_queue.task_done() else: resp = None except Empty: raise TimeoutError("timed-out waiting for a response from the device") finally: self._get_request = None self._get_with_callback = True self._multiple_responses = list() self._is_multiple_response = False if isinstance(resp, Exception): if isinstance(resp, StepSeriesException): if resp.original_exc is not None: raise resp from resp.original_exc raise resp return resp
[docs] def set(self, command: OSCSetCommand) -> None: """Send a 'set' command to the device. Args: command (`OSCCommand`): The completed command template (`stepseries.commands`). Raises: `TypeError`: `command` is not an `OSCSetCommand`. """ # Because SetDestIP has "set" in the name, allow this method # support it if isinstance(command, SetDestIP): self.get(command) return if not isinstance(command, OSCSetCommand): raise TypeError( "argument 'command' expected to be 'OSCSetCommand', " f"'{type(command).__name__}' found" ) self._check_status() if isinstance(command, ResetDevice): self._is_closed = True if command.__dict__.get("callback", None): if isinstance(command, ReportError): self.on(ErrorCommand, command.callback) self.on(ErrorOSC, command.callback) else: self.on(command.response_cls, command.callback) DEFAULT_SERVER.send(self, command)