Source code for canopen_monitor.can.interface

from __future__ import annotations
import time
import psutil
import socket
import logging
import datetime as dt
from .message import Message
from pyvit.hw.socketcan import SocketCanDev


_SOCK_TIMEOUT = 0.1
_STALE_INTERFACE = dt.timedelta(minutes=1)


[docs]class Interface(SocketCanDev): """This is a model of a POSIX interface Used to manage a singular interface and any encoded messages streaming across it :param name: Name of the interface bound to :type name: str :param last_activity: Timestamp of the last activity on the interface :type last_activity: datetime.datetime """ def __init__(self: Interface, if_name: str): """Interface constructor :param if_name: The name of the interface to bind to :type if_name: str """ super().__init__(if_name) self.name = if_name self.last_activity = dt.datetime.now() self.socket.settimeout(_SOCK_TIMEOUT) def __enter__(self: Interface) -> Interface: """The entry point of an `Interface` in a `with` statement This binds to the socket interface name specified. .. warning:: This block-waits until the provided interface comes up before binding to the socket. :returns: Itself :rtype: Interface :Example: >>> with canopen_monitor.Interface('vcan0') as dev: >>> print(f'Message: {dev.recv()}') """ self.start() return self def __exit__(self: Interface, etype, evalue, traceback) -> None: """The exit point of an `Interface` in a `with` statement :param etype: The type of event :type etype: str :param evalue: The event :type evalue: str :param traceback: The traceback of the previously exited block :type traceback: TracebackException """ self.stop()
[docs] def start(self: Interface, block_wait: bool = True) -> None: """A wrapper for `pyvit.hw.SocketCanDev.start()` If block-waiting is enabled, then instead of imediately binding to the interface, it waits for the state to change to `UP` first before binding. :param block_wait: Enables block-waiting :type block_wait: bool """ logging.info(f'Binding to socket {self.name} with last activity at {self.last_activity}') while(block_wait and not self.is_up): time.sleep(0.01) self.socket = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) super().start()
[docs] def stop(self: Interface) -> None: """A wrapper for `pyvit.hw.SocketCanDev.stop()` """ logging.info(f'Closing interface {self.name}') super().stop() self.socket.close() self.running = False
[docs] def restart(self: Interface) -> None: """A macro-fuction for restarting the interface connection This is the same as doing: >>> iface.stop() >>> iface.start() """ logging.warn(f'Restarting interface {self.name}') self.stop() self.start(False)
[docs] def recv(self: Interface) -> Message: """A wrapper for `pyvit.hw.SocketCanDev.recv()` Instead of returning a `can.Frame`, it intercepts the `recv()` and converts it to a `canopen_monitor.Message` at the last minute. :return: A loaded `canopen_monitor.Message` from the interface if a message is recieved within the configured SOCKET_TIMEOUT (default is 0.3 seconds), otherwise returns None :rtype: Message, None """ try: frame = super().recv() self.last_activity = dt.datetime.now() msg = Message(frame.arb_id, data=list(frame.data), frame_type=frame.frame_type, interface=self.name, timestamp=dt.datetime.now(), extended=frame.is_extended_id) logging.debug(f'Received from {self.name}: {msg}') return msg except socket.timeout: return None
@property def is_up(self: Interface) -> bool: """Determines if the interface is in the `UP` state :returns: `True` if in the `UP` state `False` if in the `DOWN` state :rtype: bool """ if_dev = psutil.net_if_stats().get(self.name) if(if_dev is not None): return if_dev.isup return False @property def duplex(self: Interface) -> int: """Determines the duplex, if there is any :returns: Duplex value :rtype: int """ val = Interface.__get_if_data(self.name) return val.duplex if val is not None else None @property def speed(self: Interface) -> int: """Determines the Baud Rate of the bus, if any .. warning:: This will appear as `0` for virtual can interfaces. :return: Baud rate :rtype: int """ val = Interface.__get_if_data(self.name) return val.speed if val is not None else None @property def mtu(self: Interface) -> int: """Maximum Transmission Unit :return: Maximum size of a packet :rtype: int """ val = Interface.__get_if_data(self.name) return val.mtu if val is not None else None @property def age(self: Interface) -> dt.timedelta: """Deterimes the age of the message, since it was received :return: Age of the message :rtype: datetime.timedelta """ return dt.datetime.now() - self.last_activity def __str__(self: Interface) -> str: return self.name