Skip to content

Added RuntimeJob.queue_info() method #1210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions qiskit_ibm_runtime/runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from qiskit_ibm_runtime import qiskit_runtime_service

from .utils.utils import validate_job_tags
from .utils.queueinfo import QueueInfo
from .constants import API_TO_JOB_ERROR_MESSAGE, API_TO_JOB_STATUS, DEFAULT_DECODERS
from .exceptions import (
IBMApiError,
Expand Down Expand Up @@ -136,6 +137,7 @@ def __init__(
self._session_id = session_id
self._tags = tags
self._usage_estimation: Dict[str, Any] = {}
self._queue_info: QueueInfo = None

decoder = result_decoder or DEFAULT_DECODERS.get(program_id, None) or ResultDecoder
if isinstance(decoder, Sequence):
Expand Down Expand Up @@ -686,3 +688,64 @@ def usage_estimation(self) -> Dict[str, Any]:
}

return self._usage_estimation

def queue_position(self, refresh: bool = False) -> Optional[int]:
"""Return the position of the job in the server queue.

Note:
The position returned is within the scope of the provider
and may differ from the global queue position.

Args:
refresh: If ``True``, re-query the server to get the latest value.
Otherwise return the cached value.

Returns:
Position in the queue or ``None`` if position is unknown or not applicable.
"""
if refresh:
api_metadata = self._api_client.job_metadata(self.job_id())
self._queue_info = QueueInfo(
position_in_queue=api_metadata.get("position_in_queue"),
status=self.status(),
estimated_start_time=api_metadata.get("estimated_start_time"),
estimated_completion_time=api_metadata.get("estimated_completion_time"),
)

if self._queue_info:
return self._queue_info.position
return None

def queue_info(self) -> Optional[QueueInfo]:
"""Return queue information for this job.

The queue information may include queue position, estimated start and
end time, and dynamic priorities for the hub, group, and project. See
:class:`QueueInfo` for more information.

Note:
The queue information is calculated after the job enters the queue.
Therefore, some or all of the information may not be immediately
available, and this method may return ``None``.

Returns:
A :class:`QueueInfo` instance that contains queue information for
this job, or ``None`` if queue information is unknown or not
applicable.
"""
# Get latest queue information.
api_metadata = self._api_client.job_metadata(self.job_id())
self._queue_info = QueueInfo(
position_in_queue=api_metadata.get("position_in_queue"),
status=self.status(),
estimated_start_time=api_metadata.get("estimated_start_time"),
estimated_completion_time=api_metadata.get("estimated_completion_time"),
)
# Return queue information only if it has any useful information.
if self._queue_info and any(
value is not None
for attr, value in self._queue_info.__dict__.items()
if not attr.startswith("_") and attr != "job_id"
):
return self._queue_info
return None
166 changes: 166 additions & 0 deletions qiskit_ibm_runtime/utils/queueinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Queue information for a job."""

import warnings
from datetime import datetime
from typing import Any, Optional, Union, Dict
import dateutil.parser

from ..utils import utc_to_local, duration_difference


class QueueInfo:
"""Queue information for a job."""

_data = {} # type: Dict

def __init__(
self,
position_in_queue: Optional[int] = None,
status: Optional[str] = None,
estimated_start_time: Optional[Union[str, datetime]] = None,
estimated_completion_time: Optional[Union[str, datetime]] = None,
hub_priority: Optional[float] = None,
group_priority: Optional[float] = None,
project_priority: Optional[float] = None,
job_id: Optional[str] = None,
**kwargs: Any
) -> None:
"""QueueInfo constructor.

Args:
position: Position in the queue.
status: Job status.
estimated_start_time: Estimated start time for the job, in UTC.
estimated_complete_time: Estimated complete time for the job, in UTC.
hub_priority: Dynamic priority for the hub.
group_priority: Dynamic priority for the group.
project_priority: Dynamic priority for the project.
job_id: Job ID.
kwargs: Additional attributes.
"""
self.position = int(position_in_queue) if position_in_queue else None
self._status = status
if isinstance(estimated_start_time, str):
estimated_start_time = dateutil.parser.isoparse(estimated_start_time)
if isinstance(estimated_completion_time, str):
estimated_completion_time = dateutil.parser.isoparse(estimated_completion_time)
self._estimated_start_time_utc = estimated_start_time
self._estimated_complete_time_utc = estimated_completion_time
self.hub_priority = hub_priority
self.group_priority = group_priority
self.project_priority = project_priority
self.job_id = job_id

self._data = kwargs

def __repr__(self) -> str:
"""Return the string representation of ``QueueInfo``.

Note:
The estimated start and end time are displayed in local time
for convenience.

Returns:
A string representation of ``QueueInfo``.

Raises:
TypeError: If the `estimated_start_time` or `estimated_end_time`
value is not valid.
"""
status = self._get_value(self._status)

with warnings.catch_warnings():
warnings.simplefilter("ignore")
est_start_time = (
self.estimated_start_time.isoformat() if self.estimated_start_time else None
)
est_complete_time = (
self.estimated_complete_time.isoformat() if self.estimated_complete_time else None
)

queue_info = [
"job_id='{}'".format(self.job_id),
"_status='{}'".format(status),
"estimated_start_time='{}'".format(est_start_time),
"estimated_complete_time='{}'".format(est_complete_time),
"position={}".format(self.position),
"hub_priority={}".format(self.hub_priority),
"group_priority={}".format(self.group_priority),
"project_priority={}".format(self.project_priority),
]

return "<{}({})>".format(self.__class__.__name__, ", ".join(queue_info))

def __getattr__(self, name: str) -> Any:
try:
return self._data[name]
except KeyError:
raise AttributeError("Attribute {} is not defined.".format(name)) from None

def format(self) -> str:
"""Build a user-friendly report for the job queue information.

Returns:
The job queue information report.
"""
status = self._status

with warnings.catch_warnings():
warnings.simplefilter("ignore")
est_start_time = (
duration_difference(self.estimated_start_time)
if self.estimated_start_time
else self._get_value(self.estimated_start_time)
)
est_complete_time = (
duration_difference(self.estimated_complete_time)
if self.estimated_complete_time
else self._get_value(self.estimated_complete_time)
)

queue_info = [
"Job {} queue information:".format(self._get_value(self.job_id)),
" queue position: {}".format(self._get_value(self.position)),
" status: {}".format(status),
" estimated start time: {}".format(est_start_time),
" estimated completion time: {}".format(est_complete_time),
" hub priority: {}".format(self._get_value(self.hub_priority)),
" group priority: {}".format(self._get_value(self.group_priority)),
" project priority: {}".format(self._get_value(self.project_priority)),
]

return "\n".join(queue_info)

def _get_value(self, value: Optional[Any], default_value: str = "unknown") -> Optional[Any]:
"""Return the input value if it exists or the default.

Returns:
The input value if it is not ``None``, else the input default value.
"""
return value or default_value

@property
def estimated_start_time(self) -> Optional[datetime]:
"""Return estimated start time in local time."""
if self._estimated_start_time_utc is None:
return None
return utc_to_local(self._estimated_start_time_utc)

@property
def estimated_complete_time(self) -> Optional[datetime]:
"""Return estimated complete time in local time."""
if self._estimated_complete_time_utc is None:
return None
return utc_to_local(self._estimated_complete_time_utc)
5 changes: 5 additions & 0 deletions releasenotes/notes/queueinfo-5e1bb815228425bb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
Added a method ``RuntimeJob.queue_info()`` to get the queue information
from the backend. This feature was transferred from ``qiskit_ibm_provider``.
28 changes: 10 additions & 18 deletions test/integration/test_ibm_job_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import uuid
import time
from datetime import datetime, timedelta
from unittest import skip
from unittest import skip, SkipTest

from dateutil import tz
from qiskit.compiler import transpile
Expand All @@ -32,7 +32,7 @@
integration_test_setup,
)
from ..ibm_test_case import IBMTestCase
from ..utils import most_busy_backend
from ..utils import most_busy_backend, cancel_job_safe


class TestIBMJobAttributes(IBMTestCase):
Expand Down Expand Up @@ -200,13 +200,13 @@ def test_time_per_step(self):
step, time_data, start_datetime, end_datetime
),
)

rjob = self.service.job(job.job_id())
self.assertTrue(rjob.time_per_step())

@skip("queue_info supported in provider but not here")
def test_queue_info(self):
"""Test retrieving queue information."""
if self.dependencies.channel == "ibm_cloud":
raise SkipTest("Not supported on cloud channel.")
# Find the most busy backend.
backend = most_busy_backend(self.service)
leave_states = list(JOB_FINAL_STATES) + [JobStatus.RUNNING]
Expand All @@ -230,19 +230,11 @@ def test_queue_info(self):
)
msg = "Job {} is queued but has no ".format(job.job_id())
self.assertIsNotNone(queue_info, msg + "queue info.")
for attr, value in queue_info.__dict__.items():
self.assertIsNotNone(value, msg + attr)
self.assertTrue(
all(
0 < priority <= 1.0
for priority in [
queue_info.hub_priority,
queue_info.group_priority,
queue_info.project_priority,
]
),
"Unexpected queue info {} for job {}".format(queue_info, job.job_id()),
)

self.assertTrue(queue_info.format())
self.assertTrue(repr(queue_info))
elif job._status is not None:
self.assertIsNone(job.queue_position())
self.log.warning("Unable to retrieve queue information")

# Cancel job so it doesn't consume more resources.
cancel_job_safe(job, self.log)