Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class EngineArgs:
The amount of CPU memory to offload to.
"""

cache_queue_port: int = 8003
cache_queue_port: str = "8003"
"""
Port for cache queue.
"""
Expand Down Expand Up @@ -741,7 +741,7 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:

cache_group.add_argument(
"--cache-queue-port",
type=int,
type=lambda s: [int(item.strip()) for item in s.split(",")] if s else None,
default=EngineArgs.cache_queue_port,
help="port for cache queue",
)
Expand Down
12 changes: 7 additions & 5 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ def __init__(self, cfg, start_queue=True):
cfg (Config): Config object containing all the configuration parameters.
"""
self.cfg = cfg
if isinstance(self.cfg.cache_config.cache_queue_port, str):
self.cfg.cache_config.cache_queue_port = self.cfg.cache_config.cache_queue_port.split(",")
if isinstance(self.cfg.cache_config.cache_queue_port, list):
self.cfg.cache_config.cache_queue_port = int(
self.cfg.cache_config.cache_queue_port[self.cfg.parallel_config.local_data_parallel_id]
)

if self.cfg.parallel_config.enable_expert_parallel:
self.llm_logger = get_logger(
Expand Down Expand Up @@ -251,11 +257,7 @@ def start_worker_queue_service(self, start_queue):
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
)

if (
self.cfg.cache_config.enable_prefix_caching
or self.cfg.scheduler_config.splitwise_role != "mixed"
and self.cfg.parallel_config.local_data_parallel_id == 0
):
if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
self.cache_task_queue = EngineCacheQueue(
address=(
self.cfg.master_ip,
Expand Down
37 changes: 35 additions & 2 deletions fastdeploy/engine/expert_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def __init__(self, cfg, local_data_parallel_id, start_queue=True):
llm_logger.info(f"local_data_parallel_id: {local_data_parallel_id}")
self.cfg.disaggregate_info = None

if self.cfg.cache_config.num_gpu_blocks_override is None:
self.do_profile = True
else:
self.do_profile = False

if cfg.scheduler_config.splitwise_role != "mixed":
if len(self.cfg.cache_config.pd_comm_port) == 1:
self.cfg.cache_config.pd_comm_port[0] = (
Expand Down Expand Up @@ -97,9 +102,29 @@ def start(
ipc_signal_suffix = self.cfg.parallel_config.engine_worker_queue_port[0]

llm_logger.info(f"start expert service {local_data_parallel_id}")
if self.cfg.scheduler_config.splitwise_role != "mixed":

if self.cfg.splitwise_role != "mixed" or self.cfg.cache_config.enable_prefix_caching:
if self.do_profile:
get_profile_block_num = np.zeros([1], dtype=np.int32)
while True:
try:
self.get_profile_block_num_signal = IPCSignal(
name="get_profile_block_num",
array=get_profile_block_num,
dtype=np.int32,
suffix=int(self.cfg.engine_worker_queue_port[0]),
create=False,
)
break
except:
time.sleep(1)
self.reset_kvcache_blocks()
ipc_signal_suffix_cache = self.cfg.parallel_config.engine_worker_queue_port[local_data_parallel_id]
self.engine.start_cache_service(self.cfg.local_device_ids, ipc_signal_suffix_cache)
self.cache_manager_processes = self.engine.start_cache_service(
self.cfg.local_device_ids, ipc_signal_suffix_cache
)
if self.cfg.splitwise_role != "mixed":
self.engine.split_mode_get_tasks()

if self.cfg.scheduler_config.name == "splitwise":
self.cfg.init_cache_info()
Expand Down Expand Up @@ -135,6 +160,14 @@ def start(
)
return True

def reset_kvcache_blocks(self):
self.do_profile = 0
while self.get_profile_block_num_signal.value[0] == 0:
time.sleep(1)
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
self.cfg.cache_config.reset(num_gpu_blocks)
self.engine.resource_manager.reset_cache_config(self.cfg.cache_config)

def _exit_sub_services(self):
"""
exit sub services
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ def initialize_kv_cache(self, profile: bool = False) -> None:

if not create_cache_tensor:
logger.info(f"Waiting for cache managers to create kv cache.. {cache_ready_signal.value}")
while cache_ready_signal.value[self.local_rank] != 1:
while cache_ready_signal.value[local_rank] != 1:
time.sleep(1)
logger.info(f"OK! Stop waiting. {cache_ready_signal.value}")

Expand Down Expand Up @@ -1236,7 +1236,7 @@ def initialize_kv_cache(self, profile: bool = False) -> None:
self.share_inputs["caches"] = cache_kvs_list

if not profile and create_cache_tensor:
cache_ready_signal.value[self.local_rank] = 1
cache_ready_signal.value[local_rank] = 1
logger.info(f"✅ kv cache is ready! {cache_ready_signal.value}")

paddle.device.cuda.empty_cache()
Expand Down
Loading