diff --git a/bmp390/bmp390.py b/bmp390/bmp390.py new file mode 100644 index 0000000..3785ce7 --- /dev/null +++ b/bmp390/bmp390.py @@ -0,0 +1,305 @@ +# micropython +# mail: goctaprog@gmail.com +# MIT license +import micropython +import array + +from sensor_pack import bus_service +from sensor_pack.base_sensor import BaseSensor, Iterator + +# ВНИМАНИЕ: не подключайте питание датчика к 5В, иначе датчик выйдет из строя! Только 3.3В!!! +# WARNING: do not connect "+" to 5V or the sensor will be damaged! + + +@micropython.native +def _check_value(value: int, valid_range, error_msg: str) -> int: + if value not in valid_range: + raise ValueError(error_msg) + return value + + +@micropython.native +def _calibration_regs_addr(): + """возвращает кортеж из адреса регистра, размера значения в байтах, типа значения (u-unsigned, s-signed)""" + start_addr = 0x31 + tpl = ('1b', '2h', '2H') + """возвращает итератор с адресами внутренних регистров датчика, хранящих калибровочные коэффициенты """ + val_type = "22011002200100" + for item in val_type: + v_size, v_type = tpl[int(item)] + yield int(start_addr), int(v_size), v_type + start_addr += int(v_size) + + +@micropython.native +def get_conversion_cycle_time(temperature_or_pressure: bool, oversample: int) -> int: + """возвращает время преобразования в [мс] датчиком температуры или давления в зависимости от его настроек""" + delays_ms = 5, 8, 14, 26 + if temperature_or_pressure: + return delays_ms[0] # temperature + # pressure + return delays_ms[oversample] + + +class Bmp390(BaseSensor, Iterator): + """Class for work with Bosh BMP180 pressure sensor""" + + def __init__(self, adapter: bus_service.BusAdapter, address=0xEE >> 1, + oversample_temp=0b11, oversample_press=0b11, iir_filter=0): + """i2c - объект класса I2C; baseline_pressure - давление на уровне моря в Pa в твоей(!) местности;; + oversample_settings (0..5) - точность измерения 0-грубо но быстро, 5-медленно, но точно; + address - адрес датчика (0xEF (read) and 0xEE (write) from datasheet) + iir_filter=0..7; 0 - off, 7 - max value + + i2c is an object of the I2C class; baseline_pressure - sea level pressure in Pa in your(!) area; + oversample_settings (0..5) - measurement reliability 0-coarse but fast, 5-slow but accurate;""" + super().__init__(adapter, address, False) + self.t_lin = None # for pressure calculation + # for temperature only! + self.oss_t = _check_value(oversample_temp, range(0, 6), + f"Invalid temperature oversample value: {oversample_temp}") + self.oss_p = _check_value(oversample_press, range(0, 6), + f"Invalid pressure oversample value: {oversample_press}") + self.adr = address + self.adapter = adapter + self.IIR = _check_value(iir_filter, range(0, 8), + f"Invalid iir_filter value: {iir_filter}") + self.mode = 0 + self.enable_pressure = False + self.enable_temperature = False + self.sampling_period = 0x02 # 1.28 sec + # массив, хранящий калибровочные коэффициенты (xx штук) + self.cfa = [] # signed long elements + # считываю калибровочные коэффициенты + self._read_calibration_data() + # предварительный расчет + self._precalculate() + + def get_calibration_data(self, index: int) -> int: + """возвращает калибровочный коэффициент по его индексу (0..13). + returns the calibration coefficient by its index (0..13)""" + _check_value(index, range(0, 14), f"Invalid index value: {index}") + return self.cfa[index] + + @micropython.native + def _precalculate(self): + """предварительно вычисленные значения""" + # для расчета температуры + self.par_t1 = self.get_calibration_data(0) * 2 ** 8 # + self.par_t2 = self.get_calibration_data(1) / 2 ** 30 # + self.par_t3 = self.get_calibration_data(2) / 2 ** 48 # + # для расчета давления + self.par_p1 = (self.get_calibration_data(3) - 2 ** 14) / 2 ** 20 + self.par_p2 = (self.get_calibration_data(4) - 2 ** 14) / 2 ** 29 + self.par_p3 = self.get_calibration_data(5) / 2 ** 32 + self.par_p4 = self.get_calibration_data(6) / 2 ** 37 + self.par_p5 = 8 * self.get_calibration_data(7) + self.par_p6 = self.get_calibration_data(8) / 2 ** 6 + self.par_p7 = self.get_calibration_data(9) / 2 ** 8 + self.par_p8 = self.get_calibration_data(10) / 2 ** 15 + self.par_p9 = self.get_calibration_data(11) / 2 ** 48 + self.par_p10 = self.get_calibration_data(12) / 2 ** 48 + self.par_p11 = self.get_calibration_data(13) / 2 ** 65 + + # BaseSensor + def _read_register(self, reg_addr, bytes_count=2) -> bytes: + """считывает из регистра датчика значение. + bytes_count - размер значения в байтах""" + return self.adapter.read_register(self.adr, reg_addr, bytes_count) + + # BaseSensor + def _write_register(self, reg_addr, value: int, bytes_count=2) -> int: + """записывает данные value в датчик, по адресу reg_addr. + bytes_count - кол-во записываемых данных""" + byte_order = self._get_byteorder_as_str()[0] + return self.adapter.write_register(self.adr, reg_addr, value, bytes_count, byte_order) + + def _read_calibration_data(self) -> int: + """Читает калибровочные значение из датчика. + read calibration values from sensor. + return count read values""" + if len(self.cfa): + raise ValueError(f"calibration data array already filled!") + for v_addr, v_size, v_type in _calibration_regs_addr(): + # print(v_addr, v_size, v_type) + reg_val = self._read_register(v_addr, v_size) + rv = self.unpack(f"{v_type}", reg_val)[0] + # check + if rv == 0x00 or rv == 0xFFFF: + raise ValueError(f"Invalid register addr: {v_addr} value: {hex(rv)}") + self.cfa.append(rv) + return len(self.cfa) + + def get_id(self) -> tuple: + """Возвращает идентификатор датчика и его revision ID. + Returns the ID and revision ID of the sensor.""" + chip_id = self._read_register(0x00, 1) + rev_id = self._read_register(0x01, 1) + return int(chip_id[0]), int(rev_id[0]) + + def get_error(self) -> int: + """Возвращает три бита состояния ошибок. + Bit 0 - fatal_err Fatal error + Bit 1 - Command execution failed. Cleared on read. + Bit 2 conf_err sensor configuration error detected (only working in normal mode). Cleared on read. + """ + err = self._read_register(0x02, 1) + return int(err[0]) & 0x07 + + def get_status(self) -> tuple: + """Возвращает три бита состояния датчика как кортеж + Data ready for temperature, Data ready for pressure, CMD decoder status + бит 0 - CMD decoder status (0: Command in progress; 1: Command decoder is ready to accept a new command) + бит 1 - Data ready for pressure. (It gets reset, when one pressure DATA register is read out) + бит 2 - Data ready for temperature sensor. (It gets reset, when one temperature DATA register is read out) + """ + val = self._read_register(0x03, 1)[0] + i = (int(val) >> 4) & 0x07 + drdy_temp, drdy_press, cmd_rdy = i & 0x04, i & 0x02, i & 0x01 + return drdy_temp, drdy_press, cmd_rdy + + @micropython.native + def get_pressure_raw(self) -> int: + # трех байтовое значение + l, m, h = self._read_register(0x04, 3) + return (h << 16) | (m << 8) | l + + def get_pressure(self) -> float: + """Return pressure in Pascal [Pa]. + Call get_temperature() before call get_pressure() !!!""" + uncompensated = self.get_pressure_raw() + # + t_lin = self.t_lin + + if t_lin is None: + raise ValueError(f"Call get_temperature() before call get_pressure() !!!") + + t_lin2 = t_lin * t_lin + t_lin3 = t_lin * t_lin * t_lin + # + partial_data1 = self.par_p6 * t_lin + partial_data2 = self.par_p7 * t_lin2 + partial_data3 = self.par_p8 * t_lin3 + partial_out1 = self.par_p5 + partial_data1 + partial_data2 + partial_data3 + # + partial_data1 = self.par_p2 * t_lin + partial_data2 = self.par_p3 * t_lin2 + partial_data3 = self.par_p4 * t_lin3 + partial_out2 = uncompensated * (self.par_p1 + partial_data1 + partial_data2 + partial_data3) + # + partial_data1 = uncompensated * uncompensated + partial_data2 = self.par_p9 + self.par_p10 * t_lin + partial_data3 = partial_data1 * partial_data2 + partial_data4 = partial_data3 + (uncompensated * uncompensated * uncompensated) * self.par_p11 + # + return partial_out1 + partial_out2 + partial_data4 + + @micropython.native + def get_temperature_raw(self) -> int: + # трех байтовое значение + l, m, h = self._read_register(0x07, 3) + return (h << 16) | (m << 8) | l + + def get_temperature(self) -> float: + """Return temperature in Celsius""" + uncompensated = self.get_temperature_raw() + partial_data1 = uncompensated - self.par_t1 + partial_data2 = partial_data1 * self.par_t2 + # Update the compensated temperature since this is needed for pressure calculation !!! + self.t_lin = partial_data2 + (partial_data1 * partial_data1) * self.par_t3 + return self.t_lin + + @micropython.native + def get_sensor_time(self): + # трех байтовое значение + l, m, h = self._read_register(0x0C, 3) + return (h << 16) | (m << 8) | l + + def get_event(self) -> int: + """Bit 0 por_detected ‘1’ after device power up or softreset. Clear-on-read + Bit 1 itf_act_pt ‘1’ when a serial interface transaction occurs during a + pressure or temperature conversion. Clear-on-read""" + evt = self._read_register(0x10, 1) + return int(evt[0]) & 0b11 + + def get_int_status(self) -> int: + """Bit 0 fwm_int FIFO Watermark Interrupt + Bit 1 full_int FIFO Full Interrupt + Bit 3 drdy data ready interrupt""" + int_stat = self._read_register(0x11, 1) + return int(int_stat[0]) & 0b111 + + def get_fifo_length(self) -> int: + """The FIFO byte counter indicates the current fill level of the FIFO buffer.""" + fl = self._read_register(0x12, 2) + return self.unpack("H", fl)[0] + + def soft_reset(self, reset_or_flush: bool = True): + """программный сброс датчика. + software reset of the sensor""" + if reset_or_flush: + self._write_register(0x7E, 0xB6, 1) # reset + else: + self._write_register(0x7E, 0xB0, 1) # flush + + def start_measurement(self, enable_press, enable_temp, mode: int = 2): + """ # mode: 0 - sleep, 1-forced, 2-normal (continuously)""" + if mode not in range(3): + raise ValueError(f"Invalid mode value: {mode}") + tmp = self._read_register(0x1B, 1)[0] + if enable_press: + tmp |= 0b01 + else: + tmp &= ~0b01 + + if enable_temp: + tmp |= 0b10 + else: + tmp &= ~0b10 + + if True: + tmp &= ~0b0011_0000 + if 0 == mode: + pass + if 1 == mode: + tmp |= 0b0001_0000 + if 2 == mode: + tmp |= 0b0011_0000 + # save + + self._write_register(0x1B, tmp, 1) + self.mode = mode + self.enable_pressure = enable_press + self.enable_temperature = enable_temp + + def set_oversampling(self, pressure_oversampling: int, temperature_oversampling: int): + tmp = 0 + po = _check_value(pressure_oversampling, range(0, 6), + f"Invalid value pressure_oversampling: {pressure_oversampling}") + to = _check_value(temperature_oversampling, range(0, 6), + f"Invalid value temperature_oversampling: {temperature_oversampling}") + tmp |= po + tmp |= to << 3 + self._write_register(0x1C, tmp, 1) + self.oss_t = temperature_oversampling + self.oss_p = pressure_oversampling + + def set_sampling_period(self, period: int): + p = _check_value(period, range(0, 18), + f"Invalid value output data rates: {period}") + self._write_register(0x1D, p, 1) + self.sampling_period = period + + def set_iir_filter(self, value): + p = _check_value(value, range(0, 8), + f"Invalid value iir_filter: {value}") + self._write_register(0x1F, p, 1) + + # Iterator + def __next__(self) -> tuple: + res = list() + if self.enable_temperature: + res.append(self.get_temperature()) + if self.enable_pressure: + res.append(self.get_pressure()) + return tuple(res) diff --git a/main.py b/main.py index 278167e..a9954d9 100644 --- a/main.py +++ b/main.py @@ -4,12 +4,31 @@ import socket import ubinascii from machine import Pin, I2C -import micropython_mcp9808.mcp9808 as mcp9808 +import mcp9808.mcp9808 as mcp9808 +import sht4x.sht4x as sht4x +import bmp390.bmp390 as bmp390 +from sensor_pack.bus_service import I2cAdapter from secrets import secrets GET_PATH_START = 4 POST_PATH_START = 5 +def pa_mmhg(value: float) -> float: + """Convert air pressure from Pa to mm Hg""" + return value*7.50062E-3 + +def with_fallback(value, fallback): + if value is None: + return fallback + else: + return value + +def with_fallback_to_str(value, fallback: str) -> str: + if value is None: + return fallback + else: + return str(value) + led = Pin('LED', Pin.OUT) def blink_onboard_led(num_blinks): for i in range(num_blinks): @@ -30,9 +49,6 @@ if len(devices) != 0: else: print("No device found") -mcp9808 = mcp9808.MCP9808(i2c) - - ssid = secrets['ssid'] password = secrets['pw'] @@ -65,9 +81,9 @@ blink_onboard_led(2) addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] -s = socket.socket() -s.bind(addr) -s.listen(1) +serverSocket = socket.socket() +serverSocket.bind(addr) +serverSocket.listen(1) print('listening on', addr) @@ -81,9 +97,36 @@ def result_notfound(cl, response = None): cl.send(response if response is not None else "Not Found") cl.close() -while True: - cl, addr = s.accept() +if devices.count(0x18) > 0: + mcp9808 = mcp9808.MCP9808(i2c) +else: + mcp9808 = None + + +if devices.count(0x44) > 0: + sht4x = sht4x.SHT4X(i2c) +else: + sht4x = None + +if devices.count(0x77) > 0: + adaptor = I2cAdapter(i2c) + bmp390 = bmp390.Bmp390(adaptor) + + calibration_data = [bmp390.get_calibration_data(index) for index in range(0, 14)] + print(f"Calibration data: {calibration_data}") + # + bmp390.set_oversampling(2, 3) + bmp390.set_sampling_period(5) + bmp390.set_iir_filter(2) + # + #bmp390.start_measurement(True, True, 1) +else: + bmp390 = None + +while True: + print('waiting for client') + cl, addr = serverSocket.accept() try: print('client connected from', addr) request = cl.recv(1024) @@ -92,15 +135,83 @@ while True: request = request[2:-1] # remove b' and ' from string print(request) + temp_mcp9808 = str(mcp9808.temperature) if mcp9808 is not None else None + temp_sht4x, humidity_sht4x = sht4x.measurements if sht4x is not None else (None,None) + + temp_bmp390, pressure_bmp390, time_bmp390 = None, None, None + if bmp390 is not None: + bmp390.start_measurement(True, True, 1) + s = bmp390.get_status() + while not s[2] or not s[1]: + time.sleep_ms(10) + s = bmp390.get_status() + temp_bmp390, pressure_bmp390, time_bmp390 = bmp390.get_temperature(), bmp390.get_pressure(), bmp390.get_sensor_time() + if request.find('/ ') == GET_PATH_START: - temp = str(mcp9808.temperature) - result_ok(cl, "{ \"temperature\": \"" + temp + "\" }", 'application/json') + result_ok(cl, + "{" + + "\"temp_mcp9808\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+ + "\"temp_sht4x\": \"" + with_fallback_to_str(temp_sht4x, "NaN")+"\","+ + "\"humidity_sht4x\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," + + "\"temp_bmp390\": \"" + with_fallback_to_str(temp_bmp390, "NaN") + "\"," + + "\"pressure_bmp390\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," + + "\"time_bmp390\": \"" + with_fallback_to_str(time_bmp390, "NaN") + "\"" + + "}", + 'application/json') + + if request.find('/homeassistant ') == GET_PATH_START: + result_ok(cl, + "{" + + "\"temperature\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+ + "\"humidity\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," + + "\"pressure\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," + + "\"temp_mcp9808\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+ + "\"temp_sht4x\": \"" + with_fallback_to_str(temp_sht4x, "NaN")+"\","+ + "\"humidity_sht4x\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," + + "\"temp_bmp390\": \"" + with_fallback_to_str(temp_bmp390, "NaN") + "\"," + + "\"pressure_bmp390\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," + + "\"time_bmp390\": \"" + with_fallback_to_str(time_bmp390, "NaN") + "\"" + + "}", + 'application/json') elif request.find('/prometheus') == GET_PATH_START: - temp = str(mcp9808.temperature) - content = """# HELP temperature Temperature in Celsius + attrs = "{mac=\"""" + mac_readable + "\",ip=\""+ ip +"\"} " + content = ( +"""# HELP temperature Temperature in Celsius # TYPE temperature gauge -temperature{mac=\"""" + mac_readable + "\",ip=\""+ ip +"\"} " + temp +temperature""" + attrs + with_fallback_to_str(temp_mcp9808, "NaN") + +""" +# HELP humidity Relative humidity in % +# TYPE humidity gauge +humidity""" + attrs + with_fallback_to_str(humidity_sht4x, "NaN") + +""" +# HELP pressure Pressure in Pa +# TYPE pressure gauge +pressure""" + attrs + with_fallback_to_str(pressure_bmp390, "NaN") + +""" +# HELP temp_mcp9808 Temperature in Celsius +# TYPE temp_mcp9808 gauge +temp_mcp9808""" + attrs + with_fallback_to_str(temp_mcp9808, "NaN") + +""" +# HELP humidity_sht4x Relative humidity in % +# TYPE humidity_sht4x gauge +humidity_sht4x""" + attrs + with_fallback_to_str(humidity_sht4x, "NaN") + +""" +# HELP temp_sht4x Temperature in Celsius +# TYPE temp_sht4x gauge +temp_sht4x""" + attrs + with_fallback_to_str(temp_sht4x, "NaN") + +""" +# HELP bmp390_temp Temperature in Celsius +# TYPE bmp390_temp gauge +bmp390_temp""" + attrs + with_fallback_to_str(temp_bmp390, "NaN") + +""" +# HELP bmp390_pressure Pressure in Pa +# TYPE bmp390_pressure gauge +bmp390_pressure""" + attrs + with_fallback_to_str(pressure_bmp390, "NaN") + +""" +# HELP bmp390_time Time in ms +# TYPE bmp390_time gauge +bmp390_time""" + attrs + with_fallback_to_str(time_bmp390, "NaN")) result_ok(cl, content) else: result_notfound(cl) diff --git a/micropython_mcp9808/i2c_helpers.py b/mcp9808/i2c_helpers.py similarity index 100% rename from micropython_mcp9808/i2c_helpers.py rename to mcp9808/i2c_helpers.py diff --git a/micropython_mcp9808/mcp9808.py b/mcp9808/mcp9808.py similarity index 99% rename from micropython_mcp9808/mcp9808.py rename to mcp9808/mcp9808.py index 87b8257..92263a1 100644 --- a/micropython_mcp9808/mcp9808.py +++ b/mcp9808/mcp9808.py @@ -19,7 +19,7 @@ MicroPython Driver for the Microchip MCP9808 Temperature Sensor from collections import namedtuple from micropython import const -from micropython_mcp9808.i2c_helpers import CBits, RegisterStruct +from mcp9808.i2c_helpers import CBits, RegisterStruct __version__ = "0.0.0+auto.0" diff --git a/sensor_pack/base_sensor.py b/sensor_pack/base_sensor.py new file mode 100644 index 0000000..af22f3b --- /dev/null +++ b/sensor_pack/base_sensor.py @@ -0,0 +1,57 @@ +# micropython +# MIT license +# Copyright (c) 2022 Roman Shevchik goctaprog@gmail.com +import micropython +import ustruct +from sensor_pack import bus_service + + +class BaseSensor: + """Base sensor class""" + + def __init__(self, adapter: bus_service.BusAdapter, address: int, big_byte_order: bool): + """Базовый класс Датчик. + Если big_byte_order равен True -> порядок байтов в регистрах датчика «big» + (Порядок от старшего к младшему), в противном случае порядок байтов в регистрах "little" + (Порядок от младшего к старшему) + address - адрес датчика на шине. + + Base sensor class. if big_byte_order is True -> register values byteorder is 'big' + else register values byteorder is 'little' + address - address of the sensor on the bus.""" + self.adapter = adapter + self.address = address + self.big_byte_order = big_byte_order + + def _get_byteorder_as_str(self) -> tuple: + """Return byteorder as string""" + if self.is_big_byteorder(): + return 'big', '>' + else: + return 'little', '<' + + def unpack(self, fmt_char: str, source: bytes) -> tuple: + """распаковка массива, считанного из датчика. + fmt_char: c, b, B, h, H, i, I, l, L, q, Q. pls see: https://docs.python.org/3/library/struct.html""" + if len(fmt_char) != 1: + raise ValueError(f"Invalid length fmt_char parameter: {len(fmt_char)}") + bo = self._get_byteorder_as_str()[1] + return ustruct.unpack(bo + fmt_char, source) + + @micropython.native + def is_big_byteorder(self) -> bool: + return self.big_byte_order + + def get_id(self): + raise NotImplementedError + + def soft_reset(self): + raise NotImplementedError + + +class Iterator: + def __iter__(self): + return self + + def __next__(self): + raise NotImplementedError diff --git a/sensor_pack/bus_service.py b/sensor_pack/bus_service.py new file mode 100644 index 0000000..e9c87f5 --- /dev/null +++ b/sensor_pack/bus_service.py @@ -0,0 +1,66 @@ +# micropython +# MIT license +# Copyright (c) 2022 Roman Shevchik goctaprog@gmail.com +"""service class for I/O bus operation""" + +from machine import I2C +try: + from typing import Literal +except: + pass + + +class BusAdapter: + """Proxy between I/O bus and device I/O class""" + def __init__(self, bus): + self.bus = bus + + def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes: + """считывает из регистра датчика значение. + device_addr - адрес датчика на шине. + reg_addr - адрес регистра в адресном пространстве датчика. + bytes_count - размер значения в байтах. + reads value from sensor register. + device_addr - address of the sensor on the bus. + reg_addr - register address in the address space of the sensor""" + raise NotImplementedError + + def write_register(self, device_addr: int, reg_addr: int, value: int, + bytes_count: int, byte_order: str): + """записывает данные value в датчик, по адресу reg_addr. + bytes_count - кол-во записываемых байт из value. + byte_order - порядок расположения байт в записываемом значении. + writes value data to the sensor, at reg_addr. + bytes_count - number of bytes written from value. + byte_order - the order of bytes in the value being written. + """ + raise NotImplementedError + + def read(self, device_addr, n_bytes: int) -> bytes: + raise NotImplementedError + + def write(self, device_addr, buf: bytes): + raise NotImplementedError + + +class I2cAdapter(BusAdapter): + def __init__(self, bus: I2C): + super().__init__(bus) + + def write_register(self, device_addr: int, reg_addr: int, value: int, + bytes_count: int, byte_order: Literal["little", "big"]): + """записывает данные value в датчик, по адресу reg_addr. + bytes_count - кол-во записываемых данных""" + buf = value.to_bytes(bytes_count, byte_order) + return self.bus.writeto_mem(device_addr, reg_addr, buf) + + def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes: + """считывает из регистра датчика значение. + bytes_count - размер значения в байтах""" + return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count) + + def read(self, device_addr, n_bytes: int) -> bytes: + return self.bus.readfrom(device_addr, n_bytes) + + def write(self, device_addr, buf: bytes): + return self.bus.writeto(device_addr, buf) diff --git a/sht4x/sht4x.py b/sht4x/sht4x.py new file mode 100644 index 0000000..b275bdf --- /dev/null +++ b/sht4x/sht4x.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 Jose D. Montoya +# +# SPDX-License-Identifier: MIT +""" +`sht4x` +================================================================================ + +MicroPython Driver fot the Sensirion Temperature and Humidity SHT40, SHT41 and SHT45 Sensor + + +* Author: Jose D. Montoya + + +""" + +import time +import struct +from micropython import const + +try: + from typing import Tuple +except ImportError: + pass + + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/jposada202020/MicroPython_SHT4X.git" + +_RESET = const(0x94) + +HIGH_PRECISION = const(0) +MEDIUM_PRECISION = const(1) +LOW_PRECISION = const(2) +temperature_precision_options = (HIGH_PRECISION, MEDIUM_PRECISION, LOW_PRECISION) +temperature_precision_values = { + HIGH_PRECISION: const(0xFD), + MEDIUM_PRECISION: const(0xF6), + LOW_PRECISION: const(0xE0), +} + +HEATER200mW = const(0) +HEATER110mW = const(1) +HEATER20mW = const(2) +heater_power_values = (HEATER200mW, HEATER110mW, HEATER20mW) + +TEMP_1 = const(0) +TEMP_0_1 = const(1) +heat_time_values = (TEMP_1, TEMP_0_1) + +wat_config = { + HEATER200mW: (0x39, 0x32), + HEATER110mW: (0x2F, 0x24), + HEATER20mW: (0x1E, 0x15), +} + + +class SHT4X: + """Driver for the SHT4X Sensor connected over I2C. + + :param ~machine.I2C i2c: The I2C bus the SHT4X is connected to. + :param int address: The I2C device address. Defaults to :const:`0x44` + + :raises RuntimeError: if the sensor is not found + + **Quickstart: Importing and using the device** + + Here is an example of using the :class:`SHT4X` class. + First you will need to import the libraries to use the sensor + + .. code-block:: python + + from machine import Pin, I2C + from micropython_sht4x import sht4x + + Once this is done you can define your `machine.I2C` object and define your sensor object + + .. code-block:: python + + i2c = I2C(1, sda=Pin(2), scl=Pin(3)) + sht = sht4x.SHT4X(i2c) + + Now you have access to the attributes + + .. code-block:: python + + temp = sht.temperature + hum = sht.relative_humidity + + """ + + def __init__(self, i2c, address: int = 0x44) -> None: + self._i2c = i2c + self._address = address + self._data = bytearray(6) + + self._command = 0xFD + self._temperature_precision = HIGH_PRECISION + self._heater_power = HEATER20mW + self._heat_time = TEMP_0_1 + + @property + def temperature_precision(self) -> str: + """ + Sensor temperature_precision + + +------------------------------------+------------------+ + | Mode | Value | + +====================================+==================+ + | :py:const:`sht4x.HIGH_PRECISION` | :py:const:`0` | + +------------------------------------+------------------+ + | :py:const:`sht4x.MEDIUM_PRECISION` | :py:const:`1` | + +------------------------------------+------------------+ + | :py:const:`sht4x.LOW_PRECISION` | :py:const:`2` | + +------------------------------------+------------------+ + + """ + values = ("HIGH_PRECISION", "MEDIUM_PRECISION", "LOW_PRECISION") + return values[self._temperature_precision] + + @temperature_precision.setter + def temperature_precision(self, value: int) -> None: + if value not in temperature_precision_values: + raise ValueError("Value must be a valid temperature_precision setting") + self._temperature_precision = value + self._command = temperature_precision_values[value] + + @property + def relative_humidity(self) -> float: + """ + The current relative humidity in % rH + The RH conversion formula (1) allows values to be reported + which are outside the range of 0 %RH … 100 %RH. Relative + humidity values which are smaller than 0 %RH and larger than + 100 %RH are non-physical, however these “uncropped” values might + be found beneficial in some cases (e.g. when the distribution of + the sensors at the measurement boundaries are of interest) + """ + return self.measurements[1] + + @property + def temperature(self) -> float: + """The current temperature in Celsius""" + return self.measurements[0] + + @property + def measurements(self) -> Tuple[float, float]: + """both `temperature` and `relative_humidity`, read simultaneously + If you use t the heater function, sensor will be not give a response + back. Waiting time is added to the logic to account for this situation + """ + + self._i2c.writeto(self._address, bytes([self._command]), False) + if self._command in (0x39, 0x2F, 0x1E): + time.sleep(1.2) + elif self._command in (0x32, 0x24, 0x15): + time.sleep(0.2) + time.sleep(0.2) + self._i2c.readfrom_into(self._address, self._data) + + temperature, temp_crc, humidity, humidity_crc = struct.unpack_from( + ">HBHB", self._data + ) + + if temp_crc != self._crc( + memoryview(self._data[0:2]) + ) or humidity_crc != self._crc(memoryview(self._data[3:5])): + raise RuntimeError("Invalid CRC calculated") + + temperature = -45.0 + 175.0 * temperature / 65535.0 + + humidity = -6.0 + 125.0 * humidity / 65535.0 + humidity = max(min(humidity, 100), 0) + + return temperature, humidity + + @staticmethod + def _crc(buffer) -> int: + """verify the crc8 checksum""" + crc = 0xFF + for byte in buffer: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ 0x31 + else: + crc = crc << 1 + return crc & 0xFF + + @property + def heater_power(self) -> str: + """ + Sensor heater power + The sensor has a heater. Three heating powers and two heating + durations are selectable. + The sensor executes the following procedure: + 1. The heater is enabled, and the timer starts its count-down. + 2. Measure is taken after time is up + 3. After the measurement is finished the heater is turned off. + 4. Temperature and humidity values are now available for readout. + The maximum on-time of the heater commands is one second in order + to prevent overheating + + +-------------------------------+---------------+ + | Mode | Value | + +===============================+===============+ + | :py:const:`sht4x.HEATER200mW` | :py:const:`0` | + +-------------------------------+---------------+ + | :py:const:`sht4x.HEATER110mW` | :py:const:`1` | + +-------------------------------+---------------+ + | :py:const:`sht4x.HEATER20mW` | :py:const:`2` | + +-------------------------------+---------------+ + + """ + values = ("HEATER200mW", "HEATER110mW", "HEATER20mW") + return values[self._heater_power] + + @heater_power.setter + def heater_power(self, value: int) -> None: + if value not in heater_power_values: + raise ValueError("Value must be a valid heater power setting") + self._heater_power = value + self._command = wat_config[value][self._heat_time] + + @property + def heat_time(self) -> str: + """ + Sensor heat_time + The sensor has a heater. Three heating powers and two heating + durations are selectable. + The sensor executes the following procedure: + 1. The heater is enabled, and the timer starts its count-down. + 2. Measure is taken after time is up + 3. After the measurement is finished the heater is turned off. + 4. Temperature and humidity values are now available for readout. + The maximum on-time of the heater commands is one second in order + to prevent overheating + + +----------------------------+---------------+ + | Mode | Value | + +============================+===============+ + | :py:const:`sht4x.TEMP_1` | :py:const:`0` | + +----------------------------+---------------+ + | :py:const:`sht4x.TEMP_0_1` | :py:const:`1` | + +----------------------------+---------------+ + """ + values = ("TEMP_1", "TEMP_0_1") + return values[self._heat_time] + + @heat_time.setter + def heat_time(self, value: int) -> None: + if value not in heat_time_values: + raise ValueError("Value must be a valid heat_time setting") + self._heat_time = value + self._command = wat_config[self._heater_power][value] + + def reset(self): + """ + Reset the sensor + """ + self._i2c.writeto(self._address, bytes([_RESET]), False) + time.sleep(0.1)