-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathadafruit_ble_heart_rate.py
199 lines (154 loc) · 6.83 KB
/
adafruit_ble_heart_rate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# SPDX-FileCopyrightText: 2020 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_ble_heart_rate`
================================================================================
BLE Heart Rate Service
* Author(s): Dan Halbert for Adafruit Industries
The Heart Rate Service is specified here:
https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Services/org.bluetooth.service.heart_rate.xml
Implementation Notes
--------------------
**Hardware:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
* Adafruit's BLE library: https://github.com/adafruit/Adafruit_CircuitPython_BLE
"""
import struct
from collections import namedtuple
import _bleio
from adafruit_ble.services import Service
from adafruit_ble.uuid import StandardUUID
from adafruit_ble.characteristics import Characteristic, ComplexCharacteristic
from adafruit_ble.characteristics.int import Uint8Characteristic
try:
from typing import Optional
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE_Heart_Rate.git"
HeartRateMeasurementValues = namedtuple(
"HeartRateMeasurementValues",
("heart_rate", "contact", "energy_expended", "rr_intervals"),
)
"""Namedtuple for measurement values.
* `HeartRateMeasurementValues.heart_rate`
Heart rate (int), in beats per minute.
* `HeartRateMeasurementValues.contact`
``True`` if device is contacting the body, ``False`` if not,
``None`` if device does not support contact detection.
* `HeartRateMeasurementValues.energy_expended`
Energy expended (int), in kilo joules, or ``None`` if no value.
* `HeartRateMeasurementValues.rr_intervals`
Sequence of RR intervals, measuring the time between
beats. Oldest first, in ints that are units of 1024ths of a second.
This sequence will be empty if the device does not report the intervals.
*Caution:* inexpensive heart rate monitors may not measure this
accurately. Do not use for diagnosis.
For example::
bpm = svc.measurement_values.heart_rate
"""
class _HeartRateMeasurement(ComplexCharacteristic):
"""Notify-only characteristic of streaming heart rate data."""
uuid = StandardUUID(0x2A37)
def __init__(self) -> None:
super().__init__(properties=Characteristic.NOTIFY)
def bind(self, service: "HeartRateService") -> _bleio.PacketBuffer:
"""Bind to a HeartRateService."""
bound_characteristic = super().bind(service)
bound_characteristic.set_cccd(notify=True)
# Use a PacketBuffer that can store one packet to receive the HRM data.
return _bleio.PacketBuffer(bound_characteristic, buffer_size=1)
class HeartRateService(Service):
"""Service for reading from a Heart Rate sensor."""
# 0x180D is the standard HRM 16-bit, on top of standard base UUID
uuid = StandardUUID(0x180D)
# uint8: flags
# bit 0 = 0: Heart Rate Value is uint8
# bit 0 = 1: Heart Rate Value is uint16
# bits 2:1 = 0 or 1: Sensor Contact Feature not supported
# bits 2:1 = 2: Sensor Contact Feature supported, contact is not detected
# bits 2:1 = 3: Sensor Contact Feature supported, contacted is detected
# bit 3 = 0: Energy Expended field is not present
# bit 3 = 1: Energy Expended field is present. Units: kilo Joules
# bit 4 = 0: RR-Interval values are not present
# bit 4 = 1: One or more RR-Interval values are present
#
# next uint8 or uint16: Heart Rate Value
# next uint16: Energy Expended, if present
# next uint16 (multiple): RR-Interval values, resolution of 1/1024 second
# in order of oldest to newest
#
# Mandatory for Heart Rate Service
heart_rate_measurement = _HeartRateMeasurement()
# Optional for Heart Rate Service.
body_sensor_location = Uint8Characteristic(
uuid=StandardUUID(0x2A38), properties=Characteristic.READ
)
# Mandatory only if Energy Expended features is supported.
heart_rate_control_point = Uint8Characteristic(
uuid=StandardUUID(0x2A39), properties=Characteristic.WRITE
)
_BODY_LOCATIONS = ("Other", "Chest", "Wrist", "Finger", "Hand", "Ear Lobe", "Foot")
def __init__(self, service: Optional["HeartRateService"] = None) -> None:
super().__init__(service=service)
# Defer creating buffer until needed.
self._measurement_buf = None
@property
def measurement_values(self) -> Optional[_HeartRateMeasurement]:
"""All the measurement values, returned as a HeartRateMeasurementValues
namedtuple.
Return ``None`` if no packet has been read yet.
"""
# pylint: disable=no-member
if self._measurement_buf is None:
self._measurement_buf = bytearray(
self.heart_rate_measurement.incoming_packet_length
)
buf = self._measurement_buf
packet_length = self.heart_rate_measurement.readinto(buf)
# pylint: enable=no-member
if packet_length == 0:
return None
flags = buf[0]
next_byte = 1
if flags & 0x1:
bpm = struct.unpack_from("<H", buf, next_byte)[0]
next_byte += 2
else:
bpm = struct.unpack_from("<B", buf, next_byte)[0]
next_byte += 1
if flags & 0x4:
# True or False if Sensor Contact Feature is supported.
contact = bool(flags & 0x2)
else:
# None (meaning we don't know) if Sensor Contact Feature is not supported.
contact = None
if flags & 0x8:
energy_expended = struct.unpack_from("<H", buf, next_byte)[0]
next_byte += 2
else:
energy_expended = None
rr_values = []
if flags & 0x10:
for offset in range(next_byte, packet_length, 2):
rr_val = struct.unpack_from("<H", buf, offset)[0]
rr_values.append(rr_val)
return HeartRateMeasurementValues(bpm, contact, energy_expended, rr_values)
@property
def location(self) -> str:
"""The location of the sensor on the human body, as a string.
Note that the specification describes a limited number of locations.
But the sensor manufacturer may specify using a non-standard location.
For instance, some armbands are meant to be worn just below the inner elbow,
but that is not a prescribed location. So the sensor will report something
else, such as "Wrist".
Possible values are:
"Other", "Chest", "Wrist", "Finger", "Hand", "Ear Lobe", "Foot", and
"InvalidLocation" (if value returned does not match the specification).
"""
try:
return self._BODY_LOCATIONS[self.body_sensor_location]
except IndexError:
return "InvalidLocation"