@@ -46,10 +46,10 @@ class BusABC(metaclass=ABCMeta):
46
46
47
47
@abstractmethod
48
48
def __init__ (
49
- self ,
50
- channel : Any ,
51
- can_filters : Optional [can .typechecking .CanFilters ] = None ,
52
- ** kwargs : object
49
+ self ,
50
+ channel : Any ,
51
+ can_filters : Optional [can .typechecking .CanFilters ] = None ,
52
+ ** kwargs : object
53
53
):
54
54
"""Construct and open a CAN bus instance of the specified type.
55
55
@@ -114,7 +114,7 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
114
114
return None
115
115
116
116
def _recv_internal (
117
- self , timeout : Optional [float ]
117
+ self , timeout : Optional [float ]
118
118
) -> Tuple [Optional [Message ], bool ]:
119
119
"""
120
120
Read a message from the bus and tell whether it was filtered.
@@ -176,11 +176,11 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
176
176
raise NotImplementedError ("Trying to write to a readonly bus?" )
177
177
178
178
def send_periodic (
179
- self ,
180
- msgs : Union [Message , Sequence [Message ]],
181
- period : float ,
182
- duration : Optional [float ] = None ,
183
- store_task : bool = True ,
179
+ self ,
180
+ msgs : Union [Message , Sequence [Message ]],
181
+ period : float ,
182
+ duration : Optional [float ] = None ,
183
+ store_task : bool = True ,
184
184
) -> can .broadcastmanager .CyclicSendTaskABC :
185
185
"""Start sending messages at a given period on this bus.
186
186
@@ -256,10 +256,10 @@ def wrapped_stop_method(remove_task: bool = True) -> None:
256
256
return task
257
257
258
258
def _send_periodic_internal (
259
- self ,
260
- msgs : Union [Sequence [Message ], Message ],
261
- period : float ,
262
- duration : Optional [float ] = None ,
259
+ self ,
260
+ msgs : Union [Sequence [Message ], Message ],
261
+ period : float ,
262
+ duration : Optional [float ] = None ,
263
263
) -> can .broadcastmanager .CyclicSendTaskABC :
264
264
"""Default implementation of periodic message sending using threading.
265
265
@@ -332,7 +332,7 @@ def filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
332
332
self .set_filters (filters )
333
333
334
334
def set_filters (
335
- self , filters : Optional [can .typechecking .CanFilters ] = None
335
+ self , filters : Optional [can .typechecking .CanFilters ] = None
336
336
) -> None :
337
337
"""Apply filtering to all messages received by this Bus.
338
338
@@ -449,6 +449,31 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
449
449
def fileno (self ) -> int :
450
450
raise NotImplementedError ("fileno is not implemented using current CAN bus" )
451
451
452
+ def calc_load (self , interval : float = 1.0 ) -> int :
453
+ """
454
+ Calculate the load of the interface in the last 1s or specified interval of time
455
+
456
+ :return: a int representing the load of the interface in bytes
457
+ """
458
+ start_time = time ()
459
+ count = 0
460
+ temp_buffer = []
461
+ while time () < start_time + interval :
462
+ temp_buffer .append (self .recv (interval ))
463
+ for msg in temp_buffer :
464
+ if msg is not None and isinstance (can .Message , msg ):
465
+ count += len (msg .data )
466
+ return count
467
+
468
+ def calc_load_percentage (self , interval : float = 1.0 ) -> float :
469
+ """
470
+ Calculate the load of the interface in the last 1s or specified interval of time
471
+
472
+ :return: a float representing the load of the interface as a percentage
473
+ """
474
+ raise NotImplementedError ()
475
+ # return self.calc_load(interval) / self.bitrate
476
+
452
477
453
478
class _SelfRemovingCyclicTask (CyclicSendTaskABC , ABC ):
454
479
"""Removes itself from a bus.
@@ -457,6 +482,6 @@ class _SelfRemovingCyclicTask(CyclicSendTaskABC, ABC):
457
482
"""
458
483
459
484
def stop ( # pylint: disable=arguments-differ
460
- self , remove_task : bool = True
485
+ self , remove_task : bool = True
461
486
) -> None :
462
487
raise NotImplementedError ()
0 commit comments