1
- import fcntl as fnctl
1
+ import fcntl
2
2
import math
3
3
import os
4
- import time
5
4
from ctypes import *
6
- from typing import Dict , List , Union
5
+ from typing import Any , Dict , List , Union
7
6
8
7
from ioctl_opt import IO , IOW , IOWR
9
8
10
9
from plin .enums import *
11
10
11
+ PLIN_USB_FILTER_LEN = 8
12
12
PLIN_DAT_LEN = 8
13
+ PLIN_EMPTY_DATA = b'\xff ' * PLIN_DAT_LEN
13
14
14
15
15
16
class PLINMessage (Structure ):
@@ -29,13 +30,12 @@ class PLINMessage(Structure):
29
30
("reserved" , c_uint8 * 8 )
30
31
]
31
32
32
- def __setattr__ (self , name , value ) :
33
+ def __setattr__ (self , name : str , value : Any ) -> None :
33
34
if name == "data" :
34
35
buf = (c_uint8 * PLIN_DAT_LEN )(* value )
35
- super ().__setattr__ ("len" , len (value ))
36
- super ().__setattr__ (name , buf )
36
+ return super ().__setattr__ (name , buf )
37
37
else :
38
- super ().__setattr__ (name , value )
38
+ return super ().__setattr__ (name , value )
39
39
40
40
def __repr__ (self ) -> str :
41
41
return str (self ._asdict ())
@@ -70,12 +70,12 @@ class PLINUSBFrameEntry(Structure):
70
70
("d" , c_uint8 * PLIN_DAT_LEN )
71
71
]
72
72
73
- def __setattr__ (self , name , value ) :
73
+ def __setattr__ (self , name : str , value : Any ) -> None :
74
74
if name == "d" :
75
75
buf = (c_uint8 * PLIN_DAT_LEN )(* value )
76
- super ().__setattr__ (name , buf )
76
+ return super ().__setattr__ (name , buf )
77
77
else :
78
- super ().__setattr__ (name , value )
78
+ return super ().__setattr__ (name , value )
79
79
80
80
def __repr__ (self ) -> str :
81
81
return str (self ._asdict ())
@@ -108,7 +108,7 @@ class PLINUSBGetBaudrate(Structure):
108
108
109
109
class PLINUSBIDFilter (Structure ):
110
110
_fields_ = [
111
- ("id_mask" , c_uint8 * 8 )
111
+ ("id_mask" , c_uint8 * PLIN_USB_FILTER_LEN )
112
112
]
113
113
114
114
@@ -247,10 +247,10 @@ def _asdict(self) -> dict:
247
247
for field , _ in self ._fields_ }
248
248
result ["mode" ] = PLINMode (self .mode )
249
249
if self .usb_filter == 0 :
250
- result ["usb_filter" ] = bytearray ([0 ] * 8 )
250
+ result ["usb_filter" ] = bytearray ([0 ] * PLIN_USB_FILTER_LEN )
251
251
else :
252
252
result ["usb_filter" ] = bytearray .fromhex (
253
- f"{ self .usb_filter :x} " ).ljust (8 , b'\x00 ' )
253
+ f"{ self .usb_filter :x} " ).ljust (PLIN_USB_FILTER_LEN , b'\x00 ' )
254
254
result ["bus_state" ] = PLINBusState (self .bus_state )
255
255
del result ["unused" ]
256
256
return result
@@ -265,12 +265,12 @@ class PLINUSBUpdateData(Structure):
265
265
("d" , c_uint8 * PLIN_DAT_LEN ) # new data bytes
266
266
]
267
267
268
- def __setattr__ (self , name , value ) :
268
+ def __setattr__ (self , name : str , value : Any ) -> None :
269
269
if name == "d" :
270
270
buf = (c_uint8 * PLIN_DAT_LEN )(* value )
271
- super ().__setattr__ (name , buf )
271
+ return super ().__setattr__ (name , buf )
272
272
else :
273
- super ().__setattr__ (name , value )
273
+ return super ().__setattr__ (name , value )
274
274
275
275
def __repr__ (self ) -> str :
276
276
return str (self ._asdict ())
@@ -337,15 +337,25 @@ class PLIN:
337
337
def __init__ (self , interface : str ):
338
338
self .interface = interface
339
339
self .response_remap = [- 1 ] * PLIN_USB_RSP_REMAP_ID_LEN
340
- self .reset ()
340
+ self .fd = None
341
341
342
342
def _ioctl (self , * args , ** kwargs ):
343
343
'''
344
344
Generic ioctl function to wrap open/closing the file descriptor.
345
345
'''
346
- fd = os .open (self .interface , os .O_RDWR )
347
- fnctl .ioctl (fd , * args , ** kwargs )
348
- os .close (fd )
346
+ if self .fd :
347
+ try :
348
+ fcntl .ioctl (self .fd , * args , ** kwargs )
349
+ except :
350
+ print ("File descriptor busy!" )
351
+ else :
352
+ raise Exception ("PLIN not connected!" )
353
+
354
+ def reset (self ):
355
+ '''
356
+ Resets the PLIN device.
357
+ '''
358
+ self ._ioctl (PLIORSTHW )
349
359
350
360
def start (self , mode : PLINMode , baudrate : int = 19200 ):
351
361
'''
@@ -354,14 +364,19 @@ def start(self, mode: PLINMode, baudrate: int = 19200):
354
364
self .mode = mode
355
365
self .baudrate = baudrate
356
366
367
+ if not self .fd :
368
+ self .fd = os .open (self .interface , os .O_RDWR )
369
+
370
+ self .reset ()
357
371
buffer = PLINUSBInitHardware (self .baudrate , self .mode , 0 )
358
372
self ._ioctl (PLIOHWINIT , buffer )
359
373
360
- def reset (self ):
374
+ def stop (self ):
361
375
'''
362
- Resets the PLIN device.
376
+ Disconnects from the PLIN device by closing the file descriptor .
363
377
'''
364
- self ._ioctl (PLIORSTHW )
378
+ if self .fd :
379
+ os .close (self .fd )
365
380
366
381
def set_frame_entry (self ,
367
382
id : int ,
@@ -424,7 +439,8 @@ def set_id_filter(self, filter: bytearray):
424
439
Sets the ID filter.
425
440
'''
426
441
buffer = PLINUSBIDFilter ()
427
- buffer .id_mask = (c_ubyte * 8 )(* filter .ljust (8 , b'\x00 ' ))
442
+ buffer .id_mask = (c_ubyte * PLIN_USB_FILTER_LEN )(*
443
+ filter .ljust (PLIN_USB_FILTER_LEN , b'\x00 ' ))
428
444
self ._ioctl (PLIOSETIDFILTER , buffer )
429
445
430
446
def get_id_filter (self ) -> bytearray :
@@ -435,6 +451,41 @@ def get_id_filter(self) -> bytearray:
435
451
self ._ioctl (PLIOGETIDFILTER , buffer )
436
452
return bytearray (buffer .id_mask )
437
453
454
+ def block_id (self , id : int ):
455
+ '''
456
+ Add ID to filter (block ID).
457
+ '''
458
+ if id > PLINFrameID .MAX or id < PLINFrameID .MIN :
459
+ raise ValueError (
460
+ f"ID { id } out of range [{ PLINFrameID .MIN } ..{ PLINFrameID .MAX } ]." )
461
+ current_filter = int .from_bytes (self .get_id_filter (), 'little' )
462
+ mask = ~ (1 << id )
463
+ current_filter &= mask
464
+ self .set_id_filter (current_filter .to_bytes (
465
+ PLIN_USB_FILTER_LEN , 'little' ))
466
+
467
+ def register_id (self , id : int ):
468
+ '''
469
+ Remove ID from filter (allow ID through).
470
+ '''
471
+ if id > PLINFrameID .MAX or id < PLINFrameID .MIN :
472
+ raise ValueError (
473
+ f"ID { id } out of range [{ PLINFrameID .MIN } ..{ PLINFrameID .MAX } ]." )
474
+ current_filter = int .from_bytes (self .get_id_filter (), 'little' )
475
+ mask = (1 << id )
476
+ current_filter |= mask
477
+ self .set_id_filter (current_filter .to_bytes (
478
+ PLIN_USB_FILTER_LEN , 'little' ))
479
+
480
+ def clear_id_filter (self , allow_all = True ):
481
+ '''
482
+ Clear ID filter to either allow all IDs or disallow all IDs.
483
+ '''
484
+ if allow_all :
485
+ self .set_id_filter (bytearray ([0xff ] * PLIN_USB_FILTER_LEN ))
486
+ else :
487
+ self .set_id_filter (bytearray ([0 ] * PLIN_USB_FILTER_LEN ))
488
+
438
489
def get_mode (self ) -> PLINMode :
439
490
'''
440
491
Gets the mode.
@@ -744,44 +795,36 @@ def set_led_state(self, enable: bool):
744
795
buffer = PLINUSBLEDState (on_off = int (enable ))
745
796
self ._ioctl (PLIOSETLEDSTATE , buffer )
746
797
747
- def read (self , timeout = 0 ) -> Union [PLINMessage , None ]:
798
+ def read (self , block = True ) -> Union [PLINMessage , None ]:
748
799
'''
749
- Reads a PLINMessage from the LIN bus with an optional timeout in seconds .
750
-
800
+ Reads a PLINMessage from the LIN bus with an optional timeout in milliseconds .
801
+
751
802
A timeout value of 0 blocks until data is read. If the timeout is reached before data is read, None is returned.
752
803
'''
753
- fd = os .open (self .interface , os .O_RDONLY )
754
-
755
- if timeout > 0 :
756
- os .set_blocking (fd , False )
757
- timeout_start = time .time ()
758
- while True :
759
- time .sleep (0.1 )
760
- if time .time () >= timeout_start + timeout :
761
- result = None
762
- break
763
- else :
764
- try :
765
- result = os .read (fd , PLINMessage .buffer_length )
766
- break
767
- except :
768
- pass
804
+ if self .fd :
805
+ blocking = os .get_blocking (self .fd )
806
+ os .set_blocking (self .fd , block )
807
+ try :
808
+ result = os .read (self .fd , PLINMessage .buffer_length )
809
+ message = PLINMessage .from_buffer_copy (result )
810
+ # If bytes read was invalid.
811
+ if bytes (message .data ) == PLIN_EMPTY_DATA :
812
+ message = None
813
+ except :
814
+ message = None
815
+ os .set_blocking (self .fd , blocking )
816
+ return message
769
817
else :
770
- result = os .read (fd , PLINMessage .buffer_length )
771
-
772
- os .close (fd )
773
-
774
- if result is None :
775
- return None
776
- else :
777
- return PLINMessage .from_buffer_copy (result )
818
+ raise Exception ("PLIN not connected!" )
778
819
779
820
def write (self , message : PLINMessage ):
780
821
'''
781
822
Writes a PLINMessage to the LIN bus.
782
823
'''
783
- buffer = bytearray (message )
784
-
785
- fd = os .open (self .interface , os .O_WRONLY )
786
- os .write (fd , buffer )
787
- os .close (fd )
824
+ if self .fd :
825
+ if message .dir == PLINFrameDirection .PUBLISHER :
826
+ self .block_id (message .id )
827
+ buffer = bytearray (message )
828
+ os .write (self .fd , buffer )
829
+ else :
830
+ raise Exception ("PLIN not connected!" )
0 commit comments