This commit is contained in:
Ádám Kovács
2023-11-24 23:31:40 +01:00
parent e1a6e50927
commit 46669dc268
2 changed files with 186 additions and 26 deletions

97
main.py
View File

@@ -2,20 +2,18 @@ import rp2
import network import network
import ubinascii import ubinascii
import machine import machine
import uos
from machine import Pin from machine import Pin
from time import sleep from time import sleep
import urequests as requests import urequests as requests
import time import time
import micropython_ota
from secrets import secrets from secrets import secrets
import socket from settings import settings
# Set country to avoid possible errors # Set country to avoid possible errors
rp2.country('HU') rp2.country('HU')
print('Starting up...')
wlan = network.WLAN(network.STA_IF)
# Define blinking function for onboard LED to indicate error codes # Define blinking function for onboard LED to indicate error codes
def blink_onboard_led(num_blinks): def blink_onboard_led(num_blinks):
led = machine.Pin('LED', machine.Pin.OUT) led = machine.Pin('LED', machine.Pin.OUT)
@@ -31,7 +29,7 @@ def connect_wifi():
# wlan.config(pm = 0xa11140) # wlan.config(pm = 0xa11140)
# See the MAC address in the wireless chip OTP # See the MAC address in the wireless chip OTP
mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode() mac = ubinascii.hexlify(wlan.config('mac'),':').decode()
print('mac = ' + mac) print('mac = ' + mac)
# Other things to query # Other things to query
@@ -40,18 +38,18 @@ def connect_wifi():
# print(wlan.config('txpower')) # print(wlan.config('txpower'))
# Load login data from different file for safety reasons # Load login data from different file for safety reasons
ssid = secrets['ssid']
pw = secrets['pw'] pw = secrets['pw']
wlan.connect(ssid, pw) wlan.connect(ssid, pw)
# Wait for connection with 10 second timeout # Wait for connection with 10 second timeout
timeout = 10 timeout = 20
while timeout > 0: while timeout > 0:
if wlan.status() < 0 or wlan.status() >= 3: if wlan.status() < 0 or wlan.status() >= 3:
break break
timeout -= 1 timeout -= 1
print('Waiting for connection...') print('Waiting for connection...')
blink_onboard_led(2)
time.sleep(1) time.sleep(1)
# Handle connection error # Handle connection error
# Error meanings # Error meanings
@@ -67,8 +65,10 @@ def connect_wifi():
blink_onboard_led(wlan_status) blink_onboard_led(wlan_status)
if wlan_status != 3: if wlan_status != 3:
blink_onboard_led(5)
raise RuntimeError('Wi-Fi connection failed') raise RuntimeError('Wi-Fi connection failed')
else: else:
blink_onboard_led(1)
print('Connected') print('Connected')
status = wlan.ifconfig() status = wlan.ifconfig()
print('ip = ' + status[0]) print('ip = ' + status[0])
@@ -79,13 +79,21 @@ def disconnect_wifi():
wlan.deinit() wlan.deinit()
print('Disconnected') print('Disconnected')
def send_notification(notify_url): def send_notification(title, tags):
try:
send_notification_to_server('https://ntfy.sh/' + ntfy_topic, title, tags)
except:
print('Error sending notification')
blink_onboard_led(3)
send_notification_to_server('https://ntfy.adix.link/' + ntfy_topic, title, tags)
def send_notification_to_server(notify_url, title, tags):
print('Sending notification to ' + notify_url + '...') print('Sending notification to ' + notify_url + '...')
# Send notification # Send notification
request = requests.post(notify_url, data="Csengo", headers={ request = requests.post(notify_url, data="Csengo", headers={
'Title': 'Dany Csengo', 'Title': title,
'Priority': '5', 'Priority': '5',
'X-Tags': 'bell' 'X-Tags': tags
}) })
print(request.content) print(request.content)
request.close() request.close()
@@ -95,8 +103,28 @@ def set_cpu_to_high_freq():
def set_cpu_to_low_freq(): def set_cpu_to_low_freq():
machine.freq(20000000) machine.freq(20000000)
pass
set_cpu_to_high_freq()
ota_branch = settings['ota_branch']
ota_soft_reset_device=False
ntfy_topic = settings['ntfy_topic']
print('Starting up...')
wlan = network.WLAN(network.STA_IF)
ssid = secrets['ssid']
print('WiFi network to use on action: ' + ssid)
current_version = 'unknown'
if 'version' in uos.listdir():
with open('version', 'r') as current_version_file:
current_version = current_version_file.readline().strip()
door_bell = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN) door_bell = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN)
debug_mode = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_DOWN) debug_mode = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_DOWN)
@@ -106,17 +134,23 @@ doorbell_current_state = False
debug_last_state = False debug_last_state = False
debug_current_state = False debug_current_state = False
""" led = Pin('LED', Pin.OUT)
print('Blinking LED Example')
while True:
led.value(not led.value())
sleep(0.5) """
led = Pin('LED', Pin.OUT) led = Pin('LED', Pin.OUT)
disconnect_wifi()
blink_onboard_led(2) blink_onboard_led(2)
sleep(1)
try:
connect_wifi()
print("Checking version updates...")
#micropython_ota.reset_version()
micropython_ota.ota_update('https://iot-sw.adix.link', 'doorbell', ota_branch)
send_notification(title = 'Started (sw: ' + current_version + ')', tags = 'signal_strength')
sleep(2)
disconnect_wifi()
blink_onboard_led(1)
except Exception as e:
print('Error: Could not send became-online message :(')
print(e)
blink_onboard_led(4)
print('Changing frequency and beginning loop...') print('Changing frequency and beginning loop...')
set_cpu_to_low_freq() set_cpu_to_low_freq()
while True: while True:
@@ -127,19 +161,30 @@ while True:
set_cpu_to_high_freq() set_cpu_to_high_freq()
print('Frequency is set to default') print('Frequency is set to default')
blink_onboard_led(5) blink_onboard_led(5)
debug_last_state = debug_current_state debug_last_state = debug_current_state
if doorbell_last_state == False and doorbell_current_state == True: if doorbell_last_state == False and doorbell_current_state == True:
set_cpu_to_high_freq() set_cpu_to_high_freq()
led.value(1) led.value(1)
print('Connecting to WiFi ' + ssid + '...')
connect_wifi() connect_wifi()
print('Sending notification...')
try: send_notification(title = 'Dany Csengo', tags = 'bell')
send_notification('https://ntfy.sh/adix-dany-doorbell-test')
except: time.sleep(1)
print('Error sending notification') version_changed, remote_version = micropython_ota.check_version('https://iot-sw.adix.link', 'doorbell', ota_branch)
blink_onboard_led(3) if version_changed:
send_notification('https://ntfy.adix.link/adix-dany-doorbell-test') send_notification(title = 'Updating to ' + remote_version, tags = 'new')
if ota_soft_reset_device:
print(f'Found new version {remote_version}, soft-resetting device...')
machine.soft_reset()
else:
print(f'Found new version {remote_version}, hard-resetting device...')
machine.reset()
else:
print('No new version available')
disconnect_wifi() disconnect_wifi()

115
micropython_ota.py Normal file
View File

@@ -0,0 +1,115 @@
# Based on:
# https://raw.githubusercontent.com/olivergregorius/micropython_ota/main/micropython_ota.py
import machine
import ubinascii
import uos
import urequests
import os
def reset_version():
try:
os.remove('version')
except:
pass
def check_version(host, project, branch, auth=None, timeout=5) -> (bool, str):
current_version = ''
try:
if 'version' in uos.listdir():
with open('version', 'r') as current_version_file:
current_version = current_version_file.readline().strip()
if auth:
response = urequests.get(f'{host}/{project}/version_{branch}', headers={'Authorization': f'Basic {auth}'}, timeout=timeout)
else:
response = urequests.get(f'{host}/{project}/version_{branch}', timeout=timeout)
response_status_code = response.status_code
response_text = response.text
response.close()
if response_status_code != 200:
print(f'Remote version file {host}/{project}/version_{branch} not found')
return False, current_version
remote_version = response_text.strip()
return current_version != remote_version, remote_version
except Exception as ex:
print(f'Something went wrong: {ex}')
return False, current_version
def generate_auth(user=None, passwd=None) -> str | None:
if not user and not passwd:
return None
if (user and not passwd) or (passwd and not user):
raise ValueError('Either only user or pass given. None or both are required.')
auth_bytes = ubinascii.b2a_base64(f'{user}:{passwd}'.encode())
return auth_bytes.decode().strip()
def ota_update(host, project, branch='stable', use_version_prefix=False, user=None, passwd=None, hard_reset_device=True, soft_reset_device=False, timeout=5) -> None:
all_files_found = True
auth = generate_auth(user, passwd)
prefix_or_path_separator = '_' if use_version_prefix else '/'
try:
version_changed, remote_version = check_version(host, project, branch, auth=auth, timeout=timeout)
if version_changed:
try:
uos.mkdir('tmp')
except:
pass
if auth:
response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}.files', headers={'Authorization': f'Basic {auth}'}, timeout=timeout)
else:
response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}.files', timeout=timeout)
files = str.split(response.text, '\n')
while("" in files):
files.remove("")
for filename in files:
if auth:
response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}{filename}', headers={'Authorization': f'Basic {auth}'}, timeout=timeout)
else:
response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}{filename}', timeout=timeout)
response_status_code = response.status_code
response_text = response.text
response.close()
if response_status_code != 200:
print(f'Remote source file {host}/{project}/{remote_version}{prefix_or_path_separator}{filename} not found')
all_files_found = False
continue
with open(f'tmp/{filename}', 'w') as source_file:
source_file.write(response_text)
if all_files_found:
for filename in files:
with open(f'tmp/{filename}', 'r') as source_file, open(filename, 'w') as target_file:
target_file.write(source_file.read())
uos.remove(f'tmp/{filename}')
try:
uos.rmdir('tmp')
except:
pass
with open('version', 'w') as current_version_file:
current_version_file.write(remote_version)
if soft_reset_device:
print('Soft-resetting device...')
machine.soft_reset()
if hard_reset_device:
print('Hard-resetting device...')
machine.reset()
except Exception as ex:
print(f'Something went wrong: {ex}')
def check_for_ota_update(host, project, branch='stable', user=None, passwd=None, timeout=5, soft_reset_device=False):
auth = generate_auth(user, passwd)
version_changed, remote_version = check_version(host, project, branch, auth=auth, timeout=timeout)
if version_changed:
if soft_reset_device:
print(f'Found new version {remote_version}, soft-resetting device...')
#machine.soft_reset()
else:
print(f'Found new version {remote_version}, hard-resetting device...')
#machine.reset()