Skip to content
Snippets Groups Projects
Verified Commit 6ba88f84 authored by Nik | Klampfradler's avatar Nik | Klampfradler
Browse files

[companion] Implement switching to device and connecting

parent 9987e5f5
No related branches found
No related tags found
No related merge requests found
Pipeline #84218 failed
import logging
from kivy.app import App
from kivy.logger import Logger
from kivy.uix.boxlayout import BoxLayout
from device import DeviceWidget
from device_list import DeviceListWidget
class InterfaceManager(BoxLayout):
def show_device_list(self):
self.clear_widgets()
self.add_widget(DeviceListWidget())
def show_device(self, address):
self.clear_widgets()
self.add_widget(DeviceWidget(address))
class MynitCompanionApp(App):
def build(self):
self.widget = DeviceListWidget()
Logger.setLevel(logging.DEBUG)
self.widget = InterfaceManager()
self.widget.show_device_list()
return self.widget
import asyncio
from kivy.logger import Logger
from kivy.uix.widget import Widget
from bleak import BleakClient
UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
UART_SERVICE_TX_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
UART_SERVICE_RX_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
class DeviceWidget(Widget):
def __init__(self, address, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task_queue = set()
self._client = BleakClient(address)
Logger.info(f"Connecting to device: {address}")
task = asyncio.create_task(self._client.connect())
self._task_queue.add(task)
task.add_done_callback(self._connected)
def _connected(self, task):
Logger.info("Connected")
self._task_queue.discard(task)
Logger.info("Starting notifications for UART")
task = asyncio.create_task(self._client.start_notify(UART_SERVICE_RX_UUID, self._handle_rx))
self._task_queue.add(task)
task.add_done_callback(self._reading)
def _reading(self, task):
Logger.debug("Reading UART data now")
self._task_queue.discard(task)
self.send_line('print("Hallo Welt")')
def _handle_rx(self, count, data):
Logger.debug(f"Received: {data}")
def send_line(self, data):
if not data.endswith("\r\n"):
data += "\r\n"
data = data.encode()
Logger.debug(f"Sending: {data}")
task = asyncio.create_task(self._client.write_gatt_char(UART_SERVICE_TX_UUID, data))
self._task_queue.add(task)
task.add_done_callback(self._task_queue.discard)
import asyncio
from kivy.app import App
from kivy.logger import Logger
from kivy.uix.button import Button
from kivy.uix.widget import Widget
......@@ -33,11 +34,12 @@ class DeviceListWidget(Widget):
def do_connect(self, address):
Logger.info(f"Connecting to device: {address}")
App.get_running_app().root.show_device(address)
def _new_device(self, device, advertisement):
Logger.info(f"New device discovered: {device}")
# FIXME Remove this filter
if device.name != "FroSCon-Wetter":
if device.name != "FrOSCon-Wetter":
return
widget = self
btn = Button(text=device.name, on_press=lambda self: widget.do_connect(device.address))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment