from typing import Union
from ..can import Message, MessageType
from . import hb as HBParser, \
pdo as PDOParser, \
sync as SYNCParser, \
emcy as EMCYParser, \
time as TIMEParser
from .sdo import SDOParser
from .utilities import FailedValidationError, format_bytes
[docs]class CANOpenParser:
"""
A convenience wrapper for the parse function
"""
def __init__(self, eds_configs: dict):
self.sdo_parser = SDOParser()
self.eds_configs = eds_configs
[docs] def get_name(self, message: Message) -> Union[str, None]:
# import ipdb; ipdb.set_trace()
parser = self.eds_configs.get(message.node_id)
return parser.device_commissioning.node_name \
if parser else hex(message.node_id)
[docs] def parse(self, message: Message) -> (str, str):
"""
Detect the type of the given message and return the parsed version
Arguments
---------
@:param: message: a Message object containing the message
Returns
-------
`str`: The parsed message
"""
node_id = message.node_id
eds_config = self.eds_configs.get(node_id) \
if node_id is not None else None
# Detect message type and select the appropriate parse function
if (message.type == MessageType.SYNC):
parse_function = SYNCParser.parse
elif (message.type == MessageType.EMER):
parse_function = EMCYParser.parse
elif (message.supertype == MessageType.PDO):
parse_function = PDOParser.parse
elif (message.supertype == MessageType.SDO):
if self.sdo_parser.is_complete:
self.sdo_parser = SDOParser()
parse_function = self.sdo_parser.parse
elif (message.type == MessageType.HEARTBEAT):
parse_function = HBParser.parse
elif (message.type == MessageType.TIME):
parse_function = TIMEParser.parse
else:
parse_function = None
# Call the parse function and save the result
# On error, return the message data
try:
parsed_message = parse_function(message.arb_id,
message.data,
eds_config)
error = ""
except (FailedValidationError, TypeError) as exception:
parsed_message = format_bytes(message.data)
error = str(exception)
return parsed_message, error