|
| 1 | +# |
| 2 | +# zocalo.queue_drain |
| 3 | +# Drain one queue into another in a controlled manner |
| 4 | +# |
| 5 | + |
| 6 | + |
| 7 | +import argparse |
| 8 | +import queue |
| 9 | +import sys |
| 10 | +import time |
| 11 | +from datetime import datetime |
| 12 | + |
| 13 | +import workflows.recipe.wrapper |
| 14 | +import workflows.transport |
| 15 | + |
| 16 | +import zocalo.configuration |
| 17 | + |
| 18 | + |
| 19 | +def show_cluster_info(step): |
| 20 | + try: |
| 21 | + print("Beamline " + step["parameters"]["cluster_project"].upper()) |
| 22 | + except Exception: |
| 23 | + pass |
| 24 | + try: |
| 25 | + print("Working directory " + step["parameters"]["workingdir"]) |
| 26 | + except Exception: |
| 27 | + pass |
| 28 | + |
| 29 | + |
| 30 | +show_additional_info = {"cluster.submission": show_cluster_info} |
| 31 | + |
| 32 | + |
| 33 | +def run(args=None): |
| 34 | + |
| 35 | + # Load configuration |
| 36 | + zc = zocalo.configuration.from_file() |
| 37 | + zc.activate() |
| 38 | + |
| 39 | + parser = argparse.ArgumentParser( |
| 40 | + usage="zocalo.queue_drain [options] source destination" |
| 41 | + ) |
| 42 | + |
| 43 | + default_transport = workflows.transport.default_transport |
| 44 | + if ( |
| 45 | + zc.storage |
| 46 | + and zc.storage.get("zocalo.default_transport") |
| 47 | + in workflows.transport.get_known_transports() |
| 48 | + ): |
| 49 | + default_transport = zc.storage["zocalo.default_transport"] |
| 50 | + |
| 51 | + parser.add_argument("-?", action="help", help=argparse.SUPPRESS) |
| 52 | + parser.add_argument("SOURCE", type=str, help="Source queue name") |
| 53 | + parser.add_argument("DEST", type=str, help="Destination queue name") |
| 54 | + parser.add_argument( |
| 55 | + "--wait", |
| 56 | + action="store", |
| 57 | + dest="wait", |
| 58 | + type=float, |
| 59 | + default=5, |
| 60 | + help="Wait this many seconds between deliveries", |
| 61 | + ) |
| 62 | + parser.add_argument( |
| 63 | + "--stop", |
| 64 | + action="store", |
| 65 | + dest="stop", |
| 66 | + type=float, |
| 67 | + default=60, |
| 68 | + help="Stop if no message seen for this many seconds (0 = forever)", |
| 69 | + ) |
| 70 | + parser.add_argument( |
| 71 | + "-t", |
| 72 | + "--transport", |
| 73 | + dest="transport", |
| 74 | + metavar="TRN", |
| 75 | + default=default_transport, |
| 76 | + help="Transport mechanism. Known mechanisms: " |
| 77 | + + ", ".join(workflows.transport.get_known_transports()) |
| 78 | + + f" (default: {default_transport})", |
| 79 | + ) |
| 80 | + zc.add_command_line_options(parser) |
| 81 | + workflows.transport.add_command_line_options(parser) |
| 82 | + args = parser.parse_args(args) |
| 83 | + |
| 84 | + transport = workflows.transport.lookup(args.transport)() |
| 85 | + transport.connect() |
| 86 | + |
| 87 | + messages = queue.Queue() |
| 88 | + |
| 89 | + def receive_message(header, message): |
| 90 | + messages.put((header, message)) |
| 91 | + |
| 92 | + print("Reading messages from " + args.SOURCE) |
| 93 | + transport.subscribe(args.SOURCE, receive_message, acknowledgement=True) |
| 94 | + |
| 95 | + message_count = 0 |
| 96 | + header_filter = frozenset( |
| 97 | + { |
| 98 | + "content-length", |
| 99 | + "destination", |
| 100 | + "expires", |
| 101 | + "message-id", |
| 102 | + "original-destination", |
| 103 | + "originalExpiration", |
| 104 | + "subscription", |
| 105 | + "timestamp", |
| 106 | + "redelivered", |
| 107 | + } |
| 108 | + ) |
| 109 | + drain_start = time.time() |
| 110 | + idle_time = 0 |
| 111 | + try: |
| 112 | + while True: |
| 113 | + try: |
| 114 | + header, message = messages.get(True, 0.1) |
| 115 | + except queue.Empty: |
| 116 | + idle_time = idle_time + 0.1 |
| 117 | + if args.stop and idle_time > args.stop: |
| 118 | + break |
| 119 | + continue |
| 120 | + idle_time = 0 |
| 121 | + print() |
| 122 | + try: |
| 123 | + print( |
| 124 | + "Message date: {:%Y-%m-%d %H:%M:%S}".format( |
| 125 | + datetime.fromtimestamp(int(header["timestamp"]) / 1000) |
| 126 | + ) |
| 127 | + ) |
| 128 | + except Exception: |
| 129 | + pass |
| 130 | + try: |
| 131 | + print("Recipe ID: {}".format(message["environment"]["ID"])) |
| 132 | + r = workflows.recipe.wrapper.RecipeWrapper(message=message) |
| 133 | + show_additional_info.get( |
| 134 | + args.DEST, show_additional_info.get(r.recipe_step["queue"]) |
| 135 | + )(r.recipe_step) |
| 136 | + except Exception: |
| 137 | + pass |
| 138 | + |
| 139 | + new_headers = { |
| 140 | + key: header[key] for key in header if key not in header_filter |
| 141 | + } |
| 142 | + txn = transport.transaction_begin() |
| 143 | + transport.send(args.DEST, message, headers=new_headers, transaction=txn) |
| 144 | + transport.ack(header, transaction=txn) |
| 145 | + transport.transaction_commit(txn) |
| 146 | + message_count = message_count + 1 |
| 147 | + print( |
| 148 | + "%4d message(s) drained in %.1f seconds" |
| 149 | + % (message_count, time.time() - drain_start) |
| 150 | + ) |
| 151 | + time.sleep(args.wait) |
| 152 | + except KeyboardInterrupt: |
| 153 | + sys.exit( |
| 154 | + "\nCancelling, %d message(s) drained, %d message(s) unprocessed in memory" |
| 155 | + % (message_count, messages.qsize()) |
| 156 | + ) |
| 157 | + print( |
| 158 | + "%d message(s) drained, no message seen for %.1f seconds" |
| 159 | + % (message_count, idle_time) |
| 160 | + ) |
0 commit comments