@@ -76,24 +76,17 @@ def __init__(self, key, options={}):
7676 self .setLevel (logging .DEBUG )
7777 self .lock = threading .RLock ()
7878
79- # Start the flusher
80- self .flusher_stopped = threading .Event ()
81- self .flusher = threading .Thread (
82- target = self .flush_timer_worker , daemon = True )
83- self .flusher .start ()
84-
85- def flush_timer_worker (self ):
86- while not self .flusher_stopped .wait (self .flush_interval_secs ):
87- try :
88- self .flush ()
89- except Exception as e :
90- self .internalLogger .exception (
91- f'Error in flush_timer_worker: { e } ' )
79+ self .flusher = None
80+
81+ def start_flusher (self ):
82+ if not self .flusher :
83+ self .flusher = threading .Timer (
84+ self .flush_interval_secs , self .flush )
85+ self .flusher .start ()
9286
9387 def close_flusher (self ):
9488 if self .flusher :
95- self .flusher_stopped .set ()
96- self .flusher .join ()
89+ self .flusher .cancel ()
9790 self .flusher = None
9891
9992 def buffer_log (self , message ):
@@ -119,7 +112,10 @@ def buffer_log_sync(self, message):
119112 self .buf_retention_limit )
120113
121114 if self .buf_size >= self .flush_limit :
115+ self .close_flusher ()
122116 self .flush ()
117+ else :
118+ self .start_flusher ()
123119 except Exception as e :
124120 self .internalLogger .exception (f'Error in buffer_log_sync: { e } ' )
125121 finally :
@@ -143,13 +139,15 @@ def try_lock_and_do_flush_request(self, should_block=False):
143139 local_buf = []
144140 if self .lock .acquire (blocking = should_block ):
145141 if not self .buf :
142+ self .close_flusher ()
146143 self .lock .release ()
147144 return
148145
149- if self .buf :
150- local_buf = self .buf .copy ()
151- self .buf .clear ()
152- self .buf_size = 0
146+ local_buf = self .buf .copy ()
147+ self .buf .clear ()
148+ self .buf_size = 0
149+ if local_buf :
150+ self .close_flusher ()
153151 self .lock .release ()
154152
155153 if local_buf :
@@ -304,10 +302,10 @@ def emit(self, record):
304302 'line' : msg ,
305303 'level' : record ['levelname' ] or self .loglevel ,
306304 'app' : self .app or record ['module' ],
307- 'env' : self .env
305+ 'env' : self .env ,
306+ 'meta' : {}
308307 }
309308
310- message ['meta' ] = {}
311309 for key in self .custom_fields :
312310 if key in record :
313311 if isinstance (record [key ], tuple ):
@@ -328,6 +326,9 @@ def emit(self, record):
328326 self .buffer_log (message )
329327
330328 def close (self ):
329+ # Close the flusher
330+ self .close_flusher ()
331+
331332 # First gracefully shut down any threads that are still attempting
332333 # to add log messages to the buffer. This ensures that we don't lose
333334 # any log messages that are in the process of being added to the
@@ -336,10 +337,6 @@ def close(self):
336337 self .worker_thread_pool .shutdown (wait = True )
337338 self .worker_thread_pool = None
338339
339- # Now that we've shut down the worker threads, we can safely close
340- # the flusher thread.
341- self .close_flusher ()
342-
343340 # Manually force a flush of any remaining log messages in the buffer.
344341 # We block here to ensure that the flush completes prior to the
345342 # application exiting and because the probability of this
0 commit comments