New sensors: SHT4x, BMP390

This commit is contained in:
Ádám Kovács
2023-11-12 10:30:29 +01:00
parent bae126dfad
commit c540ffffe1
7 changed files with 815 additions and 15 deletions

305
bmp390/bmp390.py Normal file
View File

@@ -0,0 +1,305 @@
# micropython
# mail: goctaprog@gmail.com
# MIT license
import micropython
import array
from sensor_pack import bus_service
from sensor_pack.base_sensor import BaseSensor, Iterator
# ВНИМАНИЕ: не подключайте питание датчика к 5В, иначе датчик выйдет из строя! Только 3.3В!!!
# WARNING: do not connect "+" to 5V or the sensor will be damaged!
@micropython.native
def _check_value(value: int, valid_range, error_msg: str) -> int:
if value not in valid_range:
raise ValueError(error_msg)
return value
@micropython.native
def _calibration_regs_addr():
"""возвращает кортеж из адреса регистра, размера значения в байтах, типа значения (u-unsigned, s-signed)"""
start_addr = 0x31
tpl = ('1b', '2h', '2H')
"""возвращает итератор с адресами внутренних регистров датчика, хранящих калибровочные коэффициенты """
val_type = "22011002200100"
for item in val_type:
v_size, v_type = tpl[int(item)]
yield int(start_addr), int(v_size), v_type
start_addr += int(v_size)
@micropython.native
def get_conversion_cycle_time(temperature_or_pressure: bool, oversample: int) -> int:
"""возвращает время преобразования в [мс] датчиком температуры или давления в зависимости от его настроек"""
delays_ms = 5, 8, 14, 26
if temperature_or_pressure:
return delays_ms[0] # temperature
# pressure
return delays_ms[oversample]
class Bmp390(BaseSensor, Iterator):
"""Class for work with Bosh BMP180 pressure sensor"""
def __init__(self, adapter: bus_service.BusAdapter, address=0xEE >> 1,
oversample_temp=0b11, oversample_press=0b11, iir_filter=0):
"""i2c - объект класса I2C; baseline_pressure - давление на уровне моря в Pa в твоей(!) местности;;
oversample_settings (0..5) - точность измерения 0-грубо но быстро, 5-медленно, но точно;
address - адрес датчика (0xEF (read) and 0xEE (write) from datasheet)
iir_filter=0..7; 0 - off, 7 - max value
i2c is an object of the I2C class; baseline_pressure - sea level pressure in Pa in your(!) area;
oversample_settings (0..5) - measurement reliability 0-coarse but fast, 5-slow but accurate;"""
super().__init__(adapter, address, False)
self.t_lin = None # for pressure calculation
# for temperature only!
self.oss_t = _check_value(oversample_temp, range(0, 6),
f"Invalid temperature oversample value: {oversample_temp}")
self.oss_p = _check_value(oversample_press, range(0, 6),
f"Invalid pressure oversample value: {oversample_press}")
self.adr = address
self.adapter = adapter
self.IIR = _check_value(iir_filter, range(0, 8),
f"Invalid iir_filter value: {iir_filter}")
self.mode = 0
self.enable_pressure = False
self.enable_temperature = False
self.sampling_period = 0x02 # 1.28 sec
# массив, хранящий калибровочные коэффициенты (xx штук)
self.cfa = [] # signed long elements
# считываю калибровочные коэффициенты
self._read_calibration_data()
# предварительный расчет
self._precalculate()
def get_calibration_data(self, index: int) -> int:
"""возвращает калибровочный коэффициент по его индексу (0..13).
returns the calibration coefficient by its index (0..13)"""
_check_value(index, range(0, 14), f"Invalid index value: {index}")
return self.cfa[index]
@micropython.native
def _precalculate(self):
"""предварительно вычисленные значения"""
# для расчета температуры
self.par_t1 = self.get_calibration_data(0) * 2 ** 8 #
self.par_t2 = self.get_calibration_data(1) / 2 ** 30 #
self.par_t3 = self.get_calibration_data(2) / 2 ** 48 #
# для расчета давления
self.par_p1 = (self.get_calibration_data(3) - 2 ** 14) / 2 ** 20
self.par_p2 = (self.get_calibration_data(4) - 2 ** 14) / 2 ** 29
self.par_p3 = self.get_calibration_data(5) / 2 ** 32
self.par_p4 = self.get_calibration_data(6) / 2 ** 37
self.par_p5 = 8 * self.get_calibration_data(7)
self.par_p6 = self.get_calibration_data(8) / 2 ** 6
self.par_p7 = self.get_calibration_data(9) / 2 ** 8
self.par_p8 = self.get_calibration_data(10) / 2 ** 15
self.par_p9 = self.get_calibration_data(11) / 2 ** 48
self.par_p10 = self.get_calibration_data(12) / 2 ** 48
self.par_p11 = self.get_calibration_data(13) / 2 ** 65
# BaseSensor
def _read_register(self, reg_addr, bytes_count=2) -> bytes:
"""считывает из регистра датчика значение.
bytes_count - размер значения в байтах"""
return self.adapter.read_register(self.adr, reg_addr, bytes_count)
# BaseSensor
def _write_register(self, reg_addr, value: int, bytes_count=2) -> int:
"""записывает данные value в датчик, по адресу reg_addr.
bytes_count - кол-во записываемых данных"""
byte_order = self._get_byteorder_as_str()[0]
return self.adapter.write_register(self.adr, reg_addr, value, bytes_count, byte_order)
def _read_calibration_data(self) -> int:
"""Читает калибровочные значение из датчика.
read calibration values from sensor.
return count read values"""
if len(self.cfa):
raise ValueError(f"calibration data array already filled!")
for v_addr, v_size, v_type in _calibration_regs_addr():
# print(v_addr, v_size, v_type)
reg_val = self._read_register(v_addr, v_size)
rv = self.unpack(f"{v_type}", reg_val)[0]
# check
if rv == 0x00 or rv == 0xFFFF:
raise ValueError(f"Invalid register addr: {v_addr} value: {hex(rv)}")
self.cfa.append(rv)
return len(self.cfa)
def get_id(self) -> tuple:
"""Возвращает идентификатор датчика и его revision ID.
Returns the ID and revision ID of the sensor."""
chip_id = self._read_register(0x00, 1)
rev_id = self._read_register(0x01, 1)
return int(chip_id[0]), int(rev_id[0])
def get_error(self) -> int:
"""Возвращает три бита состояния ошибок.
Bit 0 - fatal_err Fatal error
Bit 1 - Command execution failed. Cleared on read.
Bit 2 conf_err sensor configuration error detected (only working in normal mode). Cleared on read.
"""
err = self._read_register(0x02, 1)
return int(err[0]) & 0x07
def get_status(self) -> tuple:
"""Возвращает три бита состояния датчика как кортеж
Data ready for temperature, Data ready for pressure, CMD decoder status
бит 0 - CMD decoder status (0: Command in progress; 1: Command decoder is ready to accept a new command)
бит 1 - Data ready for pressure. (It gets reset, when one pressure DATA register is read out)
бит 2 - Data ready for temperature sensor. (It gets reset, when one temperature DATA register is read out)
"""
val = self._read_register(0x03, 1)[0]
i = (int(val) >> 4) & 0x07
drdy_temp, drdy_press, cmd_rdy = i & 0x04, i & 0x02, i & 0x01
return drdy_temp, drdy_press, cmd_rdy
@micropython.native
def get_pressure_raw(self) -> int:
# трех байтовое значение
l, m, h = self._read_register(0x04, 3)
return (h << 16) | (m << 8) | l
def get_pressure(self) -> float:
"""Return pressure in Pascal [Pa].
Call get_temperature() before call get_pressure() !!!"""
uncompensated = self.get_pressure_raw()
#
t_lin = self.t_lin
if t_lin is None:
raise ValueError(f"Call get_temperature() before call get_pressure() !!!")
t_lin2 = t_lin * t_lin
t_lin3 = t_lin * t_lin * t_lin
#
partial_data1 = self.par_p6 * t_lin
partial_data2 = self.par_p7 * t_lin2
partial_data3 = self.par_p8 * t_lin3
partial_out1 = self.par_p5 + partial_data1 + partial_data2 + partial_data3
#
partial_data1 = self.par_p2 * t_lin
partial_data2 = self.par_p3 * t_lin2
partial_data3 = self.par_p4 * t_lin3
partial_out2 = uncompensated * (self.par_p1 + partial_data1 + partial_data2 + partial_data3)
#
partial_data1 = uncompensated * uncompensated
partial_data2 = self.par_p9 + self.par_p10 * t_lin
partial_data3 = partial_data1 * partial_data2
partial_data4 = partial_data3 + (uncompensated * uncompensated * uncompensated) * self.par_p11
#
return partial_out1 + partial_out2 + partial_data4
@micropython.native
def get_temperature_raw(self) -> int:
# трех байтовое значение
l, m, h = self._read_register(0x07, 3)
return (h << 16) | (m << 8) | l
def get_temperature(self) -> float:
"""Return temperature in Celsius"""
uncompensated = self.get_temperature_raw()
partial_data1 = uncompensated - self.par_t1
partial_data2 = partial_data1 * self.par_t2
# Update the compensated temperature since this is needed for pressure calculation !!!
self.t_lin = partial_data2 + (partial_data1 * partial_data1) * self.par_t3
return self.t_lin
@micropython.native
def get_sensor_time(self):
# трех байтовое значение
l, m, h = self._read_register(0x0C, 3)
return (h << 16) | (m << 8) | l
def get_event(self) -> int:
"""Bit 0 por_detected 1 after device power up or softreset. Clear-on-read
Bit 1 itf_act_pt 1 when a serial interface transaction occurs during a
pressure or temperature conversion. Clear-on-read"""
evt = self._read_register(0x10, 1)
return int(evt[0]) & 0b11
def get_int_status(self) -> int:
"""Bit 0 fwm_int FIFO Watermark Interrupt
Bit 1 full_int FIFO Full Interrupt
Bit 3 drdy data ready interrupt"""
int_stat = self._read_register(0x11, 1)
return int(int_stat[0]) & 0b111
def get_fifo_length(self) -> int:
"""The FIFO byte counter indicates the current fill level of the FIFO buffer."""
fl = self._read_register(0x12, 2)
return self.unpack("H", fl)[0]
def soft_reset(self, reset_or_flush: bool = True):
"""программный сброс датчика.
software reset of the sensor"""
if reset_or_flush:
self._write_register(0x7E, 0xB6, 1) # reset
else:
self._write_register(0x7E, 0xB0, 1) # flush
def start_measurement(self, enable_press, enable_temp, mode: int = 2):
""" # mode: 0 - sleep, 1-forced, 2-normal (continuously)"""
if mode not in range(3):
raise ValueError(f"Invalid mode value: {mode}")
tmp = self._read_register(0x1B, 1)[0]
if enable_press:
tmp |= 0b01
else:
tmp &= ~0b01
if enable_temp:
tmp |= 0b10
else:
tmp &= ~0b10
if True:
tmp &= ~0b0011_0000
if 0 == mode:
pass
if 1 == mode:
tmp |= 0b0001_0000
if 2 == mode:
tmp |= 0b0011_0000
# save
self._write_register(0x1B, tmp, 1)
self.mode = mode
self.enable_pressure = enable_press
self.enable_temperature = enable_temp
def set_oversampling(self, pressure_oversampling: int, temperature_oversampling: int):
tmp = 0
po = _check_value(pressure_oversampling, range(0, 6),
f"Invalid value pressure_oversampling: {pressure_oversampling}")
to = _check_value(temperature_oversampling, range(0, 6),
f"Invalid value temperature_oversampling: {temperature_oversampling}")
tmp |= po
tmp |= to << 3
self._write_register(0x1C, tmp, 1)
self.oss_t = temperature_oversampling
self.oss_p = pressure_oversampling
def set_sampling_period(self, period: int):
p = _check_value(period, range(0, 18),
f"Invalid value output data rates: {period}")
self._write_register(0x1D, p, 1)
self.sampling_period = period
def set_iir_filter(self, value):
p = _check_value(value, range(0, 8),
f"Invalid value iir_filter: {value}")
self._write_register(0x1F, p, 1)
# Iterator
def __next__(self) -> tuple:
res = list()
if self.enable_temperature:
res.append(self.get_temperature())
if self.enable_pressure:
res.append(self.get_pressure())
return tuple(res)

139
main.py
View File

@@ -4,12 +4,31 @@ import socket
import ubinascii
from machine import Pin, I2C
import micropython_mcp9808.mcp9808 as mcp9808
import mcp9808.mcp9808 as mcp9808
import sht4x.sht4x as sht4x
import bmp390.bmp390 as bmp390
from sensor_pack.bus_service import I2cAdapter
from secrets import secrets
GET_PATH_START = 4
POST_PATH_START = 5
def pa_mmhg(value: float) -> float:
"""Convert air pressure from Pa to mm Hg"""
return value*7.50062E-3
def with_fallback(value, fallback):
if value is None:
return fallback
else:
return value
def with_fallback_to_str(value, fallback: str) -> str:
if value is None:
return fallback
else:
return str(value)
led = Pin('LED', Pin.OUT)
def blink_onboard_led(num_blinks):
for i in range(num_blinks):
@@ -30,9 +49,6 @@ if len(devices) != 0:
else:
print("No device found")
mcp9808 = mcp9808.MCP9808(i2c)
ssid = secrets['ssid']
password = secrets['pw']
@@ -65,9 +81,9 @@ blink_onboard_led(2)
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
serverSocket = socket.socket()
serverSocket.bind(addr)
serverSocket.listen(1)
print('listening on', addr)
@@ -81,9 +97,36 @@ def result_notfound(cl, response = None):
cl.send(response if response is not None else "Not Found")
cl.close()
while True:
cl, addr = s.accept()
if devices.count(0x18) > 0:
mcp9808 = mcp9808.MCP9808(i2c)
else:
mcp9808 = None
if devices.count(0x44) > 0:
sht4x = sht4x.SHT4X(i2c)
else:
sht4x = None
if devices.count(0x77) > 0:
adaptor = I2cAdapter(i2c)
bmp390 = bmp390.Bmp390(adaptor)
calibration_data = [bmp390.get_calibration_data(index) for index in range(0, 14)]
print(f"Calibration data: {calibration_data}")
#
bmp390.set_oversampling(2, 3)
bmp390.set_sampling_period(5)
bmp390.set_iir_filter(2)
#
#bmp390.start_measurement(True, True, 1)
else:
bmp390 = None
while True:
print('waiting for client')
cl, addr = serverSocket.accept()
try:
print('client connected from', addr)
request = cl.recv(1024)
@@ -92,15 +135,83 @@ while True:
request = request[2:-1] # remove b' and ' from string
print(request)
temp_mcp9808 = str(mcp9808.temperature) if mcp9808 is not None else None
temp_sht4x, humidity_sht4x = sht4x.measurements if sht4x is not None else (None,None)
temp_bmp390, pressure_bmp390, time_bmp390 = None, None, None
if bmp390 is not None:
bmp390.start_measurement(True, True, 1)
s = bmp390.get_status()
while not s[2] or not s[1]:
time.sleep_ms(10)
s = bmp390.get_status()
temp_bmp390, pressure_bmp390, time_bmp390 = bmp390.get_temperature(), bmp390.get_pressure(), bmp390.get_sensor_time()
if request.find('/ ') == GET_PATH_START:
temp = str(mcp9808.temperature)
result_ok(cl, "{ \"temperature\": \"" + temp + "\" }", 'application/json')
result_ok(cl,
"{" +
"\"temp_mcp9808\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+
"\"temp_sht4x\": \"" + with_fallback_to_str(temp_sht4x, "NaN")+"\","+
"\"humidity_sht4x\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," +
"\"temp_bmp390\": \"" + with_fallback_to_str(temp_bmp390, "NaN") + "\"," +
"\"pressure_bmp390\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," +
"\"time_bmp390\": \"" + with_fallback_to_str(time_bmp390, "NaN") + "\"" +
"}",
'application/json')
if request.find('/homeassistant ') == GET_PATH_START:
result_ok(cl,
"{" +
"\"temperature\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+
"\"humidity\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," +
"\"pressure\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," +
"\"temp_mcp9808\": \"" + with_fallback_to_str(temp_mcp9808, "NaN") + "\","+
"\"temp_sht4x\": \"" + with_fallback_to_str(temp_sht4x, "NaN")+"\","+
"\"humidity_sht4x\": \"" + with_fallback_to_str(humidity_sht4x, "NaN") + "\"," +
"\"temp_bmp390\": \"" + with_fallback_to_str(temp_bmp390, "NaN") + "\"," +
"\"pressure_bmp390\": \"" + with_fallback_to_str(pressure_bmp390, "NaN") + "\"," +
"\"time_bmp390\": \"" + with_fallback_to_str(time_bmp390, "NaN") + "\"" +
"}",
'application/json')
elif request.find('/prometheus') == GET_PATH_START:
temp = str(mcp9808.temperature)
content = """# HELP temperature Temperature in Celsius
attrs = "{mac=\"""" + mac_readable + "\",ip=\""+ ip +"\"} "
content = (
"""# HELP temperature Temperature in Celsius
# TYPE temperature gauge
temperature{mac=\"""" + mac_readable + "\",ip=\""+ ip +"\"} " + temp
temperature""" + attrs + with_fallback_to_str(temp_mcp9808, "NaN") +
"""
# HELP humidity Relative humidity in %
# TYPE humidity gauge
humidity""" + attrs + with_fallback_to_str(humidity_sht4x, "NaN") +
"""
# HELP pressure Pressure in Pa
# TYPE pressure gauge
pressure""" + attrs + with_fallback_to_str(pressure_bmp390, "NaN") +
"""
# HELP temp_mcp9808 Temperature in Celsius
# TYPE temp_mcp9808 gauge
temp_mcp9808""" + attrs + with_fallback_to_str(temp_mcp9808, "NaN") +
"""
# HELP humidity_sht4x Relative humidity in %
# TYPE humidity_sht4x gauge
humidity_sht4x""" + attrs + with_fallback_to_str(humidity_sht4x, "NaN") +
"""
# HELP temp_sht4x Temperature in Celsius
# TYPE temp_sht4x gauge
temp_sht4x""" + attrs + with_fallback_to_str(temp_sht4x, "NaN") +
"""
# HELP bmp390_temp Temperature in Celsius
# TYPE bmp390_temp gauge
bmp390_temp""" + attrs + with_fallback_to_str(temp_bmp390, "NaN") +
"""
# HELP bmp390_pressure Pressure in Pa
# TYPE bmp390_pressure gauge
bmp390_pressure""" + attrs + with_fallback_to_str(pressure_bmp390, "NaN") +
"""
# HELP bmp390_time Time in ms
# TYPE bmp390_time gauge
bmp390_time""" + attrs + with_fallback_to_str(time_bmp390, "NaN"))
result_ok(cl, content)
else:
result_notfound(cl)

View File

@@ -19,7 +19,7 @@ MicroPython Driver for the Microchip MCP9808 Temperature Sensor
from collections import namedtuple
from micropython import const
from micropython_mcp9808.i2c_helpers import CBits, RegisterStruct
from mcp9808.i2c_helpers import CBits, RegisterStruct
__version__ = "0.0.0+auto.0"

View File

@@ -0,0 +1,57 @@
# micropython
# MIT license
# Copyright (c) 2022 Roman Shevchik goctaprog@gmail.com
import micropython
import ustruct
from sensor_pack import bus_service
class BaseSensor:
"""Base sensor class"""
def __init__(self, adapter: bus_service.BusAdapter, address: int, big_byte_order: bool):
"""Базовый класс Датчик.
Если big_byte_order равен True -> порядок байтов в регистрах датчика «big»
(Порядок от старшего к младшему), в противном случае порядок байтов в регистрах "little"
(Порядок от младшего к старшему)
address - адрес датчика на шине.
Base sensor class. if big_byte_order is True -> register values byteorder is 'big'
else register values byteorder is 'little'
address - address of the sensor on the bus."""
self.adapter = adapter
self.address = address
self.big_byte_order = big_byte_order
def _get_byteorder_as_str(self) -> tuple:
"""Return byteorder as string"""
if self.is_big_byteorder():
return 'big', '>'
else:
return 'little', '<'
def unpack(self, fmt_char: str, source: bytes) -> tuple:
"""распаковка массива, считанного из датчика.
fmt_char: c, b, B, h, H, i, I, l, L, q, Q. pls see: https://docs.python.org/3/library/struct.html"""
if len(fmt_char) != 1:
raise ValueError(f"Invalid length fmt_char parameter: {len(fmt_char)}")
bo = self._get_byteorder_as_str()[1]
return ustruct.unpack(bo + fmt_char, source)
@micropython.native
def is_big_byteorder(self) -> bool:
return self.big_byte_order
def get_id(self):
raise NotImplementedError
def soft_reset(self):
raise NotImplementedError
class Iterator:
def __iter__(self):
return self
def __next__(self):
raise NotImplementedError

View File

@@ -0,0 +1,66 @@
# micropython
# MIT license
# Copyright (c) 2022 Roman Shevchik goctaprog@gmail.com
"""service class for I/O bus operation"""
from machine import I2C
try:
from typing import Literal
except:
pass
class BusAdapter:
"""Proxy between I/O bus and device I/O class"""
def __init__(self, bus):
self.bus = bus
def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
"""считывает из регистра датчика значение.
device_addr - адрес датчика на шине.
reg_addr - адрес регистра в адресном пространстве датчика.
bytes_count - размер значения в байтах.
reads value from sensor register.
device_addr - address of the sensor on the bus.
reg_addr - register address in the address space of the sensor"""
raise NotImplementedError
def write_register(self, device_addr: int, reg_addr: int, value: int,
bytes_count: int, byte_order: str):
"""записывает данные value в датчик, по адресу reg_addr.
bytes_count - кол-во записываемых байт из value.
byte_order - порядок расположения байт в записываемом значении.
writes value data to the sensor, at reg_addr.
bytes_count - number of bytes written from value.
byte_order - the order of bytes in the value being written.
"""
raise NotImplementedError
def read(self, device_addr, n_bytes: int) -> bytes:
raise NotImplementedError
def write(self, device_addr, buf: bytes):
raise NotImplementedError
class I2cAdapter(BusAdapter):
def __init__(self, bus: I2C):
super().__init__(bus)
def write_register(self, device_addr: int, reg_addr: int, value: int,
bytes_count: int, byte_order: Literal["little", "big"]):
"""записывает данные value в датчик, по адресу reg_addr.
bytes_count - кол-во записываемых данных"""
buf = value.to_bytes(bytes_count, byte_order)
return self.bus.writeto_mem(device_addr, reg_addr, buf)
def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
"""считывает из регистра датчика значение.
bytes_count - размер значения в байтах"""
return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count)
def read(self, device_addr, n_bytes: int) -> bytes:
return self.bus.readfrom(device_addr, n_bytes)
def write(self, device_addr, buf: bytes):
return self.bus.writeto(device_addr, buf)

261
sht4x/sht4x.py Normal file
View File

@@ -0,0 +1,261 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 Jose D. Montoya
#
# SPDX-License-Identifier: MIT
"""
`sht4x`
================================================================================
MicroPython Driver fot the Sensirion Temperature and Humidity SHT40, SHT41 and SHT45 Sensor
* Author: Jose D. Montoya
"""
import time
import struct
from micropython import const
try:
from typing import Tuple
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/jposada202020/MicroPython_SHT4X.git"
_RESET = const(0x94)
HIGH_PRECISION = const(0)
MEDIUM_PRECISION = const(1)
LOW_PRECISION = const(2)
temperature_precision_options = (HIGH_PRECISION, MEDIUM_PRECISION, LOW_PRECISION)
temperature_precision_values = {
HIGH_PRECISION: const(0xFD),
MEDIUM_PRECISION: const(0xF6),
LOW_PRECISION: const(0xE0),
}
HEATER200mW = const(0)
HEATER110mW = const(1)
HEATER20mW = const(2)
heater_power_values = (HEATER200mW, HEATER110mW, HEATER20mW)
TEMP_1 = const(0)
TEMP_0_1 = const(1)
heat_time_values = (TEMP_1, TEMP_0_1)
wat_config = {
HEATER200mW: (0x39, 0x32),
HEATER110mW: (0x2F, 0x24),
HEATER20mW: (0x1E, 0x15),
}
class SHT4X:
"""Driver for the SHT4X Sensor connected over I2C.
:param ~machine.I2C i2c: The I2C bus the SHT4X is connected to.
:param int address: The I2C device address. Defaults to :const:`0x44`
:raises RuntimeError: if the sensor is not found
**Quickstart: Importing and using the device**
Here is an example of using the :class:`SHT4X` class.
First you will need to import the libraries to use the sensor
.. code-block:: python
from machine import Pin, I2C
from micropython_sht4x import sht4x
Once this is done you can define your `machine.I2C` object and define your sensor object
.. code-block:: python
i2c = I2C(1, sda=Pin(2), scl=Pin(3))
sht = sht4x.SHT4X(i2c)
Now you have access to the attributes
.. code-block:: python
temp = sht.temperature
hum = sht.relative_humidity
"""
def __init__(self, i2c, address: int = 0x44) -> None:
self._i2c = i2c
self._address = address
self._data = bytearray(6)
self._command = 0xFD
self._temperature_precision = HIGH_PRECISION
self._heater_power = HEATER20mW
self._heat_time = TEMP_0_1
@property
def temperature_precision(self) -> str:
"""
Sensor temperature_precision
+------------------------------------+------------------+
| Mode | Value |
+====================================+==================+
| :py:const:`sht4x.HIGH_PRECISION` | :py:const:`0` |
+------------------------------------+------------------+
| :py:const:`sht4x.MEDIUM_PRECISION` | :py:const:`1` |
+------------------------------------+------------------+
| :py:const:`sht4x.LOW_PRECISION` | :py:const:`2` |
+------------------------------------+------------------+
"""
values = ("HIGH_PRECISION", "MEDIUM_PRECISION", "LOW_PRECISION")
return values[self._temperature_precision]
@temperature_precision.setter
def temperature_precision(self, value: int) -> None:
if value not in temperature_precision_values:
raise ValueError("Value must be a valid temperature_precision setting")
self._temperature_precision = value
self._command = temperature_precision_values[value]
@property
def relative_humidity(self) -> float:
"""
The current relative humidity in % rH
The RH conversion formula (1) allows values to be reported
which are outside the range of 0 %RH … 100 %RH. Relative
humidity values which are smaller than 0 %RH and larger than
100 %RH are non-physical, however these “uncropped” values might
be found beneficial in some cases (e.g. when the distribution of
the sensors at the measurement boundaries are of interest)
"""
return self.measurements[1]
@property
def temperature(self) -> float:
"""The current temperature in Celsius"""
return self.measurements[0]
@property
def measurements(self) -> Tuple[float, float]:
"""both `temperature` and `relative_humidity`, read simultaneously
If you use t the heater function, sensor will be not give a response
back. Waiting time is added to the logic to account for this situation
"""
self._i2c.writeto(self._address, bytes([self._command]), False)
if self._command in (0x39, 0x2F, 0x1E):
time.sleep(1.2)
elif self._command in (0x32, 0x24, 0x15):
time.sleep(0.2)
time.sleep(0.2)
self._i2c.readfrom_into(self._address, self._data)
temperature, temp_crc, humidity, humidity_crc = struct.unpack_from(
">HBHB", self._data
)
if temp_crc != self._crc(
memoryview(self._data[0:2])
) or humidity_crc != self._crc(memoryview(self._data[3:5])):
raise RuntimeError("Invalid CRC calculated")
temperature = -45.0 + 175.0 * temperature / 65535.0
humidity = -6.0 + 125.0 * humidity / 65535.0
humidity = max(min(humidity, 100), 0)
return temperature, humidity
@staticmethod
def _crc(buffer) -> int:
"""verify the crc8 checksum"""
crc = 0xFF
for byte in buffer:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc = crc << 1
return crc & 0xFF
@property
def heater_power(self) -> str:
"""
Sensor heater power
The sensor has a heater. Three heating powers and two heating
durations are selectable.
The sensor executes the following procedure:
1. The heater is enabled, and the timer starts its count-down.
2. Measure is taken after time is up
3. After the measurement is finished the heater is turned off.
4. Temperature and humidity values are now available for readout.
The maximum on-time of the heater commands is one second in order
to prevent overheating
+-------------------------------+---------------+
| Mode | Value |
+===============================+===============+
| :py:const:`sht4x.HEATER200mW` | :py:const:`0` |
+-------------------------------+---------------+
| :py:const:`sht4x.HEATER110mW` | :py:const:`1` |
+-------------------------------+---------------+
| :py:const:`sht4x.HEATER20mW` | :py:const:`2` |
+-------------------------------+---------------+
"""
values = ("HEATER200mW", "HEATER110mW", "HEATER20mW")
return values[self._heater_power]
@heater_power.setter
def heater_power(self, value: int) -> None:
if value not in heater_power_values:
raise ValueError("Value must be a valid heater power setting")
self._heater_power = value
self._command = wat_config[value][self._heat_time]
@property
def heat_time(self) -> str:
"""
Sensor heat_time
The sensor has a heater. Three heating powers and two heating
durations are selectable.
The sensor executes the following procedure:
1. The heater is enabled, and the timer starts its count-down.
2. Measure is taken after time is up
3. After the measurement is finished the heater is turned off.
4. Temperature and humidity values are now available for readout.
The maximum on-time of the heater commands is one second in order
to prevent overheating
+----------------------------+---------------+
| Mode | Value |
+============================+===============+
| :py:const:`sht4x.TEMP_1` | :py:const:`0` |
+----------------------------+---------------+
| :py:const:`sht4x.TEMP_0_1` | :py:const:`1` |
+----------------------------+---------------+
"""
values = ("TEMP_1", "TEMP_0_1")
return values[self._heat_time]
@heat_time.setter
def heat_time(self, value: int) -> None:
if value not in heat_time_values:
raise ValueError("Value must be a valid heat_time setting")
self._heat_time = value
self._command = wat_config[self._heater_power][value]
def reset(self):
"""
Reset the sensor
"""
self._i2c.writeto(self._address, bytes([_RESET]), False)
time.sleep(0.1)