diff --git a/main.py b/main.py index e704919..ee88ca3 100644 --- a/main.py +++ b/main.py @@ -2,20 +2,18 @@ import rp2 import network import ubinascii import machine +import uos from machine import Pin from time import sleep import urequests as requests import time +import micropython_ota from secrets import secrets -import socket +from settings import settings # Set country to avoid possible errors rp2.country('HU') -print('Starting up...') - -wlan = network.WLAN(network.STA_IF) - # Define blinking function for onboard LED to indicate error codes def blink_onboard_led(num_blinks): led = machine.Pin('LED', machine.Pin.OUT) @@ -31,7 +29,7 @@ def connect_wifi(): # wlan.config(pm = 0xa11140) # See the MAC address in the wireless chip OTP - mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode() + mac = ubinascii.hexlify(wlan.config('mac'),':').decode() print('mac = ' + mac) # Other things to query @@ -40,18 +38,18 @@ def connect_wifi(): # print(wlan.config('txpower')) # Load login data from different file for safety reasons - ssid = secrets['ssid'] pw = secrets['pw'] wlan.connect(ssid, pw) # Wait for connection with 10 second timeout - timeout = 10 + timeout = 20 while timeout > 0: if wlan.status() < 0 or wlan.status() >= 3: break timeout -= 1 print('Waiting for connection...') + blink_onboard_led(2) time.sleep(1) # Handle connection error # Error meanings @@ -67,8 +65,10 @@ def connect_wifi(): blink_onboard_led(wlan_status) if wlan_status != 3: + blink_onboard_led(5) raise RuntimeError('Wi-Fi connection failed') else: + blink_onboard_led(1) print('Connected') status = wlan.ifconfig() print('ip = ' + status[0]) @@ -79,13 +79,21 @@ def disconnect_wifi(): wlan.deinit() print('Disconnected') -def send_notification(notify_url): +def send_notification(title, tags): + try: + send_notification_to_server('https://ntfy.sh/' + ntfy_topic, title, tags) + except: + print('Error sending notification') + blink_onboard_led(3) + send_notification_to_server('https://ntfy.adix.link/' + ntfy_topic, title, tags) + +def send_notification_to_server(notify_url, title, tags): print('Sending notification to ' + notify_url + '...') # Send notification request = requests.post(notify_url, data="Csengo", headers={ - 'Title': 'Dany Csengo', + 'Title': title, 'Priority': '5', - 'X-Tags': 'bell' + 'X-Tags': tags }) print(request.content) request.close() @@ -95,7 +103,27 @@ def set_cpu_to_high_freq(): def set_cpu_to_low_freq(): machine.freq(20000000) + pass + +set_cpu_to_high_freq() + +ota_branch = settings['ota_branch'] +ota_soft_reset_device=False + +ntfy_topic = settings['ntfy_topic'] + +print('Starting up...') + +wlan = network.WLAN(network.STA_IF) +ssid = secrets['ssid'] + +print('WiFi network to use on action: ' + ssid) + +current_version = 'unknown' +if 'version' in uos.listdir(): + with open('version', 'r') as current_version_file: + current_version = current_version_file.readline().strip() door_bell = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN) debug_mode = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_DOWN) @@ -106,17 +134,23 @@ doorbell_current_state = False debug_last_state = False debug_current_state = False -""" led = Pin('LED', Pin.OUT) -print('Blinking LED Example') - -while True: - led.value(not led.value()) - sleep(0.5) """ - led = Pin('LED', Pin.OUT) -disconnect_wifi() - blink_onboard_led(2) +sleep(1) +try: + connect_wifi() + print("Checking version updates...") + #micropython_ota.reset_version() + micropython_ota.ota_update('https://iot-sw.adix.link', 'doorbell', ota_branch) + send_notification(title = 'Started (sw: ' + current_version + ')', tags = 'signal_strength') + sleep(2) + disconnect_wifi() + blink_onboard_led(1) +except Exception as e: + print('Error: Could not send became-online message :(') + print(e) + blink_onboard_led(4) + print('Changing frequency and beginning loop...') set_cpu_to_low_freq() while True: @@ -127,19 +161,30 @@ while True: set_cpu_to_high_freq() print('Frequency is set to default') blink_onboard_led(5) + debug_last_state = debug_current_state if doorbell_last_state == False and doorbell_current_state == True: set_cpu_to_high_freq() led.value(1) + print('Connecting to WiFi ' + ssid + '...') connect_wifi() + print('Sending notification...') - try: - send_notification('https://ntfy.sh/adix-dany-doorbell-test') - except: - print('Error sending notification') - blink_onboard_led(3) - send_notification('https://ntfy.adix.link/adix-dany-doorbell-test') + send_notification(title = 'Dany Csengo', tags = 'bell') + + time.sleep(1) + version_changed, remote_version = micropython_ota.check_version('https://iot-sw.adix.link', 'doorbell', ota_branch) + if version_changed: + send_notification(title = 'Updating to ' + remote_version, tags = 'new') + if ota_soft_reset_device: + print(f'Found new version {remote_version}, soft-resetting device...') + machine.soft_reset() + else: + print(f'Found new version {remote_version}, hard-resetting device...') + machine.reset() + else: + print('No new version available') disconnect_wifi() diff --git a/micropython_ota.py b/micropython_ota.py new file mode 100644 index 0000000..68dcd70 --- /dev/null +++ b/micropython_ota.py @@ -0,0 +1,115 @@ +# Based on: +# https://raw.githubusercontent.com/olivergregorius/micropython_ota/main/micropython_ota.py + +import machine +import ubinascii +import uos +import urequests +import os + +def reset_version(): + try: + os.remove('version') + except: + pass + +def check_version(host, project, branch, auth=None, timeout=5) -> (bool, str): + current_version = '' + try: + if 'version' in uos.listdir(): + with open('version', 'r') as current_version_file: + current_version = current_version_file.readline().strip() + + if auth: + response = urequests.get(f'{host}/{project}/version_{branch}', headers={'Authorization': f'Basic {auth}'}, timeout=timeout) + else: + response = urequests.get(f'{host}/{project}/version_{branch}', timeout=timeout) + response_status_code = response.status_code + response_text = response.text + response.close() + if response_status_code != 200: + print(f'Remote version file {host}/{project}/version_{branch} not found') + return False, current_version + remote_version = response_text.strip() + return current_version != remote_version, remote_version + except Exception as ex: + print(f'Something went wrong: {ex}') + return False, current_version + + +def generate_auth(user=None, passwd=None) -> str | None: + if not user and not passwd: + return None + if (user and not passwd) or (passwd and not user): + raise ValueError('Either only user or pass given. None or both are required.') + auth_bytes = ubinascii.b2a_base64(f'{user}:{passwd}'.encode()) + return auth_bytes.decode().strip() + + +def ota_update(host, project, branch='stable', use_version_prefix=False, user=None, passwd=None, hard_reset_device=True, soft_reset_device=False, timeout=5) -> None: + all_files_found = True + auth = generate_auth(user, passwd) + prefix_or_path_separator = '_' if use_version_prefix else '/' + try: + version_changed, remote_version = check_version(host, project, branch, auth=auth, timeout=timeout) + if version_changed: + try: + uos.mkdir('tmp') + except: + pass + + if auth: + response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}.files', headers={'Authorization': f'Basic {auth}'}, timeout=timeout) + else: + response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}.files', timeout=timeout) + files = str.split(response.text, '\n') + + while("" in files): + files.remove("") + + for filename in files: + + if auth: + response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}{filename}', headers={'Authorization': f'Basic {auth}'}, timeout=timeout) + else: + response = urequests.get(f'{host}/{project}/{remote_version}{prefix_or_path_separator}{filename}', timeout=timeout) + response_status_code = response.status_code + response_text = response.text + response.close() + if response_status_code != 200: + print(f'Remote source file {host}/{project}/{remote_version}{prefix_or_path_separator}{filename} not found') + all_files_found = False + continue + with open(f'tmp/{filename}', 'w') as source_file: + source_file.write(response_text) + if all_files_found: + for filename in files: + with open(f'tmp/{filename}', 'r') as source_file, open(filename, 'w') as target_file: + target_file.write(source_file.read()) + uos.remove(f'tmp/{filename}') + try: + uos.rmdir('tmp') + except: + pass + with open('version', 'w') as current_version_file: + current_version_file.write(remote_version) + if soft_reset_device: + print('Soft-resetting device...') + machine.soft_reset() + if hard_reset_device: + print('Hard-resetting device...') + machine.reset() + except Exception as ex: + print(f'Something went wrong: {ex}') + + +def check_for_ota_update(host, project, branch='stable', user=None, passwd=None, timeout=5, soft_reset_device=False): + auth = generate_auth(user, passwd) + version_changed, remote_version = check_version(host, project, branch, auth=auth, timeout=timeout) + if version_changed: + if soft_reset_device: + print(f'Found new version {remote_version}, soft-resetting device...') + #machine.soft_reset() + else: + print(f'Found new version {remote_version}, hard-resetting device...') + #machine.reset()