Files
PiClock/main.py
2024-02-05 10:22:18 +01:00

500 lines
17 KiB
Python

import _thread
import gc
import math
import network
import socket
import sys
import time
import ubinascii
from machine import I2C, Pin, ADC
from secrets import secrets
import sht4x.sht4x as sht4x
from web_tools import result_ok, result_notfound
from tools import connect_wifi, disconnect_wifi, set_time, with_fallback_to_str
from ht16k33.ht16k33segmentbig import HT16K33SegmentBig
# GPIO devices
led = Pin('LED', Pin.OUT)
pir_sensor = Pin(2, Pin.IN, Pin.PULL_UP)
photoRes = ADC(Pin(26))
def blink_onboard_led(num_blinks):
for i in range(num_blinks):
led.value(1)
time.sleep_ms(200)
led.value(0)
time.sleep_ms(200)
i2c = I2C(0, sda=Pin(0), scl=Pin(1))
devices = i2c.scan()
if len(devices) != 0:
print('Number of I2C devices found=',len(devices))
for device in devices:
print("Device Hexadecimel Address=",hex(device))
else:
print("No device found")
display = HT16K33SegmentBig(i2c)
display.clear()
display.set_glyph(0b01111100, 0)
display.set_glyph(0b01011100, 1)
display.set_glyph(0b01011100, 2)
display.set_glyph(0b01111000, 3)
display.set_brightness(1)
display.draw()
last_brightness = 1
wlan = network.WLAN(network.STA_IF)
ssid = secrets['ssid']
pw = secrets['pw']
connect_wifi(wlan, ssid, pw, blink_onboard_led)
display.set_colon(0x10)
display.draw()
wlan_mac = wlan.config('mac')
mac_readable = ubinascii.hexlify(wlan_mac).decode()
if devices.count(0x44) > 0:
sensor_sht4x = sht4x.SHT4X(i2c)
else:
sensor_sht4x = None
update_time = False
try:
print('Updating time...')
set_time()
print('Time updated')
except:
print('Could not set time')
update_time = True
temp_sht4x = 0
humidity_sht4x = 0
enable_temp = True
enable_hum = True
dim_light = True
enable_auto_brightness = True
enable_motion_detection = False
motion_timeout_ms = 18000 # Default value, it will be recalculated
motion_started_ms = 0
motion_state_on = False
light_raw = 0
light = 0
running = True
def web_thread():
global temp_sht4x
global humidity_sht4x
global enable_temp
global enable_hum
global dim_light
global enable_auto_brightness
global enable_motion_detection
global wlan
global running
GET_PATH_START = 4
POST_PATH_START = 5
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
print('Binding to ' + str(addr))
serverSocket = socket.socket()
serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serverSocket.bind(addr)
serverSocket.listen()
last_wlan_status = wlan.status()
last_wlan_isconnected = wlan.isconnected()
while running:
try:
reconnect_wifi = False
wlan_status = wlan.status()
wlan_is_connected = wlan.isconnected()
if wlan_status != last_wlan_status or wlan_is_connected != last_wlan_isconnected:
print("WLAN status changed " + str(wlan_status) + " " + str(wlan_is_connected))
if wlan_status != last_wlan_status:
print('WLAN status changed to ' + str(last_wlan_status))
if wlan_status < 0 or wlan_status > 3:
reconnect_wifi = True
if last_wlan_status == network.STAT_IDLE:
reconnect_wifi = True
if not wlan_is_connected:
reconnect_wifi = True
last_wlan_isconnected = wlan_is_connected
last_wlan_status = wlan_status
if reconnect_wifi:
print('Disconnecting from WiFi')
disconnect_wifi(wlan)
print('Reconnecting to WiFi')
while not connect_wifi(wlan, ssid, pw, blink_onboard_led):
print('Could not connect to WiFi, retrying in 5 seconds')
time.sleep(10)
print('Reconnected to WiFi')
except Exception as e:
print("caught exception in web loop in wlan checking {} {}".format(type(e).__name__, e))
sys.print_exception(e)
time.sleep_ms(100)
try:
cl, addr = serverSocket.accept()
#print('client connected from', addr)
request = cl.recv(1024)
request = str(request)
request = request[2:-1] # remove b' and ' from string
#print(request)
sensor_lock.acquire()
temperature = temp_sht4x
humidity = humidity_sht4x
sensor_lock.release()
if request.find('/homeassistant ') == GET_PATH_START:
result_ok(cl,
"{" +
"\"humidity\": \"" + with_fallback_to_str(humidity, "NaN") + "\"," +
"\"temp\": \"" + with_fallback_to_str(temperature, "NaN")+"\","+
"\"motion\": \"" + with_fallback_to_str(motion_state_on, "NaN")+"\","+
"\"light_raw\": \"" + with_fallback_to_str(light_raw, "NaN")+"\","+
"\"light\": \"" + with_fallback_to_str(light, "NaN")+"\""+
"}",
'application/json')
elif request.find('/toggle_temp') == GET_PATH_START:
result_ok(cl, '{"is_active": "' + str(enable_temp).lower() + '"}', 'application/json')
elif request.find('/toggle_hum') == GET_PATH_START:
result_ok(cl, '{"is_active": "' + str(enable_hum).lower() + '"}', 'application/json')
elif request.find('/toggle_dim_light') == GET_PATH_START:
result_ok(cl, '{"is_active": "' + str(dim_light).lower() + '"}', 'application/json')
elif request.find('/toggle_auto_light') == GET_PATH_START:
result_ok(cl, '{"is_active": "' + str(enable_auto_brightness).lower() + '"}', 'application/json')
elif request.find('/toggle_motion_detection') == GET_PATH_START:
result_ok(cl, '{"is_active": "' + str(enable_motion_detection).lower() + '"}', 'application/json')
elif request.find('/toggle_temp') == POST_PATH_START:
content_start_pos = request.find('\\r\\n\\r\\n')
content = request[content_start_pos+8:]
# HomeAssistant sends request body in a second packet
if len(content) == 0:
content = str(cl.recv(1024))
is_active = content.find('true') != -1
enable_temp = is_active
result_ok(cl, 'ok')
elif request.find('/toggle_hum') == POST_PATH_START:
content_start_pos = request.find('\\r\\n\\r\\n')
content = request[content_start_pos+8:]
# HomeAssistant sends request body in a second packet
if len(content) == 0:
content = str(cl.recv(1024))
is_active = content.find('true') != -1
enable_hum = is_active
result_ok(cl, 'ok')
elif request.find('/toggle_dim_light') == POST_PATH_START:
content_start_pos = request.find('\\r\\n\\r\\n')
content = request[content_start_pos+8:]
# HomeAssistant sends request body in a second packet
if len(content) == 0:
content = str(cl.recv(1024))
is_active = content.find('true') != -1
dim_light = is_active
result_ok(cl, 'ok')
elif request.find('/toggle_auto_light') == POST_PATH_START:
content_start_pos = request.find('\\r\\n\\r\\n')
content = request[content_start_pos+8:]
# HomeAssistant sends request body in a second packet
if len(content) == 0:
content = str(cl.recv(1024))
is_active = content.find('true') != -1
enable_auto_brightness = is_active
result_ok(cl, 'ok')
elif request.find('/toggle_motion_detection') == POST_PATH_START:
content_start_pos = request.find('\\r\\n\\r\\n')
content = request[content_start_pos+8:]
# HomeAssistant sends request body in a second packet
if len(content) == 0:
content = str(cl.recv(1024))
is_active = content.find('true') != -1
enable_motion_detection = is_active
result_ok(cl, 'ok')
elif request.find('/metrics') == GET_PATH_START:
attrs = "{mac=\"""" + mac_readable + "\"} "
content = (
"""# HELP temperature Temperature in Celsius
# TYPE temperature gauge
temperature""" + attrs + with_fallback_to_str(temperature, "NaN") +
"""
# HELP humidity Relative humidity in %
# TYPE humidity gauge
humidity""" + attrs + with_fallback_to_str(humidity, "NaN") +
"""
# HELP light Normalized light level
# TYPE light gauge
light""" + attrs + with_fallback_to_str(light, "NaN") +
"""
# HELP light_raw Raw light resistance value 0-65535
# TYPE light_raw gauge
light_raw""" + attrs + with_fallback_to_str(light_raw, "NaN"))
result_ok(cl, content)
else:
result_notfound(cl)
except KeyboardInterrupt:
print('Closing web socket due to KeyboardInterrupt')
running = False
break
except Exception as e:
print("caught exception in web loop {} {}".format(type(e).__name__, e))
sys.print_exception(e)
time.sleep_ms(100)
try:
gc_lock.acquire()
gc.collect()
finally:
gc_lock.release()
print('Web thread exiting...')
serverSocket.close()
disconnect_wifi(wlan)
print('Web thread exited')
def display_thread():
global temp_sht4x
global humidity_sht4x
global light_raw
global light
global enable_temp
global enable_hum
global dim_light
global enable_auto_brightness
global enable_motion_detection
global motion_timeout_ms
global motion_started_ms
global motion_state_on
global last_brightness
global update_time
global display
global sensor_lock
global gc_lock
global running
current_time = time.time_ns()
current_time_ms = int(current_time // 1_000_000)
last_display_mode_time = current_time_ms
display_mode = 0
sensor_update_delta = 3000
last_sensor_update_time = current_time_ms - sensor_update_delta - 1000
sensor_lock_acquired = False
while running:
try:
try:
if current_time_ms >= last_sensor_update_time + sensor_update_delta:
sensor_lock_acquired = True
sensor_lock.acquire()
temp_sht4x, humidity_sht4x = sensor_sht4x.measurements if sensor_sht4x is not None else (0.0, 0.0)
last_sensor_update_time = current_time_ms
except Exception as e:
print("caught exception in main loop {} {}".format(type(e).__name__, e))
sys.print_exception(e)
if sensor_lock_acquired:
sensor_lock.release()
sensor_lock_acquired = False
current_time = time.time_ns()
current_time_ms = int(current_time // 1_000_000)
display.clear()
ltime = time.localtime()
hour = ltime[3]
minute = ltime[4]
# Light on a 0 - 65535 scale
light_raw = photoRes.read_u16()
# We want to use the minimum brightness in darker environments
# So we scale the light value to a 0 - 15 scale but uses a 18 (0-17) scale with zeroes at the beggining
# Because of this the lower (18 [extended range] - 16 [real range] + 1 [0 is 0 on the real range]) * 100 / 18 = 16.6%
# the brightness will be 0
# We also chop of the top so instead of 18 we use a 20 scale but only subtract 3 from
light = light_raw * 19 // 65535 - 3
if light < 0:
light = 0
if enable_auto_brightness:
brightness = light
if brightness != last_brightness:
last_brightness = brightness
display.set_brightness(brightness)
else:
if dim_light and last_brightness != 1:
display.set_brightness(1)
last_brightness = 1
elif not dim_light and last_brightness != 15:
display.set_brightness(15)
last_brightness = 15
movement = True if pir_sensor.value() == 1 else False
if motion_state_on and current_time_ms >= motion_started_ms + motion_timeout_ms:
motion_state_on = False
if motion_state_on == False and enable_motion_detection and movement:
motion_state_on = True
motion_started_ms = current_time_ms
display_mode = 0
last_display_mode_time = current_time_ms
motion_timeout_ms = 6000
if enable_temp:
motion_timeout_ms = motion_timeout_ms + 6000
if enable_hum:
motion_timeout_ms = motion_timeout_ms + 6000
if display_mode == 0:
hour_0 = int(math.floor(hour/10))
hour_1 = hour - hour_0 * 10
display.set_number(hour_0, 0)
display.set_number(hour_1, 1)
minute_0 = int(math.floor(minute/10))
minute_1 = minute - minute_0*10
display.set_number(minute_0, 2)
display.set_number(minute_1, 3)
signs = 0
signs = signs | (0x02 if time.time() % 2 == 0 else 0) # (0x02 if current_time_ms % 1000 < 500 else 0)
display.set_colon(signs)
elif display_mode == 1:
temp_i0 = int(temp_sht4x // 10)
temp_i1 = int(temp_sht4x - (temp_i0 * 10))
temp_d0 = int(temp_sht4x * 10 - temp_i0 * 100 - temp_i1 * 10)
display.set_number(temp_i0, 0)
display.set_number(temp_i1, 1)
display.set_number(temp_d0, 2)
display.set_glyph(0x39, 3)
display.set_colon(0x02)
elif display_mode == 2:
hum_i0 = int(humidity_sht4x // 10)
hum_i1 = int(humidity_sht4x - (hum_i0 * 10))
hum_d0 = int(humidity_sht4x * 10 - hum_i0 * 100 - hum_i1 * 10)
display.set_number(hum_i0, 0)
display.set_number(hum_i1, 1)
display.set_number(hum_d0, 2)
display.set_colon(0x12)
try:
if enable_motion_detection and not motion_state_on:
display.clear()
display.draw()
except Exception as e:
print("caught exception in main loop {} {}".format(type(e).__name__, e))
sys.print_exception(e)
if current_time_ms >= last_display_mode_time + 3000:
last_display_mode_time = current_time_ms
display_mode = display_mode + 1
if display_mode == 1 and not enable_temp:
display_mode = display_mode + 1
if display_mode == 2 and not enable_hum:
display_mode = display_mode + 1
if display_mode > 2:
display_mode = 0
#print(wlan.isconnected(), wlan.status(), wlan.ifconfig())
try:
gc_lock.acquire()
gc.collect()
finally:
gc_lock.release()
try:
if update_time:
try:
set_time()
motion_state_on = False
update_time = False
current_time = time.time_ns()
current_time_ms = int(current_time // 1_000_000)
last_display_mode_time = current_time_ms
except:
print('Could not set time')
update_time = True
except Exception as e:
print("caught exception in main loop {} {}".format(type(e).__name__, e))
sys.print_exception(e)
time.sleep_ms(100)
except KeyboardInterrupt:
running = False
break
except Exception as e:
print("caught exception in main loop {} {}".format(type(e).__name__, e))
sys.print_exception(e)
time.sleep_ms(100)
print('Display thread exited...')
sensor_lock = _thread.allocate_lock()
gc_lock = _thread.allocate_lock()
print('Starting Display thread')
second_thread = _thread.start_new_thread(display_thread, ())
print('Starting Main loop')
web_thread()