Skip to content

[Bugfix][Nixl] Fix Preemption Bug #18631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,12 @@ def schedule(self) -> SchedulerOutput:
break

request = self.waiting[0]
num_prealloc_computed_tokens = 0
# P/D: skip request if still waiting for remote kvs.

# KVTransfer: skip request if still waiting for remote kvs.
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
is_ready = self._update_waiting_for_remote_kv(request)
if is_ready:
request.status = RequestStatus.WAITING
num_prealloc_computed_tokens = (
request.num_computed_tokens)
else:
self.waiting.popleft()
skipped_waiting_requests.appendleft(request)
Expand Down Expand Up @@ -349,32 +347,33 @@ def schedule(self) -> SchedulerOutput:
load_kv_async = False

# Get already-cached tokens.
if num_prealloc_computed_tokens == 0:
new_computed_blocks, num_native_computed_tokens = \
if request.num_computed_tokens == 0:
# Get locally-cache tokens.
new_computed_blocks, num_new_local_computed_tokens = \
self.kv_cache_manager.get_computed_blocks(
request)

# Get externally-cached tokens if using a KVConnector.
if self.connector is not None:
num_external_computed_tokens, load_kv_async = (
self.connector.get_num_new_matched_tokens(
request, num_native_computed_tokens))
request, num_new_local_computed_tokens))

# Total computed tokens (local + external).
num_computed_tokens = (num_native_computed_tokens +
num_computed_tokens = (num_new_local_computed_tokens +
num_external_computed_tokens)
# KVTransfer: WAITING reqs have num_computed_tokens > 0
# after async KV recvs are completed.
else:
# P/D: skip checking prefix cache if loaded from remote kvs.
assert request.kv_transfer_params is not None
new_computed_blocks = KVCacheBlocks.create_empty()
num_native_computed_tokens = 0

# Total computed tokens (allocated in prior step).
num_computed_tokens = num_prealloc_computed_tokens
num_new_local_computed_tokens = 0
num_computed_tokens = request.num_computed_tokens

encoder_inputs_to_schedule = None
new_encoder_budget = encoder_budget

# P/D: loading remote KV, do not allocate for new work.
# KVTransfer: loading remote KV, do not allocate for new work.
if load_kv_async:
assert num_external_computed_tokens > 0
num_new_tokens = 0
Expand Down Expand Up @@ -405,7 +404,7 @@ def schedule(self) -> SchedulerOutput:
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens + num_external_computed_tokens,
num_native_computed_tokens,
num_new_local_computed_tokens,
new_computed_blocks,
num_lookahead_tokens=self.num_lookahead_tokens,
delay_cache_blocks=load_kv_async,
Expand All @@ -414,21 +413,18 @@ def schedule(self) -> SchedulerOutput:
# The request cannot be scheduled.
break

# KVConnector: update internal state after allocation.
# This information is used to determine if a load is
# needed for this request.
if num_external_computed_tokens:
assert self.connector is not None
# KVTransfer: update connector state. Used to create metadata
# to instruct the Worker to do a KV load if needed.
if self.connector is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this came up in a prior PR review but are you sure we want to call this when num_external_computed_tokens == 0?

IMO it's clearer to not do so since the connector has declared that it is not providing any tokens in this case, so should not have any interest in the allocation. In the MultiConnector where we assume that we'll load from at most one connector, we only invoke update_state_after_alloc on the first connector that returned nonzero from get_num_new_matched_tokens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the connector should decide what to do in the case that num_external_tokens is 0.

For instance, in the case of the NIXLConnector, we currently leak memory on the P worker is the D worker has a full local prefix cache hit since the D worker does not have an opportunity to call send_notif() to alert the P worker it can free its blocks.

I will remove the change from this PR and make another one to make this fix more explicit.

self.connector.update_state_after_alloc(
request,
new_computed_blocks + new_blocks,
num_external_computed_tokens,
)

self.waiting.popleft()
# KVTransfer: wait until remove KVs have arrived.
if load_kv_async:
# If loading async, allocate memory and put request
# into the WAITING_FOR_REMOTE_KV state.
skipped_waiting_requests.appendleft(request)
request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
continue
Expand Down