33import logging
44import ssl
55import paho .mqtt .client as mqtt
6- from typing import Dict , Optional
76
87# Get client ID with optional postfix for multiple instances
98client_id_postfix = os .environ .get ('IDENT_POSTFIX' , '' )
109client = mqtt .Client (
1110 client_id = f"smahub{ client_id_postfix } " , transport = 'tcp' , protocol = mqtt .MQTTv311 , clean_session = False )
12- pubunits = False
1311
1412
1513def env_vars (config ):
@@ -50,6 +48,19 @@ def execute(config, get_items, register_callback, do_stop):
5048
5149 logging .info ("Starting MQTT sink" )
5250
51+ # Define publish and callback functions here to capture pubunits in closure
52+ def publish (topic , value ):
53+ # only publish units if they are there and we really want to
54+ publish_value = value
55+ if isinstance (value , tuple ):
56+ if not pubunits :
57+ publish_value = value [0 ]
58+ client .publish (topic , str (publish_value ))
59+
60+ def my_callback (key , value ):
61+ topic = str (key ).replace ("." , "/" )
62+ publish (topic , value )
63+
5364 # No need for global - we're only using the module-level client, not rebinding it
5465 if config ['server' ]['username' ]:
5566 client .username_pw_set (
@@ -103,7 +114,7 @@ def execute(config, get_items, register_callback, do_stop):
103114 logging .fatal (
104115 f"MQTT broker configuration error: { str (exc )} , rethrowing exception" )
105116 raise
106- except ConnectionError as exc :
117+ except ConnectionError :
107118 logging .fatal (
108119 f"MQTT broker not reachable at address: { config .get ('server' , 'address' )} : { str (config .get ('server' , 'port' ))} " )
109120 raise
@@ -112,7 +123,7 @@ def execute(config, get_items, register_callback, do_stop):
112123 f"MQTT broker unknown error: { str (exc )} , rethrowing exception" )
113124 raise
114125
115- # We only publish data on change
126+ # We only publish data on change (use closure-captured my_callback)
116127 register_callback (my_callback )
117128
118129 i = 0
@@ -132,21 +143,6 @@ def execute(config, get_items, register_callback, do_stop):
132143 logging .info ("Stopping MQTT sink" )
133144
134145
135- def publish (topic , value ):
136- # read module-level client and pubunits (no rebinding), no global statement required
137- # only publish units if they are there and we really want to
138- publish_value = value
139- if isinstance (value , tuple ):
140- if not pubunits :
141- publish_value = value [0 ]
142- client .publish (topic , str (publish_value ))
143-
144-
145- def my_callback (key , value ):
146- topic = str (key ).replace ("." , "/" )
147- publish (topic , value )
148-
149-
150146def on_connect (client , userdata , flags , rc ):
151147 if rc == 0 :
152148 logging .info ("Connected to MQTT Broker" )
0 commit comments