1
0
mirror of https://github.com/MarkParker5/STARK.git synced 2024-11-24 08:12:13 +02:00
STARK/SmartHome/Merlin/Merlin.py
2022-05-18 14:06:33 +02:00

66 lines
1.8 KiB
Python

from typing import List
from threading import Thread
# import RPi.GPIO as GPIO # TEST
# import spidev # TEST
from .MerlinMessage import MerlinMessage
from .lib_nrf24 import NRF24
# GPIO.setmode(GPIO.BCM) # TEST
class Merlin():
radio: NRF24
send_queue: List[MerlinMessage] = []
my_urdi = [0xf0, 0xf0, 0xf0, 0xf0, 0xe1]
def __init__(self):
return # TEST
radio = NRF24(GPIO, spidev.SpiDev())
radio.begin(0, 17)
radio.setPALevel(NRF24.PA_HIGH)
radio.setDataRate(NRF24.BR_2MBPS)
radio.setChannel(0x60)
radio.setPayloadSize(2)
radio.setAutoAck(True)
#radio.setAddressWidth(4)
radio.enableAckPayload()
radio.openReadingPipe(1, self.my_urdi)
radio.startListening()
radio.stopListening()
radio.startListening()
self.radio = radio
def send(self, message: MerlinMessage):
self.send_queue.append(message)
def _send(self, message: MerlinMessage):
print(messag.urdi, message.data, [b.to_bytes(1, 'big') for b in message.data])
return # TEST
self.radio.stopListening()
self.radio.openWritingPipe(message.urdi)
self.radio.write(message.data)
self.radio.startListening()
def receiveAndTransmit(self):
while True:
# send messages from queue
while self.send_queue and (message := self.send_queue.pop()):
self._send(message)
continue # TEST
# receiving messages
if not self.radio.available():
continue
rawData = []
self.radio.read(rawData, self.radio.getPayloadSize())
func, arg = rawData
print(f'{func=} {arg=}')
receiveAndTransmitThread = Thread(target = Merlin().receiveAndTransmit)
receiveAndTransmitThread.start()