import time import network import socket import ubinascii from machine import Pin, I2C from mcp9808.mcp9808 import RESOLUTION_0_625_C import mcp9808.mcp9808 as mcp9808 import sht4x.sht4x as sht4x import bmp390.bmp390 as bmp390 from sensor_pack.bus_service import I2cAdapter from secrets import secrets GET_PATH_START = 4 POST_PATH_START = 5 def pa_mmhg(value: float) -> float: """Convert air pressure from Pa to mm Hg""" return value*7.50062E-3 def with_fallback(value, fallback): if value is None: return fallback else: return value def with_fallback_to_str(value, fallback: str) -> str: if value is None: return fallback else: return str(value) led = Pin('LED', Pin.OUT) def blink_onboard_led(num_blinks): for i in range(num_blinks): led.value(1) time.sleep_ms(200) led.value(0) time.sleep_ms(200) blink_onboard_led(1) sdaPIN=Pin(20) sclPIN=Pin(21) i2c=I2C(0,sda=sdaPIN, scl=sclPIN, freq=200000) devices = i2c.scan() if len(devices) != 0: print('Number of I2C devices found=',len(devices)) for device in devices: print("Device Hexadecimel Address=",hex(device)) else: print("No device found") ssid = secrets['ssid'] password = secrets['pw'] print('Connecting to WiFi network: ' + ssid) wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(ssid, password) max_wait = 10 while max_wait > 0: if wlan.status() < 0 or wlan.status() >= 3: break max_wait -= 1 print('waiting for connection...') time.sleep(1) ip = "???" if wlan.status() != 3: blink_onboard_led(3) raise RuntimeError('network connection failed') else: print('connected') status = wlan.ifconfig() ip = status[0] print( 'ip = ' + ip ) wlan_mac = wlan.config('mac') mac_readable = ubinascii.hexlify(wlan_mac).decode() print('Mac address is ' + mac_readable) blink_onboard_led(2) addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] serverSocket = socket.socket() serverSocket.bind(addr) serverSocket.listen(1) print('listening on', addr) def result_ok(cl, response = None, content_type = "text/plain"): cl.send('HTTP/1.0 200 OK\r\nContent-type: ' + content_type + '\r\n\r\n') cl.send(response if response is not None else "Ok") cl.close() def result_notfound(cl, response = None): cl.send('HTTP/1.0 404 NotFound\r\nContent-type: text/plain\r\n\r\n') cl.send(response if response is not None else "Not Found") cl.close() if devices.count(0x18) > 0: mcp9808 = mcp9808.MCP9808(i2c) else: mcp9808 = None if devices.count(0x44) > 0: sht4x = sht4x.SHT4X(i2c) else: sht4x = None if devices.count(0x77) > 0: adaptor = I2cAdapter(i2c) bmp390 = bmp390.Bmp390(adaptor) calibration_data = [bmp390.get_calibration_data(index) for index in range(0, 14)] print(f"Calibration data: {calibration_data}") # bmp390.set_oversampling(2, 3) bmp390.set_sampling_period(5) bmp390.set_iir_filter(2) # bmp390.start_measurement(True, True, 2) else: bmp390 = None try: while True: print('waiting for client') cl, addr = serverSocket.accept() try: print('client connected from', addr) request = cl.recv(1024) request = str(request) request = request[2:-1] # remove b' and ' from string print(request) temp_mcp9808 = str(mcp9808.temperature) if mcp9808 is not None else None temp_sht4x, humidity_sht4x = sht4x.measurements if sht4x is not None else (None,None) temp_bmp390, pressure_bmp390, time_bmp390 = None, None, None if bmp390 is not None: #bmp390.start_measurement(True, True, 1) s = bmp390.get_status() while not s[2] or not s[1]: time.sleep_ms(10) s = bmp390.get_status() temp_bmp390, pressure_bmp390, time_bmp390 = bmp390.get_temperature(), bmp390.get_pressure(), bmp390.get_sensor_time() if request.find('/ ') == GET_PATH_START: result_ok(cl, "{" + "\"temp_mcp9808\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+ "\"temp_sht4x\": \"" + with_fallback_to_str(temp_sht4x, "NaN")+"\","+ "\"humidity_sht4x\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," + "\"temp_bmp390\": \"" + with_fallback_to_str(temp_bmp390, "NaN") + "\"," + "\"pressure_bmp390\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," + "\"time_bmp390\": \"" + with_fallback_to_str(time_bmp390, "NaN") + "\"" + "}", 'application/json') if request.find('/homeassistant ') == GET_PATH_START: result_ok(cl, "{" + "\"temperature\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+ "\"humidity\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," + "\"pressure\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," + "\"temp_mcp9808\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+ "\"temp_sht4x\": \"" + with_fallback_to_str(temp_sht4x, "NaN")+"\","+ "\"humidity_sht4x\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," + "\"temp_bmp390\": \"" + with_fallback_to_str(temp_bmp390, "NaN") + "\"," + "\"pressure_bmp390\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," + "\"time_bmp390\": \"" + with_fallback_to_str(time_bmp390, "NaN") + "\"" + "}", 'application/json') elif request.find('/prometheus') == GET_PATH_START: attrs = "{mac=\"""" + mac_readable + "\",ip=\""+ ip +"\"} " content = ( """# HELP temperature Temperature in Celsius # TYPE temperature gauge temperature""" + attrs + with_fallback_to_str(temp_mcp9808, "NaN") + """ # HELP humidity Relative humidity in % # TYPE humidity gauge humidity""" + attrs + with_fallback_to_str(humidity_sht4x, "NaN") + """ # HELP pressure Pressure in Pa # TYPE pressure gauge pressure""" + attrs + with_fallback_to_str(pressure_bmp390, "NaN") + """ # HELP temp_mcp9808 Temperature in Celsius # TYPE temp_mcp9808 gauge temp_mcp9808""" + attrs + with_fallback_to_str(temp_mcp9808, "NaN") + """ # HELP humidity_sht4x Relative humidity in % # TYPE humidity_sht4x gauge humidity_sht4x""" + attrs + with_fallback_to_str(humidity_sht4x, "NaN") + """ # HELP temp_sht4x Temperature in Celsius # TYPE temp_sht4x gauge temp_sht4x""" + attrs + with_fallback_to_str(temp_sht4x, "NaN") + """ # HELP bmp390_temp Temperature in Celsius # TYPE bmp390_temp gauge bmp390_temp""" + attrs + with_fallback_to_str(temp_bmp390, "NaN") + """ # HELP bmp390_pressure Pressure in Pa # TYPE bmp390_pressure gauge bmp390_pressure""" + attrs + with_fallback_to_str(pressure_bmp390, "NaN") + """ # HELP bmp390_time Time in ms # TYPE bmp390_time gauge bmp390_time""" + attrs + with_fallback_to_str(time_bmp390, "NaN")) result_ok(cl, content) else: result_notfound(cl) except OSError as e: cl.close() print('connection closed') except: serverSocket.close()