From bae126dfad313a0536849329eecb7b0cd39c478e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81d=C3=A1m=20Kov=C3=A1cs?= Date: Fri, 10 Nov 2023 15:09:26 +0100 Subject: [PATCH] Base project --- main.py | 111 ++++++++++ micropython_mcp9808/i2c_helpers.py | 135 ++++++++++++ micropython_mcp9808/mcp9808.py | 321 +++++++++++++++++++++++++++++ 3 files changed, 567 insertions(+) create mode 100644 main.py create mode 100644 micropython_mcp9808/i2c_helpers.py create mode 100644 micropython_mcp9808/mcp9808.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..278167e --- /dev/null +++ b/main.py @@ -0,0 +1,111 @@ +import time +import network +import socket +import ubinascii +from machine import Pin, I2C + +import micropython_mcp9808.mcp9808 as mcp9808 +from secrets import secrets + +GET_PATH_START = 4 +POST_PATH_START = 5 + +led = Pin('LED', Pin.OUT) +def blink_onboard_led(num_blinks): + for i in range(num_blinks): + led.value(1) + time.sleep_ms(200) + led.value(0) + time.sleep_ms(200) +blink_onboard_led(1) + +sdaPIN=Pin(20) +sclPIN=Pin(21) +i2c=I2C(0,sda=sdaPIN, scl=sclPIN, freq=200000) +devices = i2c.scan() +if len(devices) != 0: + print('Number of I2C devices found=',len(devices)) + for device in devices: + print("Device Hexadecimel Address=",hex(device)) +else: + print("No device found") + +mcp9808 = mcp9808.MCP9808(i2c) + + +ssid = secrets['ssid'] +password = secrets['pw'] + +wlan = network.WLAN(network.STA_IF) +wlan.active(True) +wlan.connect(ssid, password) + +max_wait = 10 +while max_wait > 0: + if wlan.status() < 0 or wlan.status() >= 3: + break + max_wait -= 1 + print('waiting for connection...') + time.sleep(1) + +ip = "???" +if wlan.status() != 3: + blink_onboard_led(3) + raise RuntimeError('network connection failed') +else: + print('connected') + status = wlan.ifconfig() + ip = status[0] + print( 'ip = ' + ip ) + +wlan_mac = wlan.config('mac') +mac_readable = ubinascii.hexlify(wlan_mac).decode() + +blink_onboard_led(2) + +addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] + +s = socket.socket() +s.bind(addr) +s.listen(1) + +print('listening on', addr) + +def result_ok(cl, response = None, content_type = "text/plain"): + cl.send('HTTP/1.0 200 OK\r\nContent-type: ' + content_type + '\r\n\r\n') + cl.send(response if response is not None else "Ok") + cl.close() + +def result_notfound(cl, response = None): + cl.send('HTTP/1.0 404 NotFound\r\nContent-type: text/plain\r\n\r\n') + cl.send(response if response is not None else "Not Found") + cl.close() + +while True: + + cl, addr = s.accept() + try: + print('client connected from', addr) + request = cl.recv(1024) + + request = str(request) + request = request[2:-1] # remove b' and ' from string + print(request) + + if request.find('/ ') == GET_PATH_START: + temp = str(mcp9808.temperature) + result_ok(cl, "{ \"temperature\": \"" + temp + "\" }", 'application/json') + + elif request.find('/prometheus') == GET_PATH_START: + temp = str(mcp9808.temperature) + content = """# HELP temperature Temperature in Celsius +# TYPE temperature gauge +temperature{mac=\"""" + mac_readable + "\",ip=\""+ ip +"\"} " + temp + result_ok(cl, content) + else: + result_notfound(cl) + + + except OSError as e: + cl.close() + print('connection closed') \ No newline at end of file diff --git a/micropython_mcp9808/i2c_helpers.py b/micropython_mcp9808/i2c_helpers.py new file mode 100644 index 0000000..98f0255 --- /dev/null +++ b/micropython_mcp9808/i2c_helpers.py @@ -0,0 +1,135 @@ +# https://github.com/jposada202020/MicroPython_MCP9808/blob/master/micropython_mcp9808/i2c_helpers.py + +# SPDX-FileCopyrightText: Copyright (c) 2023 Jose D. Montoya +# +# SPDX-License-Identifier: MIT +""" +`i2c_helpers` +================================================================================ + +I2C Communications helpers + + +* Author(s): Jose D. Montoya + +Based on + +* adafruit_register.i2c_struct. Author(s): Scott Shawcroft +* adafruit_register.i2c_bits. Author(s): Scott Shawcroft + +MIT License + +Copyright (c) 2016 Adafruit Industries + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +""" +# pylint: disable=too-many-arguments +import struct + + +class CBits: + """ + Changes bits from a byte register + """ + + def __init__( + self, + num_bits: int, + register_address: int, + start_bit: int, + register_width=1, + lsb_first=True, + ) -> None: + self.bit_mask = ((1 << num_bits) - 1) << start_bit + self.register = register_address + self.star_bit = start_bit + self.lenght = register_width + self.lsb_first = lsb_first + + def __get__( + self, + obj, + objtype=None, + ) -> int: + mem_value = obj._i2c.readfrom_mem(obj._address, self.register, self.lenght) + + reg = 0 + order = range(len(mem_value) - 1, -1, -1) + if not self.lsb_first: + order = reversed(order) + for i in order: + reg = (reg << 8) | mem_value[i] + + reg = (reg & self.bit_mask) >> self.star_bit + + return reg + + def __set__(self, obj, value: int) -> None: + memory_value = obj._i2c.readfrom_mem(obj._address, self.register, self.lenght) + + reg = 0 + order = range(len(memory_value) - 1, -1, -1) + if not self.lsb_first: + order = range(0, len(memory_value)) + for i in order: + reg = (reg << 8) | memory_value[i] + reg &= ~self.bit_mask + + value <<= self.star_bit + reg |= value + reg = reg.to_bytes(self.lenght, "big") + + obj._i2c.writeto_mem(obj._address, self.register, reg) + + +class RegisterStruct: + """ + Register Struct + """ + + def __init__(self, register_address: int, form: str) -> None: + self.format = form + self.register = register_address + self.lenght = struct.calcsize(form) + + def __get__( + self, + obj, + objtype=None, + ): + if self.lenght <= 2: + value = struct.unpack( + self.format, + memoryview( + obj._i2c.readfrom_mem(obj._address, self.register, self.lenght) + ), + )[0] + else: + value = struct.unpack( + self.format, + memoryview( + obj._i2c.readfrom_mem(obj._address, self.register, self.lenght) + ), + ) + return value + + def __set__(self, obj, value): + mem_value = value.to_bytes(self.lenght, "big") + obj._i2c.writeto_mem(obj._address, self.register, mem_value) diff --git a/micropython_mcp9808/mcp9808.py b/micropython_mcp9808/mcp9808.py new file mode 100644 index 0000000..87b8257 --- /dev/null +++ b/micropython_mcp9808/mcp9808.py @@ -0,0 +1,321 @@ +# https://github.com/jposada202020/MicroPython_MCP9808/blob/master/micropython_mcp9808/mcp9808.py + +# SPDX-FileCopyrightText: 2017 Scott Shawcroft for Adafruit Industries +# SPDX-FileCopyrightText: 2021 Jose David Montoya +# SPDX-FileCopyrightText: Copyright (c) 2023 Jose D. Montoya +# +# SPDX-License-Identifier: MIT +""" +`mcp9808` +================================================================================ + +MicroPython Driver for the Microchip MCP9808 Temperature Sensor + + +* Author(s): Jose D. Montoya + + +""" + +from collections import namedtuple +from micropython import const +from micropython_mcp9808.i2c_helpers import CBits, RegisterStruct + + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/jposada202020/MicroPython_MCP9808.git" + + +_CONFIG = const(0x01) +_UPPER_TEMP = const(0x02) +_LOWER_TEMP = const(0x03) +_CRITICAL_TEMP = const(0x04) +_TEMP = const(0x05) +_REG_WHOAMI = const(0x07) +_RESOLUTION = const(0x08) + +HYSTERESIS_0 = const(0b00) +HYSTERESIS_1_5 = const(0b01) +HYSTERESIS_3 = const(0b10) +HYSTERESIS_6 = const(0b11) +hysteresis_values = (HYSTERESIS_0, HYSTERESIS_1_5, HYSTERESIS_3, HYSTERESIS_6) + +CONTINUOUS = const(0b0) +SHUTDOWN = const(0b1) +power_mode_values = (CONTINUOUS, SHUTDOWN) + +RESOLUTION_0_5_C = const(0b00) +RESOLUTION_0_625_C = const(0b01) +RESOLUTION_0_125_C = const(0b10) +RESOLUTION_0_0625_C = const(0b11) +temperature_resolution_values = ( + RESOLUTION_0_5_C, + RESOLUTION_0_625_C, + RESOLUTION_0_125_C, + RESOLUTION_0_0625_C, +) + +AlertStatus = namedtuple("AlertStatus", ["high_alert", "low_alert", "critical_alert"]) + + +class MCP9808: + """Driver for the MCP9808 Sensor connected over I2C. + + :param ~machine.I2C i2c: The I2C bus the MCP9808 is connected to. + :param int address: The I2C device address. Defaults to :const:`0x18` + + :raises RuntimeError: if the sensor is not found + + **Quickstart: Importing and using the device** + + Here is an example of using the :class:`MCP9808` class. + First you will need to import the libraries to use the sensor + + .. code-block:: python + + from machine import Pin, I2C + from micropython_mcp9808 import mcp9808 + + Once this is done you can define your `machine.I2C` object and define your sensor object + + .. code-block:: python + + i2c = I2C(1, sda=Pin(2), scl=Pin(3)) + mcp = mcp9808.MCP9808(i2c) + + Now you have access to the attributes + + .. code-block:: python + + temp = mcp.temperature + + """ + + _device_id = RegisterStruct(_REG_WHOAMI, "B") + _config = RegisterStruct(_CONFIG, "H") + + _hysteresis = CBits(2, _CONFIG, 9, 2, False) + _power_mode = CBits(1, _CONFIG, 8, 2, False) + + _temperature_data = CBits(13, _TEMP, 0, 2, False) + _temperature_resolution = CBits(2, _RESOLUTION, 0) + + critical_alert = CBits(1, _TEMP, 7, register_width=1) + high_alert = CBits(1, _TEMP, 6, register_width=1) + low_alert = CBits(1, _TEMP, 5, register_width=1) + + def __init__(self, i2c, address: int = 0x18) -> None: + self._i2c = i2c + self._address = address + + if self._device_id != 0x04: + raise RuntimeError("Failed to find MCP9808") + + @property + def hysteresis(self) -> str: + """ + Sensor hysteresis + + +------------------------------------+------------------+ + | Mode | Value | + +====================================+==================+ + | :py:const:`mcp9808.HYSTERESIS_0` | :py:const:`0b00` | + +------------------------------------+------------------+ + | :py:const:`mcp9808.HYSTERESIS_1_5` | :py:const:`0b01` | + +------------------------------------+------------------+ + | :py:const:`mcp9808.HYSTERESIS_3` | :py:const:`0b10` | + +------------------------------------+------------------+ + | :py:const:`mcp9808.HYSTERESIS_6` | :py:const:`0b11` | + +------------------------------------+------------------+ + """ + values = ("HYSTERESIS_0", "HYSTERESIS_1_5", "HYSTERESIS_3", "HYSTERESIS_6") + return values[self._hysteresis] + + @hysteresis.setter + def hysteresis(self, value: int) -> None: + if value not in hysteresis_values: + raise ValueError("Value must be a valid hysteresis setting") + self._hysteresis = value + + @property + def power_mode(self) -> str: + """ + Sensor power_mode + In shutdown, all power-consuming activities are disabled, though + all registers can be written to or read. This bit cannot be set + to ‘1’ when either of the Lock bits is set (bit 6 and bit 7). + However, it can be cleared to ‘0’ for continuous conversion while + locked + + +--------------------------------+------------------+ + | Mode | Value | + +================================+==================+ + | :py:const:`mcp9808.CONTINUOUS` | :py:const:`0b00` | + +--------------------------------+------------------+ + | :py:const:`mcp9808.SHUTDOWN` | :py:const:`0b1` | + +--------------------------------+------------------+ + """ + values = ("CONTINUOUS", "SHUTDOWN") + return values[self._power_mode] + + @power_mode.setter + def power_mode(self, value: int) -> None: + if value not in power_mode_values: + raise ValueError("Value must be a valid power_mode setting") + self._power_mode = value + + @property + def temperature(self): + """ + Temperature in Celsius + """ + data = bytearray(2) + self._i2c.readfrom_mem_into(self._address, _TEMP, data) + + return self._convert_temperature(data) + + @staticmethod + def _convert_temperature(temp: bytearray) -> float: + temp[0] = temp[0] & 0x1F + if temp[0] & 0x10 == 0x10: + temp[0] = temp[0] & 0x0F + return (temp[0] * 16 + temp[1] / 16.0) - 256 + return temp[0] * 16 + temp[1] / 16.0 + + @property + def temperature_upper(self) -> float: + """ + Upper temperature in Celsius + """ + return self._get_temperature(_UPPER_TEMP) + + @temperature_upper.setter + def temperature_upper(self, value: int) -> None: + if not isinstance(value, int): + raise ValueError("Temperature must be an int value") + self._limit_temperatures(value, _UPPER_TEMP) + + def _get_temperature(self, register_address): + data = bytearray(2) + self._i2c.readfrom_mem_into(self._address, register_address, data) + + return self._convert_temperature(data) + + def _limit_temperatures(self, temp: int, register_address): + """Internal function to setup limit temperature + :param int temp: temperature limit + """ + + if temp < 0: + negative = True + temp = abs(temp) + else: + negative = False + + high_byte = temp >> 4 + if negative: + high_byte = high_byte | 0x10 + + low_byte = (temp & 0x0F) << 4 + + self._i2c.writeto_mem( + self._address, register_address, bytes([high_byte, low_byte]) + ) + + @property + def temperature_lower(self) -> float: + """ + Lower temperature in Celsius + """ + return self._get_temperature(_LOWER_TEMP) + + @temperature_lower.setter + def temperature_lower(self, value: int) -> None: + if not isinstance(value, int): + raise ValueError("Temperature must be an int value") + self._limit_temperatures(value, _LOWER_TEMP) + + @property + def temperature_critical(self) -> float: + """ + Critical temperature in Celsius + """ + return self._get_temperature(_CRITICAL_TEMP) + + @temperature_critical.setter + def temperature_critical(self, value: int) -> None: + if not isinstance(value, int): + raise ValueError("Temperature must be an int value") + self._limit_temperatures(value, _CRITICAL_TEMP) + + @property + def alert_status(self): + """The current triggered status of the high and low temperature alerts as a AlertStatus + named tuple with attributes for the triggered status of each alert. + + .. code-block :: python + + import time + from machine import Pin, I2C + from micropython_mcp9808 import mcp9808 + + i2c = I2C(1, sda=Pin(2), scl=Pin(3)) # Correct I2C pins for RP2040 + mcp = mcp9808.MCP9808(i2c) + + mcp.temperature_lower = 20 + mcp.temperature_upper = 23 + mcp.temperature_critical = 30 + + print("High limit: {}".format(mcp.temperature_upper)) + print("Low limit: {}".format(mcp.temperature_lower)) + print("Critical limit: {}".format(mcp.temperature_critical)) + + while True: + print("Temperature: {:.2f}C".format(mcp.temperature)) + alert_status = tmp.alert_status + if alert_status.high_alert: + print("Temperature above high set limit!") + if alert_status.low_alert: + print("Temperature below low set limit!") + if alert_status.critical_alert: + print("Temperature above critical set limit!") + time.sleep(1) + + """ + + return AlertStatus( + high_alert=self.high_alert, + low_alert=self.low_alert, + critical_alert=self.critical_alert, + ) + + @property + def temperature_resolution(self) -> str: + """ + Sensor temperature_resolution + + +------------------------------------------+---------------------------+ + | Mode | Value | + +==========================================+===========================+ + | :py:const:`mcp9808.RESOLUTION_0_5_C` | :py:const:`0b00` 0.5°C | + +------------------------------------------+---------------------------+ + | :py:const:`mcp9808.RESOLUTION_0_625_C` | :py:const:`0b01` 0.25°C | + +------------------------------------------+---------------------------+ + | :py:const:`mcp9808.RESOLUTION_0_125_C` | :py:const:`0b10` 0.125°C | + +------------------------------------------+---------------------------+ + | :py:const:`mcp9808.RESOLUTION_0_0625_C` | :py:const:`0b11` 0.0625°C | + +------------------------------------------+---------------------------+ + """ + values = ( + "RESOLUTION_0_5_C", + "RESOLUTION_0_625_C", + "RESOLUTION_0_125_C", + "RESOLUTION_0_0625_C", + ) + return values[self._temperature_resolution] + + @temperature_resolution.setter + def temperature_resolution(self, value: int) -> None: + if value not in temperature_resolution_values: + raise ValueError("Value must be a valid temperature_resolution setting") + self._temperature_resolution = value