279 lines
8.3 KiB
Python
279 lines
8.3 KiB
Python
import rp2
|
|
import network
|
|
import ubinascii
|
|
import machine
|
|
import uos
|
|
import sys
|
|
from machine import Pin
|
|
from time import sleep
|
|
import urequests as requests
|
|
import time
|
|
import micropython_ota
|
|
from secrets import secrets
|
|
from settings import settings
|
|
|
|
# Set country to avoid possible errors
|
|
rp2.country('HU')
|
|
|
|
# Define blinking function for onboard LED to indicate error codes
|
|
def blink_onboard_led(num_blinks):
|
|
led = machine.Pin('LED', machine.Pin.OUT)
|
|
for i in range(num_blinks):
|
|
led.value(1)
|
|
time.sleep_ms(200)
|
|
led.value(0)
|
|
time.sleep_ms(200)
|
|
|
|
def connect_wifi():
|
|
print('Connecting to WiFi ' + ssid + '...')
|
|
wlan.active(True)
|
|
# If you need to disable powersaving mode
|
|
# wlan.config(pm = 0xa11140)
|
|
|
|
# See the MAC address in the wireless chip OTP
|
|
mac = ubinascii.hexlify(wlan.config('mac'),':').decode()
|
|
print('mac = ' + mac)
|
|
|
|
# Other things to query
|
|
# print(wlan.config('channel'))
|
|
# print(wlan.config('essid'))
|
|
# print(wlan.config('txpower'))
|
|
|
|
# Load login data from different file for safety reasons
|
|
pw = secrets['pw']
|
|
|
|
wlan.connect(ssid, pw)
|
|
|
|
# Wait for connection with 10 second timeout
|
|
timeout = 10
|
|
while timeout > 0:
|
|
if wlan.status() < 0 or wlan.status() >= 3:
|
|
break
|
|
timeout -= 1
|
|
print('Waiting for connection... Status is ' + str(wlan.status()))
|
|
blink_onboard_led(2)
|
|
time.sleep(1)
|
|
# Handle connection error
|
|
# Error meanings
|
|
# 0 Link Down
|
|
# 1 Link Join
|
|
# 2 Link NoIp
|
|
# 3 Link Up
|
|
# -1 Link Fail
|
|
# -2 Link NoNet
|
|
# -3 Link BadAuth
|
|
|
|
wlan_status = wlan.status()
|
|
blink_onboard_led(wlan_status)
|
|
|
|
if wlan_status != 3:
|
|
blink_onboard_led(5)
|
|
print('Wi-Fi connection failed')
|
|
return False
|
|
else:
|
|
blink_onboard_led(1)
|
|
print('Connected')
|
|
status = wlan.ifconfig()
|
|
print('ip = ' + status[0])
|
|
return True
|
|
|
|
def disconnect_wifi():
|
|
wlan.disconnect()
|
|
wlan.active(False)
|
|
wlan.deinit()
|
|
print('Disconnected')
|
|
|
|
def send_notification(title, tags, send_ha_notification = False):
|
|
try:
|
|
send_notification_to_server('https://ntfy.sh/' + ntfy_topic, title, tags)
|
|
except:
|
|
print('Error sending notification')
|
|
blink_onboard_led(3)
|
|
send_notification_to_server('https://ntfy.adix.link/' + ntfy_topic, title, tags)
|
|
|
|
try:
|
|
if send_ha_notification:
|
|
if homeassistant_url is not None and homeassistant_token is not None:
|
|
send_homeassistant_notification(homeassistant_url)
|
|
else:
|
|
print('HomeAssistant notification is on, but config is missing', homeassistant_url, homeassistant_token)
|
|
except Exception as e:
|
|
print('Error sending HA notification')
|
|
sys.print_exception(e)
|
|
|
|
def send_notification_to_server(notify_url, title, tags):
|
|
print('Sending notification to ' + notify_url + '...')
|
|
# Send notification
|
|
request = requests.post(notify_url, data="Csengo", headers={
|
|
'Title': title,
|
|
'Priority': '5',
|
|
'X-Tags': tags
|
|
})
|
|
print(request.content)
|
|
request.close()
|
|
|
|
def send_homeassistant_notification(ha_url):
|
|
print('Sending notification to HA on ' + ha_url + '...')
|
|
# Send notification
|
|
request = requests.post(ha_url + '/api/services/input_button/press', data='{"entity_id": "input_button.doorbell"}', headers={
|
|
'Authorization': 'Bearer ' + (homeassistant_token if homeassistant_token is not None else ""),
|
|
'Content-Type': 'application/json'
|
|
})
|
|
print(request.content)
|
|
request.close()
|
|
|
|
|
|
def send_health_update():
|
|
try:
|
|
health_url = healthr_url + '/health/' + healthr_service_name
|
|
print("Sending health update to '" + health_url + "...")
|
|
request = requests.post(health_url)
|
|
request.close()
|
|
except:
|
|
print('Error sending health update')
|
|
blink_onboard_led(3)
|
|
|
|
def check_version_update():
|
|
version_changed, remote_version = micropython_ota.check_version('https://iot-sw.adix.link', 'doorbell', ota_branch)
|
|
if version_changed:
|
|
send_notification(title = 'Updating to ' + remote_version, tags = 'new')
|
|
if ota_soft_reset_device:
|
|
print(f'Found new version {remote_version}, soft-resetting device...')
|
|
machine.soft_reset()
|
|
else:
|
|
print(f'Found new version {remote_version}, hard-resetting device...')
|
|
machine.reset()
|
|
else:
|
|
print('No new version available')
|
|
|
|
def set_cpu_to_high_freq():
|
|
machine.freq(125000000)
|
|
print('Frequency is set to 125 MHz')
|
|
|
|
def set_cpu_to_low_freq():
|
|
print('Frequency is set to 20 MHz')
|
|
machine.freq(20000000)
|
|
pass
|
|
|
|
|
|
set_cpu_to_high_freq()
|
|
|
|
ota_branch = settings['ota_branch']
|
|
ota_soft_reset_device=False
|
|
|
|
ntfy_topic = settings['ntfy_topic']
|
|
|
|
healthr_url = settings['healthr_url']
|
|
healthr_service_name = settings['healthr_service_name']
|
|
|
|
homeassistant_url = None
|
|
homeassistant_token = None
|
|
try:
|
|
homeassistant_url = settings['homeassistant_url']
|
|
homeassistant_token = settings['homeassistant_token']
|
|
except:
|
|
print('HomeAssistant config is missing', homeassistant_url, homeassistant_token)
|
|
|
|
|
|
|
|
print('Starting up...')
|
|
|
|
wlan = network.WLAN(network.STA_IF)
|
|
ssid = secrets['ssid']
|
|
|
|
print('WiFi network to use on action: ' + ssid)
|
|
|
|
current_version = 'unknown'
|
|
if 'version' in uos.listdir():
|
|
with open('version', 'r') as current_version_file:
|
|
current_version = current_version_file.readline().strip()
|
|
|
|
door_bell = machine.Pin(28, machine.Pin.IN, machine.Pin.PULL_DOWN)
|
|
debug_mode = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_DOWN)
|
|
|
|
doorbell_last_state = False
|
|
doorbell_current_state = False
|
|
|
|
debug_last_state = False
|
|
debug_current_state = False
|
|
|
|
led = Pin('LED', Pin.OUT)
|
|
blink_onboard_led(2)
|
|
sleep(1)
|
|
try:
|
|
if connect_wifi():
|
|
print("Checking version updates...")
|
|
#micropython_ota.reset_version()
|
|
micropython_ota.ota_update('https://iot-sw.adix.link', 'doorbell', ota_branch)
|
|
send_notification(title = 'Started (sw: ' + current_version + ')', tags = 'signal_strength')
|
|
sleep(2)
|
|
disconnect_wifi()
|
|
blink_onboard_led(1)
|
|
else:
|
|
print('Error: Could not connect to WiFi :(')
|
|
blink_onboard_led(4)
|
|
except Exception as e:
|
|
print('Error: Could not send became-online message :(')
|
|
print(e)
|
|
blink_onboard_led(4)
|
|
|
|
print('Changing frequency and beginning loop...')
|
|
set_cpu_to_low_freq()
|
|
last_update_time = time.time() - 3600;
|
|
while True:
|
|
doorbell_current_state = door_bell.value()
|
|
debug_current_state = debug_mode.value()
|
|
|
|
if debug_last_state == False and debug_current_state == True:
|
|
set_cpu_to_high_freq()
|
|
print('Frequency is set to default')
|
|
blink_onboard_led(5)
|
|
|
|
debug_last_state = debug_current_state
|
|
|
|
if last_update_time + 3000 < time.time():
|
|
try:
|
|
set_cpu_to_high_freq()
|
|
if connect_wifi():
|
|
send_health_update()
|
|
sleep(1)
|
|
check_version_update()
|
|
sleep(1)
|
|
disconnect_wifi()
|
|
blink_onboard_led(1)
|
|
last_update_time = time.time()
|
|
else:
|
|
blink_onboard_led(3)
|
|
# If we can't connect to WiFi, try again in 5 minutes (we lie that last update was 55 minutes ago)
|
|
last_update_time = time.time() - 3300
|
|
|
|
except Exception as e:
|
|
print('Error: Could not send health update')
|
|
print(e)
|
|
blink_onboard_led(4)
|
|
set_cpu_to_low_freq()
|
|
|
|
if doorbell_last_state == False and doorbell_current_state == True:
|
|
set_cpu_to_high_freq()
|
|
led.value(1)
|
|
if connect_wifi():
|
|
print('Sending notification...')
|
|
|
|
send_notification(title = 'Dany Csengo', tags = 'bell', send_ha_notification=True)
|
|
|
|
time.sleep(1)
|
|
check_version_update()
|
|
|
|
disconnect_wifi()
|
|
|
|
led.value(0)
|
|
time.sleep_ms(200)
|
|
blink_onboard_led(1)
|
|
# Sleep for 5 sec to avoid multiple triggers AND too frequent frequency change
|
|
print('Waiting 5 seconds before changing frequency...')
|
|
time.sleep(5)
|
|
set_cpu_to_low_freq()
|
|
|
|
doorbell_last_state = doorbell_current_state
|
|
time.sleep_ms(200)
|