1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/walkline-esp32-ble

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
ble_scanner.py 6.9 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
walkline Отправлено 5 лет назад c6d7136
import ubluetooth as bt
from utime import sleep
from ble_advertising import decode_name, decode_services, decode_field
from hid_services import HIDConst
import _thread
import gc
class BLE_PERIPHERALS(object):
def __init__(self):
self.__clients = []
def append(self, data):
self.__clients.append(BLE_PERIPHERAL(data))
def clear(self):
self.__clients.clear()
def find(self, addr=None, conn_handle=None):
for client in self.__clients:
if client.addr is not None and client.addr == addr or client.conn_handle is not None and client.conn_handle == conn_handle:
return client
return None
def __iter__(self):
return iter(self.__clients)
def __len__(self):
return len(self.__clients)
class BLE_SERVICES(object):
def __init__(self):
self.__services = []
def append(self, data):
self.__services.append(BLE_SERVICE(data))
class BLE_SERVICE(object):
def __init__(self, data):
conn_handle, start_handle, end_handle, uuid = data
self.__start_handle = start_handle
self.__end_handle = end_handle
self.__uuid = uuid
class BLE_PERIPHERAL(object):
def __init__(self, data):
addr_type, addr, connectable, rssi, adv_data = data
self.__addr_type = addr_type
self.__addr = bytes(addr)
self.__name = decode_name(adv_data) or self.__get_bt_mac(addr)
self.__connectable = connectable
self.__rssi = rssi
self.__adv_data = bytes(adv_data)
self.__conn_handle = None
self.__value_handle = None
self.__services = BLE_SERVICES()
@property
def adv_data(self):
return self.__adv_data
@property
def addr_type(self):
return self.__addr_type
@property
def addr(self):
return self.__addr
@property
def name(self):
return self.__name
@property
def connectable(self):
return self.__connectable
@property
def conn_handle(self):
return self.__conn_handle
@conn_handle.setter
def conn_handle(self, value):
self.__conn_handle = value
@property
def value_handle(self):
return self.__value_handle
@value_handle.setter
def value_handle(self, value):
self.__value_handle = value
def append_service(self, value):
self.__services.append(value)
def __get_bt_mac(self, value):
assert isinstance(value, bytes) and len(value) == 6, ValueError("mac address value error")
return ":".join(['%02X' % byte for byte in value])
class BLE_CENTRAL(object):
def __init__(self, ble, scan_cb=None, pconnect_cb=None, services_cb=None):
self.__ble = ble
self.__devices = BLE_PERIPHERALS()
self.__addrs = []
self.__scan_cb = scan_cb
self.__pconnect_cb = pconnect_cb
self.__service_cb = services_cb
self.__ble.active(False)
print("activating ble...")
self.__ble.active(True)
print("ble activated")
self.__ble.irq(self.__irq)
def scan(self, senconds=30):
self.__devices.clear()
self.__addrs.clear()
sleep(1)
print("scaning for {} second(s)...".format(senconds))
self.__ble.gap_scan(senconds * 1000, 50000, 50000)
def __irq(self, event, data):
if event == HIDConst.IRQ_SCAN_RESULT:
addr_type, addr, connectable, rssi, adv_data = data
if not bytes(addr) in self.__addrs:
self.__addrs.append(bytes(addr))
self.__devices.append(data)
elif event == HIDConst.IRQ_SCAN_COMPLETE:
print("scan completed, {} device(s) found".format(len(self.__devices)))
if self.__scan_cb is not None:
self.__scan_cb(self.__ble, self.__devices)
# for device in self.__devices:
# print("- [{}]: {}".format(device.name, device.adv_data))
# for device in self.__devices:
# if device.connectable and device.name == "A4:C1:38:0C:15:FD": # "6B:8C:E7:2B:C7:AD": # "5B:D2:5A:DB:CD:28": # "73:C4:57:22:84:8D": # "41:0D:27:39:37:4D": # "SCNB-DX-V104":
# print("connecting to [{}]".format(device.name))
# self.__ble.gap_connect(device.addr_type, device.addr)
elif event == HIDConst.IRQ_PERIPHERAL_CONNECT:
conn_handle, addr_type, addr = data
device = self.__devices.find(addr=bytes(addr))
# if device:
# print("[{}] connected".format(device.name))
device.conn_handle = conn_handle
if self.__pconnect_cb is not None:
self.__pconnect_cb(self.__ble, device)
# self.__ble.gattc_discover_services(conn_handle)
elif event == HIDConst.IRQ_PERIPHERAL_DISCONNECT:
conn_handle, addr_type, addr = data
device = self.__devices.find(conn_handle=conn_handle)
# if device:
print("[{}] disconnected".format(device.name))
device.conn_handle = None
elif event == HIDConst.IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
device = self.__devices.find(conn_handle=conn_handle)
if device:
print("[{}] notify: {}".format(device.name, notify_data))
device.value_handle = value_handle
elif event == HIDConst.IRQ_GATTC_INDICATE:
conn_handle, value_handle, notify_data = data
device = self.__devices.find(conn_handle=conn_handle)
if device:
print("[{}] indicated: {}".format(device.name, notify_data))
elif event == HIDConst.IRQ_GATTC_SERVICE_RESULT:
conn_handle, start_handle, end_handle, uuid = data
device = self.__devices.find(conn_handle=conn_handle)
# if device:
# print("[{}] service result: {}".format(device.name, uuid))
device.append_service(data)
if self.__service_cb is not None:
self.__service_cb(self.__ble, device.name, conn_handle, start_handle, end_handle, uuid)
# self.__ble.gattc_discover_characteristics(conn_handle, start_handle, end_handle)
elif event == HIDConst.IRQ_GATTC_CHARACTERISTIC_RESULT:
print("characteristic result:", data)
conn_handle, def_handle, value_handle, properties, uuid = data
device = self.__devices.find(conn_handle=conn_handle)
if device:
print("[{}] characteristic result: {}".format(device.name, uuid))
else:
print("event: {}, data: {}".format(event, data))
gc.collect()
def get_peripheral(self):
return self.__devices
forever_loop = True
central = None
def init_button():
global central
from machine import Pin
def button_click_cb(timer):
central.scan()
button = Pin(0, Pin.IN, Pin.PULL_UP)
button.irq(button_click_cb, Pin.IRQ_RISING)
print("button initialized")
def service_cb(ble:bt.BLE, name, conn_handle, start_handle, end_handle, uuid):
print("[{}] found service: {}".format(name, uuid))
# ble.gattc_discover_characteristics(conn_handle, start_handle, end_handle)
def pconnect_cb(ble:bt.BLE, device:BLE_PERIPHERAL):
print("[{}] connected".format(device.name))
print("[{}] discoving services...".format(device.name))
ble.gattc_discover_services(device.conn_handle)
def scan_cb(ble:bt.BLE, devices:BLE_PERIPHERALS):
for device in devices:
if device.connectable and device.name == "A4:C1:38:0C:15:FD":
print("connecting to [{}]".format(device.name))
ble.gap_connect(device.addr_type, device.addr)
def main():
global central
init_button()
ble = bt.BLE()
central = BLE_CENTRAL(ble, scan_cb, pconnect_cb, service_cb)
central.scan(senconds=30)
# while forever_loop:
# sleep(0.2)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
forever_loop = False
print("\nPRESS CTRL+D TO RESET DEVICE")

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://gitlife.ru/oschina-mirror/walkline-esp32-ble.git
git@gitlife.ru:oschina-mirror/walkline-esp32-ble.git
oschina-mirror
walkline-esp32-ble
walkline-esp32-ble
master