Skip to content

Commit d386455

Browse files
kamil-certatsebix
authored andcommitted
ENH: Add possibility to delay generating MISP Feed
Generating MISP feed on every incoming message slows down processing. The new config option let us decide to save them in batches. Cached events are stored in a cache list in Redis. In addition, a code related to Python 3.6 was removed as we do not support this version any more.
1 parent 2485871 commit d386455

4 files changed

Lines changed: 135 additions & 47 deletions

File tree

intelmq/bots/outputs/misp/output_feed.py

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
# -*- coding: utf-8 -*-
66
import datetime
77
import json
8+
import re
89
from pathlib import Path
910
from uuid import uuid4
10-
import re
1111

1212
from intelmq import VAR_STATE_PATH
1313
from intelmq.lib.bot import OutputBot
1414
from intelmq.lib.exceptions import MissingDependencyError
15+
from intelmq.lib.mixins import CacheMixin
1516
from intelmq.lib.utils import parse_relative
1617

1718
try:
@@ -20,19 +21,14 @@
2021
except ImportError:
2122
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
2223
MISPEvent = None
23-
import_fail_reason = 'import'
24-
except SyntaxError:
25-
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
26-
MISPEvent = None
27-
import_fail_reason = 'syntax'
28-
24+
import_fail_reason = "import"
2925

30-
# NOTE: This module is compatible with Python 3.6+
3126

32-
33-
class MISPFeedOutputBot(OutputBot):
27+
class MISPFeedOutputBot(OutputBot, CacheMixin):
3428
"""Generate an output in the MISP Feed format"""
29+
3530
interval_event: str = "1 hour"
31+
delay_save_event_count: int = None
3632
misp_org_name = None
3733
misp_org_uuid = None
3834
output_dir: str = f"{VAR_STATE_PATH}mispfeed-output" # TODO: should be path
@@ -46,13 +42,8 @@ def check_output_dir(dirname):
4642
return True
4743

4844
def init(self):
49-
if MISPEvent is None and import_fail_reason == 'syntax':
50-
raise MissingDependencyError("pymisp",
51-
version='>=2.4.117.3',
52-
additional_text="Python versions below 3.6 are "
53-
"only supported by pymisp <= 2.4.119.1.")
54-
elif MISPEvent is None:
55-
raise MissingDependencyError('pymisp', version='>=2.4.117.3')
45+
if MISPEvent is None:
46+
raise MissingDependencyError("pymisp", version=">=2.4.117.3")
5647

5748
self.current_event = None
5849

@@ -72,59 +63,90 @@ def init(self):
7263
try:
7364
with (self.output_dir / '.current').open() as f:
7465
self.current_file = Path(f.read())
75-
self.current_event = MISPEvent()
76-
self.current_event.load_file(self.current_file)
77-
78-
last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0]
79-
last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f')
80-
last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f')
81-
if last_max_time < datetime.datetime.now():
82-
self.min_time_current = datetime.datetime.now()
83-
self.max_time_current = self.min_time_current + self.timedelta
84-
self.current_event = None
85-
else:
86-
self.min_time_current = last_min_time
87-
self.max_time_current = last_max_time
66+
67+
if self.current_file.exists():
68+
self.current_event = MISPEvent()
69+
self.current_event.load_file(self.current_file)
70+
71+
last_min_time, last_max_time = re.findall(
72+
"IntelMQ event (.*) - (.*)", self.current_event.info
73+
)[0]
74+
last_min_time = datetime.datetime.strptime(
75+
last_min_time, "%Y-%m-%dT%H:%M:%S.%f"
76+
)
77+
last_max_time = datetime.datetime.strptime(
78+
last_max_time, "%Y-%m-%dT%H:%M:%S.%f"
79+
)
80+
if last_max_time < datetime.datetime.now():
81+
self.min_time_current = datetime.datetime.now()
82+
self.max_time_current = self.min_time_current + self.timedelta
83+
self.current_event = None
84+
else:
85+
self.min_time_current = last_min_time
86+
self.max_time_current = last_max_time
8887
except:
89-
self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event)
88+
self.logger.exception(
89+
"Loading current event %s failed. Skipping it.", self.current_event
90+
)
9091
self.current_event = None
9192
else:
9293
self.min_time_current = datetime.datetime.now()
9394
self.max_time_current = self.min_time_current + self.timedelta
9495

9596
def process(self):
96-
9797
if not self.current_event or datetime.datetime.now() > self.max_time_current:
9898
self.min_time_current = datetime.datetime.now()
9999
self.max_time_current = self.min_time_current + self.timedelta
100100
self.current_event = MISPEvent()
101-
self.current_event.info = ('IntelMQ event {begin} - {end}'
102-
''.format(begin=self.min_time_current.isoformat(),
103-
end=self.max_time_current.isoformat()))
101+
self.current_event.info = "IntelMQ event {begin} - {end}" "".format(
102+
begin=self.min_time_current.isoformat(),
103+
end=self.max_time_current.isoformat(),
104+
)
104105
self.current_event.set_date(datetime.date.today())
105106
self.current_event.Orgc = self.misp_org
106107
self.current_event.uuid = str(uuid4())
107-
self.current_file = self.output_dir / f'{self.current_event.uuid}.json'
108-
with (self.output_dir / '.current').open('w') as f:
108+
self.current_file = self.output_dir / f"{self.current_event.uuid}.json"
109+
with (self.output_dir / ".current").open("w") as f:
109110
f.write(str(self.current_file))
110111

112+
# On startup or when timeout occurs, clean the queue to ensure we do not
113+
# keep events forever because there was not enough generated
114+
self._generate_feed()
115+
111116
event = self.receive_message().to_dict(jsondict_as_string=True)
112117

113-
obj = self.current_event.add_object(name='intelmq_event')
114-
for object_relation, value in event.items():
118+
cache_size = None
119+
if self.delay_save_event_count:
120+
cache_size = self.cache_put(event)
121+
122+
if cache_size is None:
123+
self._generate_feed(event)
124+
elif cache_size >= self.delay_save_event_count:
125+
self._generate_feed()
126+
127+
self.acknowledge_message()
128+
129+
def _add_message_to_feed(self, message: dict):
130+
obj = self.current_event.add_object(name="intelmq_event")
131+
for object_relation, value in message.items():
115132
try:
116133
obj.add_attribute(object_relation, value=value)
117134
except NewAttributeError:
118135
# This entry isn't listed in the harmonization file, ignoring.
119136
pass
120137

121-
feed_output = self.current_event.to_feed(with_meta=False)
138+
def _generate_feed(self, message: dict = None):
139+
if message:
140+
self._add_message_to_feed(message)
141+
142+
while message := self.cache_pop():
143+
self._add_message_to_feed(message)
122144

123-
with self.current_file.open('w') as f:
145+
feed_output = self.current_event.to_feed(with_meta=False)
146+
with self.current_file.open("w") as f:
124147
json.dump(feed_output, f)
125148

126149
feed_meta_generator(self.output_dir)
127-
self.acknowledge_message()
128150

129151
@staticmethod
130152
def check(parameters):

intelmq/lib/bot.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ def catch_shutdown():
279279
def harmonization(self):
280280
return self._harmonization
281281

282+
@property
283+
def bot_id(self):
284+
return self.__bot_id_full
285+
282286
def __handle_sigterm_signal(self, signum: int, stack: Optional[object]):
283287
"""
284288
Calls when a SIGTERM is received. Stops the bot.

intelmq/lib/mixins/cache.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
""" CacheMixin for IntelMQ
1+
"""CacheMixin for IntelMQ
22
33
SPDX-FileCopyrightText: 2021 Sebastian Waldbauer
44
SPDX-License-Identifier: AGPL-3.0-or-later
55
66
CacheMixin is used for caching/storing data in redis.
77
"""
88

9+
import json
910
from typing import Any, Optional
1011
import redis
1112
import intelmq.lib.utils as utils
@@ -31,7 +32,9 @@ def __init__(self, **kwargs):
3132
"socket_timeout": 5,
3233
}
3334

34-
self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs)
35+
self.__redis = redis.Redis(
36+
db=self.redis_cache_db, password=self.redis_cache_password, **kwargs
37+
)
3538
super().__init__()
3639

3740
def cache_exists(self, key: str):
@@ -51,6 +54,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None):
5154
if self.redis_cache_ttl:
5255
self.__redis.expire(key, self.redis_cache_ttl)
5356

57+
def cache_put(self, value: dict) -> int:
58+
# Returns the length of the list after pushing
59+
size = self.__redis.lpush(self.bot_id, json.dumps(value))
60+
return size
61+
62+
def cache_pop(self) -> dict:
63+
data = self.__redis.rpop(self.bot_id)
64+
if data is None:
65+
return None
66+
return json.loads(data)
67+
5468
def cache_flush(self):
5569
"""
5670
Flushes the currently opened database by calling FLUSHDB.

intelmq/tests/bots/outputs/misp/test_output_feed.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

55
# -*- coding: utf-8 -*-
6+
import json
67
import unittest
7-
import sys
8+
from pathlib import Path
89
from tempfile import TemporaryDirectory
910

1011
import intelmq.lib.test as test
@@ -37,9 +38,9 @@
3738

3839
@test.skip_exotic()
3940
class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase):
40-
4141
@classmethod
4242
def set_bot(cls):
43+
cls.use_cache = True
4344
cls.bot_reference = MISPFeedOutputBot
4445
cls.default_input_message = EXAMPLE_EVENT
4546
cls.directory = TemporaryDirectory()
@@ -51,10 +52,57 @@ def set_bot(cls):
5152
def test_event(self):
5253
self.run_bot()
5354

55+
current_event = open(f"{self.directory.name}/.current").read()
56+
with open(current_event) as f:
57+
objects = json.load(f).get("Event", {}).get("Object", [])
58+
assert len(objects) == 1
59+
60+
def test_accumulating_events(self):
61+
self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT]
62+
self.run_bot(iterations=2, parameters={"delay_save_event_count": 3})
63+
64+
current_event = open(f"{self.directory.name}/.current").read()
65+
66+
# First, the feed is empty - not enough events came
67+
with open(current_event) as f:
68+
objects = json.load(f).get("Event", {}).get("Object", [])
69+
assert len(objects) == 0
70+
71+
self.input_message = [EXAMPLE_EVENT]
72+
self.run_bot(parameters={"delay_save_event_count": 3})
73+
74+
# When enough events were collected, save them
75+
with open(current_event) as f:
76+
objects = json.load(f)["Event"]["Object"]
77+
assert len(objects) == 3
78+
79+
self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT]
80+
self.run_bot(iterations=3, parameters={"delay_save_event_count": 3})
81+
82+
# We continue saving to the same file until interval timeout
83+
with open(current_event) as f:
84+
objects = json.load(f)["Event"]["Object"]
85+
assert len(objects) == 6
86+
87+
# Simulating leftovers in the queue when it's time to generate new event
88+
Path(f"{self.directory.name}/.current").unlink()
89+
self.bot.cache_put(EXAMPLE_EVENT)
90+
self.run_bot(parameters={"delay_save_event_count": 3})
91+
92+
new_event = open(f"{self.directory.name}/.current").read()
93+
with open(new_event) as f:
94+
objects = json.load(f)["Event"]["Object"]
95+
assert len(objects) == 1
96+
97+
98+
def tearDown(self):
99+
self.cache.delete(self.bot_id)
100+
super().tearDown()
101+
54102
@classmethod
55103
def tearDownClass(cls):
56104
cls.directory.cleanup()
57105

58106

59-
if __name__ == '__main__': # pragma: no cover
107+
if __name__ == "__main__": # pragma: no cover
60108
unittest.main()

0 commit comments

Comments
 (0)