import _thread import gc import math import network import socket import sys import time import ubinascii from machine import I2C, Pin, ADC from secrets import secrets import sht4x.sht4x as sht4x from web_tools import result_ok, result_notfound from tools import connect_wifi, set_time, with_fallback, with_fallback_to_str from ht16k33.ht16k33segmentbig import HT16K33SegmentBig # GPIO devices led = Pin('LED', Pin.OUT) pir_sensor = Pin(2, Pin.IN, Pin.PULL_UP) photoRes = ADC(Pin(26)) def blink_onboard_led(num_blinks): for i in range(num_blinks): led.value(1) time.sleep_ms(200) led.value(0) time.sleep_ms(200) i2c = I2C(0, sda=Pin(0), scl=Pin(1)) devices = i2c.scan() if len(devices) != 0: print('Number of I2C devices found=',len(devices)) for device in devices: print("Device Hexadecimel Address=",hex(device)) else: print("No device found") display = HT16K33SegmentBig(i2c) display.clear() display.set_glyph(0b01111100, 0) display.set_glyph(0b01011100, 1) display.set_glyph(0b01011100, 2) display.set_glyph(0b01111000, 3) display.set_brightness(1) display.draw() last_brightness = 1 wlan = network.WLAN(network.STA_IF) ssid = secrets['ssid'] pw = secrets['pw'] connect_wifi(wlan, ssid, pw, blink_onboard_led) display.set_colon(0x10) display.draw() wlan_mac = wlan.config('mac') mac_readable = ubinascii.hexlify(wlan_mac).decode() sht4x = sht4x.SHT4X(i2c) update_time = False try: print('Updating time...') set_time() print('Time updated') except: print('Could not set time') update_time = True temp_sht4x = 0 humidity_sht4x = 0 enable_temp = True enable_hum = True dim_light = True enable_auto_brightness = True enable_motion_detection = False motion_timeout_ms = 18000 # Default value, it will be recalculated motion_started_ms = 0 motion_state_on = False def web_thread(): global temp_sht4x global humidity_sht4x global enable_temp global enable_hum global dim_light global enable_auto_brightness global enable_motion_detection GET_PATH_START = 4 POST_PATH_START = 5 addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] print('Binding to ' + str(addr)) serverSocket = socket.socket() serverSocket.bind(addr) serverSocket.listen(1) while True: try: #print('waiting for client') cl, addr = serverSocket.accept() #print('client connected from', addr) request = cl.recv(1024) request = str(request) request = request[2:-1] # remove b' and ' from string #print(request) sensor_lock.acquire() temperature = temp_sht4x humidity = humidity_sht4x sensor_lock.release() if request.find('/homeassistant ') == GET_PATH_START: result_ok(cl, "{" + "\"humidity\": \"" + with_fallback_to_str(humidity, "NaN") + "\"," + "\"temp\": \"" + with_fallback_to_str(temperature, "NaN")+"\""+ "}", 'application/json') elif request.find('/toggle_temp') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_temp).lower() + '"}', 'application/json') elif request.find('/toggle_hum') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_hum).lower() + '"}', 'application/json') elif request.find('/toggle_dim_light') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(dim_light).lower() + '"}', 'application/json') elif request.find('/toggle_auto_light') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_auto_brightness).lower() + '"}', 'application/json') elif request.find('/toggle_motion_detection') == GET_PATH_START: result_ok(cl, '{"is_active": "' + str(enable_motion_detection).lower() + '"}', 'application/json') elif request.find('/toggle_temp') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_temp = is_active result_ok(cl, 'ok') elif request.find('/toggle_hum') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_hum = is_active result_ok(cl, 'ok') elif request.find('/toggle_dim_light') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 dim_light = is_active result_ok(cl, 'ok') elif request.find('/toggle_auto_light') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_auto_brightness = is_active result_ok(cl, 'ok') elif request.find('/toggle_motion_detection') == POST_PATH_START: content_start_pos = request.find('\\r\\n\\r\\n') content = request[content_start_pos+8:] # HomeAssistant sends request body in a second packet if len(content) == 0: content = str(cl.recv(1024)) is_active = content.find('true') != -1 enable_motion_detection = is_active result_ok(cl, 'ok') elif request.find('/metrics') == GET_PATH_START: attrs = "{mac=\"""" + mac_readable + "\"} " content = ( """# HELP temperature Temperature in Celsius # TYPE temperature gauge temperature""" + attrs + with_fallback_to_str(temperature, "NaN") + """ # HELP humidity Relative humidity in % # TYPE humidity gauge humidity""" + attrs + with_fallback_to_str(humidity, "NaN")) result_ok(cl, content) else: result_notfound(cl) except KeyboardInterrupt: serverSocket.close() break except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) try: gc.collect() except: pass def display_thread(): global temp_sht4x global humidity_sht4x global enable_temp global enable_hum global dim_light global enable_auto_brightness global enable_motion_detection global motion_timeout_ms global motion_started_ms global motion_state_on global last_brightness global update_time global display global sensor_lock current_time = time.time_ns() last_display_mode_time = int(current_time // 1_000_000) display_mode = 0 while True: try: try: sensor_lock.acquire() temp_sht4x, humidity_sht4x = sht4x.measurements except KeyboardInterrupt: break except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) sensor_lock.release() current_time = time.time_ns() current_time_ms = int(current_time // 1_000_000) display.clear() ltime = time.localtime() hour = ltime[3] minute = ltime[4] # Light on a 0 - 65535 scale light = light_raw = photoRes.read_u16() # We want to use the minimum brightness in darker environments # So we scale the light value to a 0 - 15 scale but uses a 18 (0-17) scale with zeroes at the beggining # Because of this the lower (18 [extended range] - 16 [real range] + 1 [0 is 0 on the real range]) * 100 / 18 = 16.6% # the brightness will be 0 # We also chop of the top so instead of 18 we use a 20 scale but only subtract 3 from light = light * 19 // 65535 - 3 if light < 0: light = 0 if enable_auto_brightness: #if hour <= 7 or hour >= 20: # brighness = 1 #else: # brighness = 15 brightness = light if brightness != last_brightness: last_brightness = brightness display.set_brightness(brightness) else: if dim_light and last_brightness != 1: display.set_brightness(1) last_brightness = 1 elif not dim_light and last_brightness != 15: display.set_brightness(15) last_brightness = 15 if motion_state_on and current_time_ms >= motion_started_ms + motion_timeout_ms: motion_state_on = False if motion_state_on == False and enable_motion_detection and pir_sensor.value() == 1: motion_state_on = True motion_started_ms = current_time_ms display_mode = 0 last_display_mode_time = current_time_ms motion_timeout_ms = 6000 if enable_temp: motion_timeout_ms = motion_timeout_ms + 6000 if enable_hum: motion_timeout_ms = motion_timeout_ms + 6000 if display_mode == 0: hour_0 = int(math.floor(hour/10)) hour_1 = hour - hour_0 * 10 display.set_number(hour_0, 0) display.set_number(hour_1, 1) minute_0 = int(math.floor(minute/10)) minute_1 = minute - minute_0*10 display.set_number(minute_0, 2) display.set_number(minute_1, 3) signs = 0 signs = signs | (0x02 if time.time() % 2 == 0 else 0) # (0x02 if current_time_ms % 1000 < 500 else 0) display.set_colon(signs) elif display_mode == 1: temp_i0 = int(temp_sht4x // 10) temp_i1 = int(temp_sht4x - (temp_i0 * 10)) temp_d0 = int(temp_sht4x * 10 - temp_i0 * 100 - temp_i1 * 10) display.set_number(temp_i0, 0) display.set_number(temp_i1, 1) display.set_number(temp_d0, 2) display.set_glyph(0x39, 3) display.set_colon(0x02) elif display_mode == 2: hum_i0 = int(humidity_sht4x // 10) hum_i1 = int(humidity_sht4x - (hum_i0 * 10)) hum_d0 = int(humidity_sht4x * 10 - hum_i0 * 100 - hum_i1 * 10) display.set_number(hum_i0, 0) display.set_number(hum_i1, 1) display.set_number(hum_d0, 2) display.set_colon(0x12) try: if enable_motion_detection and not motion_state_on: display.clear() display.draw() except KeyboardInterrupt: break except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) if current_time_ms >= last_display_mode_time + 3000: last_display_mode_time = current_time_ms display_mode = display_mode + 1 if display_mode == 1 and not enable_temp: display_mode = display_mode + 1 if display_mode == 2 and not enable_hum: display_mode = display_mode + 1 if display_mode > 2: display_mode = 0 gc.collect() try: if update_time: try: set_time() update_time = False except: print('Could not set time') update_time = True except KeyboardInterrupt: break except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) except KeyboardInterrupt: break except Exception as e: print("caught exception in main loop {} {}".format(type(e).__name__, e)) sys.print_exception(e) time.sleep_ms(100) sensor_lock = _thread.allocate_lock() print('Starting Display thread') second_thread = _thread.start_new_thread(display_thread, ()) print('Starting Main loop') web_thread()