diff --git a/docs/WIKI.md b/docs/WIKI.md index 736f0cc..f139cbd 100644 --- a/docs/WIKI.md +++ b/docs/WIKI.md @@ -44,7 +44,7 @@ help [command] View Python execution stack information for all threads currently running in the process, and support analyzing native stacks and exporting to files. ```shell -stack [pid] [-f ] [--native] +stack [pid] [-f ] [--native] [-a|--async] ``` ##### Parameter Analysis @@ -53,6 +53,7 @@ stack [pid] [-f ] [--native] | pid | No | Process ID to analyze, defaults to the injected process ID | 3303 | | -f, --filepath | No | File path to export thread stacks to | /home/admin/stack.log | | --native | No | Whether to analyze native stacks of Python threads, defaults to False | --native | +| -a, --async | No | Whether to display async coroutine/task stacks | -a | ##### Output Display Command examples: @@ -64,6 +65,10 @@ stack # View native stacks of Python threads stack --native +# View async coroutine/task stacks +stack -a +stack --async + # Export execution stack information to a file stack -f ./stack.log ``` @@ -80,19 +85,58 @@ Analyzing native thread stacks: View Python execution stack information for all threads currently running in the process, and support exporting to files. ```shell -stack [filepath] +stack [filepath] [-a|--async] ``` ##### Parameter Analysis | Parameter | Required | Meaning | Example | | --- | --- | --- | --- | | filepath | No | File path to export thread stacks to | /home/admin/stack.log | +| -a, --async | No | Whether to display async coroutine/task stacks | -a | ##### Output Display Executing the `stack` command will display stack information for all threads in the console. ![img.png](https://raw.githubusercontent.com/alibaba/PyFlightProfiler/refs/heads/main/docs/images/stack_mac.png) +### Async Coroutine Stack: stack --async +View all async coroutines/tasks running across all event loops in all threads. + +```shell +stack -a +stack --async +``` + +The async stack display shows: +- **Thread**: The thread where the event loop is running +- **Task Name**: The asyncio Task name +- **State**: Task state (PENDING, WAITING, FINISHED, CANCELLED, FAILED) +- **Coroutine**: The coroutine function name +- **Stack**: Full coroutine call chain following the `cr_await` chain + +Example output: +``` +============================================================ +Async Coroutine/Task Stacks +============================================================ + +Thread: AsyncEventLoop (tid: 0x16f8a7000) +-------------------------------------------------- + + Task #1: FetchAPI1 + State: WAITING + Coroutine: fetch_data_loop + Stack (3 frames): + File "/app/demo.py", line 95, in fetch_data_loop + await fetch_data(name, delay) + File "/app/demo.py", line 26, in fetch_data + await asyncio.sleep(delay) + File "/usr/lib/python3.9/asyncio/tasks.py", line 649, in sleep + return await future + +============================================================ +``` + ## Method Execution Observation: watch ### Observing Method Input, Output, and Time Consumption diff --git a/docs/WIKI_zh.md b/docs/WIKI_zh.md index 7796ff6..658979a 100644 --- a/docs/WIKI_zh.md +++ b/docs/WIKI_zh.md @@ -44,7 +44,7 @@ help [command] 查看进程当前运行的所有线程的Python执行栈信息,并支持分析native栈以及导出到文件。 ```shell -stack [pid] [-f ] [--native] +stack [pid] [-f ] [--native] [-a|--async] ``` ##### 参数解析 @@ -53,6 +53,7 @@ stack [pid] [-f ] [--native] | pid | 否 | 分析的进程ID,默认为被注入进程的ID | 3303 | | -f, --filepath | 否 | 线程栈导出到的文件位置 | /home/admin/stack.log | | --native | 否 | 是否分析Python线程的本地栈,默认为False | --native | +| -a, --async | 否 | 是否展示异步协程/任务调用栈 | -a | ##### 输出展示 命令示例: @@ -64,6 +65,10 @@ stack # 查看Python线程的native栈 stack --native +# 查看异步协程/任务调用栈 +stack -a +stack --async + # 导出执行栈信息到文件中 stack -f ./stack.log ``` @@ -80,19 +85,58 @@ stack -f ./stack.log 查看进程当前运行的所有线程的Python执行栈信息,并支持导出到文件。 ```shell -stack [filepath] +stack [filepath] [-a|--async] ``` ##### 参数解析 | 参数 | 必填 | 含义 | 示例 | | --- | --- | --- | --- | | filepath | 否 | 线程栈导出到的文件位置 | /home/admin/stack.log | +| -a, --async | 否 | 是否展示异步协程/任务调用栈 | -a | ##### 输出展示 执行`stack`命令可在控制台展示所有线程的栈信息。 ![img.png](https://raw.githubusercontent.com/alibaba/PyFlightProfiler/refs/heads/main/docs/images/stack_mac.png) +### 异步协程栈: stack --async +查看进程中所有线程、所有事件循环中运行的异步协程/任务。 + +```shell +stack -a +stack --async +``` + +异步协程栈展示内容包括: +- **Thread**: 事件循环所在的线程 +- **Task Name**: asyncio Task 的名称 +- **State**: 任务状态(PENDING, WAITING, FINISHED, CANCELLED, FAILED) +- **Coroutine**: 协程函数名 +- **Stack**: 完整的协程调用链(通过 `cr_await` 链递归追踪) + +输出示例: +``` +============================================================ +Async Coroutine/Task Stacks +============================================================ + +Thread: AsyncEventLoop (tid: 0x16f8a7000) +-------------------------------------------------- + + Task #1: FetchAPI1 + State: WAITING + Coroutine: fetch_data_loop + Stack (3 frames): + File "/app/demo.py", line 95, in fetch_data_loop + await fetch_data(name, delay) + File "/app/demo.py", line 26, in fetch_data + await asyncio.sleep(delay) + File "/usr/lib/python3.9/asyncio/tasks.py", line 649, in sleep + return await future + +============================================================ +``` + ## 方法执行观测watch ### 观察执行方法输入、输出及耗时 diff --git a/flight_profiler/code_inject.py b/flight_profiler/code_inject.py index 3bc7567..06d3aba 100644 --- a/flight_profiler/code_inject.py +++ b/flight_profiler/code_inject.py @@ -46,6 +46,6 @@ def run_app(): loop.run_until_complete(asyncio.wait(tasks)) -profile_thread = threading.Thread(target=run_app) +profile_thread = threading.Thread(target=run_app, name="flight-profiler-injector") profile_thread.start() logger.info("pyFlightProfiler: start code inject successfully") diff --git a/flight_profiler/help_descriptions.py b/flight_profiler/help_descriptions.py index a8b0ece..f203f8a 100644 --- a/flight_profiler/help_descriptions.py +++ b/flight_profiler/help_descriptions.py @@ -211,9 +211,9 @@ def _build_help_msg(self) -> str: if is_linux(): STACK_COMMAND_DESCRIPTION = CommandDescription( - usage=["stack [pid] [-f ] [--native]"], + usage=["stack [pid] [-f ] [--native] [-a|--async]"], summary="Inspect stack frames of current running process.", - examples=["stack", "stack --native", "stack -f ./stack.log"], + examples=["stack", "stack --native", "stack -a", "stack --async", "stack -f ./stack.log"], wiki="https://github.com/alibaba/PyFlightProfiler/blob/main/docs/WIKI.md", options=[ ( @@ -222,15 +222,19 @@ def _build_help_msg(self) -> str: ), ("-f, --filepath", "redirect thread stack to filepath."), ("--native", "display native stack frames."), + ("-a, --async", "display async coroutine/task stacks."), ], ) else: STACK_COMMAND_DESCRIPTION = CommandDescription( - usage=["stack [filepath]"], + usage=["stack [filepath] [-a|--async]"], summary="Inspect stack frames of current running process.", - examples=["stack", "stack ./stack.log"], + examples=["stack", "stack -a", "stack --async", "stack ./stack.log"], wiki="https://github.com/alibaba/PyFlightProfiler/blob/main/docs/WIKI.md", - options=[("", "redirect thread stack to filepath.")], + options=[ + ("", "redirect thread stack to filepath."), + ("-a, --async", "display async coroutine/task stacks."), + ], ) TRACE_COMMAND_DESCRIPTION = CommandDescription( diff --git a/flight_profiler/plugins/stack/cli_plugin_stack.py b/flight_profiler/plugins/stack/cli_plugin_stack.py index cc08a24..48892ce 100644 --- a/flight_profiler/plugins/stack/cli_plugin_stack.py +++ b/flight_profiler/plugins/stack/cli_plugin_stack.py @@ -56,12 +56,48 @@ def __analyze_under_linux(self, params: StackParams): f"{COLOR_GREEN}write {stack_literal} to {params.filepath} successfully!{COLOR_END}" ) + def __show_coroutine_stacks(self, params: StackParams): + """ + Show async coroutine stacks via server communication. + Coroutines must be fetched from the target process's event loop. + """ + body = {"target": "stack", "param": "async"} + try: + client = FlightClient(host="localhost", port=self.port) + except: + show_error_info("Target process exited!") + return + try: + coro_lines: List[str] = [] + for line in client.request_stream(body): + if line: + line = line.decode("utf-8") + if params.filepath is not None: + coro_lines.append(line) + else: + print(line) + if params.filepath is not None: + with open(params.filepath, "w") as f: + for line in coro_lines: + print(line, file=f, flush=True) + print( + f"{COLOR_GREEN}write coroutine stacks to {params.filepath} successfully!{COLOR_END}" + ) + finally: + client.close() + def do_action(self, cmd): try: stack_param: StackParams = global_stack_parser.parse_stack_params(cmd) except: print(self.get_help()) return + + # Handle async coroutine stack display + if stack_param.async_stack: + self.__show_coroutine_stacks(stack_param) + return + if is_linux(): try: self.__analyze_under_linux(stack_param) diff --git a/flight_profiler/plugins/stack/server_plugin_stack.py b/flight_profiler/plugins/stack/server_plugin_stack.py index 16f77c2..0c83ce0 100644 --- a/flight_profiler/plugins/stack/server_plugin_stack.py +++ b/flight_profiler/plugins/stack/server_plugin_stack.py @@ -1,11 +1,25 @@ +import asyncio import os import tempfile import threading import traceback +from typing import List +from flight_profiler.common.system_logger import logger from flight_profiler.ext.stack_C import dump_all_threads_stack from flight_profiler.plugins.server_plugin import Message, ServerPlugin, ServerQueue from flight_profiler.utils.args_util import split_regex +from flight_profiler.utils.render_util import ( + BANNER_COLOR_CYAN, + COLOR_BOLD, + COLOR_END, + COLOR_FAINT, + COLOR_GREEN, + COLOR_ORANGE, + COLOR_RED, + COLOR_WHITE_255, + COLOR_YELLOW, +) from flight_profiler.utils.shell_util import resolve_symbol_address @@ -49,6 +63,11 @@ def add_thread_name(self, contents): return new_contents async def do_action(self, param): + # Check if we need to show async coroutine stacks + if param == "async": + await self._dump_coroutine_stacks() + return + tmp_fd, tmp_file_path = tempfile.mkstemp() try: addr = resolve_symbol_address("_Py_DumpTracebackThreads", os.getpid()) @@ -67,6 +86,448 @@ async def do_action(self, param): except: await self.out_q.output_msg(Message(True, traceback.format_exc())) + async def _dump_coroutine_stacks(self): + """ + Dump async coroutine/task stacks from all running event loops. + """ + try: + output_lines: List[str] = [] + # Title with green + separator = f"{COLOR_GREEN}{COLOR_BOLD}{'=' * 60}{COLOR_END}" + output_lines.append(separator) + output_lines.append(f"{COLOR_GREEN}{COLOR_BOLD}Async Coroutine/Task Stacks{COLOR_END}") + output_lines.append(separator) + output_lines.append("") + + # Get all running event loops across threads + all_tasks = self._get_all_async_tasks() + + # Group tasks by thread and filter out internal threads + tasks_by_thread = {} + for task_info in all_tasks: + thread_id = task_info.get("thread_id", 0) + thread_name = task_info.get("thread_name", "Unknown") + # Skip flight_profiler internal threads early + if self._is_flight_profiler_thread(thread_name): + continue + key = (thread_id, thread_name) + if key not in tasks_by_thread: + tasks_by_thread[key] = [] + tasks_by_thread[key].append(task_info) + + if not tasks_by_thread: + output_lines.append(f"{COLOR_YELLOW}No active coroutines/async tasks found.{COLOR_END}") + output_lines.append("") + output_lines.append(f"{COLOR_FAINT}Note: Coroutines are only visible when an event loop is running.{COLOR_END}") + else: + for (thread_id, thread_name), tasks in tasks_by_thread.items(): + # Thread header in cyan + output_lines.append(f"{BANNER_COLOR_CYAN}{COLOR_BOLD}Thread: {thread_name} (tid: 0x{thread_id:x}){COLOR_END}") + output_lines.append(f"{COLOR_FAINT}{'-' * 50}{COLOR_END}") + + for i, task_info in enumerate(tasks): + task_name = task_info.get("name", "") + task_state = task_info.get("state", "unknown") + coro_repr = task_info.get("coro_repr", "") + + output_lines.append("") + # Task name in yellow + output_lines.append(f" {COLOR_YELLOW}{COLOR_BOLD}Task #{i + 1}: {task_name}{COLOR_END}") + # State with color based on status + state_color = self._get_state_color(task_state) + output_lines.append(f" {COLOR_WHITE_255}State:{COLOR_END} {state_color}{task_state}{COLOR_END}") + # Coroutine name in orange + output_lines.append(f" {COLOR_WHITE_255}Coroutine:{COLOR_END} {COLOR_ORANGE}{coro_repr}{COLOR_END}") + + # Print stack frames + stack_frames = task_info.get("stack", []) + if stack_frames: + output_lines.append(f" {COLOR_WHITE_255}Stack ({len(stack_frames)} frames):{COLOR_END}") + for frame in stack_frames: + filename = frame.get("filename", "") + lineno = frame.get("lineno", 0) + funcname = frame.get("name", "") + line = frame.get("line", "") + # File path in faint, line number in green, function name in bold + output_lines.append( + f' {COLOR_FAINT}File "{filename}",{COLOR_END} ' + f'{COLOR_GREEN}line {lineno}{COLOR_END}, ' + f'{COLOR_BOLD}in {funcname}{COLOR_END}' + ) + if line: + output_lines.append(f" {COLOR_WHITE_255}{line}{COLOR_END}") + else: + output_lines.append(f" {COLOR_FAINT}Stack: {COLOR_END}") + + output_lines.append("") + + output_lines.append(separator) + result = "\n".join(output_lines) + await self.out_q.output_msg(Message(True, result)) + except Exception: + logger.exception("Failed to dump coroutine stacks") + await self.out_q.output_msg(Message(True, traceback.format_exc())) + + def _get_state_color(self, state: str) -> str: + """Get color code based on task state.""" + state_colors = { + "PENDING": COLOR_YELLOW, + "WAITING": COLOR_GREEN, + "FINISHED": COLOR_GREEN, + "CANCELLED": COLOR_FAINT, + "FAILED": COLOR_RED, + "UNKNOWN": COLOR_FAINT, + } + return state_colors.get(state, COLOR_WHITE_255) + + def _get_all_async_tasks(self) -> List[dict]: + """ + Get all async tasks from all running event loops across all threads. + Returns a list of task info dictionaries. + """ + all_tasks_info = [] + seen_task_ids = set() + + # Directly access asyncio's global _all_tasks WeakSet + # This contains ALL tasks from ALL event loops, not just current loop + try: + # _all_tasks is in asyncio.tasks module + import asyncio.tasks as asyncio_tasks + if hasattr(asyncio_tasks, '_all_tasks'): + all_tasks_weak = asyncio_tasks._all_tasks + elif hasattr(asyncio, '_all_tasks'): + all_tasks_weak = asyncio._all_tasks + else: + all_tasks_weak = None + if all_tasks_weak is not None: + # Safe iteration over WeakSet (may need retry due to concurrent modification) + for attempt in range(10): + try: + tasks_snapshot = list(all_tasks_weak) + break + except RuntimeError: + continue + else: + tasks_snapshot = [] + + for task in tasks_snapshot: + if task is None or id(task) in seen_task_ids: + continue + # Skip done tasks + try: + if task.done(): + continue + except Exception: + pass + # Skip flight_profiler internal tasks + if self._is_flight_profiler_task(task): + continue + seen_task_ids.add(id(task)) + + # Find the thread running this task's loop + thread_id, thread_name = self._find_thread_for_task(task) + + task_info = { + "task_id": id(task), + "thread_id": thread_id, + "thread_name": thread_name, + "name": self._get_task_name(task), + "state": self._get_task_state(task), + "coro_repr": self._get_coro_repr(task), + "stack": self._get_task_stack(task), + } + all_tasks_info.append(task_info) + except Exception: + logger.exception("Failed to get all async tasks") + + return all_tasks_info + + def _find_thread_for_task(self, task: asyncio.Task) -> tuple: + """ + Find which thread is running the event loop for this task. + Returns (thread_id, thread_name) tuple. + """ + try: + loop = getattr(task, '_loop', None) + if loop is None: + return (0, "Unknown") + + # Try to get thread_id directly from loop (set when loop is running) + loop_thread_id = getattr(loop, '_thread_id', None) + if loop_thread_id is not None and loop_thread_id != 0: + # Found the thread id, now get the name + thread_name = "Unknown" + for t in threading.enumerate(): + if t.ident == loop_thread_id: + thread_name = t.name + break + return (loop_thread_id, thread_name) + + # Fallback: try to find by scanning current frames + import sys + for thread_id, frame in sys._current_frames().items(): + current_frame = frame + while current_frame is not None: + local_vars = current_frame.f_locals + for var_name in ['self', 'loop', '_loop', 'event_loop']: + if var_name in local_vars: + val = local_vars[var_name] + if val is loop or (hasattr(val, '_loop') and val._loop is loop): + thread_name = "Unknown" + for t in threading.enumerate(): + if t.ident == thread_id: + thread_name = t.name + break + return (thread_id, thread_name) + current_frame = current_frame.f_back + except Exception: + logger.exception("Failed to find thread for task") + return (0, "Unknown") + + def _scan_frame_for_loops( + self, frame, thread_id: int, seen_loops: set, + seen_task_ids: set, all_tasks_info: List[dict] + ): + """ + Recursively scan a frame and its parents for event loops. + """ + try: + current_frame = frame + while current_frame is not None: + local_vars = current_frame.f_locals + for var_name, var_value in local_vars.items(): + if isinstance(var_value, asyncio.AbstractEventLoop): + if id(var_value) not in seen_loops: + seen_loops.add(id(var_value)) + # Get thread name + thread_name = "Unknown" + for t in threading.enumerate(): + if t.ident == thread_id: + thread_name = t.name + break + tasks_info = self._extract_tasks_from_loop( + var_value, thread_id, thread_name, seen_task_ids + ) + all_tasks_info.extend(tasks_info) + current_frame = current_frame.f_back + except Exception: + pass + + def _extract_tasks_from_loop( + self, loop: asyncio.AbstractEventLoop, thread_id: int, thread_name: str, + seen_task_ids: set = None + ) -> List[dict]: + """ + Extract task information from a given event loop. + """ + tasks_info = [] + if seen_task_ids is None: + seen_task_ids = set() + + try: + # Get all tasks from this loop + try: + tasks = asyncio.all_tasks(loop) + except RuntimeError: + # Loop might not be running + return tasks_info + + for task in tasks: + # Skip already seen tasks + if id(task) in seen_task_ids: + continue + # Skip flight_profiler internal tasks + if self._is_flight_profiler_task(task): + continue + seen_task_ids.add(id(task)) + task_info = { + "task_id": id(task), + "thread_id": thread_id, + "thread_name": thread_name, + "name": self._get_task_name(task), + "state": self._get_task_state(task), + "coro_repr": self._get_coro_repr(task), + "stack": self._get_task_stack(task), + } + tasks_info.append(task_info) + except Exception: + pass + + return tasks_info + + def _is_flight_profiler_thread(self, thread_name: str) -> bool: + """ + Check if a thread belongs to flight_profiler itself. + We filter these out to avoid showing internal tool threads. + """ + if not thread_name: + return False + name_lower = thread_name.lower() + return "flight-profiler" in name_lower or "flight_profiler" in name_lower + + def _is_flight_profiler_task(self, task: asyncio.Task) -> bool: + """ + Check if a task belongs to flight_profiler itself. + We filter these out to avoid showing internal tool tasks. + """ + try: + # Check coroutine name + coro = task.get_coro() + if coro is not None: + coro_name = getattr(coro, "__qualname__", "") or getattr(coro, "__name__", "") + if "flight_profiler" in coro_name.lower(): + return True + # Check coroutine's module + coro_module = getattr(coro, "__module__", "") or "" + if "flight_profiler" in coro_module: + return True + # Check cr_code for file path + cr_code = getattr(coro, "cr_code", None) + if cr_code is not None: + filename = getattr(cr_code, "co_filename", "") + if "flight_profiler" in filename: + return True + + # Check stack frames for flight_profiler paths + frames = task.get_stack() + if frames: + for frame in frames: + filename = frame.f_code.co_filename + if "flight_profiler" in filename: + return True + except Exception: + pass + return False + + def _get_task_name(self, task: asyncio.Task) -> str: + """ + Get the name of an asyncio task. + """ + try: + # Python 3.8+ has task.get_name() + if hasattr(task, "get_name"): + return task.get_name() + return repr(task) + except Exception: + return "" + + def _get_task_state(self, task: asyncio.Task) -> str: + """ + Get the state of an asyncio task. + """ + try: + if task.done(): + if task.cancelled(): + return "CANCELLED" + try: + if task.exception() is not None: + return "FAILED" + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + return "FINISHED" + # Check if task is waiting (has a waiter) + if hasattr(task, "_fut_waiter") and task._fut_waiter is not None: + return "WAITING" + return "PENDING" + except Exception: + return "UNKNOWN" + + def _get_coro_repr(self, task: asyncio.Task) -> str: + """ + Get a string representation of the coroutine. + """ + try: + coro = task.get_coro() + if coro is not None: + # Get coroutine name and qualified name + coro_name = getattr(coro, "__qualname__", None) or getattr( + coro, "__name__", repr(coro) + ) + return coro_name + return "" + except Exception: + return "" + + def _get_task_stack(self, task: asyncio.Task) -> List[dict]: + """ + Get the full coroutine call chain for an asyncio task. + This follows the cr_await chain to show nested coroutine calls. + """ + stack_info = [] + try: + # Start with the task's coroutine + coro = task.get_coro() + if coro is None: + return stack_info + + # Follow the cr_await chain to build full coroutine stack + self._collect_coro_frames(coro, stack_info) + + except Exception as e: + logger.exception("Failed to get task stack") + return stack_info + + def _collect_coro_frames(self, coro, stack_info: List[dict], depth: int = 0): + """ + Recursively collect frames from coroutine and its awaited coroutines. + """ + if coro is None or depth > 100: # Prevent infinite recursion + return + + try: + # Get the current frame of this coroutine + cr_frame = getattr(coro, 'cr_frame', None) + if cr_frame is not None: + frame_info = { + "filename": cr_frame.f_code.co_filename, + "lineno": cr_frame.f_lineno, + "name": cr_frame.f_code.co_name, + "line": self._get_line_from_frame(cr_frame), + } + stack_info.append(frame_info) + + # Follow cr_await to get the awaited coroutine/awaitable + cr_await = getattr(coro, 'cr_await', None) + if cr_await is not None: + # cr_await could be another coroutine, a Future, or other awaitable + if hasattr(cr_await, 'cr_frame'): + # It's a coroutine + self._collect_coro_frames(cr_await, stack_info, depth + 1) + elif hasattr(cr_await, 'gi_frame'): + # It's a generator-based coroutine + gi_frame = cr_await.gi_frame + if gi_frame is not None: + frame_info = { + "filename": gi_frame.f_code.co_filename, + "lineno": gi_frame.f_lineno, + "name": gi_frame.f_code.co_name, + "line": self._get_line_from_frame(gi_frame), + } + stack_info.append(frame_info) + # Continue following gi_yieldfrom if present + gi_yieldfrom = getattr(cr_await, 'gi_yieldfrom', None) + if gi_yieldfrom is not None: + self._collect_coro_frames(gi_yieldfrom, stack_info, depth + 1) + elif hasattr(cr_await, '__self__') and hasattr(cr_await, 'cr_await'): + # Some wrapped awaitable + self._collect_coro_frames(cr_await, stack_info, depth + 1) + except Exception as e: + logger.exception("Failed to collect coroutine frames at depth %d", depth) + + def _get_line_from_frame(self, frame) -> str: + """ + Get the source code line from a frame object. + """ + try: + import linecache + + filename = frame.f_code.co_filename + lineno = frame.f_lineno + line = linecache.getline(filename, lineno).strip() + return line + except Exception: + return "" + def get_instance(cmd: str, out_q: ServerQueue): return StackServerPlugin(cmd, out_q) diff --git a/flight_profiler/plugins/stack/stack_parser.py b/flight_profiler/plugins/stack/stack_parser.py index 5d018da..29c0de1 100644 --- a/flight_profiler/plugins/stack/stack_parser.py +++ b/flight_profiler/plugins/stack/stack_parser.py @@ -10,10 +10,11 @@ class StackParams: - def __init__(self, pid: int, filepath: Optional[str], native: bool): + def __init__(self, pid: int, filepath: Optional[str], native: bool, async_stack: bool = False): self.pid = pid self.native = native self.filepath = filepath + self.async_stack = async_stack # show async coroutine stack class StackParser(argparse.ArgumentParser): @@ -40,6 +41,15 @@ def __init__(self): default=False, help="analyze native stack frame or not.", ) + self.add_argument( + "-a", + "--async", + dest="async_stack", + required=False, + action="store_true", + default=False, + help="show async coroutine/task stacks.", + ) self.add_argument( "-f", "--filepath", @@ -65,9 +75,15 @@ def parse_stack_params(self, arg_string: str) -> StackParams: pid=getattr(args, "pid"), native=getattr(args, "native"), filepath=getattr(args, "filepath"), + async_stack=getattr(args, "async_stack"), ) else: - return StackParams(pid=-1, native=False, filepath=getattr(args, "filepath")) + return StackParams( + pid=-1, + native=False, + filepath=getattr(args, "filepath"), + async_stack=getattr(args, "async_stack"), + ) global_stack_parser = StackParser() diff --git a/flight_profiler/test/plugins/stack/server_plugin_stack_test.py b/flight_profiler/test/plugins/stack/server_plugin_stack_test.py new file mode 100644 index 0000000..122de05 --- /dev/null +++ b/flight_profiler/test/plugins/stack/server_plugin_stack_test.py @@ -0,0 +1,468 @@ +""" +Unit tests for server_plugin_stack.py async coroutine stack functionality. +""" + +import asyncio +import unittest +from unittest.mock import MagicMock + +from flight_profiler.plugins.stack.server_plugin_stack import StackServerPlugin +from flight_profiler.utils.render_util import ( + COLOR_FAINT, + COLOR_GREEN, + COLOR_RED, + COLOR_WHITE_255, + COLOR_YELLOW, +) + + +class MockServerQueue: + """Mock ServerQueue for testing.""" + + def __init__(self): + self.messages = [] + + async def output_msg(self, msg): + self.messages.append(msg) + + +class TestStackServerPluginFiltering(unittest.TestCase): + """Test cases for flight_profiler filtering logic.""" + + def setUp(self): + self.out_q = MockServerQueue() + self.plugin = StackServerPlugin("stack", self.out_q) + + def test_is_flight_profiler_thread_with_hyphen(self): + """Test thread name filtering with 'flight-profiler' pattern.""" + self.assertTrue(self.plugin._is_flight_profiler_thread("flight-profiler-worker-0")) + self.assertTrue(self.plugin._is_flight_profiler_thread("flight-profiler-injector")) + self.assertTrue(self.plugin._is_flight_profiler_thread("Flight-Profiler-Worker")) + + def test_is_flight_profiler_thread_with_underscore(self): + """Test thread name filtering with 'flight_profiler' pattern.""" + self.assertTrue(self.plugin._is_flight_profiler_thread("flight_profiler_worker")) + self.assertTrue(self.plugin._is_flight_profiler_thread("Flight_Profiler_Task")) + + def test_is_flight_profiler_thread_negative(self): + """Test that non-flight_profiler threads are not filtered.""" + self.assertFalse(self.plugin._is_flight_profiler_thread("MainThread")) + self.assertFalse(self.plugin._is_flight_profiler_thread("AsyncEventLoop")) + self.assertFalse(self.plugin._is_flight_profiler_thread("Worker-1")) + self.assertFalse(self.plugin._is_flight_profiler_thread("Thread-2")) + + def test_is_flight_profiler_thread_empty_or_none(self): + """Test handling of empty or None thread names.""" + self.assertFalse(self.plugin._is_flight_profiler_thread("")) + self.assertFalse(self.plugin._is_flight_profiler_thread(None)) + + +class TestStackServerPluginStateColor(unittest.TestCase): + """Test cases for state color mapping.""" + + def setUp(self): + self.out_q = MockServerQueue() + self.plugin = StackServerPlugin("stack", self.out_q) + + def test_state_color_pending(self): + """Test PENDING state returns yellow color.""" + self.assertEqual(self.plugin._get_state_color("PENDING"), COLOR_YELLOW) + + def test_state_color_waiting(self): + """Test WAITING state returns green color.""" + self.assertEqual(self.plugin._get_state_color("WAITING"), COLOR_GREEN) + + def test_state_color_finished(self): + """Test FINISHED state returns green color.""" + self.assertEqual(self.plugin._get_state_color("FINISHED"), COLOR_GREEN) + + def test_state_color_cancelled(self): + """Test CANCELLED state returns faint color.""" + self.assertEqual(self.plugin._get_state_color("CANCELLED"), COLOR_FAINT) + + def test_state_color_failed(self): + """Test FAILED state returns red color.""" + self.assertEqual(self.plugin._get_state_color("FAILED"), COLOR_RED) + + def test_state_color_unknown(self): + """Test UNKNOWN state returns faint color.""" + self.assertEqual(self.plugin._get_state_color("UNKNOWN"), COLOR_FAINT) + + def test_state_color_unrecognized(self): + """Test unrecognized state returns default white color.""" + self.assertEqual(self.plugin._get_state_color("SOME_OTHER_STATE"), COLOR_WHITE_255) + + +class TestStackServerPluginTaskInfo(unittest.TestCase): + """Test cases for task information extraction.""" + + def setUp(self): + self.out_q = MockServerQueue() + self.plugin = StackServerPlugin("stack", self.out_q) + + def test_get_task_name_with_get_name(self): + """Test getting task name using get_name() method.""" + mock_task = MagicMock() + mock_task.get_name.return_value = "TestTask" + self.assertEqual(self.plugin._get_task_name(mock_task), "TestTask") + + def test_get_task_name_without_get_name(self): + """Test getting task name when get_name() is not available.""" + mock_task = MagicMock(spec=[]) # No get_name method + result = self.plugin._get_task_name(mock_task) + self.assertIn("MagicMock", result) + + def test_get_task_name_with_exception(self): + """Test handling exception when getting task name.""" + mock_task = MagicMock() + mock_task.get_name.side_effect = Exception("Test error") + self.assertEqual(self.plugin._get_task_name(mock_task), "") + + def test_get_task_state_done_cancelled(self): + """Test getting state for cancelled task.""" + mock_task = MagicMock() + mock_task.done.return_value = True + mock_task.cancelled.return_value = True + self.assertEqual(self.plugin._get_task_state(mock_task), "CANCELLED") + + def test_get_task_state_done_failed(self): + """Test getting state for failed task.""" + mock_task = MagicMock() + mock_task.done.return_value = True + mock_task.cancelled.return_value = False + mock_task.exception.return_value = Exception("Test") + self.assertEqual(self.plugin._get_task_state(mock_task), "FAILED") + + def test_get_task_state_done_finished(self): + """Test getting state for finished task.""" + mock_task = MagicMock() + mock_task.done.return_value = True + mock_task.cancelled.return_value = False + mock_task.exception.return_value = None + self.assertEqual(self.plugin._get_task_state(mock_task), "FINISHED") + + def test_get_task_state_waiting(self): + """Test getting state for waiting task.""" + mock_task = MagicMock() + mock_task.done.return_value = False + mock_task._fut_waiter = MagicMock() # Has a waiter + self.assertEqual(self.plugin._get_task_state(mock_task), "WAITING") + + def test_get_task_state_pending(self): + """Test getting state for pending task.""" + mock_task = MagicMock() + mock_task.done.return_value = False + mock_task._fut_waiter = None # No waiter + self.assertEqual(self.plugin._get_task_state(mock_task), "PENDING") + + +class TestStackServerPluginCoroRepr(unittest.TestCase): + """Test cases for coroutine representation.""" + + def setUp(self): + self.out_q = MockServerQueue() + self.plugin = StackServerPlugin("stack", self.out_q) + + def test_get_coro_repr_with_qualname(self): + """Test getting coroutine repr with __qualname__.""" + mock_coro = MagicMock() + mock_coro.__qualname__ = "MyClass.my_coroutine" + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + self.assertEqual(self.plugin._get_coro_repr(mock_task), "MyClass.my_coroutine") + + def test_get_coro_repr_with_name(self): + """Test getting coroutine repr with __name__ when __qualname__ is None.""" + mock_coro = MagicMock(spec=["__name__"]) + mock_coro.__name__ = "my_coroutine" + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + result = self.plugin._get_coro_repr(mock_task) + self.assertEqual(result, "my_coroutine") + + def test_get_coro_repr_no_coro(self): + """Test getting coroutine repr when task has no coroutine.""" + mock_task = MagicMock() + mock_task.get_coro.return_value = None + self.assertEqual(self.plugin._get_coro_repr(mock_task), "") + + +class TestStackServerPluginCoroFrames(unittest.TestCase): + """Test cases for coroutine frame collection (cr_await chain).""" + + def setUp(self): + self.out_q = MockServerQueue() + self.plugin = StackServerPlugin("stack", self.out_q) + + def test_collect_coro_frames_single_frame(self): + """Test collecting frames from a single coroutine.""" + # Create mock frame + mock_code = MagicMock() + mock_code.co_filename = "/path/to/test.py" + mock_code.co_name = "test_func" + + mock_frame = MagicMock() + mock_frame.f_code = mock_code + mock_frame.f_lineno = 42 + + # Create mock coroutine + mock_coro = MagicMock() + mock_coro.cr_frame = mock_frame + mock_coro.cr_await = None + + stack_info = [] + self.plugin._collect_coro_frames(mock_coro, stack_info) + + self.assertEqual(len(stack_info), 1) + self.assertEqual(stack_info[0]["filename"], "/path/to/test.py") + self.assertEqual(stack_info[0]["lineno"], 42) + self.assertEqual(stack_info[0]["name"], "test_func") + + def test_collect_coro_frames_chain(self): + """Test collecting frames from a chain of coroutines (cr_await).""" + # Create mock frames + mock_code1 = MagicMock() + mock_code1.co_filename = "/path/to/level1.py" + mock_code1.co_name = "level1_func" + mock_frame1 = MagicMock() + mock_frame1.f_code = mock_code1 + mock_frame1.f_lineno = 10 + + mock_code2 = MagicMock() + mock_code2.co_filename = "/path/to/level2.py" + mock_code2.co_name = "level2_func" + mock_frame2 = MagicMock() + mock_frame2.f_code = mock_code2 + mock_frame2.f_lineno = 20 + + # Create nested coroutines + mock_coro2 = MagicMock() + mock_coro2.cr_frame = mock_frame2 + mock_coro2.cr_await = None + + mock_coro1 = MagicMock() + mock_coro1.cr_frame = mock_frame1 + mock_coro1.cr_await = mock_coro2 + + stack_info = [] + self.plugin._collect_coro_frames(mock_coro1, stack_info) + + self.assertEqual(len(stack_info), 2) + self.assertEqual(stack_info[0]["name"], "level1_func") + self.assertEqual(stack_info[1]["name"], "level2_func") + + def test_collect_coro_frames_max_depth(self): + """Test that recursion is limited by max depth.""" + # Create a chain longer than max depth + stack_info = [] + + # Create deep chain (depth > 100) + def create_chain(depth): + if depth > 105: + return None + mock_code = MagicMock() + mock_code.co_filename = f"/path/to/level{depth}.py" + mock_code.co_name = f"level{depth}" + mock_frame = MagicMock() + mock_frame.f_code = mock_code + mock_frame.f_lineno = depth + + mock_coro = MagicMock() + mock_coro.cr_frame = mock_frame + mock_coro.cr_await = create_chain(depth + 1) + return mock_coro + + root_coro = create_chain(0) + self.plugin._collect_coro_frames(root_coro, stack_info) + + # Should stop at max depth (100) + self.assertLessEqual(len(stack_info), 101) + + def test_collect_coro_frames_none_coro(self): + """Test handling of None coroutine.""" + stack_info = [] + self.plugin._collect_coro_frames(None, stack_info) + self.assertEqual(len(stack_info), 0) + + +class TestStackServerPluginTaskFiltering(unittest.TestCase): + """Test cases for flight_profiler task filtering.""" + + def setUp(self): + self.out_q = MockServerQueue() + self.plugin = StackServerPlugin("stack", self.out_q) + + def test_is_flight_profiler_task_by_coro_name(self): + """Test filtering task by coroutine name.""" + mock_coro = MagicMock() + mock_coro.__qualname__ = "flight_profiler.server.run" + mock_coro.__module__ = "some_module" + + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + mock_task.get_stack.return_value = [] + + self.assertTrue(self.plugin._is_flight_profiler_task(mock_task)) + + def test_is_flight_profiler_task_by_module(self): + """Test filtering task by module name.""" + mock_coro = MagicMock() + mock_coro.__qualname__ = "some_func" + mock_coro.__module__ = "flight_profiler.plugins.stack" + + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + mock_task.get_stack.return_value = [] + + self.assertTrue(self.plugin._is_flight_profiler_task(mock_task)) + + def test_is_flight_profiler_task_by_filename(self): + """Test filtering task by file path.""" + mock_code = MagicMock() + mock_code.co_filename = "/path/to/flight_profiler/server.py" + + mock_coro = MagicMock() + mock_coro.__qualname__ = "some_func" + mock_coro.__module__ = "some_module" + mock_coro.cr_code = mock_code + + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + mock_task.get_stack.return_value = [] + + self.assertTrue(self.plugin._is_flight_profiler_task(mock_task)) + + def test_is_flight_profiler_task_by_stack_frame(self): + """Test filtering task by stack frame file path.""" + mock_coro = MagicMock() + mock_coro.__qualname__ = "user_func" + mock_coro.__module__ = "user_module" + mock_coro.cr_code = None + + mock_code = MagicMock() + mock_code.co_filename = "/path/to/flight_profiler/plugin.py" + mock_frame = MagicMock() + mock_frame.f_code = mock_code + + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + mock_task.get_stack.return_value = [mock_frame] + + self.assertTrue(self.plugin._is_flight_profiler_task(mock_task)) + + def test_is_flight_profiler_task_negative(self): + """Test that user tasks are not filtered.""" + mock_coro = MagicMock() + mock_coro.__qualname__ = "user_coroutine" + mock_coro.__module__ = "my_app.handlers" + mock_coro.cr_code = None + + mock_code = MagicMock() + mock_code.co_filename = "/path/to/my_app/handlers.py" + mock_frame = MagicMock() + mock_frame.f_code = mock_code + + mock_task = MagicMock() + mock_task.get_coro.return_value = mock_coro + mock_task.get_stack.return_value = [mock_frame] + + self.assertFalse(self.plugin._is_flight_profiler_task(mock_task)) + + +class TestStackServerPluginIntegration(unittest.TestCase): + """Integration tests using real asyncio tasks.""" + + def _run_async_test(self, coro): + """ + Run async test in an isolated event loop without affecting global state. + """ + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + # Cancel all pending tasks + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + # Run loop to process cancellations + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + loop.close() + + def test_get_task_info_with_real_task(self): + """Test extracting info from a real asyncio task.""" + out_q = MockServerQueue() + plugin = StackServerPlugin("stack", out_q) + test_case = self + + async def sample_coroutine(): + await asyncio.sleep(100) + + async def run_test(): + task = asyncio.create_task(sample_coroutine(), name="TestSampleTask") + await asyncio.sleep(0.01) # Let task start + + # Test task name + name = plugin._get_task_name(task) + test_case.assertEqual(name, "TestSampleTask") + + # Test task state + state = plugin._get_task_state(task) + test_case.assertEqual(state, "WAITING") + + # Test coroutine repr + coro_repr = plugin._get_coro_repr(task) + test_case.assertEqual( + coro_repr, + "TestStackServerPluginIntegration.test_get_task_info_with_real_task..sample_coroutine" + ) + + # Cleanup + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + self._run_async_test(run_test()) + + def test_collect_frames_with_real_coroutine_chain(self): + """Test frame collection with real nested coroutines.""" + out_q = MockServerQueue() + plugin = StackServerPlugin("stack", out_q) + test_case = self + + async def inner_coro(): + await asyncio.sleep(100) + + async def outer_coro(): + await inner_coro() + + async def run_test(): + task = asyncio.create_task(outer_coro(), name="ChainTest") + await asyncio.sleep(0.01) # Let task start + + # Get task stack + stack = plugin._get_task_stack(task) + + # Should have frames from the coroutine chain + test_case.assertGreater(len(stack), 0) + + # Verify frame structure + for frame in stack: + test_case.assertIn("filename", frame) + test_case.assertIn("lineno", frame) + test_case.assertIn("name", frame) + + # Cleanup + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + self._run_async_test(run_test()) + + +if __name__ == "__main__": + unittest.main() diff --git a/flight_profiler/test/plugins/watch/watch_plugin_test.py b/flight_profiler/test/plugins/watch/watch_plugin_test.py index a1c75b2..38178b7 100644 --- a/flight_profiler/test/plugins/watch/watch_plugin_test.py +++ b/flight_profiler/test/plugins/watch/watch_plugin_test.py @@ -35,7 +35,7 @@ def test_watch_module_method(self): raise finally: integration.stop() - + def test_watch_keyword_only_module_method(self): current_directory = os.path.dirname(os.path.abspath(__file__)) file = os.path.join(current_directory, "watch_server_script.py") @@ -58,7 +58,7 @@ def test_watch_keyword_only_module_method(self): break else: break - + self.assertTrue(find) except: raise @@ -180,5 +180,3 @@ def test_watch_module_exp_method(self): raise finally: integration.stop() - - diff --git a/flight_profiler/test/plugins/watch/watch_server_script.py b/flight_profiler/test/plugins/watch/watch_server_script.py index 6e8f2bc..ef5086b 100644 --- a/flight_profiler/test/plugins/watch/watch_server_script.py +++ b/flight_profiler/test/plugins/watch/watch_server_script.py @@ -20,8 +20,8 @@ def nested_func_inner(): def kwonly_func(x, y, *, c=1): print("kwonly_func called") - - + + def test_func(name): print("hello func") return name + " " + "watch_plugin"