import ubluetooth as bt from utime import sleep from micropython import const from ble_advertising import decode_name, advertising_payload _IRQ_CENTRAL_CONNECT = const(1 << 0) _IRQ_CENTRAL_DISCONNECT = const(1 << 1) _IRQ_GATTS_WRITE = const(1 << 2) _IRQ_GATTS_READ_REQUEST = const(1 << 3) _IRQ_SCAN_RESULT = const(1 << 4) _IRQ_SCAN_COMPLETE = const(1 << 5) _IRQ_PERIPHERAL_CONNECT = const(1 << 6) _IRQ_PERIPHERAL_DISCONNECT = const(1 << 7) _IRQ_GATTC_SERVICE_RESULT = const(1 << 8) _IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9) _IRQ_GATTC_DESCRIPTOR_RESULT = const(1 << 10) _IRQ_GATTC_READ_RESULT = const(1 << 11) _IRQ_GATTC_WRITE_STATUS = const(1 << 12) _IRQ_GATTC_NOTIFY = const(1 << 13) _IRQ_GATTC_INDICATE = const(1 << 14) _IRQ_ALL = const(0xffff) _ADV_TYPE_FLAGS = const(0x01) _ADV_TYPE_NAME = const(0x09) _ADV_TYPE_UUID16_COMPLETE = const(0x3) _ADV_TYPE_UUID32_COMPLETE = const(0x5) _ADV_TYPE_UUID128_COMPLETE = const(0x7) _ADV_TYPE_UUID16_MORE = const(0x2) _ADV_TYPE_UUID32_MORE = const(0x4) _ADV_TYPE_UUID128_MORE = const(0x6) _ADV_TYPE_APPEARANCE = const(0x19) _ADDRESS_TYPE = ["Public", "Random"] # List of services: https://www.bluetooth.com/specifications/gatt/services/ UUID_HID = bt.UUID(0x1812) UUID_BATTERY = bt.UUID(0x180F) UUID_DEVICE_INFO = bt.UUID(0x180A) # REPORT_UUID = bt.UUID(0x2A4D) # REPORT_MAP_UUID = bt.UUID(0x2A4B) # Indicate GATT characteristics # List of characteristics: https://www.bluetooth.com/specifications/gatt/characteristics/ # PROTOCOL_MODE = (bt.UUID(0x2A4E), bt.FLAG_READ | bt.FLAG_NOTIFY,) # BOOT_INPUT_DEVICE = (bt.UUID(0x2A22), bt.FLAG_READ | bt.FLAG_NOTIFY,) HID_READ = (bt.UUID(0x2A4D), bt.FLAG_READ | bt.FLAG_NOTIFY, ((bt.UUID(0x2908), 0),)) # 0x2B37 HID_WRITE = (bt.UUID(0x2A4D), bt.FLAG_WRITE,) HID_SERVICE = (UUID_HID, (HID_READ, HID_WRITE,),) BATTERY_READ = (bt.UUID(0x2A19), bt.FLAG_READ | bt.FLAG_NOTIFY,) BATTERY_SERVICE = (UUID_BATTERY, (BATTERY_READ,),) DEVICE_INFO_READ = (bt.UUID(0x2A29), bt.FLAG_READ | bt.FLAG_NOTIFY,) DEVICE_INFO_SERVICE = (UUID_DEVICE_INFO, (DEVICE_INFO_READ,),) SERVICES = (HID_SERVICE, BATTERY_SERVICE, DEVICE_INFO_SERVICE,) mac_lists = [] name_lists = [] hid_read_handle = None hid_write_handle = None hid_report_handle = None battery_read_handle = None device_info_read_handle = None def ble_irq(event, data): global ble, device_lists, hid_read_handle, hid_write_handle, hid_report_handle, battery_read_handle, device_info_read_handle if event == _IRQ_SCAN_RESULT: addr_type, addr, connectable, rssi, adv_data = data mac = get_bt_mac(addr) name = decode_name(adv_data) or "?" # print("type: {}, mac: {}, connectable: {}, rssi: {}, adv_data: {}".format(_ADDRESS_TYPE[addr_type], get_bt_mac(addr), connectable, rssi, adv_data)) if connectable: if not mac in mac_lists: mac_lists.append(mac) name_lists.append(name) # if connectable == True: # ble.gap_scan(None) # ble.gap_connect(addr_type, addr) elif event == _IRQ_SCAN_COMPLETE: print("scan result:\n{}\n{}".format(mac_lists, name_lists)) elif event == _IRQ_PERIPHERAL_CONNECT: conn_handle, addr_type, addr = data print("peripheral connected, conn_handle: {}, addr_type: {}, addr: {}".format(conn_handle, addr_type, get_bt_mac(addr))) elif event == _IRQ_PERIPHERAL_DISCONNECT: conn_handle, addr_type, addr = data print("peripheral disconnected, conn_handle: {}, addr_type: {}, addr: {}".format(conn_handle, addr_type, get_bt_mac(addr))) elif event == _IRQ_CENTRAL_CONNECT: conn_handle, addr_type, addr = data print("central connected, conn_handle: {}, addr_type: {}, addr: {}".format(conn_handle, addr_type, get_bt_mac(addr))) # ble.gatts_notify(conn_handle, read_handle) # ble.gatts_read(read_handle) # ble.gap_connect(addr_type, addr) # ble.gatts_write(write_handle, b'\x00\x00\x00\x00') # print(ble.gatts_read(read_handle)) # print(ble.gatts_read(write_handle)) elif event == _IRQ_CENTRAL_DISCONNECT: conn_handle, addr_type, addr = data print("central disconnected, conn_handle: {}, addr_type: {}, addr: {}".format(conn_handle, addr_type, get_bt_mac(addr))) start_hid_service() else: print("event: {}, data: {}".format(event, data)) def get_bt_mac(value): assert isinstance(value, bytes) and len(value) == 6, ValueError("mac address value error") return ":".join(['%02X' % byte for byte in value]) def start_bt_scan(seconds=30): global ble, device_lists print("start scaning for {} second(s)".format(seconds)) mac_lists.clear() name_lists.clear() ble.gap_scan(seconds * 1000, 30000, 30000) def bt_adv_encode(adv_type, value): return bytes((len(value) + 1, adv_type,)) + value def bt_adv_encode_name(name): return bt_adv_encode(const(0x09), name.encode()) def register_hid_services(): global hid_read_handle, hid_write_handle, hid_report_handle, battery_read_handle, device_info_read_handle ((read_handle, report_handle, write_handle), (battery_read_handle,), (device_info_read_handle,)) = ble.gatts_register_services(SERVICES) ble.gatts_write(read_handle, bytes(100)) ble.gatts_write(report_handle, bytes(100)) ble.gatts_write(write_handle, bytes(100)) ble.gatts_write(battery_read_handle, bytes(100)) ble.gatts_write(device_info_read_handle, bytes(100)) # print(ble.gatts_read(device_info_read_handle)) start_hid_service() def start_hid_service(): # Indicate GATT service name in reverse byte order in second argument of adv_encode(0x03, b'') # List of services: https://www.bluetooth.com/specifications/gatt/services/ payload = advertising_payload(name="walkline_kb", services=[UUID_HID, UUID_DEVICE_INFO], appearance=961) ble.gap_advertise(60 * 1000, payload) # ble.gap_advertise(60 * 1000, bt_adv_encode(0x01, b'\x06') + bt_adv_encode(0x03, b'\x12\x18') + bt_adv_encode(0x19, b'\xc1\x03') + bt_adv_encode_name('walkline_hd')) class BLE_HID(object): def __init__(self, ble, name="walkline_kb"): self.__ble = ble self.__name = name self.__ble.active(True) self.__ble.irq(self.__irq) def __irq(self, event, data): pass def main(): ble = bt.BLE() ble.active(True) ble.irq(ble_irq) # start_bt_scan() register_hid_services() # ble.gap_advertise(1000, adv_data='walkline_hardware') while True: sleep(0.2) if __name__ == "__main__": main() # import ubinascii # ubinascii.unhexlify(b'aabbccddeeff') # b='\x02\x01\x1a\x02\n\x0c\n\xffL\x00\x10\x05Q\x1c\x1e\x84\x88' # 2 1 26 # 2 10 12 # 10 255 76 0 16 5 81 28 30 132 136 # import bluetooth # bt = bluetooth.BLE() # bt.active(True) # UUID_HR = const(0x2A37) # _IRQ_ALL = const(0xffff) # HR_SERVICE = bluetooth.UUID(0x180D) # HR_CHAR = (bluetooth.UUID(UUID_HR), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,) # CID = None # ((HR,),) = bt.gatts_register_services(((HR_SERVICE, (HR_CHAR,),),)) # def bt_irq(event, data): # # >> bt irq 1 (64, 1, b'Y\xeb\n@uZ') # global CID # CID,EID,DATA = data # print('connected > evt:{} id:{} eid:{} data:{}'.format(event,CID,EID,DATA)) # bt.irq(bt_irq, _IRQ_ALL) # def adv_encode(adv_type, value): # return bytes((len(value) + 1, adv_type,)) + value # def adv_encode_name(name): # return adv_encode(0x09, name.encode()) # def adv(): # bt.gap_advertise(100, adv_encode(0x01, b'\x06') + adv_encode(0x03, b'\x0d\x18') + adv_encode(0x19, b'\xc1\x03') + adv_encode_name('esp32hr')) # def send(val=0): # # b'\x00\x42' # ba = bytearray(2) # ba[1]=val # bt.gatts_notify(64, HR, ba) # adv()