Changeset - c1a9e8637b35
[Not reviewed]
default
0 4 0
John Kemp - 4 years ago 2020-09-27 16:45:40

Minor code and documentation updates/fixes.
Vehicle spec identifiers now use enums for IO type and value type.
Vehicle spec signals now use an enum for signal type.
4 files changed with 71 insertions and 60 deletions:
0 comments (0 inline, 0 general)
examples/create_network_spec.py
Show inline comments
...
 
@@ -8,8 +8,8 @@ networks = {NETID_MSCAN: {'bitrate': Bit
 
hvac_ecu = ECU(NETID_MSCAN, 0x456, 'HVAC')
 

 
hvac_identifiers = [
 
    Identifier('Blower Speed Output',          0x9601, [Signal(units='%', m=100./255.)], READ_WRITE),
 
    Identifier('External Ambient Temperature', 0x9628, [Signal(units='Deg C', m=0.25)], READ_WRITE),
 
    Identifier('Blower Speed Output',          0x9601, [Signal(units='%', m=100./255.)], IOType.ReadWrite),
 
    Identifier('External Ambient Temperature', 0x9628, [Signal(units='Deg C', m=0.25)], IOType.ReadWrite),
 
    Identifier('Left Solar Radiation Sensor',  0x9734, [Signal(units='W', m=10, max_val=1250)]),
 
    Identifier('Cabin Temperature',            0x97A5, [Signal(units='Deg C', length=16, m=0.01, b=-100, min_val=-50, max_val=100)]),
 
]
neovi/neodevice.py
Show inline comments
...
 
@@ -130,6 +130,7 @@ class NeoDevice:
 
    def get_settings(self):
 
        """
 
        Get the current device configuration.
 

 
        :return:
 
        """
 
        raise NotImplementedError
...
 
@@ -137,6 +138,7 @@ class NeoDevice:
 
    def set_settings(self, settings, save_to_eeprom=False):
 
        """
 
        Set the device configuration.
 

 
        :param settings: A device-specific configuration.
 
        :param save_to_eeprom: Determines if the configuration will be persisted across device power cycles.
 
        :return:
neovi/neovi.py
Show inline comments
...
 
@@ -90,13 +90,14 @@ apilib = None
 

 

 
# noinspection PyUnusedLocal
 
def update_result(result, func, args):
 
def _update_result(result, func, args):
 
    """
 
    Helper to convert an API call result into an error code.
 
    Helper to convert an API call result into an error code. This is used internally.
 

 
    :param int result:
 
    :param func:
 
    :param tuple args:
 
    :return: Either SUCCESS or an error code.
 
    :return: Either SUCCESS or an error code returned by the Intrepid API.
 
    :rtype: int
 
    """
 
    if result == 0:
...
 
@@ -116,34 +117,34 @@ def _set_api_hints():
 
    apilib.icsneoGetErrorMessages.argtypes = [c_int, POINTER(c_int), POINTER(c_int)]
 

 
    apilib.icsneoGetMessages.argtypes = [c_int, POINTER(structures.icsSpyMessage), POINTER(c_int), POINTER(c_int)]
 
    apilib.icsneoGetMessages.errcheck = update_result
 
    apilib.icsneoGetMessages.errcheck = _update_result
 
    apilib.icsneoTxMessages.argtypes = [c_int, POINTER(structures.icsSpyMessage), c_int, c_int]
 
    apilib.icsneoTxMessages.errcheck = update_result
 
    apilib.icsneoTxMessages.errcheck = _update_result
 
    apilib.icsneoWaitForRxMessagesWithTimeOut.argtypes = [c_int, c_uint]
 
    apilib.icsneoGetTimeStampForMsg.argtypes = [c_int, POINTER(structures.icsSpyMessage), POINTER(c_double)]
 
    apilib.icsneoGetTimeStampForMsg.errcheck = update_result
 
    apilib.icsneoGetTimeStampForMsg.errcheck = _update_result
 
    apilib.icsneoEnableNetworkRXQueue.argtypes = [c_int, c_int]
 
    apilib.icsneoEnableNetworkRXQueue.errcheck = update_result
 
    apilib.icsneoEnableNetworkRXQueue.errcheck = _update_result
 
    apilib.icsneoGetISO15765Status.argtypes = [c_int, c_int, c_int, c_int, POINTER(c_int), POINTER(c_int)]
 
    apilib.icsneoGetISO15765Status.restype = None
 
    apilib.icsneoSetISO15765RxParameters.argtypes = [c_int, c_int, c_int, POINTER(structures.spyFilterLong), POINTER(structures.icsSpyMessage), c_int, c_int, c_int, c_int]
 
    apilib.icsneoSetISO15765RxParameters.restype = None
 

 
    apilib.icsneoGetConfiguration.argtypes = (c_int, POINTER(c_ubyte), POINTER(c_int))
 
    apilib.icsneoGetConfiguration.errcheck = update_result
 
    apilib.icsneoGetConfiguration.errcheck = _update_result
 
    apilib.icsneoSendConfiguration.argtypes = (c_int, POINTER(c_ubyte), c_int)
 
    apilib.icsneoSendConfiguration.errcheck = update_result
 
    apilib.icsneoSendConfiguration.errcheck = _update_result
 
    apilib.icsneoGetFireSettings.argtypes = (c_int, POINTER(structures.SFireSettings), c_int)
 
    apilib.icsneoGetFireSettings.errcheck = update_result
 
    apilib.icsneoGetFireSettings.errcheck = _update_result
 
    apilib.icsneoSetFireSettings.argtypes = (c_int, POINTER(structures.SFireSettings), c_int, c_int)
 
    apilib.icsneoSetFireSettings.errcheck = update_result
 
    apilib.icsneoSetFireSettings.errcheck = _update_result
 
    apilib.icsneoGetVCAN3Settings.argtypes = (c_int, POINTER(structures.SVCAN3Settings), c_int)
 
    apilib.icsneoGetVCAN3Settings.errcheck = update_result
 
    apilib.icsneoGetVCAN3Settings.errcheck = _update_result
 
    apilib.icsneoSetVCAN3Settings.argtypes = (c_int, POINTER(structures.SVCAN3Settings), c_int, c_int)
 
    apilib.icsneoSetVCAN3Settings.errcheck = update_result
 
    apilib.icsneoSetVCAN3Settings.errcheck = _update_result
 

 
    apilib.icsneoSetBitRate.argtypes = (c_int,  c_int, c_int)
 
    apilib.icsneoSetBitRate.errcheck = update_result
 
    apilib.icsneoSetBitRate.errcheck = _update_result
 

 
    apilib.icsneoGetHWFirmwareInfo.argtypes = (c_int, POINTER(structures.APIFirmwareInfo))
 
    apilib.icsneoGetDLLFirmwareInfo.argtypes = (c_int, POINTER(structures.APIFirmwareInfo))
...
 
@@ -291,13 +292,13 @@ class BitRate(enum.IntEnum):
 
    BR_800000 = 9
 
    BR_1000000 = 10
 

 

 
DEFAULT_BIT_RATE = BitRate.BR_500000
 
    DEFAULT = BR_500000
 

 

 
def network_name(nid):
 
    """
 
    Lookup the friendly name for a network ID.
 

 
    :param int nid: The network ID to look up.
 
    :return: The friendly name for the network.
 
    :rtype: str
neovi/spec.py
Show inline comments
...
 
@@ -16,20 +16,22 @@ ECU provides four identifiers, each made
 

 
"""
 

 

 
import enum
 
import json
 

 

 
# Identifier signal types
 
ANALOG = 0
 
class SignalType(enum.Enum):
 
    Analog = 0
 

 

 
# Identifier value types
 
UNSIGNED_INT = 0
 
class ValueType(enum.Enum):
 
    UnsignedInt = 0
 

 

 
# Signal IO type
 
READ = 0
 
WRITE = 1
 
READ_WRITE = 2
 
class IOType(enum.Enum):
 
    Read = 0
 
    Write = 1
 
    ReadWrite = 2
 

 

 
class NonByteBoundarySignalError(Exception):
...
 
@@ -44,7 +46,7 @@ class UnsupportedValueTypeError(Exceptio
 
    pass
 

 

 
def prepend_zeroes(values, final_length):
 
def _prepend_zeroes(values, final_length):
 
    return ([0] * (final_length - len(values))) + values
 

 

...
 
@@ -75,22 +77,24 @@ class Signal:
 
    """
 
    __init__(self, name='value', signal_type=ANALOG, value_type=UNSIGNED_INT, start=0, length=8, min=None, max=None, units='', m=1, b=0, description='')
 
    
 
    Represents a signal. An :py:class:`.spec.Identifier` will contain one or
 
    Represents a signal. An :py:class:`.Identifier` will contain one or
 
    more signals.
 
    
 
    :param str name: The name of this signal.
 
    :param int signal_type: The type of signal. Currently, only
 
        :py:data:`.spec.ANALOG` is supported.
 
    :param int value_type: The type of value returned for the signal.
 
        Currently only :py:data:`.spec.UNSIGNED_INT` is supported.
 
    :param signal_type: The type of signal. Currently, only
 
        :py:attr:`.SignalType.Analog` is supported.
 
    :type signal_type: :py:class:`.SignalType`
 
    :param value_type: The type of value returned for the signal.
 
        Currently only :py:attr:`.ValueType.UnsignedInt` is supported.
 
    :type value_type: :py:class:`.ValueType`
 
    :param int start: The bit within the identifier's aggregate value at
 
        which this signal starts.
 
    :param int length: The length in bits of this signal.
 
    :param float min_val: The minimum value for this signal. This is not enforced
 
        and is for reference only. If not specified then b is used as the
 
        and is for reference only. If not specified then *b* is used as the
 
        default value.
 
    :param float max_val:  The minimum value for this signal. This is not enforced
 
        and is for reference only. If not specified then (255. * m) + b is
 
        and is for reference only. If not specified then *(255. * m) + b* is
 
        used as the default value.
 
    :param str units: The units of the signal once converted.
 
    :param float m: The slope for conversion of the signal into engineering
...
 
@@ -100,25 +104,25 @@ class Signal:
 
    :param str description: A human-readable description of the signal.
 
   
 
    :raises NonByteBoundarySignalError: If length or start are not integer
 
        multiples of 8.
 
        multiples of 8. This is a limitation of the current implementation.
 
    """
 
    def __init__(self, name='value', signal_type=ANALOG, value_type=UNSIGNED_INT, start=0, length=8, min_val=None, max_val=None, units='', m=1, b=0, description=''):
 
    def __init__(self, name='value', signal_type=SignalType.Analog, value_type=ValueType.UnsignedInt, start=0, length=8, min_val=None, max_val=None, units='', m=1, b=0, description=''):
 
        self.name = name
 
        self.signal_type = signal_type
 
        self.value_type = value_type
 
        self.start = start
 
        self.length = length
 
        if min_val is None:
 
            self.min = b
 
            self.min = float(b)
 
        else:
 
            self.min = min_val
 
            self.min = float(min_val)
 
        if max_val is None:
 
            self.max = (255. * m) + b
 
        else:
 
            self.max = max_val
 
            self.max = float(max_val)
 
        self.units = units
 
        self.m = m
 
        self.b = b
 
        self.m = float(m)
 
        self.b = float(b)
 
        self.description = description
 
        
 
        if length % 8 != 0 or start % 8 != 0:
...
 
@@ -130,12 +134,12 @@ class Signal:
 
        
 
        :param signal_bytes: The bytes received from the ECU.
 
        :type signal_bytes: list of ints
 
        :returns: Tuple of final value and the units of the signal.
 
        :returns: Tuple of final value (float) and the units of the signal (str).
 
        
 
        :raises UnsupportedValueTypeError: If the signal is of a type that is
 
            currently unsupported.
 
        """
 
        if self.signal_type == ANALOG and self.value_type == UNSIGNED_INT:
 
        if self.signal_type == SignalType.Analog and self.value_type == ValueType.UnsignedInt:
 
            value = sum(x << (i*8) for i, x in enumerate(reversed(signal_bytes[self.start:self.start + (self.length / 8)])))
 
            value = self.m * value + self.b
 
        else:
...
 
@@ -155,12 +159,12 @@ class Signal:
 
        :raises ValueTooLargeError: If the value does not fit into the number
 
            of bits specified for the signal.
 
        """
 
        if self.signal_type == ANALOG and self.value_type == UNSIGNED_INT:
 
        if self.signal_type == SignalType.Analog and self.value_type == ValueType.UnsignedInt:
 
            data_bytes = []
 
            value = int(round((value - self.b) / self.m))
 

 
            if value == 0:
 
                return prepend_zeroes([0], self.length / 8)
 
                return _prepend_zeroes([0], self.length / 8)
 
            
 
            while value > 0:
 
                data_bytes.append(value & 0xFF)
...
 
@@ -169,18 +173,18 @@ class Signal:
 
            if len(data_bytes) > (self.length / 8):
 
                raise ValueTooLargeError
 

 
            return prepend_zeroes([x for x in reversed(data_bytes)], self.length / 8)
 
            return _prepend_zeroes([x for x in reversed(data_bytes)], self.length / 8)
 
        else:
 
            raise UnsupportedValueTypeError
 
    
 
    def __repr__(self):
 
        return 'Signal(name="%(name)s", signal_type=%(signal_type)u, value_type=%(value_type)u, start=%(start)u, length=%(length)u, min=%(min)s, max=%(max)s, units="%(units)s", m=%(m)f, b=%(b)f)' % self.__dict__
 
        return 'Signal(name="%(name)s", signal_type=%(signal_type)s, value_type=%(value_type)s, start=%(start)u, length=%(length)u, min=%(min)s, max=%(max)s, units="%(units)s", m=%(m)f, b=%(b)f)' % self.__dict__
 

 

 
# TODO: Add method to encode all signal values into a final value to send
 
class Identifier:
 
    """
 
    __init__(self, name, data_id, signals = [], io_type = READ)
 
    __init__(self, name, data_id, signals, io_type = IOType.Read)
 
    
 
    Represents an identifier (an example of which would be "Blower Speed Output"
 
    in a HVAC ECU). The value of an identifier can consist of multiple
...
 
@@ -191,11 +195,10 @@ class Identifier:
 
    :param int data_id: The address (internal to the ECU) with which this
 
        identifier is associated.
 
    :param signals: The signals that form this identifier.
 
    :type signals: list of spec.Signal
 
    :param int io_type: Allowed IO for this identifier: :py:data:`.spec.READ`,
 
        :py:data:`.spec.WRITE`, :py:data:`.spec.READ_WRITE`.
 
    :type signals: list of :py:class:`.Signal`
 
    :param IOType io_type: Allowed IO for this identifier.
 
    """
 
    def __init__(self, name, data_id, signals=(), io_type=READ):
 
    def __init__(self, name, data_id, signals, io_type=IOType.Read):
 
        self.name = name
 
        self.data_id = data_id
 
        self.data_id_bytes = [(data_id & 0xFF00) >> 8, data_id & 0x00FF]
...
 
@@ -256,11 +259,11 @@ class PyNeoViJSONEncoder(json.JSONEncode
 
    """
 
    def default(self, o):
 
        if o.__class__.__name__ == 'Identifier':
 
            return 'PyNeoVi.Identifier', o.name, o.data_id, o.signals, o.io_type
 
            return 'PyNeoVi.Identifier', o.name, o.data_id, o.signals, o.io_type.name
 
        elif o.__class__.__name__ == 'ECU':
 
            return 'PyNeoVi.ECU', o.network, o.request_id, o.short_desc, o.long_desc
 
        elif o.__class__.__name__ == 'Signal':
 
            return 'PyNeoVi.Signal', o.name, o.signal_type, o.value_type, o.start, o.length, o.min, o.max, o.units, o.m, o.b, o.description
 
            return 'PyNeoVi.Signal', o.name, o.signal_type.name, o.value_type.name, o.start, o.length, o.min, o.max, o.units, o.m, o.b, o.description
 
        else:
 
            return json.JSONEncoder.default(self, o)
 

...
 
@@ -271,6 +274,14 @@ def from_json(json_object):
 
    additional object types defined here.
 
    """
 
    output = {}
 

 
    def parse_signal(signal):
 
        _, name, signal_type, value_type, start, length, min, max, units, m, b, desc = signal
 
        return Signal(name, SignalType[signal_type], ValueType[value_type], start, length, min, max, units, m, b, desc)
 

 
    def parse_identifier(identifier):
 
        signals = [parse_signal(signal_info) for signal_info in identifier[3]]
 
        return Identifier(identifier[1], identifier[2], signals, IOType[identifier[4]])
 
    
 
    for k, v in json_object.items():
 
        if type(v) != list:
...
 
@@ -281,12 +292,9 @@ def from_json(json_object):
 
        elif v[0] == 'PyNeoVi.ECU':
 
            output[k] = ECU(*v[1:])
 
        elif v[0] == 'PyNeoVi.Identifier':
 
            output[k] = Identifier(*v[1:])
 
            output[k] = parse_identifier(v)
 
        elif k == 'identifiers':
 
            new_idents = []
 
            for identifier in v:
 
                signals = [Signal(*signal_info[1:]) for signal_info in identifier[3]]
 
                new_idents.append(Identifier(identifier[1], identifier[2], signals))
 
            new_idents = [parse_identifier(identifier) for identifier in v]
 
            output[k] = _IdentifierList(new_idents)
 
        else:
 
            print('%s %s' % (k, v))
0 comments (0 inline, 0 general)