julian пре 1 година
комит
f15a10cad0
2 измењених фајлова са 434 додато и 0 уклоњено
  1. +22
    -0
      README.md
  2. +412
    -0
      src/program.py

+ 22
- 0
README.md Прегледај датотеку

@@ -0,0 +1,22 @@
# Pool Tank Monitoring

## Dependencies

- Python 3
- Python packages
- gpiozero
- adafruit_hcsr04
- waveshare_epd
- Font: squarea.ttf

## Configuration

- config.ini
- password.txt

## To Do

- glätten/filter
- alarmhöhe in cm
- display aktualisierungszeit
- letzte stufe X wenn mail gesendet

+ 412
- 0
src/program.py Прегледај датотеку

@@ -0,0 +1,412 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-

from gpiozero import Button
from signal import pause
import time
from enum import Enum
import sys
import board
import adafruit_hcsr04
import statistics
import configparser
import datetime
import os
from pathlib import Path
import shutil
from colorama import init as colorama_init
from colorama import Fore
from colorama import Style
from subprocess import check_call
from signal import pause
import smtplib, ssl

picdir = os.path.realpath("/home/firestone/pi-zero-tank/pic")
libdir = os.path.realpath("/home/firestone/pi-zero-tank/lib")
if os.path.exists(libdir):
sys.path.append(libdir)

print(f"Pfad zu Grafik: {picdir}")
print(f"Pfad zu e-Paper-Bibliothek: {libdir}")

import logging
from waveshare_epd import epd2in13_V4
from PIL import Image,ImageDraw,ImageFont
import traceback

logging.basicConfig(level=logging.DEBUG)

NUM_SAMPLES = 100
FILE = Path("config.ini")
ALARM_LEVEL_CM = 5.0

smtp_port = 587
smtp_server = "mail.gmx.net"
sender_email = "fuellstand@gmx.net"
receiver_email = "fire-stone@t-online.de"

class States(Enum):
MAIN_MENU = 1
MEASURE_CL_FULL = 2
MEASURE_CL_EMPTY = 3
MEASURE_PH_FULL = 4
MEASURE_PH_EMPTY = 5
SHUTDOWN = 6

class Buttons:
def __init__(self, b_cl, b_ph):
self.time_button_both = None
self.time_button_cl = None
self.time_button_ph = None
self.button_cl = b_cl
self.button_ph = b_ph
self.state = States.MAIN_MENU
self.activate_events()

def check_pressed(self):
if self.button_cl.is_pressed and self.button_ph.is_pressed:
if self.time_button_both is None:
self.time_button_cl = None
self.time_button_ph = None
self.time_button_both = time.time()
elif self.button_cl.is_pressed:
if self.time_button_cl is None:
self.time_button_cl = time.time()
elif self.button_ph.is_pressed:
if self.time_button_ph is None:
self.time_button_ph = time.time()
else:
self.time_button_cl = None
self.time_button_ph = None
self.time_button_both = None

def check_released(self):
now = time.time()
if self.time_button_cl:
if now - self.time_button_cl >= 5:
self.state = States.MEASURE_CL_EMPTY
self.time_button_cl = None
elif now - self.time_button_cl >= 2:
self.state = States.MEASURE_CL_FULL
self.time_button_cl = None
elif not self.button_cl.is_pressed:
self.time_button_cl = None
elif self.time_button_ph:
if now - self.time_button_ph >= 5:
self.state = States.MEASURE_PH_EMPTY
self.time_button_ph = None
elif now - self.time_button_ph >= 2:
self.state = States.MEASURE_PH_FULL
self.time_button_ph = None
elif not self.button_ph.is_pressed:
self.time_button_ph = None
elif self.time_button_both:
if now - self.time_button_both >= 5:
self.state = States.SHUTDOWN
self.time_button_both = None
elif not self.button_cl.is_pressed or not self.button_ph.is_pressed:
self.time_button_both = None

def get_state(self):
return self.state

def set_state(self, state):
self.state = state

def activate_events(self):
self.button_cl.when_pressed = self.check_pressed
self.button_cl.when_released = self.check_released
self.button_ph.when_pressed = self.check_pressed
self.button_ph.when_released = self.check_released

def deactivate_events(self):
self.button_cl.when_pressed = self.none_event
self.button_cl.when_released = self.none_event
self.button_ph.when_pressed = self.none_event
self.button_ph.when_released = self.none_event

def none_event(self):
pass

def shutdown(epd):
print("System wird heruntergefahren:")
print("Erstelle Backup...")
target_directory = FILE.parent / 'backup'
if not target_directory.is_dir():
print(f'Creating directory {target_directory}')
os.mkdir(target_directory)
modified_time = os.path.getmtime(FILE)
timestamp = datetime.datetime.fromtimestamp(modified_time).strftime("%Y-%m-%d_%H-%M-%S")
target_file = target_directory / f'{FILE.stem}_{timestamp}{FILE.suffix}'
print(target_file)
shutil.copyfile(FILE, target_file)
print("Backup erstellt.")
print("Display ausschalten...")
epd.init()
epd.Clear()
time.sleep(2)
epd.sleep()
epd2in13_V4.epdconfig.module_exit(cleanup=True)
print("Linux shutdown...")
#check_call(['sudo', 'poweroff'])
#TODO
sys.exit(1)
pause()

def measure(sensor):
data = []
fails = 0
for _ in range(NUM_SAMPLES):
try:
data.append(sensor.distance)
except RuntimeError:
fails += 1
time.sleep(0.01)
#print(len(data), fails)
if fails < (NUM_SAMPLES / 2):
mid_count = NUM_SAMPLES // 2
start_index = (NUM_SAMPLES - mid_count) // 2
end_index = start_index + mid_count
try:
val = statistics.fmean(sorted(data)[start_index:end_index])
except:
val = 0
else:
val = 0
#val = random.uniform(5.0, 40.0)
return val

def store_value(config, val, meas, lvl, date):
now = str(datetime.datetime.now())
print("Speichere:")
print("Wert: {0} in {1}.{2}.".format(val, meas, lvl))
print("Datum: {0} in {1}.{2}.".format(now, meas, date))
config.set(meas, lvl, "{:.3f}".format(val))
config.set(meas, date, now)
with open(FILE, 'w') as configfile:
config.write(configfile)
print("Wert gespeichert!")

def get_percent(leer, voll, val):
if val < voll:
val = voll
if val > leer:
val = leer
return abs(leer - val) / abs(leer - voll) * 100

def send_email(meas, val):
with open("password.txt", "r") as f:
smtp_password = f.readline()
message = """\
FROM: {1}
TO: {2}
Subject: {0}-Tank fast leer!

Hallo Andreas,

Die Fuellhoehe des {0}-Tanks ist zu niedrig. Bitte pruefen!

Die aktuelle Hoehe betraegt {3:.0f} cm.

Gruesse vom Tank-Computer""".format(meas, sender_email, receiver_email, val)
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, smtp_port) as server:
#TODO
#server.set_debuglevel(1)
#server.ehlo()
server.starttls(context=context)
#server.login(sender_email, smtp_password.strip())
#server.sendmail(sender_email, receiver_email, message)

def display_site_measure(epd, titel, titel_pos, val):
dp = Image.new('1', (epd.height, epd.width), 255)
draw = ImageDraw.Draw(dp)
draw.text(titel_pos, titel, font = font_label, fill = 0)
draw.line((0, 26, 250, 26), fill = 0, width = 0)
draw.text((20, 44), f"Gemessene Höhe = {val:.2f} cm", font = font_label, fill = 0)
draw.text((77, 79), "Wert korrekt?", font = font_label, fill = 0)
# Button-Beschriftung:
draw.line((0, 105, 250, 105), fill = 0, width = 0)
draw.text((25, 110), "Anwenden", font = font_label, fill = 0)
draw.text((156, 110), "Abbrechen", font = font_label, fill = 0)
epd.display(epd.getbuffer(dp))

def display_site_main(epd, cl, ph):
dp = Image.new('1', (epd.height, epd.width), 255)
newimage = Image.open(os.path.join(picdir, 'tankgrafik.bmp'))
dp.paste(newimage, (0,0))
draw = ImageDraw.Draw(dp)
draw.text((19, 31), f"Chlor: {cl:.0f} %", font = font_label, fill = 0)
draw.text((149, 31), f"pH-: {ph:.0f} %", font = font_label, fill = 0)
if cl < 80:
draw.rectangle((10, 54, 114, 61), outline = 1, fill = 1)
if cl < 60:
draw.rectangle((10, 66, 114, 73), outline = 1, fill = 1)
if cl < 40:
draw.rectangle((10, 78, 114, 85), outline = 1, fill = 1)
if cl < 20:
draw.rectangle((10, 90, 114, 97), outline = 1, fill = 1)
if cl < ALARM_LEVEL_CM: #todo and email
draw.rectangle((10, 54, 114, 109), outline = 1, fill = 1)
draw.text((37, 53), "X", font = font_huge, fill = 0)
if ph < 80:
draw.rectangle((135, 54, 239, 61), outline = 1, fill = 1)
if ph < 60:
draw.rectangle((135, 66, 239, 73), outline = 1, fill = 1)
if ph < 40:
draw.rectangle((135, 78, 239, 85), outline = 1, fill = 1)
if ph < 20:
draw.rectangle((135, 90, 239, 97), outline = 1, fill = 1)
if ph < ALARM_LEVEL_CM: #todo and email
draw.rectangle((135, 54, 239, 109), outline = 1, fill = 1)
draw.text((164, 53), "X", font = font_huge, fill = 0)
epd.display(epd.getbuffer(dp))

if __name__ == '__main__':
colorama_init()

print("Initialisiere Display")
epd = epd2in13_V4.EPD()
logging.info("init and Clear")
epd.init()
epd.Clear()
time.sleep(2)
font_label = ImageFont.truetype(os.path.join(picdir, 'squarea.ttf'), 15)
font_huge = ImageFont.truetype(os.path.join(picdir, 'squarea.ttf'), 74)

display_site_main(epd, 0, 0)

print("Lese config.ini...")
config = configparser.ConfigParser()
config.read(FILE)
cl_leer = config.getfloat('Chlor', 'leer')
cl_voll = config.getfloat('Chlor', 'voll')
ph_leer = config.getfloat('pH-', 'leer')
ph_voll = config.getfloat('pH-', 'voll')
print("Werte aus config.ini:")
print("Cl leer {:.2f}".format(cl_leer))
print("Cl voll {:.2f}".format(cl_voll))
print("pH- leer {:.2f}".format(ph_leer))
print("pH- voll {:.2f}".format(ph_voll))

print("Setze E-Mail-Merker zurück...")
cl_mail = 0
ph_mail = 0
config.set('Chlor', 'mail', "0")
config.set('Chlor', 'datum_mail', "")
config.set('pH-', 'mail', "0")
config.set('pH-', 'datum_mail', "")
with open(FILE, 'w') as configfile:
config.write(configfile)

print("Hardware initialisieren...")
button_cl = Button(pin=27)
button_ph = Button(pin=22)
sensor_cl = adafruit_hcsr04.HCSR04(trigger_pin=board.D5, echo_pin=board.D6)
sensor_ph = adafruit_hcsr04.HCSR04(trigger_pin=board.D13, echo_pin=board.D12, timeout=0)
buttons = Buttons(button_cl, button_ph)

print("Starte Hauptprogramm...")
while True:
try:
cl = measure(sensor_cl)
ph = measure(sensor_ph)
clp = get_percent(cl_leer, cl_voll, cl)
php = get_percent(ph_leer, ph_voll, ph)
display_site_main(epd, clp, php)
print("Chlor: {:10.2f} cm, pH-Minus: {:10.2f} cm".format(cl, ph))
print("Chlor: {:10.2f} %, pH-Minus: {:10.2f} %".format(clp, php))
if abs(cl_leer - ALARM_LEVEL_CM) < cl:
print(f"{Fore.RED}Chlor-Tank fast leer!{Style.RESET_ALL}")
if cl_mail == 0:
print("Sende E-Mail...")
send_email("Chlor", abs(cl_leer - cl))
cl_mail = 1
config.set('Chlor', 'mail', str(cl_mail))
config.set('Chlor', 'datum_mail', str(datetime.datetime.now()))
with open(FILE, 'w') as configfile:
config.write(configfile)
if abs(ph_leer - ALARM_LEVEL_CM) < ph:
print(f"{Fore.RED}pH-Minus-Tank fast leer!{Style.RESET_ALL}")
if ph_mail == 0:
print("Sende E-Mail...")
send_email("pH-Minus", abs(ph_leer - ph))
ph_mail = 1
config.set('pH-', 'mail', str(ph_mail))
config.set('pH-', 'datum_mail', str(datetime.datetime.now()))
with open(FILE, 'w') as configfile:
config.write(configfile)
state = buttons.get_state()
if state == States.MEASURE_CL_FULL:
buttons.deactivate_events()
print("Messung voller Chlor-Tank:")
val = measure(sensor_cl)
display_site_measure(epd, "Chlor Voll-Messung", (51, 6), val)
print("Höhe = {:.3f} cm".format(val))
print("Drücke <Cl> zum Speichern oder <pH> für Abbruch!")
while True:
if button_cl.is_pressed:
store_value(config, val, "Chlor", "voll", "datum_voll")
cl_voll = val
break;
elif button_ph.is_pressed:
print("Abbruch!")
break
buttons.set_state(States.MAIN_MENU)
buttons.activate_events()
elif state == States.MEASURE_CL_EMPTY:
buttons.deactivate_events()
print("Messung leerer Chlor-Tank:")
val = measure(sensor_cl)
display_site_measure(epd, "Chlor Leer-Messung", (48, 6), val)
print("Höhe = {:.3f} cm".format(val))
print("Drücke <Cl> zum Speichern oder <pH> für Abbruch!")
while True:
if button_cl.is_pressed:
store_value(config, val, "Chlor", "leer", "datum_leer")
cl_leer = val
break;
elif button_ph.is_pressed:
print("Abbruch!")
break
buttons.set_state(States.MAIN_MENU)
buttons.activate_events()
elif state == States.MEASURE_PH_FULL:
buttons.deactivate_events()
print("Messung voller pH-Minus-Tank:")
val = measure(sensor_ph)
display_site_measure(epd, "pH- Voll-Messung", (57, 6), val)
print("Höhe = {:.3f} cm".format(val))
print("Drücke <Cl> zum Speichern oder <pH> für Abbruch!")
while True:
if button_cl.is_pressed:
store_value(config, val, "pH-", "voll", "datum_voll")
ph_voll = val
break;
elif button_ph.is_pressed:
print("Abbruch!")
break
buttons.set_state(States.MAIN_MENU)
buttons.activate_events()
elif state == States.MEASURE_PH_EMPTY:
buttons.deactivate_events()
print("Messung leerer pH-Minus-Tank:")
val = measure(sensor_ph)
display_site_measure(epd, "pH- Leer-Messung", (54, 6), val)
print("Höhe = {:.3f} cm".format(val))
print("Drücke <Cl> zum Speichern oder <pH> für Abbruch!")
while True:
if button_cl.is_pressed:
store_value(config, val, "pH-", "leer", "datum_leer")
ph_leer = val
break;
elif button_ph.is_pressed:
print("Abbruch!")
break
buttons.set_state(States.MAIN_MENU)
buttons.activate_events()
elif state == States.SHUTDOWN:
shutdown(epd)
time.sleep(0.5)
except KeyboardInterrupt:
shutdown(epd)

Loading…
Откажи
Сачувај