import _thread import gc import math import network import socket import sys import time import ubinascii from machine import I2C, Pin, ADC from secrets import secrets import sht4x.sht4x as sht4x from web_tools import result_ok, result_notfound from tools import connect_wifi, disconnect_wifi, set_time, with_fallback_to_str from ht16k33.ht16k33segmentbig import HT16K33SegmentBig # GPIO devices led = Pin('LED', Pin.OUT) pir_sensor = Pin(2, Pin.IN, Pin.PULL_UP) photoRes = ADC(Pin(26)) def blink_onboard_led(num_blinks): for i in range(num_blinks): led.value(1) time.sleep_ms(200) led.value(0) time.sleep_ms(200) i2c = I2C(0, sda=Pin(0), scl=Pin(1)) devices = i2c.scan() if len(devices) != 0: print('Number of I2C devices found=',len(devices)) for device in devices: print("Device Hexadecimel Address=",hex(device)) else: print("No device found") display = HT16K33SegmentBig(i2c) display.clear() display.set_glyph(0b01111100, 0) display.set_glyph(0b01011100, 1) display.set_glyph(0b01011100, 2) display.set_glyph(0b01111000, 3) display.set_brightness(1) display.draw() last_brightness = 1 wlan = network.WLAN(network.STA_IF) ssid = secrets['ssid'] pw = secrets['pw'] connect_wifi(wlan, ssid, pw, blink_onboard_led) display.set_colon(0x10) display.draw() wlan_mac = wlan.config('mac') mac_readable = ubinascii.hexlify(wlan_mac).decode() if devices.count(0x44) > 0: sensor_sht4x = sht4x.SHT4X(i2c) else: sensor_sht4x = None update_time = False try: print('Updating time...') set_time() print('Time updated') except: print('Could not set time') update_time = True temp_sht4x = 0 humidity_sht4x = 0 enable_temp = True enable_hum = True dim_light = True enable_auto_brightness = True enable_motion_detection = False motion_timeout_ms = 18000 # Default value, it will be recalculated motion_started_ms = 0 motion_state_on = False running = True def web_thread(): global temp_sht4x global humidity_sht4x global enable_temp global enable_hum global dim_light global enable_auto_brightness global enable_motion_detection global wlan global running GET_PATH_START = 4 POST_PATH_START = 5 addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] print('Binding to ' + str(addr)) serverSocket = socket.socket() serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serverSocket.bind(addr) serverSocket.listen() last_wlan_status = wlan.status() while running: try: reconnect_wifi = False if wlan.status() != last_wlan_status: last_wlan_status = wlan.status() print('WLAN status changed to ' + str(last_wlan_status)) if wlan.status() < 0 or wlan.status() > 3: reconnect_wifi = True if last_wlan_status == network.STAT_IDLE: reconnect_wifi = True if not wlan.isconnected(): reconnect_wifi = True if reconnect_wifi: print('Disconnecting from WiFi') disconnect_wifi(wlan) print('Reconnecting to WiFi') while not connect_wifi(wlan, ssid, pw, blink_onboard_led): print('Could not connect to WiFi, retrying in 5 seconds') time.sleep(5) print('Reconnected to WiFi') except Exception as e: print("caught exception in web loop in wlan checking {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) try: cl, addr = serverSocket.accept() #print('client connected from', addr) request = cl.recv(1024) request = str(request) request = request[2:-1] # remove b' and ' from string #print(request) sensor_lock.acquire() temperature = temp_sht4x humidity = humidity_sht4x sensor_lock.release() if request.find('/homeassistant ') == GET_PATH_START: result_ok(cl, "{" + "\"humidity\": \"" + with_fallback_to_str(humidity, "NaN") + "\"," + "\"temp\": \"" + with_fallback_to_str(temperature, "NaN")+"\""+ "}", 'application/json') elif request.find('/toggle_temp') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_temp).lower() + '"}', 'application/json') elif request.find('/toggle_hum') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_hum).lower() + '"}', 'application/json') elif request.find('/toggle_dim_light') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(dim_light).lower() + '"}', 'application/json') elif request.find('/toggle_auto_light') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_auto_brightness).lower() + '"}', 'application/json') elif request.find('/toggle_motion_detection') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_motion_detection).lower() + '"}', 'application/json') elif request.find('/toggle_temp') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_temp = is_active result_ok(cl, 'ok') elif request.find('/toggle_hum') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_hum = is_active result_ok(cl, 'ok') elif request.find('/toggle_dim_light') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 dim_light = is_active result_ok(cl, 'ok') elif request.find('/toggle_auto_light') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_auto_brightness = is_active result_ok(cl, 'ok') elif request.find('/toggle_motion_detection') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_motion_detection = is_active result_ok(cl, 'ok') elif request.find('/metrics') == GET_PATH_START: attrs = "{mac=\"""" + mac_readable + "\"} " content = ( """# HELP temperature Temperature in Celsius # TYPE temperature gauge temperature""" + attrs + with_fallback_to_str(temperature, "NaN") + """ # HELP humidity Relative humidity in % # TYPE humidity gauge humidity""" + attrs + with_fallback_to_str(humidity, "NaN")) result_ok(cl, content) else: result_notfound(cl) except KeyboardInterrupt: print('Closing web socket due to KeyboardInterrupt') running = False break except Exception as e: print("caught exception in web loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) try: gc_lock.acquire() gc.collect() finally: gc_lock.release() print('Web thread exiting...') serverSocket.close() disconnect_wifi(wlan) print('Web thread exited') def display_thread(): global temp_sht4x global humidity_sht4x global enable_temp global enable_hum global dim_light global enable_auto_brightness global enable_motion_detection global motion_timeout_ms global motion_started_ms global motion_state_on global last_brightness global update_time global display global sensor_lock global gc_lock global running current_time = time.time_ns() current_time_ms = int(current_time // 1_000_000) last_display_mode_time = current_time_ms display_mode = 0 sensor_update_delta = 3000 last_sensor_update_time = current_time_ms - sensor_update_delta - 1000 while running: try: sensor_lock_acquired = False try: if current_time_ms >= last_sensor_update_time + sensor_update_delta: sensor_lock_acquired = True sensor_lock.acquire() temp_sht4x, humidity_sht4x = sensor_sht4x.measurements if sensor_sht4x is not None else (0.0, 0.0) last_sensor_update_time = current_time_ms except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) if sensor_lock_acquired: sensor_lock.release() sensor_lock_acquired = False current_time = time.time_ns() current_time_ms = int(current_time // 1_000_000) display.clear() ltime = time.localtime() hour = ltime[3] minute = ltime[4] # Light on a 0 - 65535 scale light = photoRes.read_u16() # We want to use the minimum brightness in darker environments # So we scale the light value to a 0 - 15 scale but uses a 18 (0-17) scale with zeroes at the beggining # Because of this the lower (18 [extended range] - 16 [real range] + 1 [0 is 0 on the real range]) * 100 / 18 = 16.6% # the brightness will be 0 # We also chop of the top so instead of 18 we use a 20 scale but only subtract 3 from light = light * 19 // 65535 - 3 if light < 0: light = 0 if enable_auto_brightness: brightness = light if brightness != last_brightness: last_brightness = brightness display.set_brightness(brightness) else: if dim_light and last_brightness != 1: display.set_brightness(1) last_brightness = 1 elif not dim_light and last_brightness != 15: display.set_brightness(15) last_brightness = 15 if motion_state_on and current_time_ms >= motion_started_ms + motion_timeout_ms: motion_state_on = False if motion_state_on == False and enable_motion_detection and pir_sensor.value() == 1: motion_state_on = True motion_started_ms = current_time_ms display_mode = 0 last_display_mode_time = current_time_ms motion_timeout_ms = 6000 if enable_temp: motion_timeout_ms = motion_timeout_ms + 6000 if enable_hum: motion_timeout_ms = motion_timeout_ms + 6000 if display_mode == 0: hour_0 = int(math.floor(hour/10)) hour_1 = hour - hour_0 * 10 display.set_number(hour_0, 0) display.set_number(hour_1, 1) minute_0 = int(math.floor(minute/10)) minute_1 = minute - minute_0*10 display.set_number(minute_0, 2) display.set_number(minute_1, 3) signs = 0 signs = signs | (0x02 if time.time() % 2 == 0 else 0) # (0x02 if current_time_ms % 1000 < 500 else 0) display.set_colon(signs) elif display_mode == 1: temp_i0 = int(temp_sht4x // 10) temp_i1 = int(temp_sht4x - (temp_i0 * 10)) temp_d0 = int(temp_sht4x * 10 - temp_i0 * 100 - temp_i1 * 10) display.set_number(temp_i0, 0) display.set_number(temp_i1, 1) display.set_number(temp_d0, 2) display.set_glyph(0x39, 3) display.set_colon(0x02) elif display_mode == 2: hum_i0 = int(humidity_sht4x // 10) hum_i1 = int(humidity_sht4x - (hum_i0 * 10)) hum_d0 = int(humidity_sht4x * 10 - hum_i0 * 100 - hum_i1 * 10) display.set_number(hum_i0, 0) display.set_number(hum_i1, 1) display.set_number(hum_d0, 2) display.set_colon(0x12) try: if enable_motion_detection and not motion_state_on: display.clear() display.draw() except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) if current_time_ms >= last_display_mode_time + 3000: last_display_mode_time = current_time_ms display_mode = display_mode + 1 if display_mode == 1 and not enable_temp: display_mode = display_mode + 1 if display_mode == 2 and not enable_hum: display_mode = display_mode + 1 if display_mode > 2: display_mode = 0 #print(wlan.isconnected(), wlan.status(), wlan.ifconfig()) try: gc_lock.acquire() gc.collect() finally: gc_lock.release() try: if update_time: try: set_time() motion_state_on = False update_time = False current_time = time.time_ns() current_time_ms = int(current_time // 1_000_000) last_display_mode_time = current_time_ms except: print('Could not set time') update_time = True except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) except KeyboardInterrupt: running = False break except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) print('Display thread exited...') sensor_lock = _thread.allocate_lock() gc_lock = _thread.allocate_lock() print('Starting Display thread') second_thread = _thread.start_new_thread(display_thread, ()) print('Starting Main loop') web_thread()