Skip to content

Commit 2eb456b

Browse files
authored
Add RMQ version of queue monitor (#58)
Adds dependency on pandas
1 parent 2e38586 commit 2eb456b

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed

requirements.conda.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ junit-xml>=1.9
55
marshmallow-sqlalchemy
66
msgpack-python
77
networkx
8+
pandas
89
pika
910
procrunner>=1.0.2
1011
pytest

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"dlstbx.plot_reflections=dlstbx.cli.plot_reflections:run",
3737
"dlstbx.queue_drain=dlstbx.cli.queue_drain:run",
3838
"dlstbx.queue_monitor=dlstbx.cli.queue_monitor:run",
39+
"dlstbx.queue_monitor_rmq=dlstbx.cli.queue_monitor_rmq:run",
3940
"dlstbx.run_dozor=dlstbx.cli.run_dozor:run",
4041
"dlstbx.run_system_tests=dlstbx.cli.run_system_tests:run",
4142
"dlstbx.service=dlstbx.cli.service:run",
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#
2+
# dlstbx.queue_monitor
3+
# Monitor queue utilization
4+
#
5+
6+
import argparse
7+
import logging
8+
import time
9+
10+
import json
11+
import numpy as np
12+
import pandas as pd
13+
import urllib.parse
14+
import urllib.request
15+
16+
logger = logging.getLogger("dlstbx.queue_monitor")
17+
18+
19+
RABBITMQ_HOST = "rabbitmq1.diamond.ac.uk"
20+
21+
22+
def get_queue_stats() -> pd.DataFrame:
23+
password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
24+
password_mgr.add_password(
25+
realm=None,
26+
uri=f"http://{RABBITMQ_HOST}:15672/api/",
27+
user="guest",
28+
passwd="guest",
29+
)
30+
handler = urllib.request.HTTPBasicAuthHandler(password_mgr)
31+
opener = urllib.request.build_opener(handler)
32+
urllib.request.install_opener(opener)
33+
request = urllib.request.Request(f"http://{RABBITMQ_HOST}:15672/api/queues")
34+
with urllib.request.urlopen(request) as response:
35+
json_str = response.read()
36+
stats = pd.json_normalize(json.loads(json_str))
37+
fields = [
38+
"name",
39+
"consumers",
40+
"messages",
41+
"messages_ready",
42+
"messages_unacknowledged",
43+
"message_stats.publish",
44+
"message_stats.publish_details.rate",
45+
"message_stats.deliver_get",
46+
"message_stats.deliver_get_details.rate",
47+
]
48+
return stats[fields]
49+
50+
51+
def print_stats(stats: pd.DataFrame) -> None:
52+
"""Main display function"""
53+
54+
# https://activemq.apache.org/how-do-i-find-the-size-of-a-queue
55+
# Enqueue Count - the total number of messages sent to the queue since the last restart
56+
# Dequeue Count - the total number of messages removed from the queue (ack’d by consumer) since last restart
57+
# Inflight Count - the number of messages sent to a consumer session and have not received an ack
58+
# Dispatch Count - the total number of messages sent to consumer sessions (Dequeue + Inflight)
59+
# Expired Count - the number of messages that were not delivered because they were expired
60+
# QueueSize is the total number of messages in the queue/store that have not been ack’d by a consumer.
61+
62+
# https://www.rabbitmq.com/rabbitmqctl.8.html#list_queues
63+
# messages_ready - Number of messages ready to be delivered to clients.
64+
# messages_unacknowledged - Number of messages delivered to clients but not yet acknowledged.messages
65+
# messages - Sum of ready and unacknowledged messages (queue depth).
66+
67+
# Convert rates to equivalent number of messages in last 5 seconds
68+
stats["change_publish_rate"] = (
69+
stats["message_stats.publish_details.rate"] * 5
70+
).apply(np.ceil)
71+
stats["change_deliver_rate"] = (
72+
stats["message_stats.deliver_get_details.rate"] * 5
73+
).apply(np.ceil)
74+
75+
stats = stats.fillna(0.0).astype(int, errors="ignore")
76+
stats = stats[stats["messages"] > 0].set_index("name")
77+
stats = stats.sort_index().sort_values(by="messages")
78+
status = stats.to_dict(orient="index")
79+
longest = stats.reset_index().astype(str).applymap(len).max()
80+
81+
c_gray = "\x1b[30m"
82+
c_green = "\x1b[32m"
83+
c_yellow = "\x1b[33m"
84+
c_blue = "\x1b[34m"
85+
c_magenta = "\x1b[35m"
86+
c_reset = "\x1b[0m"
87+
c_bold = "\x1b[1m"
88+
89+
line = (
90+
"{colour[namespace]}{qname:{longest[name]}}{colour[reset]} "
91+
"{colour[input]}{0[change_publish_rate]:{longest[change_publish_rate]}} "
92+
">{colour[hold]}[ {filter_zero[messages_ready]:{longest[messages_ready]}} | {colour[listeners]}{filter_zero[consumers]:<{longest[consumers]}}{colour[hold]} | {colour[flight]}{filter_zero[messages_unacknowledged]:<{longest[messages_unacknowledged]}}{colour[hold]} ]"
93+
"{colour[output]}> {filter_zero[change_deliver_rate]:<{longest[change_deliver_rate]}}{colour[reset]}"
94+
)
95+
# line += " -- {0[relevance]}{colour[reset]}"
96+
97+
print("\033[H\033[J", end="")
98+
queue_sep = "{header}ActiveMQ status: {highlight}{queues}{header} queues containing {highlight}{messages}{header} messages{reset}".format(
99+
messages=stats["messages_ready"].sum(),
100+
queues=len(stats),
101+
highlight=c_bold + c_yellow,
102+
reset=c_reset,
103+
header=c_reset + c_yellow,
104+
)
105+
print(queue_sep)
106+
107+
for qname in status.keys():
108+
colour = {
109+
"input": c_green if status[qname]["change_publish_rate"] else c_gray,
110+
"hold": c_blue if status[qname]["messages_ready"] else c_gray,
111+
"flight": c_blue
112+
if status[qname]["messages_ready"]
113+
or status[qname]["messages_unacknowledged"]
114+
else c_gray,
115+
"output": c_green if status[qname]["change_deliver_rate"] else c_gray,
116+
"reset": c_reset,
117+
"listeners": c_yellow if status[qname]["consumers"] else c_gray,
118+
"namespace": c_magenta
119+
# if status[qname]["shortdest.prefix"] == "zocdev"
120+
# else "",
121+
}
122+
filter_zero = {
123+
key: status[qname][key] if status[qname][key] > 0 else ""
124+
for key in (
125+
"change_deliver_rate",
126+
"messages_unacknowledged",
127+
"messages_ready",
128+
"consumers",
129+
)
130+
}
131+
print(
132+
line.format(
133+
status[qname],
134+
qname=qname,
135+
longest=longest,
136+
colour=colour,
137+
filter_zero=filter_zero,
138+
)
139+
)
140+
141+
print(
142+
"\n{header}What do the numbers mean:{reset}".format(
143+
reset=c_reset,
144+
header=c_reset + c_yellow,
145+
)
146+
)
147+
print(
148+
f"topic/queue name {c_green}m.in/5s >{c_gray}[ {c_blue}m.held{c_gray} | {c_yellow}clients{c_gray} | {c_blue}m.dispatchd{c_gray} ]{c_green}> m.out/5s{c_reset}"
149+
)
150+
151+
152+
def run():
153+
parser = argparse.ArgumentParser(usage="dlstbx.queue_monitor")
154+
parser.add_argument("-?", action="help", help=argparse.SUPPRESS)
155+
parser.add_argument(
156+
"--test",
157+
action="store_true",
158+
dest="test",
159+
help="Connect to personal development ActiveMQ server",
160+
)
161+
parser.add_argument(
162+
"--interval",
163+
dest="gather_interval",
164+
default=5,
165+
help="Interval (in seconds) at which to gather statistics",
166+
)
167+
168+
args = parser.parse_args()
169+
170+
try:
171+
while True:
172+
stats = get_queue_stats()
173+
print_stats(stats)
174+
time.sleep(args.gather_interval)
175+
except KeyboardInterrupt:
176+
print("\x1b[0m")

0 commit comments

Comments
 (0)