p-lsd:hanazeder:sh-8
Inhaltsverzeichnis
Auslesen einer Hanazeder SH-8 Steuerung und per MQTT veröffentlichen
(Solar Heizungssteuerung SH Hersteller: Hanazeder)
die Hanazeder SH-8 Steuerung besitzt eine serielle Schnittstelle mit der man die Daten auslesen kann.
Danke der Hilfe eines Freundes kann ich inzwischen die Steuerung auslesen und die Werte als JSON per MQTT versenden. Das Projekte war meine erste Berührung zu Python3.
Diese Anleitung ist für den Versierten Bastler gedacht. Ich lese die Serielle Schnittstelle mit einem USB-3.3V Seriell Adapter aus. Es darf nur TX, RX und GND verbunden werden.
Es handelt sich um ein TTL Serielle Schnittstelle mit 3,3V
Angeschlossen wird nur:
- TX
- RX
- GND
- | |HanazederSH8_2_MQTT.py
#!/usr/bin/env python3 # # V0.9 # # #Dieses Script liest die Serielle Schnittstelle einer Hanazeder SH8 Steuerung regelmässig aus #und Sendet ein JSON per MQTT an den Broker #V0.1 - devzero hat den JSON Teil und das Grundgerüst zu verfügung gestellt #V0.2 - gezielte Abfrage der Seriellen Schnittstelle hinzugefügt. #V0.3 - JSON um Solar Leistung erweitert #V0.4 - MQTT impelemntierung (offen) #V0.5 - Optimirungen der Initalisirungen #V0.6 - Individueller Name für Fühler und Ausgänge (devzero) #V0.7 - Optimierung Abfrage der Seriellen Schnittstelle (devzero & lsd) #V0.8 - Abfrage der Seriellen Schnittstelle komplett neu implementiert #V0.9 - Status Infos zum System per MQTT import netifaces as ni import serial import time import json import paho.mqtt.client as mqttclient import re DEBUG=False DISPLAY=True #MQTT Parameter #https://www.emqx.io/blog/how-to-use-mqtt-in-python MQTTBROKER = "iobrokerv2.home.logout.de" MQTTPORT = 1883 MQTTUSER = "iobroker" MQTTPASS = "brokerio" MQTTTOPIC = "Hanazeder/SH8" MQTTID = 'HZ-Solar-1234' MQTTSTATUS = MQTTTOPIC+"/status" MQTTSYSTEM = MQTTTOPIC+"/system" MQTTSTATUSSERIAL = MQTTTOPIC+"/statusseriell" MQTTDATA = MQTTTOPIC+"/json" #Serielle Schnittstell SERPORT="/dev/hzsh8solar" SERBAUD=9600 SERTIME=1 SLEEPSEC=300 SERREAD_BYTE=500 #Es können auch Werte weggelssen werden dann wird A[1-8] oder F[1-14] behalte #z.B. ist F14 nicht angelegt AUTOMATIK_AUSGANG_MAPPING = { "A1": "A1-Solar to Boiler", "A2": "A2-Solar to Puffer", "A3": "A3-freigabe BHKW", "A4": "A4-Puffer to Boiler", "A5": "A5-NN", "A6": "A6-Heiz-Pumpe", "A7": "A7-Mischer-Auf", "A8": "A8-Mischer-Zu" } FUEHLER_MAPPING = { "F1": "F1-Kollektor", "F2": "F2-nn", "F3": "F3-VL_Solar", "F4": "F4-RL_Solar", "F5": "F5-P_unten", "F6": "F6-P_mitte", "F7": "F7-P_oben", "F8": "F8-B_unten", "F9": "F9-B_oben", "F10": "F10-NN", "F11": "F11-NN", "F12": "F12-Heizkreis", "F13": "F13-Aussentemp" } ###################### Parameter ENDE #################### def Host_name_IP_2_MQTT(): try: ret = dict() gws = ni.gateways() net['DGW'] = gws['default'][ni.AF_INET][0] net['IP'] = ni.ifaddresses('eth0')[ni.AF_INET][0]['addr'] net['Netmask'] = ni.ifaddresses('eth0')[ni.AF_INET][0]['netmask'] net['MAC'] = ni.ifaddresses('eth0')[ni.AF_LINK][0]['addr'] except: if DEBUG: print("Unable to get Hostname and IP") return ret def parse(data_str): def _get_dict_from_key_value_string_list(key_value_string_list, name_mapping_dict=None, type_cast_function=None): ret = dict() for key_value_string in key_value_string_list: key, value = key_value_string.split(':') if name_mapping_dict and key in name_mapping_dict: key = name_mapping_dict[key] if type_cast_function is not None: value = type_cast_function(value) ret[key] = value return ret solarleistung = None automatik = dict() ausgang = dict() fuehler = dict() for line in data_str.decode().split('\n\r'): if 'Mom.Solar-Leist' in line: match = re.search(r'\d+\.\d+', line) if match: solarleistung = float(match.group(0)) continue pattern = re.compile(r'A\d:\d') if pattern.search(line): if 'Automatik' in line: automatik = _get_dict_from_key_value_string_list( pattern.findall(line), AUTOMATIK_AUSGANG_MAPPING, lambda x: bool(int(x))) elif 'Ausgang' in line: ausgang = _get_dict_from_key_value_string_list( pattern.findall(line), AUTOMATIK_AUSGANG_MAPPING, lambda x: bool(int(x))) continue pattern = re.compile(r'F\d+:\d+') if pattern.search(line): fuehler.update( _get_dict_from_key_value_string_list(pattern.findall(line), FUEHLER_MAPPING, lambda x: int(x))) continue ret = dict() ret['Solarleistung'] = solarleistung ret['Automatik'] = automatik ret['Ausgang'] = ausgang ret['Fuehler'] = fuehler return ret def mqtt_connect(): #MQTT Client initalisiren def MQTT_ONLINE(MQTTCLIENT, userdata, flags, rc): MQTTCLIENT.publish(MQTTSTATUS,"online",0,retain=False) if rc == 0: print("Connected to MQTT Broker! Return Code: "+str(rc)) else: print("Failed to connect, Return Code: "+str(rc)) MQTTCLIENT = mqttclient.Client(MQTTID) MQTTCLIENT.username_pw_set(MQTTUSER, MQTTPASS) MQTTCLIENT.on_connect = MQTT_ONLINE MQTTCLIENT.will_set(MQTTSTATUS,"offline",0,retain=True) MQTTCLIENT.connect(MQTTBROKER, MQTTPORT) return MQTTCLIENT def warten_auf_bytes(erwartetes_byte, serial_port): antwort=False while True: read_serial = serial_port.read(1) if read_serial == erwartetes_byte: antwort=True break if not read_serial: break return antwort if __name__ == '__main__': #MQTT Client Verbinden MQTTCLIENT = mqtt_connect() MQTTCLIENT.loop_start() JSON = json.dumps(Host_name_IP_2_MQTT(), indent=4) MQTTCLIENT.publish(MQTTSYSTEM,JSON,0,retain=True) #Initalisirung der Seriellen Schnittstell ser = serial.Serial( SERPORT, baudrate=SERBAUD, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=SERTIME, xonxoff=False, rtscts=False, write_timeout=None, dsrdtr=False, inter_byte_timeout=None, exclusive=None ) if ser.is_open: if DEBUG: print("Ser. Interface ist ONLINE") MQTTCLIENT.publish(MQTTSTATUSSERIAL,"CONNECT",0,retain=False) #time.sleep(0.2) while True: #ser.flushInput() ser.reset_input_buffer() ser.write(b'\x55') if DEBUG: print('DEBUG_Main: 1-2 Befehlsmodus aktivieren') if warten_auf_bytes(b'\x55', ser): if DEBUG: print('DEBUG_Main: 2-2 Befehlsmodus aktiviert') if DEBUG: print('DEBUG_Main: Sende DWORT 0x0000') ser.write(b'\x00\x00\x00\x00') if DEBUG: print('DEBUG_Main: Sende anzahl (1) der zu Empfangenen Byts 0x01') ser.write(b'\x01') if warten_auf_bytes(b'\x02', ser): if DEBUG: print('DEBUG_Main: Nutzdaten könne empfangen werden') reply = ser.read(size=SERREAD_BYTE) if DEBUG: print ("Daten die Empfangen wurden ", reply) if DEBUG: print ("2 DEBUG-Rufe >> JSON << auf") if DEBUG: print(json.dumps(parse(reply), indent=4)) JSON = json.dumps(parse(reply), indent=4) if (DISPLAY): print ("____________ JSON Ergebniss _________________") print (JSON) print ("___________",(time.ctime()),"_________________\n") MQTTCLIENT.publish(MQTTDATA,JSON,0,retain=True) else: print('doof, timeout') else: print('doof, timeout') if DEBUG: print ("Sleep: ",SLEEPSEC) time.sleep(SLEEPSEC) if DEBUG: print ("Sleep: Ende") else: if DEBUG: print("Ser. Interface ist OFFLINE") MQTTCLIENT.will_set(MQTTSTATUSSERIAL,"DISCONNECT",0,retain=True) ser.close()
Unterlagen die mir vom Hersteller zu Verfügung gestellt wurden
Display flackert und/oder die Steuerung friert ein
p-lsd/hanazeder/sh-8.txt · Zuletzt geändert: 2021/06/22 13:41 von lsd