MainLoop — Ruby-библиотека для управления субпроцессами и потоками с функциями:
- автоматический сбор дочерних процессов (reaping)
- корректное завершение (SIGTERM/SIGINT) процессов и потоков
- автоматический перезапуск по количеству повторов
- принудительное завершение по таймауту
- обработка завершения процессов и потоков
MainLoop is a Ruby library for managing subprocesses and threads with features:
- automatic child process reaping
- graceful shutdown (SIGTERM/SIGINT) for processes and threads
- automatic restart by retry count
- timeout-based force termination
- process/thread completion handling
- Потоко-безопасный канал обмена событиями (IO.pipe) / Thread-safe event bus (IO.pipe)
- Управление процессами через Kernel.fork / Process management via Kernel.fork
- Управление потоками через Thread.new / Thread management via Thread.new
- Обработка сигналов TERM/INT/CLD / SIGTERM/INT/CLD signal handling
- Автоматический сбор завершенных процессов / Automatic child process reaping
- Retry-логика для перезапуска / Retry logic for restarts
- Принудительное завершение по таймауту / Timeout-based force termination
gem install main_loop
При установке MainLoop через bundler добавьте следующую строку в Gemfile:
If you'd rather install MainLoop using bundler, add a line for it in your Gemfile:
gem 'main_loop'
Затем выполните / Then run:
bundle install # для установки гема / gem installation
MainLoop - это корневой модуль, который подключает все компоненты:
MainLoop is the root module that requires all components:
require 'main_loop'Субмодули / Submodules:
MainLoop::Bus— канал обмена событиями (IO.pipe)MainLoop::Dispatcher— координация обработчиков и управление жизненным цикломMainLoop::Loop— главный цикл обработки событий и сигналовMainLoop::Handler— абстрактный базовый классMainLoop::ProcessHandler— управление субпроцессамиMainLoop::ThreadHandler— управление потоками
graph TB
subgraph "MainLoop"
Loop["Loop<br><i>Главный цикл</i>"]
Dispatcher["Dispatcher<br><i>Координирует обработчиков</i>"]
Bus["Bus<br><i>Канал обмена событиями (IO.pipe)</i>"]
Handler["Handler<br><i>Базовый класс</i>"]
ProcessHandler["ProcessHandler<br><i>Управляет процессами</i>"]
ThreadHandler["ThreadHandler<br><i>Управляет потоками</i>"]
Loop -->|координирует| Dispatcher
Dispatcher -->|использует| Bus
Loop -->|получает события из| Bus
ProcessHandler -->|наследует| Handler
ThreadHandler -->|наследует| Handler
ProcessHandler -->|регистрируется в| Dispatcher
ThreadHandler -->|регистрируется в| Dispatcher
end
sequenceDiagram
participant Cycle as Loop
participant Bus
participant Dispatcher
participant Handler
note over Cycle,Handler: Инициализация / Initialization
Cycle->>Bus: install_signal_handlers
Handler->>Dispatcher: add_handler
Cycle->>Cycle: start_loop_forever
loop Forever
Cycle->>Bus: gets(wait)
alt sig:TERM/INT
Bus-->>Cycle: "sig:TERM"
Cycle->>Dispatcher: term
Dispatcher->>Dispatcher: @terminating_at = Time.now
note right of Dispatcher: Graceful termination
Dispatcher->>Handler: term
opt @on_term || @runnable.on_term
Handler->>Handler: @on_term.call(pid/thread)
end
else sig:CLD
Cycle->>Cycle: wait for reap_children
else reap:pid:status
Cycle->>Dispatcher: reap_by_id
Dispatcher->>Handler: reap(status)
alt retry_count > 0
Handler->>Handler: handle_retry -> run
else
Handler->>Bus: publish(:term)
end
end
Cycle->>Dispatcher: tick
alt need_force_kill?
note right of Dispatcher: Превышен timeout / Timeout exceeded
Dispatcher->>Dispatcher: @killed = true
Dispatcher->>Handler: kill
end
end
Библиотека поддерживает два основных подхода для описания работы процессов и потоков:
Передайте блок кода непосредственно в конструктор обработчика (ProcessHandler или ThreadHandler). В этом блоке размещается основная логика. При завершении (по сигналу, ошибке или таймауту) блок прерывается; вы можете определить дополнительные действия, используя переданный объект обработчика.
MainLoop::ProcessHandler.new(dispatcher, 'my_process', retry_count: 3) do
# основная логика
loop { sleep 1 }
end
MainLoop::ThreadHandler.new(dispatcher, 'my_thread', retry_count: 0) do |handler|
handler.on_term do
# действия при завершении (очистка, закрытие ресурсов)
@stop = true
end
# основная логика с возможностью проверки флага
@stop = false
loop { sleep 1; break if @stop }
endПередайте экземпляр класса, который реализует два обязательных метода:
run— содержит основную логику; дляProcessHandlerметод не принимает аргументов, дляThreadHandlerполучает объект потока.on_term— вызывается при необходимости завершить процесс/поток; дляProcessHandlerпринимает PID, дляThreadHandler— объект потока. В этом методе следует инициировать корректное завершение (например, послать сигнал процессу или установить флаг остановки для потока).
class Worker
def run
trap('USR1') { @stop = true; raise Interrupt }
@stop = false
loop { sleep 1; break if @stop }
rescue Interrupt
# завершаемся
exit 0
end
def on_term(pid)
Process.kill('USR1', pid)
end
end
MainLoop::ProcessHandler.new(dispatcher, 'worker', runnable: Worker.new)Оба подхода могут комбинироваться с параметрами (retry_count, logger и т.д.) и одинаково хорошо интегрируются с циклом MainLoop::Loop.
Выбор зависит от удобства: для простых сценариев подойдёт блок, для сложной логики управления завершением — объект с явными методами.
require 'main_loop'
require 'logger'
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
# Шина и диспетчер. Параметр timeout (в секундах) опционален.
bus = MainLoop::Bus.new
dispatcher = MainLoop::Dispatcher.new(bus, timeout: 10, logger: logger)
mainloop = MainLoop::Loop.new(bus, dispatcher, logger: logger)# Процесс test1: будет перезапущен 3 раза, после завершения выходит с кодом 0
MainLoop::ProcessHandler.new dispatcher, 'test1', retry_count: 3, logger: logger do
sleep 2
exit! 0
end
# Процесс test2: обрабатывает SIGTERM и выходит с кодом 1 после 2 перезапусков
MainLoop::ProcessHandler.new dispatcher, 'test2', retry_count: 2, logger: logger do
trap 'TERM' do
exit(0)
end
sleep 2
exit! 1
endMainLoop::ProcessHandler.new dispatcher, 'worker', retry_count: 3, logger: logger do
loop do
# основная логика / main logic
sleep 1
end
exit 0
endclass Worker
def run
# Используем пользовательский сигнал, чтобы прервать системные вызовы (например, sleep)
trap('USR1') do
puts "Получен USR1. pid = #{Process.pid}"
@stop = true
raise Interrupt # прерывает текущий блок (sleep и т.д.)
end
@stop = false
loop do
puts "работаю..."
sleep 100
break if @stop
rescue Interrupt
puts "прерывание, выходим"
break
end
exit 0
end
def on_term(pid)
puts "Завершаю работу. pid = #{pid}"
Process.kill('USR1', pid) # посылаем пользовательский сигнал
begin
Timeout.timeout(5) { Process.wait(pid) }
rescue Timeout::Error
puts "не завершился за 5 секунд"
end
puts "Завершил работу"
end
end
worker = Worker.new
MainLoop::ProcessHandler.new(dispatcher, 'worker', runnable: worker, retry_count: :unlimited, logger: logger)# Поток thread2: не перезапускается (retry_count: 0), выполняет внешнюю команду
MainLoop::ThreadHandler.new dispatcher, 'thread2', retry_count: 0, logger: logger do
system('sleep 15;echo ok')
endMainLoop::ThreadHandler.new dispatcher, 'worker', retry_count: 0, logger: logger do |handler|
# Устанавливаем обработчик завершения потока
handler.on_term do
puts "Завершаем поток, выполняем cleanup..."
@stop = true
end
@stop = false
loop do
puts "Работаю..."
sleep 1
break if @stop
end
endclass Worker
def run(thread)
@stop = false
loop do
sleep 60
break if @stop
end
end
def on_term(thread)
@stop = true
thread&.wakeup
end
end
worker = Worker.new
MainLoop::ThreadHandler.new dispatcher, 'worker', runnable: worker, logger: logger# Бесконечный цикл / Infinite loop
mainloop.run
# С таймаутом (30 секунд) / With timeout (30 seconds)
mainloop.run(30)Когда отправляется сигнал TERM или INT:
trapперехватывает сигнал и отправляетbus.puts("sig:TERM")Loopполучает событие изBusи вызываетDispatcher#termDispatcherустанавливает@terminating_at = Time.now- Все обработчики получают
term:ProcessHandlerпосылаетProcess.kill('TERM', pid)ThreadHandlerвызывает@on_termблок
- Если через
timeout(по умолчанию 5 сек) процессы не завершились:Dispatcher#tickпроверяетneed_force_kill?- Если
true— посылаетkillвсем обработчикам
- Когда все обработчики завершаются (
finished?):Dispatcher#try_exit!вызываетexit(@exit_code)
When sending TERM or INT signal:
trapcatches the signal and sendsbus.puts("sig:TERM")Loopgets the event fromBusand callsDispatcher#termDispatchersets@terminating_at = Time.now- All handlers receive
term:ProcessHandlersendsProcess.kill('TERM', pid)ThreadHandlercalls@on_termblock
- If processes don't terminate within
timeout(default 5 seconds):Dispatcher#tickchecksneed_force_kill?- If
true— sendskillto all handlers
- When all handlers finish (
finished?):Dispatcher#try_exit!callsexit(@exit_code)
retry_count: 3 # повторить 3 раза / retry 3 times
retry_count: 0 # не повторять / don't retry
retry_count: :unlimited # бесконечные повторы / infinite retriesОбработчики могут отправлять события в шину:
Handlers can publish events to the bus:
# Из любого места обработчика / From any handler place:
publish("reap:#{id}:exited")
publish(:term)- Потоко-безопасность:
BusиDispatcherиспользуютMonitorMixin - Таймауты:
Timeouterиспользуется для timeout вBus#getsиLoop#start_loop_forever - Логирование: все классы принимают параметр
logger:, по умолчаниюLogger.new(nil) - Ошибки: используйте
rescue StandardError(не пустойrescue) - Коды выхода:
exit!(code)в процессах,exit(code)в основном потоке
- Thread safety:
BusandDispatcheruseMonitorMixin - Timeouts:
Timeouteris used for timeout inBus#getsиLoop#start_loop_forever - Logging: all classes accept
logger:parameter, default isLogger.new(nil) - Error handling: use
rescue StandardError(not barerescue) - Exit codes:
exit!(code)in processes,exit(code)in main thread
# Запуск всех тестов / Run all tests
bundle exec rspec
# Запуск одного файла / Run single file
bundle exec rspec spec/bus_spec.rb
# Запуск одного теста / Run single test
bundle exec rspec spec/bus_spec.rb:12
# Покрытие кода / Code coverage (96%+)
bundle exec rspec --format progressДля запуска примеров из локальной копии репозитория (без установки гема) используйте опцию -I интерпретатора Ruby, указав путь к директории lib относительно текущей папки:
ruby -I ./lib examples/имя_файла.rbТекущая версия / Current version: 0.1.4
Юрий Самойленко / Yuri Samoylenko kinnalru@gmail.com
Библиотека доступна с открытым исходным кодом в соответствии с условиями лицензии MIT.
The gem is available as open source under the terms of the MIT License.