Changeset - 7f3f681e5406
[Not reviewed]
default
0 7 1
Kemp - 6 years ago 2018-04-09 11:04:11

Updated setup.py and README.txt ready for packaging for PyPI. Added message type constants for Pending DTCs and Permanent DTCs. Started adding unit tests. Other minor edits.
8 files changed with 121 insertions and 16 deletions:
0 comments (0 inline, 0 general)
README.txt
Show inline comments
 
pyneovi is a wrapper around the API provided by Intrepid Control Systems for
 
communicating with their NeoVI range of devices.
 
pyneovi is a wrapper around the API provided by Intrepid Control Systems for communicating with their NeoVI range of devices.
 

 
The focus of the library is on communicating with the ECU in the context of a
 
diagnostic session. Currently, authentication via Security Access commands
 
(usually required for setting output values) is not handled and must be
 
performed by a separate tool.
 
The focus of the library is on communicating with the ECU in the context of a diagnostic session. Currently, authentication via Security Access commands (usually required for setting output values) is not handled and must be performed by a separate tool.
 

 
The project homepage is http://kempj.co.uk/projects/pyneovi/
 

 
Documentation can be found at http://pyneovi.readthedocs.org/en/latest/
...
 
\ No newline at end of file
 
The project sourcecode repository is at https://bitbucket.org/Kemp_J/pyneovi
 

 
Documentation can be found at http://pyneovi.readthedocs.org/en/latest/
neovi/can.py
Show inline comments
...
 
@@ -12,7 +12,9 @@ DIAG_CURRENT_DATA          = 0x01
 
DIAG_FREEZE_FRAME_DATA     = 0x02
 
DIAG_STORED_DTCS           = 0x03
 
DIAG_CLEAR_DTCS            = 0x04
 
DIAG_PENDING_DTCS          = 0x07
 
DIAG_VEHICLE_INFORMATION   = 0x09
 
DIAG_PERMANENT_DTCS        = 0x0A
 

 
DIAGNOSTIC_SESSION_CONTROL = 0x10
 
ECU_RESET                  = 0x11
neovi/ecu.py
Show inline comments
...
 
@@ -18,7 +18,7 @@ from __future__ import absolute_import, 
 
import threading
 
import logging
 

 
from neovi.structures import format_array, get_status_bits_set, array_equal
 
from neovi.structures import format_array, get_status_bits_set, array_equal, icsSpyMessage
 
import neovi.can as can
 

 

...
 
@@ -123,7 +123,7 @@ class ECU:
 
        self.msg_lock.release()
 

 
        # msg (as passed around in subs_key) should have been modified by the thread handling the incoming messages.
 
        msg = msg[0]
 
        msg = msg[0]    # type: icsSpyMessage
 

 
        if msg.Data[1] == 0x7F:
 
            raise CommandError(msg.Data[3])
neovi/neodevice.py
Show inline comments
...
 
@@ -200,7 +200,7 @@ class NeoDevice:
 
        self.tx_id += 1
 
        return self.tx_raw_message(msg, network_id), self.tx_id - 1
 
        
 
    def subscribe_to(self, callback, network=None, address=None, msg_type=None, additional_bytes=(), auto_remove=False, user_data=None):
 
    def subscribe_to(self, callback, network=None, address=None, msg_type=None, additional_bytes=None, auto_remove=False, user_data=None):
 
        """
 
        Set a callback function to be called upon reception of a defined subset
 
        of messages. Note that this will only occur if the message thread has
...
 
@@ -221,7 +221,8 @@ class NeoDevice:
 
        :param int msg_type: The message type of interest (see :py:mod:`.can`).
 
            If None (the default) then it will be ignored for filtering.
 
        :param additional_bytes: Additional payload bytes to use for filtering.
 
            May be an empty list (the default).
 
            May be an empty list. A value of None (the default) will be converted
 
            to an empty list automatically.
 
        :type additional_bytes: list of ints
 
        :param bool auto_remove: If True then the subscription is removed once
 
            the first message matching the provided criteria has been received.
...
 
@@ -230,7 +231,7 @@ class NeoDevice:
 
            the received message.
 
        """
 
        self.subscriptions_lock.acquire()
 
        self.subscriptions.append((callback, network, address, msg_type, additional_bytes, auto_remove, user_data))
 
        self.subscriptions.append((callback, network, address, msg_type, [] if additional_bytes is None else additional_bytes, auto_remove, user_data))
 
        self.subscriptions_lock.release()
 
    
 
    def subscribe_to_all(self, callback):
neovi/spec.py
Show inline comments
...
 
@@ -164,7 +164,7 @@ class Signal:
 
            
 
            while value > 0:
 
                data_bytes.append(value & 0xFF)
 
                value = value >> 8
 
                value >>= 8
 
            
 
            if len(data_bytes) > (self.length / 8):
 
                raise ValueTooLargeError
neovi/structures.py
Show inline comments
...
 
@@ -125,8 +125,8 @@ class icsSpyMessage(Structure):
 
    
 
    def __repr__(self):
 
        fields = ['StatusBitField', 'StatusBitField2', 'NodeID', 'DescriptionID', 'ArbIDOrHeader', 'Value']
 
        field_values = ['%s=%s' % (k, v) for k, v in [(row[0], getattr(self, row[0])) for row in fields]]
 
        data_values = ['%s=(%X %X %X %X %X)' % (k, v.Data[0], v.Data[1], v.Data[2], v.Data[3], v.Data[4]) for k, v in [(row[0], getattr(self, row[0])) for row in ['Data', 'AckBytes']]]
 
        field_values = ['%s=%s' % (k, v) for k, v in [(row, getattr(self, row)) for row in fields]]
 
        data_values = ['%s=(%X %X %X %X %X %X %X %X)' % (k, v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]) for k, v in [(row, getattr(self, row)) for row in ['Data', 'AckBytes']]]
 
        return 'icsSpyMessage(%s)' % ', '.join(field_values + data_values)
 
                
 

setup.py
Show inline comments
 
from setuptools import setup
 

 
readme_file = open('README.txt', 'r')
 
long_description = '\n'.join(readme_file.readlines())
 
readme_file.close()
 

 
setup(name='pyneovi',
 
      packages=['neovi'],
 
      version='1.0',
 
      description='A wrapper around the API provided by Intrepid Control Systems for communicating with their NeoVI range of devices',
 
      long_description=long_description,
 
      author='John Kemp',
 
      author_email='kemp@kempj.co.uk',
 
      url='http://kempj.co.uk',
 
      url='http://kempj.co.uk/projects/pyneovi/',
 
      license='MIT',
 
      classifiers=[
 
        'Intended Audience :: Developers',
tests/test_neodevice.py
Show inline comments
 
new file 100644
 
from overrides import overrides
 
from typing import List
 
import threading
 

 
from neovi.neodevice import NeoDevice
 
from neovi.neovi import SUCCESS
 
from neovi.structures import icsSpyMessage
 

 

 
class OfflineNeoDevice(NeoDevice):
 
    def __init__(self, messages: List[icsSpyMessage]) -> None:
 
        super().__init__(None, False, False)
 
        self.messages = messages
 

 
    @overrides
 
    def get_messages(self):
 
        return (SUCCESS, self.messages, [])
 

 

 
class TestSubscriptions:
 
    def do_subscription_test(self, messages: List[icsSpyMessage], expected_output: List[icsSpyMessage], network_filter=None, address_filter=None, msg_type_filter=None, additional_bytes_filter=None) -> None:
 
        remaining = expected_output[:]
 
        unexpected = []
 

 
        def callback(msg, user_data):
 
            if msg in remaining:
 
                remaining.remove(msg)
 
            else:
 
                unexpected.append(msg)
 

 
        neodevice = OfflineNeoDevice(messages)
 
        neodevice.subscribe_to(callback, network_filter, address_filter, msg_type_filter, additional_bytes_filter)
 
        neodevice._process_msg_queue()
 
        assert len(remaining) == 0
 
        assert len(unexpected) == 0
 

 

 
    def test_subscription(self) -> None:
 
        messages = [icsSpyMessage()]
 
        self.do_subscription_test(messages, messages)
 

 

 
    def test_subscription_network_filter(self) -> None:
 
        correct_network = icsSpyMessage(NetworkID=1)
 
        incorrect_network = icsSpyMessage(NetworkID=2)
 
        self.do_subscription_test([correct_network, incorrect_network], [correct_network], network_filter=1)
 

 

 
    def test_subscription_address_filter(self) -> None:
 
        correct_address = icsSpyMessage(ArbIDOrHeader=1)
 
        incorrect_address = icsSpyMessage(ArbIDOrHeader=2)
 
        self.do_subscription_test([correct_address, incorrect_address], [correct_address], address_filter=1)
 

 

 
    def test_subscription_msg_type_filter(self) -> None:
 
        correct_msg_type = icsSpyMessage()
 
        correct_msg_type.Data[1] = 1
 
        incorrect_msg_type = icsSpyMessage()
 
        incorrect_msg_type.Data[1] = 2
 
        self.do_subscription_test([correct_msg_type, incorrect_msg_type], [correct_msg_type], msg_type_filter=1)
 

 

 
    def test_subscription_additional_bytes_filter(self) -> None:
 
        correct_additional_bytes = icsSpyMessage()
 
        correct_additional_bytes.Data[2] = 2
 
        correct_additional_bytes.Data[3] = 5
 
        incorrect_additional_bytes = icsSpyMessage()
 
        incorrect_additional_bytes.Data[2] = 1
 
        incorrect_additional_bytes.Data[3] = 2
 
        self.do_subscription_test([correct_additional_bytes, incorrect_additional_bytes], [correct_additional_bytes], additional_bytes_filter=[2, 5])
 

 

 
    def test_subscription_auto_remove(self) -> None:
 
        def callback(msg, user_data):
 
            pass
 

 
        neodevice = OfflineNeoDevice([icsSpyMessage()])
 

 
        neodevice.subscribe_to(callback, auto_remove=True)
 
        neodevice._process_msg_queue()
 
        assert len(neodevice.subscriptions) == 0
 

 
        neodevice.subscribe_to(callback, auto_remove=False)
 
        neodevice._process_msg_queue()
 
        assert len(neodevice.subscriptions) == 1
 

 

 
    def test_subscription_user_data(self) -> None:
 
        submitted_user_data = "test string"
 
        success = threading.Event()
 

 
        def callback(msg, user_data):
 
            if submitted_user_data == user_data:
 
                success.set()
 

 
        neodevice = OfflineNeoDevice([icsSpyMessage()])
 
        neodevice.subscribe_to(callback, user_data=submitted_user_data)
 
        neodevice._process_msg_queue()
 
        assert success.is_set()
0 comments (0 inline, 0 general)