Skip to content

Commit 5cbf39d

Browse files
author
Michael Ng
authored
Merge branch 'master' into rashid/dfm-blocking-timeout
2 parents 1e4ace4 + 3731be4 commit 5cbf39d

File tree

6 files changed

+712
-1
lines changed

6 files changed

+712
-1
lines changed
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
# Copyright 2019 Optimizely
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
import abc
15+
import threading
16+
import time
17+
18+
from datetime import timedelta
19+
from six.moves import queue
20+
21+
from optimizely import logger as _logging
22+
from optimizely import notification_center as _notification_center
23+
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
24+
from optimizely.helpers import enums
25+
from optimizely.helpers import validator
26+
from .event_factory import EventFactory
27+
from .user_event import UserEvent
28+
29+
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
30+
31+
32+
class BaseEventProcessor(ABC):
33+
""" Class encapsulating event processing. Override with your own implementation. """
34+
35+
@abc.abstractmethod
36+
def process(user_event):
37+
""" Method to provide intermediary processing stage within event production.
38+
Args:
39+
user_event: UserEvent instance that needs to be processed and dispatched.
40+
"""
41+
pass
42+
43+
44+
class BatchEventProcessor(BaseEventProcessor):
45+
"""
46+
BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.
47+
The BatchEventProcessor maintains a single consumer thread that pulls events off of
48+
the blocking queue and buffers them for either a configured batch size or for a
49+
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
50+
"""
51+
52+
_DEFAULT_QUEUE_CAPACITY = 1000
53+
_DEFAULT_BATCH_SIZE = 10
54+
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30)
55+
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5)
56+
_SHUTDOWN_SIGNAL = object()
57+
_FLUSH_SIGNAL = object()
58+
LOCK = threading.Lock()
59+
60+
def __init__(self,
61+
event_dispatcher,
62+
logger,
63+
start_on_init=False,
64+
event_queue=None,
65+
batch_size=None,
66+
flush_interval=None,
67+
timeout_interval=None,
68+
notification_center=None):
69+
""" EventProcessor init method to configure event batching.
70+
71+
Args:
72+
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
73+
logger: Provides a log method to log messages. By default nothing would be logged.
74+
start_on_init: Optional boolean param which starts the consumer thread if set to True.
75+
Default value is False.
76+
event_queue: Optional component which accumulates the events until dispacthed.
77+
batch_size: Optional param which defines the upper limit on the number of events in event_queue after which
78+
the event_queue will be flushed.
79+
flush_interval: Optional floating point number representing time interval in seconds after which event_queue will
80+
be flushed.
81+
timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer
82+
thread.
83+
notification_center: Optional instance of notification_center.NotificationCenter.
84+
"""
85+
self.event_dispatcher = event_dispatcher or default_event_dispatcher
86+
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
87+
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY)
88+
self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \
89+
else self._DEFAULT_BATCH_SIZE
90+
self.flush_interval = timedelta(seconds=flush_interval) \
91+
if self._validate_intantiation_props(flush_interval, 'flush_interval') \
92+
else self._DEFAULT_FLUSH_INTERVAL
93+
self.timeout_interval = timedelta(seconds=timeout_interval) \
94+
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
95+
else self._DEFAULT_TIMEOUT_INTERVAL
96+
self.notification_center = notification_center
97+
self._current_batch = list()
98+
99+
if not validator.is_notification_center_valid(self.notification_center):
100+
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
101+
self.notification_center = _notification_center.NotificationCenter()
102+
103+
if start_on_init is True:
104+
self.start()
105+
106+
@property
107+
def is_running(self):
108+
""" Property to check if consumer thread is alive or not. """
109+
return self.executor.isAlive()
110+
111+
def _validate_intantiation_props(self, prop, prop_name):
112+
""" Method to determine if instantiation properties like batch_size, flush_interval
113+
and timeout_interval are valid.
114+
115+
Args:
116+
prop: Property value that needs to be validated.
117+
prop_name: Property name.
118+
119+
Returns:
120+
False if property value is None or less than 1 or not a finite number.
121+
False if property name is batch_size and value is a floating point number.
122+
True otherwise.
123+
"""
124+
if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \
125+
not validator.is_finite_number(prop):
126+
self.logger.info('Using default value for {}.'.format(prop_name))
127+
return False
128+
129+
return True
130+
131+
def _get_time(self, _time=None):
132+
""" Method to return rounded off time as integer in seconds. If _time is None, uses current time.
133+
134+
Args:
135+
_time: time in seconds that needs to be rounded off.
136+
137+
Returns:
138+
Integer time in seconds.
139+
"""
140+
if _time is None:
141+
return int(round(time.time()))
142+
143+
return int(round(_time))
144+
145+
def start(self):
146+
""" Starts the batch processing thread to batch events. """
147+
if hasattr(self, 'executor') and self.is_running:
148+
self.logger.warning('BatchEventProcessor already started.')
149+
return
150+
151+
self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds())
152+
self.executor = threading.Thread(target=self._run)
153+
self.executor.setDaemon(True)
154+
self.executor.start()
155+
156+
def _run(self):
157+
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps
158+
periodically if queue is empty.
159+
"""
160+
try:
161+
while True:
162+
if self._get_time() > self.flushing_interval_deadline:
163+
self._flush_queue()
164+
165+
try:
166+
item = self.event_queue.get(True, 0.05)
167+
168+
except queue.Empty:
169+
time.sleep(0.05)
170+
continue
171+
172+
if item == self._SHUTDOWN_SIGNAL:
173+
self.logger.debug('Received shutdown signal.')
174+
break
175+
176+
if item == self._FLUSH_SIGNAL:
177+
self.logger.debug('Received flush signal.')
178+
self._flush_queue()
179+
continue
180+
181+
if isinstance(item, UserEvent):
182+
self._add_to_batch(item)
183+
184+
except Exception as exception:
185+
self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception))
186+
187+
finally:
188+
self.logger.info('Exiting processing loop. Attempting to flush pending events.')
189+
self._flush_queue()
190+
191+
def flush(self):
192+
""" Adds flush signal to event_queue. """
193+
194+
self.event_queue.put(self._FLUSH_SIGNAL)
195+
196+
def _flush_queue(self):
197+
""" Flushes event_queue by dispatching events. """
198+
199+
if len(self._current_batch) == 0:
200+
return
201+
202+
with self.LOCK:
203+
to_process_batch = list(self._current_batch)
204+
self._current_batch = list()
205+
206+
log_event = EventFactory.create_log_event(to_process_batch, self.logger)
207+
208+
if self.notification_center is not None:
209+
self.notification_center.send_notifications(
210+
enums.NotificationTypes.LOG_EVENT,
211+
log_event
212+
)
213+
214+
try:
215+
self.event_dispatcher.dispatch_event(log_event)
216+
except Exception as e:
217+
self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e))
218+
219+
def process(self, user_event):
220+
""" Method to process the user_event by putting it in event_queue.
221+
Args:
222+
user_event: UserEvent Instance.
223+
"""
224+
if not isinstance(user_event, UserEvent):
225+
self.logger.error('Provided event is in an invalid format.')
226+
return
227+
228+
self.logger.debug('Received user_event: ' + str(user_event))
229+
230+
try:
231+
self.event_queue.put_nowait(user_event)
232+
except queue.Full:
233+
self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())))
234+
235+
def _add_to_batch(self, user_event):
236+
""" Method to append received user event to current batch.
237+
Args:
238+
user_event: UserEvent Instance.
239+
"""
240+
if self._should_split(user_event):
241+
self._flush_queue()
242+
self._current_batch = list()
243+
244+
# Reset the deadline if starting a new batch.
245+
if len(self._current_batch) == 0:
246+
self.flushing_interval_deadline = self._get_time() + \
247+
self._get_time(self.flush_interval.total_seconds())
248+
249+
with self.LOCK:
250+
self._current_batch.append(user_event)
251+
if len(self._current_batch) >= self.batch_size:
252+
self._flush_queue()
253+
254+
def _should_split(self, user_event):
255+
""" Method to check if current event batch should split into two.
256+
Args:
257+
user_event: UserEvent Instance.
258+
Return Value:
259+
- True, if revision number and project_id of last event in current batch do not match received event's
260+
revision number and project id respectively.
261+
- False, otherwise.
262+
"""
263+
if len(self._current_batch) == 0:
264+
return False
265+
266+
current_context = self._current_batch[-1].event_context
267+
new_context = user_event.event_context
268+
269+
if current_context.revision != new_context.revision:
270+
return True
271+
272+
if current_context.project_id != new_context.project_id:
273+
return True
274+
275+
return False
276+
277+
def stop(self):
278+
""" Stops and disposes batch event processor. """
279+
self.event_queue.put(self._SHUTDOWN_SIGNAL)
280+
self.logger.warning('Stopping Scheduler.')
281+
282+
self.executor.join(self.timeout_interval.total_seconds())
283+
284+
if self.is_running:
285+
self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.')

optimizely/event/log_event.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ def __init__(self, url, params, http_verb=None, headers=None):
2020
self.params = params
2121
self.http_verb = http_verb or 'POST'
2222
self.headers = headers
23+
24+
def __str__(self):
25+
return str(self.__class__) + ": " + str(self.__dict__)

optimizely/helpers/enums.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,12 @@ class NotificationTypes(object):
123123
124124
TRACK notification listener has the following parameters:
125125
str event_key, str user_id, dict attributes (can be None), event_tags (can be None), Event event
126+
127+
LOG_EVENT notification listener has the following parameter(s):
128+
LogEvent log_event
126129
"""
127130
ACTIVATE = 'ACTIVATE:experiment, user_id, attributes, variation, event'
128131
DECISION = 'DECISION:type, user_id, attributes, decision_info'
129132
OPTIMIZELY_CONFIG_UPDATE = 'OPTIMIZELY_CONFIG_UPDATE'
130133
TRACK = 'TRACK:event_key, user_id, attributes, event_tags, event'
134+
LOG_EVENT = 'LOG_EVENT:log_event'

0 commit comments

Comments
 (0)