diff --git a/source/pip/qsharp/__init__.py b/source/pip/qsharp/__init__.py index 80432d977e..889b24ee83 100644 --- a/source/pip/qsharp/__init__.py +++ b/source/pip/qsharp/__init__.py @@ -2,30 +2,33 @@ # Licensed under the MIT License. from . import telemetry_events +from ._native import ( + CircuitGenerationMethod, + Pauli, + QSharpError, + Result, + TargetProfile, + estimate_custom, +) +from ._noise import BitFlipNoise, DepolarizingNoise, PauliNoise, PhaseFlipNoise from ._qsharp import ( - init, - eval, - run, - compile, circuit, + compile, + dump_circuit, + dump_machine, estimate, + eval, + init, logical_counts, - set_quantum_seed, + run, set_classical_seed, - dump_machine, - dump_circuit, - StateDump, - ShotResult, - PauliNoise, - DepolarizingNoise, - BitFlipNoise, - PhaseFlipNoise, - CircuitGenerationMethod, + set_quantum_seed, ) +from ._session import Session +from ._types import ShotResult, StateDump telemetry_events.on_import() -from ._native import Result, Pauli, QSharpError, TargetProfile, estimate_custom # IPython notebook specific features try: @@ -61,4 +64,5 @@ "BitFlipNoise", "PhaseFlipNoise", "CircuitGenerationMethod", + "Session", ] diff --git a/source/pip/qsharp/_device/_atom/__init__.py b/source/pip/qsharp/_device/_atom/__init__.py index f58a7ab77f..ec5bdee01b 100644 --- a/source/pip/qsharp/_device/_atom/__init__.py +++ b/source/pip/qsharp/_device/_atom/__init__.py @@ -4,7 +4,7 @@ from .._device import Device, Zone, ZoneType from ..._simulation import NoiseConfig, run_qir_clifford, run_qir_cpu, run_qir_gpu from ..._native import try_create_gpu_adapter -from ..._qsharp import QirInputData +from ..._types import QirInputData from ... import telemetry_events from typing import List, Literal, Optional diff --git a/source/pip/qsharp/_device/_device.py b/source/pip/qsharp/_device/_device.py index 991dc46b24..e0030fa18a 100644 --- a/source/pip/qsharp/_device/_device.py +++ b/source/pip/qsharp/_device/_device.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. from enum import Enum -from .._qsharp import QirInputData +from .._types import QirInputData class ZoneType(Enum): diff --git a/source/pip/qsharp/_ipython.py b/source/pip/qsharp/_ipython.py index 041148e4da..db1661b8ec 100644 --- a/source/pip/qsharp/_ipython.py +++ b/source/pip/qsharp/_ipython.py @@ -12,7 +12,7 @@ from IPython.display import display, clear_output from IPython.core.magic import register_cell_magic from ._native import QSharpError -from ._qsharp import get_interpreter, qsharp_value_to_python_value +from ._qsharp import _get_default_session from . import telemetry_events @@ -35,8 +35,9 @@ def callback(output): start_time = monotonic() try: - results = qsharp_value_to_python_value( - get_interpreter().interpret(cell, callback) + session = _get_default_session() + results = session._qsharp_value_to_python_value( + session._interpreter.interpret(cell, callback) ) durationMs = (monotonic() - start_time) * 1000 diff --git a/source/pip/qsharp/_noise.py b/source/pip/qsharp/_noise.py new file mode 100644 index 0000000000..a6681a6b64 --- /dev/null +++ b/source/pip/qsharp/_noise.py @@ -0,0 +1,94 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Noise Models for Quantum Simulation""" + +from typing import Tuple + + +class PauliNoise(Tuple[float, float, float]): + """ + The Pauli noise to use in simulation represented + as probabilities of Pauli-X, Pauli-Y, and Pauli-Z errors + """ + + def __new__(cls, x: float, y: float, z: float): + """ + Creates a new :class:`PauliNoise` instance with the given error probabilities. + + :param x: Probability of a Pauli-X (bit flip) error. Must be non-negative. + :type x: float + :param y: Probability of a Pauli-Y error. Must be non-negative. + :type y: float + :param z: Probability of a Pauli-Z (phase flip) error. Must be non-negative. + :type z: float + :return: A new :class:`PauliNoise` tuple ``(x, y, z)``. + :rtype: PauliNoise + :raises ValueError: If any probability is negative or if ``x + y + z > 1``. + """ + if x < 0 or y < 0 or z < 0: + raise ValueError("Pauli noise probabilities must be non-negative.") + if x + y + z > 1: + raise ValueError("The sum of Pauli noise probabilities must be at most 1.") + return super().__new__(cls, (x, y, z)) + + +class DepolarizingNoise(PauliNoise): + """ + The depolarizing noise to use in simulation. + """ + + def __new__(cls, p: float): + """ + Creates a new :class:`DepolarizingNoise` instance. + + The depolarizing channel applies Pauli-X, Pauli-Y, or Pauli-Z errors each with + probability ``p / 3``. + + :param p: Total depolarizing error probability. Must satisfy ``0 ≤ p ≤ 1``. + :type p: float + :return: A new :class:`DepolarizingNoise` with equal X, Y, and Z error probabilities. + :rtype: DepolarizingNoise + :raises ValueError: If ``p`` is negative or ``p > 1``. + """ + return super().__new__(cls, p / 3, p / 3, p / 3) + + +class BitFlipNoise(PauliNoise): + """ + The bit flip noise to use in simulation. + """ + + def __new__(cls, p: float): + """ + Creates a new :class:`BitFlipNoise` instance. + + The bit flip channel applies a Pauli-X error with probability ``p``. + + :param p: Probability of a bit flip (Pauli-X) error. Must satisfy ``0 ≤ p ≤ 1``. + :type p: float + :return: A new :class:`BitFlipNoise` with X error probability ``p``. + :rtype: BitFlipNoise + :raises ValueError: If ``p`` is negative or ``p > 1``. + """ + return super().__new__(cls, p, 0, 0) + + +class PhaseFlipNoise(PauliNoise): + """ + The phase flip noise to use in simulation. + """ + + def __new__(cls, p: float): + """ + Creates a new :class:`PhaseFlipNoise` instance. + + The phase flip channel applies a Pauli-Z error with probability ``p``. + + :param p: Probability of a phase flip (Pauli-Z) error. Must satisfy ``0 ≤ p ≤ 1``. + :type p: float + :return: A new :class:`PhaseFlipNoise` with Z error probability ``p``. + :rtype: PhaseFlipNoise + :raises ValueError: If ``p`` is negative or ``p > 1``. + """ + return super().__new__(cls, 0, 0, p) diff --git a/source/pip/qsharp/_qsharp.py b/source/pip/qsharp/_qsharp.py index 9837989b18..10a60431d2 100644 --- a/source/pip/qsharp/_qsharp.py +++ b/source/pip/qsharp/_qsharp.py @@ -1,291 +1,89 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from . import telemetry_events, code -from ._native import ( # type: ignore - Interpreter, - TargetProfile, - StateDumpData, - QSharpError, - Output, - Circuit, - GlobalCallable, - Closure, - Pauli, - Result, - UdtValue, - TypeIR, - TypeKind, - PrimitiveKind, - CircuitConfig, - CircuitGenerationMethod, - NoiseConfig, -) +""" +Q# Interpreter Module-Level Interface + +This module provides the public API for interacting with the Q# interpreter. +It includes module-level functions for code evaluation, execution, compilation, +circuit generation, and resource estimation. These functions operate on a global +default session instance. +""" + +import json +import sys +import types +import warnings +from time import monotonic from typing import ( Any, Callable, Dict, + List, Optional, Tuple, - TypedDict, Union, - List, - Set, - Iterable, cast, ) + +from . import code, telemetry_events +from ._native import ( # type: ignore + Circuit, + CircuitGenerationMethod, + Closure, + GlobalCallable, + NoiseConfig, + TargetProfile, +) +from ._noise import ( + BitFlipNoise, + DepolarizingNoise, + PauliNoise, + PhaseFlipNoise, +) +from ._session import Session, ipython_helper +from ._types import Config, QirInputData, ShotResult, StateDump from .estimator._estimator import ( - EstimatorResult, EstimatorParams, + EstimatorResult, LogicalCounts, ) -import json -import os -import sys -import types -import warnings -from pathlib import Path -from time import monotonic -from dataclasses import make_dataclass - - -def lower_python_obj(obj: object, visited: Optional[Set[object]] = None) -> Any: - if visited is None: - visited = set() - - if id(obj) in visited: - raise QSharpError("Cannot send circular objects from Python to Q#.") - - visited = visited.copy().add(id(obj)) - - # Base case: Primitive types - if isinstance(obj, (bool, int, float, complex, str, Pauli, Result)): - return obj - - # Recursive case: Tuple - if isinstance(obj, tuple): - return tuple(lower_python_obj(elt, visited) for elt in obj) - - # Recursive case: Dict - if isinstance(obj, dict): - return {name: lower_python_obj(val, visited) for name, val in obj.items()} - - # Base case: Callable or Closure - if hasattr(obj, "__global_callable"): - return obj.__getattribute__("__global_callable") - if isinstance(obj, (GlobalCallable, Closure)): - return obj - - # Recursive case: Class with slots - if hasattr(obj, "__slots__"): - fields = {} - for name in getattr(obj, "__slots__"): - if name == "__dict__": - for name, val in obj.__dict__.items(): - fields[name] = lower_python_obj(val, visited) - else: - val = getattr(obj, name) - fields[name] = lower_python_obj(val, visited) - return fields - - # Recursive case: Class - if hasattr(obj, "__dict__"): - fields = { - name: lower_python_obj(val, visited) for name, val in obj.__dict__.items() - } - return fields - - # Recursive case: Array - # By using `Iterable` instead of `list`, we can handle other kind of iterables - # like numpy arrays and generators. - if isinstance(obj, Iterable): - return [lower_python_obj(elt, visited) for elt in obj] - - raise TypeError(f"unsupported type: {type(obj)}") - - -def python_args_to_interpreter_args(args): - """ - Helper function to turn the `*args` argument of this module - to the format expected by the Q# interpreter. - """ - if len(args) == 0: - return None - elif len(args) == 1: - return lower_python_obj(args[0]) - else: - return lower_python_obj(args) - - -_interpreter: Union["Interpreter", None] = None -_config: Union["Config", None] = None - -# Check if we are running in a Jupyter notebook to use the IPython display function -_in_jupyter = False -try: - from IPython.display import display - - if get_ipython().__class__.__name__ == "ZMQInteractiveShell": # type: ignore - _in_jupyter = True # Jupyter notebook or qtconsole -except: - pass - - -# Reporting execution time during IPython cells requires that IPython -# gets pinged to ensure it understands the cell is active. This is done by -# simply importing the display function, which it turns out is enough to begin timing -# while avoiding any UI changes that would be visible to the user. -def ipython_helper(): - try: - if __IPYTHON__: # type: ignore - from IPython.display import display - except NameError: - pass - - -class Config: - """ - Configuration hints for the language service. - """ - - _config: Dict[str, Any] - - def __init__( - self, - target_profile: TargetProfile, - language_features: Optional[List[str]], - manifest: Optional[str], - project_root: Optional[str], - ): - if target_profile == TargetProfile.Adaptive_RI: - self._config = {"targetProfile": "adaptive_ri"} - elif target_profile == TargetProfile.Adaptive_RIF: - self._config = {"targetProfile": "adaptive_rif"} - elif target_profile == TargetProfile.Adaptive_RIFLA: - self._config = {"targetProfile": "adaptive_rifla"} - elif target_profile == TargetProfile.Base: - self._config = {"targetProfile": "base"} - elif target_profile == TargetProfile.Unrestricted: - self._config = {"targetProfile": "unrestricted"} - - if language_features is not None: - self._config["languageFeatures"] = language_features - if manifest is not None: - self._config["manifest"] = manifest - if project_root: - # For now, we only support local project roots, so use a file schema in the URI. - # In the future, we may support other schemes, such as github, if/when - # we have VS Code Web + Jupyter support. - self._config["projectRoot"] = Path(os.getcwd(), project_root).as_uri() - - def __repr__(self) -> str: - return "Q# initialized with configuration: " + str(self._config) - - # See https://ipython.readthedocs.io/en/stable/config/integrating.html#rich-display - # See https://ipython.org/ipython-doc/3/notebook/nbformat.html#display-data - # This returns a custom MIME-type representation of the Q# configuration. - # This data will be available in the cell output, but will not be displayed - # to the user, as frontends would not know how to render the custom MIME type. - # Editor services that interact with the notebook frontend - # (i.e. the language service) can read and interpret the data. - def _repr_mimebundle_( - self, include: Union[Any, None] = None, exclude: Union[Any, None] = None - ) -> Dict[str, Dict[str, Any]]: - return {"application/x.qsharp-config": self._config} - - def get_target_profile(self) -> str: - """ - Returns the target profile as a string, or "unspecified" if not set. - """ - return self._config.get("targetProfile", "unspecified") - - -class PauliNoise(Tuple[float, float, float]): - """ - The Pauli noise to use in simulation represented - as probabilities of Pauli-X, Pauli-Y, and Pauli-Z errors - """ - - def __new__(cls, x: float, y: float, z: float): - """ - Creates a new :class:`PauliNoise` instance with the given error probabilities. - - :param x: Probability of a Pauli-X (bit flip) error. Must be non-negative. - :type x: float - :param y: Probability of a Pauli-Y error. Must be non-negative. - :type y: float - :param z: Probability of a Pauli-Z (phase flip) error. Must be non-negative. - :type z: float - :return: A new :class:`PauliNoise` tuple ``(x, y, z)``. - :rtype: PauliNoise - :raises ValueError: If any probability is negative or if ``x + y + z > 1``. - """ - if x < 0 or y < 0 or z < 0: - raise ValueError("Pauli noise probabilities must be non-negative.") - if x + y + z > 1: - raise ValueError("The sum of Pauli noise probabilities must be at most 1.") - return super().__new__(cls, (x, y, z)) - - -class DepolarizingNoise(PauliNoise): - """ - The depolarizing noise to use in simulation. - """ - - def __new__(cls, p: float): - """ - Creates a new :class:`DepolarizingNoise` instance. - - The depolarizing channel applies Pauli-X, Pauli-Y, or Pauli-Z errors each with - probability ``p / 3``. - - :param p: Total depolarizing error probability. Must satisfy ``0 ≤ p ≤ 1``. - :type p: float - :return: A new :class:`DepolarizingNoise` with equal X, Y, and Z error probabilities. - :rtype: DepolarizingNoise - :raises ValueError: If ``p`` is negative or ``p > 1``. - """ - return super().__new__(cls, p / 3, p / 3, p / 3) +# Global default session instance used by methods in this module. +_default_session: Optional[Session] = None -class BitFlipNoise(PauliNoise): - """ - The bit flip noise to use in simulation. - """ - - def __new__(cls, p: float): - """ - Creates a new :class:`BitFlipNoise` instance. - The bit flip channel applies a Pauli-X error with probability ``p``. - - :param p: Probability of a bit flip (Pauli-X) error. Must satisfy ``0 ≤ p ≤ 1``. - :type p: float - :return: A new :class:`BitFlipNoise` with X error probability ``p``. - :rtype: BitFlipNoise - :raises ValueError: If ``p`` is negative or ``p > 1``. - """ - return super().__new__(cls, p, 0, 0) +def _get_default_session() -> Session: + """Returns the default session, lazily initializing if needed.""" + global _default_session + if _default_session is None: + init() + assert _default_session is not None, "Failed to initialize the Q# interpreter." + return _default_session -class PhaseFlipNoise(PauliNoise): +def _clear_code_module(code_module: types.ModuleType, module_prefix: str): """ - The phase flip noise to use in simulation. + Removes dynamically added Q# callables, structs, and namespace modules from + a code module and sys.modules. """ + keys_to_remove = [] + for key, val in code_module.__dict__.items(): + if ( + hasattr(val, "__global_callable") + or hasattr(val, "__qsharp_class") + or isinstance(val, types.ModuleType) + ): + keys_to_remove.append(key) + for key in keys_to_remove: + code_module.__delattr__(key) - def __new__(cls, p: float): - """ - Creates a new :class:`PhaseFlipNoise` instance. - - The phase flip channel applies a Pauli-Z error with probability ``p``. - - :param p: Probability of a phase flip (Pauli-Z) error. Must satisfy ``0 ≤ p ≤ 1``. - :type p: float - :return: A new :class:`PhaseFlipNoise` with Z error probability ``p``. - :rtype: PhaseFlipNoise - :raises ValueError: If ``p`` is negative or ``p > 1``. - """ - return super().__new__(cls, 0, 0, p) + keys_to_remove = [] + for key in sys.modules: + if key.startswith(module_prefix + "."): + keys_to_remove.append(key) + for key in keys_to_remove: + sys.modules.__delitem__(key) def init( @@ -324,199 +122,25 @@ def init( :return: The Q# interpreter configuration. :rtype: Config """ - from ._fs import read_file, list_directory, exists, join, resolve - from ._http import fetch_github - - global _interpreter - global _config - - if isinstance(target_name, str): - target = target_name.split(".")[0].lower() - if target == "ionq" or target == "rigetti": - target_profile = TargetProfile.Base - elif target == "quantinuum": - target_profile = TargetProfile.Adaptive_RI - else: - raise QSharpError( - f'target_name "{target_name}" not recognized. Please set target_profile directly.' - ) - - manifest_contents = None - if project_root is not None: - # Normalize the project path (i.e. fix file separators and remove unnecessary '.' and '..') - project_root = resolve(".", project_root) - qsharp_json = join(project_root, "qsharp.json") - if not exists(qsharp_json): - raise QSharpError( - f"{qsharp_json} not found. qsharp.json should exist at the project root and be a valid JSON file." - ) - - try: - (_, manifest_contents) = read_file(qsharp_json) - except Exception as e: - raise QSharpError( - f"Error reading {qsharp_json}. qsharp.json should exist at the project root and be a valid JSON file." - ) from e - - # Loop through the environment module and remove any dynamically added attributes that represent - # Q# callables or structs. This is necessary to avoid conflicts with the new interpreter instance. - keys_to_remove = [] - for key, val in code.__dict__.items(): - if ( - hasattr(val, "__global_callable") - or hasattr(val, "__qsharp_class") - or isinstance(val, types.ModuleType) - ): - keys_to_remove.append(key) - for key in keys_to_remove: - code.__delattr__(key) - - # Also remove any namespace modules dynamically added to the system. - keys_to_remove = [] - for key in sys.modules: - if key.startswith("qsharp.code."): - keys_to_remove.append(key) - for key in keys_to_remove: - sys.modules.__delitem__(key) - - _interpreter = Interpreter( - target_profile, - language_features, - project_root, - read_file, - list_directory, - resolve, - fetch_github, - _make_callable, - _make_class, - trace_circuit, - ) - - _config = Config(target_profile, language_features, manifest_contents, project_root) - # Return the configuration information to provide a hint to the - # language service through the cell output. - return _config - - -def get_interpreter() -> Interpreter: - """ - Returns the Q# interpreter. - - :return: The Q# interpreter. - :rtype: Interpreter - """ - global _interpreter - if _interpreter is None: - init() - assert _interpreter is not None, "Failed to initialize the Q# interpreter." - return _interpreter - - -def get_config() -> Config: - """ - Returns the Q# interpreter configuration. - - :return: The Q# interpreter configuration. - :rtype: Config - """ - global _config - if _config is None: - init() - assert _config is not None, "Failed to initialize the Q# interpreter." - return _config + global _default_session + # Dispose the old session so its callables fail gracefully. + if _default_session is not None: + _default_session._disposed = True -class StateDump: - """ - A state dump returned from the Q# interpreter. - """ - - """ - The number of allocated qubits at the time of the dump. - """ - qubit_count: int - - __inner: dict - __data: StateDumpData - - def __init__(self, data: StateDumpData): - self.__data = data - self.__inner = data.get_dict() - self.qubit_count = data.qubit_count - - def __getitem__(self, index: int) -> complex: - return self.__inner.__getitem__(index) - - def __iter__(self): - return self.__inner.__iter__() - - def __len__(self) -> int: - return len(self.__inner) - - def __repr__(self) -> str: - return self.__data.__repr__() - - def __str__(self) -> str: - return self.__data.__str__() - - def _repr_markdown_(self) -> str: - return self.__data._repr_markdown_() - - def check_eq( - self, state: Union[Dict[int, complex], List[complex]], tolerance: float = 1e-10 - ) -> bool: - """ - Checks if the state dump is equal to the given state. This is not mathematical equality, - as the check ignores global phase. - - :param state: The state to check against, provided either as a dictionary of state indices to complex amplitudes, - or as a list of real amplitudes. - :param tolerance: The tolerance for the check. Defaults to 1e-10. - :return: ``True`` if the state dump is equal to the given state within the given tolerance, ignoring global phase. - :rtype: bool - """ - phase = None - # Convert a dense list of real amplitudes to a dictionary of state indices to complex amplitudes - if isinstance(state, list): - state = {i: val for i, val in enumerate(state)} - # Filter out zero states from the state dump and the given state based on tolerance - state = {k: v for k, v in state.items() if abs(v) > tolerance} - inner_state = {k: v for k, v in self.__inner.items() if abs(v) > tolerance} - if len(state) != len(inner_state): - return False - for key in state: - if key not in inner_state: - return False - if phase is None: - # Calculate the phase based on the first state pair encountered. - # Every pair of states after this must have the same phase for the states to be equivalent. - phase = inner_state[key] / state[key] - elif abs(phase - inner_state[key] / state[key]) > tolerance: - # This pair of states does not have the same phase, - # within tolerance, so the equivalence check fails. - return False - return True - - def as_dense_state(self) -> List[complex]: - """ - Returns the state dump as a dense list of complex amplitudes. This will include zero amplitudes. - - :return: A dense list of complex amplitudes, one per computational basis state. - :rtype: List[complex] - """ - return [self.__inner.get(i, complex(0)) for i in range(2**self.qubit_count)] - - -class ShotResult(TypedDict): - """ - A single result of a shot. - """ + # Clean up the global code namespace before creating a new session. + _clear_code_module(code, "qsharp.code") - events: List[Output | StateDump | str] - result: Any - messages: List[str] - matrices: List[Output] - dumps: List[StateDump] + _default_session = Session( + target_profile=target_profile, + target_name=target_name, + project_root=project_root, + language_features=language_features, + trace_circuit=trace_circuit, + _code_module=code, + _code_prefix="qsharp.code", + ) + return _default_session._config def eval( @@ -535,230 +159,7 @@ def eval( :rtype: Any :raises QSharpError: If there is an error evaluating the source code. """ - ipython_helper() - - results: ShotResult = { - "events": [], - "result": None, - "messages": [], - "matrices": [], - "dumps": [], - } - - def on_save_events(output: Output) -> None: - # Append the output to the last shot's output list - if output.is_matrix(): - results["events"].append(output) - results["matrices"].append(output) - elif output.is_state_dump(): - dump_data = cast(StateDumpData, output.state_dump()) - state_dump = StateDump(dump_data) - results["events"].append(state_dump) - results["dumps"].append(state_dump) - elif output.is_message(): - stringified = str(output) - results["events"].append(stringified) - results["messages"].append(stringified) - - def callback(output: Output) -> None: - if _in_jupyter: - try: - display(output) - return - except: - # If IPython is not available, fall back to printing the output - pass - print(output, flush=True) - - telemetry_events.on_eval() - start_time = monotonic() - - output = get_interpreter().interpret( - source, on_save_events if save_events else callback - ) - results["result"] = qsharp_value_to_python_value(output) - - durationMs = (monotonic() - start_time) * 1000 - telemetry_events.on_eval_end(durationMs) - - if save_events: - return results - else: - return results["result"] - - -# Helper function that knows how to create a function that invokes a callable. This will be -# used by the underlying native code to create functions for callables on the fly that know -# how to get the currently initialized global interpreter instance. -def _make_callable(callable: GlobalCallable, namespace: List[str], callable_name: str): - module = code - # Create a name that will be used to collect the hierarchy of namespace identifiers if they exist and use that - # to register created modules with the system. - accumulated_namespace = "qsharp.code" - accumulated_namespace += "." - for name in namespace: - accumulated_namespace += name - # Use the existing entry, which should already be a module. - if hasattr(module, name): - module = module.__getattribute__(name) - if sys.modules.get(accumulated_namespace) is None: - # This is an existing entry that is not yet registered in sys.modules, so add it. - # This can happen if a callable with the same name as this namespace is already - # defined. - sys.modules[accumulated_namespace] = module - else: - # This namespace entry doesn't exist as a module yet, so create it, add it to the environment, and - # add it to sys.modules so it supports import properly. - new_module = types.ModuleType(accumulated_namespace) - module.__setattr__(name, new_module) - sys.modules[accumulated_namespace] = new_module - module = new_module - accumulated_namespace += "." - - def _callable(*args): - ipython_helper() - - def callback(output: Output) -> None: - if _in_jupyter: - try: - display(output) - return - except: - # If IPython is not available, fall back to printing the output - pass - print(output, flush=True) - - args = python_args_to_interpreter_args(args) - - output = get_interpreter().invoke(callable, args, callback) - return qsharp_value_to_python_value(output) - - # Each callable is annotated so that we know it is auto-generated and can be removed on a re-init of the interpreter. - _callable.__global_callable = callable - - # Add the callable to the module. - if module.__dict__.get(callable_name) is None: - module.__setattr__(callable_name, _callable) - else: - # Preserve any existing attributes on the attribute with the matching name, - # since this could be a collision with an existing namespace/module. - for key, val in module.__dict__.get(callable_name).__dict__.items(): - if key != "__global_callable": - _callable.__dict__[key] = val - module.__setattr__(callable_name, _callable) - - -def qsharp_value_to_python_value(obj): - # Base case: Primitive types - if isinstance(obj, (bool, int, float, complex, str, Pauli, Result)): - return obj - - # Recursive case: Tuple - if isinstance(obj, tuple): - # Special case Value::UNIT maps to None. - if not obj: - return None - return tuple(qsharp_value_to_python_value(elt) for elt in obj) - - # Recursive case: Array - if isinstance(obj, list): - return [qsharp_value_to_python_value(elt) for elt in obj] - - # Recursive case: Callable or Closure - if isinstance(obj, (GlobalCallable, Closure)): - return obj - - # Recursive case: Udt - if isinstance(obj, UdtValue): - class_name = obj.name - fields = [] - args = [] - for name, value_ir in obj.fields: - val = qsharp_value_to_python_value(value_ir) - ty = type(val) - args.append(val) - fields.append((name, ty)) - return make_dataclass(class_name, fields)(*args) - - -def make_class_rec(qsharp_type: TypeIR) -> type: - class_name = qsharp_type.unwrap_udt().name - fields = {} - for field in qsharp_type.unwrap_udt().fields: - ty = None - kind = field[1].kind() - - if kind == TypeKind.Primitive: - prim_kind = field[1].unwrap_primitive() - if prim_kind == PrimitiveKind.Bool: - ty = bool - elif prim_kind == PrimitiveKind.Int: - ty = int - elif prim_kind == PrimitiveKind.Double: - ty = float - elif prim_kind == PrimitiveKind.Complex: - ty = complex - elif prim_kind == PrimitiveKind.String: - ty = str - elif prim_kind == PrimitiveKind.Pauli: - ty = Pauli - elif prim_kind == PrimitiveKind.Result: - ty = Result - else: - raise QSharpError(f"unknown primitive {prim_kind}") - elif kind == TypeKind.Tuple: - # Special case Value::UNIT maps to None. - if not field[1].unwrap_tuple(): - ty = type(None) - else: - ty = tuple - elif kind == TypeKind.Array: - ty = list - elif kind == TypeKind.Udt: - ty = make_class_rec(field[1]) - else: - raise QSharpError(f"unknown type {kind}") - fields[field[0]] = ty - - return make_dataclass( - class_name, - fields, - ) - - -def _make_class(qsharp_type: TypeIR, namespace: List[str], class_name: str): - """ - Helper function to create a python class given a description of it. This will be - used by the underlying native code to create classes on the fly corresponding to - the currently initialized interpreter instance. - """ - - module = code - # Create a name that will be used to collect the hierarchy of namespace identifiers if they exist and use that - # to register created modules with the system. - accumulated_namespace = "qsharp.code" - accumulated_namespace += "." - for name in namespace: - accumulated_namespace += name - # Use the existing entry, which should already be a module. - if hasattr(module, name): - module = module.__getattribute__(name) - else: - # This namespace entry doesn't exist as a module yet, so create it, add it to the environment, and - # add it to sys.modules so it supports import properly. - new_module = types.ModuleType(accumulated_namespace) - module.__setattr__(name, new_module) - sys.modules[accumulated_namespace] = new_module - module = new_module - accumulated_namespace += "." - - QSharpClass = make_class_rec(qsharp_type) - - # Each class is annotated so that we know it is auto-generated and can be removed on a re-init of the interpreter. - QSharpClass.__qsharp_class = True - - # Add the class to the module. - module.__setattr__(class_name, QSharpClass) + return _get_default_session().eval(source, save_events=save_events) def run( @@ -799,119 +200,16 @@ def run( :raises QSharpError: If there is an error interpreting the input. :raises ValueError: If the number of shots is less than 1. """ - ipython_helper() - - if shots < 1: - raise ValueError("The number of shots must be greater than 0.") - - telemetry_events.on_run( + return _get_default_session().run( + entry_expr, shots, - noise=(noise is not None and noise != (0.0, 0.0, 0.0)), - qubit_loss=(qubit_loss is not None and qubit_loss > 0.0), + *args, + on_result=on_result, + save_events=save_events, + noise=noise, + qubit_loss=qubit_loss, + seed=seed, ) - start_time = monotonic() - - results: List[ShotResult] = [] - - def print_output(output: Output) -> None: - if _in_jupyter: - try: - display(output) - return - except: - # If IPython is not available, fall back to printing the output - pass - print(output, flush=True) - - def on_save_events(output: Output) -> None: - # Append the output to the last shot's output list - results[-1]["events"].append(output) - if output.is_matrix(): - results[-1]["matrices"].append(output) - elif output.is_state_dump(): - dump_data = cast(StateDumpData, output.state_dump()) - results[-1]["dumps"].append(StateDump(dump_data)) - elif output.is_message(): - results[-1]["messages"].append(str(output)) - - callable = None - run_entry_expr = None - if isinstance(entry_expr, Callable) and hasattr(entry_expr, "__global_callable"): - args = python_args_to_interpreter_args(args) - callable = entry_expr.__global_callable - elif isinstance(entry_expr, (GlobalCallable, Closure)): - args = python_args_to_interpreter_args(args) - callable = entry_expr - else: - assert isinstance(entry_expr, str) - run_entry_expr = entry_expr - - noise_config = None - if isinstance(noise, NoiseConfig): - noise_config = noise - noise = None - - shot_seed = seed - for shot in range(shots): - # We also don't want every shot to return the same results, so we update the seed for - # the next shot with the shot number. This keeps the behavior deterministic if a seed - # was provided. - if seed is not None: - shot_seed = shot + seed - - results.append( - {"result": None, "events": [], "messages": [], "matrices": [], "dumps": []} - ) - run_results = get_interpreter().run( - run_entry_expr, - on_save_events if save_events else print_output, - noise_config, - noise, - qubit_loss, - callable, - args, - shot_seed, - ) - run_results = qsharp_value_to_python_value(run_results) - results[-1]["result"] = run_results - if on_result: - on_result(results[-1]) - # For every shot after the first, treat the entry expression as None to trigger - # a rerun of the last executed expression without paying the cost for any additional - # compilation. - run_entry_expr = None - - durationMs = (monotonic() - start_time) * 1000 - telemetry_events.on_run_end(durationMs, shots) - - if save_events: - return results - else: - return [shot["result"] for shot in results] - - -# Class that wraps generated QIR, which can be used by -# azure-quantum as input data. -# -# This class must implement the QirRepresentable protocol -# that is defined by the azure-quantum package. -# See: https://github.com/microsoft/qdk-python/blob/fcd63c04aa871e49206703bbaa792329ffed13c4/azure-quantum/azure/quantum/target/target.py#L21 -class QirInputData: - # The name of this variable is defined - # by the protocol and must remain unchanged. - _name: str - - def __init__(self, name: str, ll_str: str): - self._name = name - self._ll_str = ll_str - - # The name of this method is defined - # by the protocol and must remain unchanged. - def _repr_qir_(self, **kwargs) -> bytes: - return self._ll_str.encode("utf-8") - - def __str__(self) -> str: - return self._ll_str def compile( @@ -936,24 +234,7 @@ def compile( with open('myfile.ll', 'w') as file: file.write(str(program)) """ - ipython_helper() - start = monotonic() - interpreter = get_interpreter() - target_profile = get_config().get_target_profile() - telemetry_events.on_compile(target_profile) - if isinstance(entry_expr, Callable) and hasattr(entry_expr, "__global_callable"): - args = python_args_to_interpreter_args(args) - ll_str = interpreter.qir(callable=entry_expr.__global_callable, args=args) - elif isinstance(entry_expr, (GlobalCallable, Closure)): - args = python_args_to_interpreter_args(args) - ll_str = interpreter.qir(callable=entry_expr, args=args) - else: - assert isinstance(entry_expr, str) - ll_str = interpreter.qir(entry_expr=entry_expr) - res = QirInputData("main", ll_str) - durationMs = (monotonic() - start) * 1000 - telemetry_events.on_compile_end(durationMs, target_profile) - return res + return _get_default_session().compile(entry_expr, *args) def circuit( @@ -1009,34 +290,17 @@ def circuit( :rtype: :class:`~qsharp._native.Circuit` :raises QSharpError: If there is an error synthesizing the circuit. """ - ipython_helper() - start = monotonic() - telemetry_events.on_circuit() - config = CircuitConfig( - max_operations=max_operations, + return _get_default_session().circuit( + entry_expr, + *args, + operation=operation, generation_method=generation_method, + max_operations=max_operations, source_locations=source_locations, group_by_scope=group_by_scope, prune_classical_qubits=prune_classical_qubits, ) - if isinstance(entry_expr, Callable) and hasattr(entry_expr, "__global_callable"): - args = python_args_to_interpreter_args(args) - res = get_interpreter().circuit( - config=config, callable=entry_expr.__global_callable, args=args - ) - elif isinstance(entry_expr, (GlobalCallable, Closure)): - args = python_args_to_interpreter_args(args) - res = get_interpreter().circuit(config=config, callable=entry_expr, args=args) - else: - assert entry_expr is None or isinstance(entry_expr, str) - res = get_interpreter().circuit(config, entry_expr, operation=operation) - - durationMs = (monotonic() - start) * 1000 - telemetry_events.on_circuit_end(durationMs) - - return res - def estimate( entry_expr: Union[str, Callable, GlobalCallable, Closure], @@ -1083,17 +347,20 @@ def _coerce_estimator_params( param_str = json.dumps(params) telemetry_events.on_estimate() start = monotonic() + session = _get_default_session() if isinstance(entry_expr, Callable) and hasattr(entry_expr, "__global_callable"): - args = python_args_to_interpreter_args(args) - res_str = get_interpreter().estimate( + args = session._python_args_to_interpreter_args(args) + res_str = session._interpreter.estimate( param_str, callable=entry_expr.__global_callable, args=args ) elif isinstance(entry_expr, (GlobalCallable, Closure)): - args = python_args_to_interpreter_args(args) - res_str = get_interpreter().estimate(param_str, callable=entry_expr, args=args) + args = session._python_args_to_interpreter_args(args) + res_str = session._interpreter.estimate( + param_str, callable=entry_expr, args=args + ) else: assert isinstance(entry_expr, str) - res_str = get_interpreter().estimate(param_str, entry_expr=entry_expr) + res_str = session._interpreter.estimate(param_str, entry_expr=entry_expr) res = json.loads(res_str) try: @@ -1120,21 +387,7 @@ def logical_counts( :return: Program resources in terms of logical gate counts. :rtype: LogicalCounts """ - - ipython_helper() - - if isinstance(entry_expr, Callable) and hasattr(entry_expr, "__global_callable"): - args = python_args_to_interpreter_args(args) - res_dict = get_interpreter().logical_counts( - callable=entry_expr.__global_callable, args=args - ) - elif isinstance(entry_expr, (GlobalCallable, Closure)): - args = python_args_to_interpreter_args(args) - res_dict = get_interpreter().logical_counts(callable=entry_expr, args=args) - else: - assert isinstance(entry_expr, str) - res_dict = get_interpreter().logical_counts(entry_expr=entry_expr) - return LogicalCounts(res_dict) + return _get_default_session().logical_counts(entry_expr, *args) def set_quantum_seed(seed: Optional[int]) -> None: @@ -1145,7 +398,7 @@ def set_quantum_seed(seed: Optional[int]) -> None: :param seed: The seed to use for the quantum random number generator. If None, the seed will be generated from entropy. """ - get_interpreter().set_quantum_seed(seed) + _get_default_session().set_quantum_seed(seed) def set_classical_seed(seed: Optional[int]) -> None: @@ -1157,7 +410,7 @@ def set_classical_seed(seed: Optional[int]) -> None: :param seed: The seed to use for the classical random number generator. If None, the seed will be generated from entropy. """ - get_interpreter().set_classical_seed(seed) + _get_default_session().set_classical_seed(seed) def dump_machine() -> StateDump: @@ -1167,8 +420,7 @@ def dump_machine() -> StateDump: :return: The state of the simulator. :rtype: StateDump """ - ipython_helper() - return StateDump(get_interpreter().dump_machine()) + return _get_default_session().dump_machine() def dump_circuit() -> Circuit: @@ -1184,5 +436,4 @@ def dump_circuit() -> Circuit: :rtype: Circuit :raises QSharpError: If the interpreter was not initialized with ``trace_circuit=True``. """ - ipython_helper() - return get_interpreter().dump_circuit() + return _get_default_session()._interpreter.dump_circuit() diff --git a/source/pip/qsharp/_session.py b/source/pip/qsharp/_session.py new file mode 100644 index 0000000000..8848a3bfc3 --- /dev/null +++ b/source/pip/qsharp/_session.py @@ -0,0 +1,906 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Q# Session Management + +This module provides the Session class for managing Q# interpreter contexts. +Each Session instance has its own interpreter and code namespace, allowing multiple +independent Q# environments to coexist. +""" + +import sys +import types +from dataclasses import make_dataclass +from time import monotonic +from typing import ( + Any, + Callable, + Iterable, + List, + Optional, + Set, + Tuple, + Union, + cast, +) + +from . import telemetry_events +from ._native import ( # type: ignore + Circuit, + CircuitConfig, + CircuitGenerationMethod, + Closure, + GlobalCallable, + Interpreter, + NoiseConfig, + Output, + Pauli, + PrimitiveKind, + QSharpError, + Result, + StateDumpData, + TargetProfile, + TypeIR, + TypeKind, + UdtValue, +) +from ._noise import ( + BitFlipNoise, + DepolarizingNoise, + PauliNoise, + PhaseFlipNoise, +) +from ._types import Config, QirInputData, ShotResult, StateDump +from .estimator._estimator import LogicalCounts + +# Check if we are running in a Jupyter notebook to use the IPython display function +_in_jupyter = False +try: + from IPython.display import display + + if get_ipython().__class__.__name__ == "ZMQInteractiveShell": # type: ignore + _in_jupyter = True # Jupyter notebook or qtconsole +except: + pass + + +# Reporting execution time during IPython cells requires that IPython +# gets pinged to ensure it understands the cell is active. This is done by +# simply importing the display function, which it turns out is enough to begin timing +# while avoiding any UI changes that would be visible to the user. +def ipython_helper(): + try: + if __IPYTHON__: # type: ignore + from IPython.display import display + except NameError: + pass + + +def make_class_rec(qsharp_type: TypeIR) -> type: + class_name = qsharp_type.unwrap_udt().name + fields = {} + for field in qsharp_type.unwrap_udt().fields: + ty = None + kind = field[1].kind() + + if kind == TypeKind.Primitive: + prim_kind = field[1].unwrap_primitive() + if prim_kind == PrimitiveKind.Bool: + ty = bool + elif prim_kind == PrimitiveKind.Int: + ty = int + elif prim_kind == PrimitiveKind.Double: + ty = float + elif prim_kind == PrimitiveKind.Complex: + ty = complex + elif prim_kind == PrimitiveKind.String: + ty = str + elif prim_kind == PrimitiveKind.Pauli: + ty = Pauli + elif prim_kind == PrimitiveKind.Result: + ty = Result + else: + raise QSharpError(f"unknown primitive {prim_kind}") + elif kind == TypeKind.Tuple: + # Special case Value::UNIT maps to None. + if not field[1].unwrap_tuple(): + ty = type(None) + else: + ty = tuple + elif kind == TypeKind.Array: + ty = list + elif kind == TypeKind.Udt: + ty = make_class_rec(field[1]) + else: + raise QSharpError(f"unknown type {kind}") + fields[field[0]] = ty + + return make_dataclass( + class_name, + fields, + ) + + +class Session: + """ + An isolated Q# interpreter environment. + + A Session provides a self-contained Q# execution environment where code is + evaluated, compiled, and executed in isolation from other Session instances. + Each Session maintains its own code namespace. + + A session has attribute `code` which is a Python module containing all Q# operations + and types defined in this session. This allows you to call Q# operations from + Python. + + Example: + + .. code-block:: python + + s = qsharp.Session() + s.eval("operation Main() : Result { use q = Qubit(); X(q); MResetZ(q) }") + assert s.run("Main()", 2) == [qsharp.Result.One, qsharp.Result.One] + assert s.code.Main() == qsharp.Result.One + """ + + _interpreter: Interpreter + _config: Config + code: types.ModuleType + _code_prefix: str + _disposed: bool + + def __init__( + self, + *, + target_profile: TargetProfile = TargetProfile.Unrestricted, + target_name: Optional[str] = None, + project_root: Optional[str] = None, + language_features: Optional[List[str]] = None, + trace_circuit: Optional[bool] = None, + _code_module: Optional[types.ModuleType] = None, + _code_prefix: Optional[str] = None, + ): + """ + Initializes a new isolated Q# session. + + :keyword target_profile: Setting the target profile allows the Q# + interpreter to generate programs that are compatible + with a specific target. See :class:`TargetProfile`. + + :keyword target_name: An optional name of the target machine to use for + inferring the compatible target_profile setting. + + :keyword project_root: An optional path to a root directory with a Q# project to + include. It must contain a qsharp.json project manifest. + + :keyword language_features: An optional list of language feature flags to + enable. These correspond to experimental or preview Q# language features. + Valid values are: + + - ``"v2-preview-syntax"``: Enables Q# v2 preview syntax. This removes + support for the scoped qubit allocation block form + (``use q = Qubit() { ... }``), requiring the statement form instead + (``use q = Qubit();``). It also removes the requirement to use the ``set`` + keyword for mutable variable assignments. + + :keyword trace_circuit: Enables tracing of circuit during execution. + Passing `True` is required for the `dump_machine()` function to return a + circuit trace. + The `circuit()` method is *not* affected by this parameter and will always + generate a circuit diagram. + """ + self._disposed = False + + if _code_module is not None: + self.code = _code_module + self._code_prefix = _code_prefix or "qsharp.code" + else: + self._code_prefix = f"qsharp._session_{id(self)}" + self.code = types.ModuleType(self._code_prefix) + + from ._fs import exists, join, list_directory, read_file, resolve + from ._http import fetch_github + + if isinstance(target_name, str): + target = target_name.split(".")[0].lower() + if target == "ionq" or target == "rigetti": + target_profile = TargetProfile.Base + elif target == "quantinuum": + target_profile = TargetProfile.Adaptive_RI + else: + raise QSharpError( + f'target_name "{target_name}" not recognized. Please set target_profile directly.' + ) + + manifest_contents = None + if project_root is not None: + project_root = resolve(".", project_root) + qsharp_json = join(project_root, "qsharp.json") + if not exists(qsharp_json): + raise QSharpError( + f"{qsharp_json} not found. qsharp.json should exist at the project root and be a valid JSON file." + ) + + try: + _, manifest_contents = read_file(qsharp_json) + except Exception as e: + raise QSharpError( + f"Error reading {qsharp_json}. qsharp.json should exist at the project root and be a valid JSON file." + ) from e + + self._interpreter = Interpreter( + target_profile, + language_features, + project_root, + read_file, + list_directory, + resolve, + fetch_github, + self._make_callable, + self._make_class, + trace_circuit, + ) + + self._config = Config( + target_profile, language_features, manifest_contents, project_root + ) + + def _qsharp_value_to_python_value(self, obj): + """Converts Q# value to Python value.""" + # Base case: Primitive types + if isinstance(obj, (bool, int, float, complex, str, Pauli, Result)): + return obj + + # Recursive case: Tuple + if isinstance(obj, tuple): + # Special case Value::UNIT maps to None. + if not obj: + return None + return tuple(self._qsharp_value_to_python_value(elt) for elt in obj) + + # Recursive case: Array + if isinstance(obj, list): + return [self._qsharp_value_to_python_value(elt) for elt in obj] + + # Recursive case: Callable or Closure + if isinstance(obj, (GlobalCallable, Closure)): + return obj + + # Recursive case: Udt + if isinstance(obj, UdtValue): + class_name = obj.name + fields = [] + args = [] + for name, value_ir in obj.fields: + val = self._qsharp_value_to_python_value(value_ir) + ty = type(val) + args.append(val) + fields.append((name, ty)) + return make_dataclass(class_name, fields)(*args) + + def _lower_python_obj( + self, obj: object, visited: Optional[Set[object]] = None + ) -> Any: + """Converts Python value to Q# value.""" + + # Base case: Primitive types + if isinstance(obj, (bool, int, float, complex, str, Pauli, Result)): + return obj + + obj_id = id(obj) + if visited is None: + visited = set() + if obj_id in visited: + raise QSharpError("Cannot send circular objects from Python to Q#.") + visited.add(obj_id) + + try: + # Recursive case: Tuple + if isinstance(obj, tuple): + return tuple(self._lower_python_obj(elt, visited) for elt in obj) + + # Recursive case: Dict + if isinstance(obj, dict): + return { + name: self._lower_python_obj(val, visited) + for name, val in obj.items() + } + + # Base case: Callable or Closure + if hasattr(obj, "__global_callable"): + self._check_same_session_callable(obj) + return obj.__getattribute__("__global_callable") + if isinstance(obj, (GlobalCallable, Closure)): + return obj + + # Recursive case: Class with slots + if hasattr(obj, "__slots__"): + self._check_same_session_struct(obj) + fields = {} + for name in getattr(obj, "__slots__"): + if name == "__dict__": + for name, val in obj.__dict__.items(): + fields[name] = self._lower_python_obj(val, visited) + else: + val = getattr(obj, name) + fields[name] = self._lower_python_obj(val, visited) + return fields + + # Recursive case: Class + if hasattr(obj, "__dict__"): + self._check_same_session_struct(obj) + fields = { + name: self._lower_python_obj(val, visited) + for name, val in obj.__dict__.items() + } + return fields + + # Recursive case: Array + # By using `Iterable` instead of `list`, we can handle other kind of + # iterables like numpy arrays and generators. + if isinstance(obj, Iterable): + return [self._lower_python_obj(elt, visited) for elt in obj] + + raise TypeError(f"unsupported type: {type(obj)}") + finally: + visited.remove(obj_id) + + def _python_args_to_interpreter_args(self, args: tuple[Any, ...]): + """Turns `args` to the format expected by the Q# interpreter.""" + if len(args) == 0: + return None + elif len(args) == 1: + return self._lower_python_obj(args[0]) + else: + return self._lower_python_obj(args) + + def _display(self, output: Output) -> None: + """Displays output in Jupyter (if available), otherwise prints.""" + if _in_jupyter: + try: + display(output) + return + except Exception: + # If IPython is not available, fall back to printing the output. + pass + print(output, flush=True) + + def _make_callable( + self, callable: GlobalCallable, namespace: List[str], callable_name: str + ): + """Registers a Q# callable in this session's code module.""" + module = self.code + accumulated_namespace = self._code_prefix + "." + for name in namespace: + accumulated_namespace += name + if hasattr(module, name): + module = module.__getattribute__(name) + if sys.modules.get(accumulated_namespace) is None: + sys.modules[accumulated_namespace] = module + else: + new_module = types.ModuleType(accumulated_namespace) + module.__setattr__(name, new_module) + sys.modules[accumulated_namespace] = new_module + module = new_module + accumulated_namespace += "." + + def _callable_fn(*args): + if self._disposed: + raise QSharpError( + "This callable belongs to a disposed Q# session. " + "Re-evaluate the callable in a current session." + ) + ipython_helper() + + args = self._python_args_to_interpreter_args(args) + output = self._interpreter.invoke(callable, args, self._display) + return self._qsharp_value_to_python_value(output) + + setattr(_callable_fn, "_qdk_session", self) + setattr(_callable_fn, "__global_callable", callable) + + if module.__dict__.get(callable_name) is None: + module.__setattr__(callable_name, _callable_fn) + else: + for key, val in module.__dict__.get(callable_name).__dict__.items(): + if key != "__global_callable": + _callable_fn.__dict__[key] = val + module.__setattr__(callable_name, _callable_fn) + + def _make_class(self, qsharp_type: TypeIR, namespace: List[str], class_name: str): + """Registers a Q# type as a Python dataclass in this session's code module.""" + module = self.code + accumulated_namespace = self._code_prefix + "." + for name in namespace: + accumulated_namespace += name + if hasattr(module, name): + module = module.__getattribute__(name) + else: + new_module = types.ModuleType(accumulated_namespace) + module.__setattr__(name, new_module) + sys.modules[accumulated_namespace] = new_module + module = new_module + accumulated_namespace += "." + + QSharpClass = make_class_rec(qsharp_type) + QSharpClass.__qsharp_class = True + setattr(QSharpClass, "_qdk_session", self) + module.__setattr__(class_name, QSharpClass) + + def _check_same_session_callable(self, callable_fn: Any) -> None: + """Raise if a callable belongs to a different session.""" + # Callable must originate from Q#, so it always has a session. + assert hasattr(callable_fn, "_qdk_session") + callable_session = getattr(callable_fn, "_qdk_session") + if callable_session is not self: + raise QSharpError("This callable belongs to a different Session. ") + + def _check_same_session_struct(self, struct: Any) -> None: + """Raise if a struct belongs to a different session.""" + # Struct values originating from Q# are not themselves tagged with _qdk_session, + # but their classes are (in _make_class). + struct_type = type(struct) + if not hasattr(struct_type, "_qdk_session"): + # Ignore objects not originating from Q#. + return + if getattr(struct_type, "_qdk_session") is not self: + raise QSharpError("This struct belongs to a different Session. ") + + def eval( + self, + source: str, + *, + save_events: bool = False, + ) -> Any: + """ + Evaluates Q# source code. + + Output is printed to console. + + :param source: The Q# source code to evaluate. + :keyword save_events: If true, all output will be saved and returned. If false, + they will be printed. + :return: The value returned by the last statement in the source code, or the + saved output if ``save_events`` is true. + :rtype: Any + :raises QSharpError: If there is an error evaluating the source code. + """ + ipython_helper() + + results: ShotResult = { + "events": [], + "result": None, + "messages": [], + "matrices": [], + "dumps": [], + } + + def on_save_events(output: Output) -> None: + # Append the output to the last shot's output list + if output.is_matrix(): + results["events"].append(output) + results["matrices"].append(output) + elif output.is_state_dump(): + dump_data = cast(StateDumpData, output.state_dump()) + state_dump = StateDump(dump_data) + results["events"].append(state_dump) + results["dumps"].append(state_dump) + elif output.is_message(): + stringified = str(output) + results["events"].append(stringified) + results["messages"].append(stringified) + + telemetry_events.on_eval() + start_time = monotonic() + + output = self._interpreter.interpret( + source, on_save_events if save_events else self._display + ) + results["result"] = self._qsharp_value_to_python_value(output) + + durationMs = (monotonic() - start_time) * 1000 + telemetry_events.on_eval_end(durationMs) + + if save_events: + return results + else: + return results["result"] + + def run( + self, + entry_expr: Union[str, Callable, GlobalCallable, Closure], + shots: int, + *args, + on_result: Optional[Callable[[ShotResult], None]] = None, + save_events: bool = False, + noise: Optional[ + Union[ + Tuple[float, float, float], + PauliNoise, + BitFlipNoise, + PhaseFlipNoise, + DepolarizingNoise, + NoiseConfig, + ] + ] = None, + qubit_loss: Optional[float] = None, + seed: Optional[int] = None, + ) -> List[Any]: + """ + Runs the given Q# expression for the given number of shots. + Each shot uses an independent instance of the simulator. + + :param entry_expr: The entry expression. Alternatively, a callable can be + provided, which must be a Q# callable. + :param shots: The number of shots to run. + :param *args: The arguments to pass to the callable, if one is provided. + :param on_result: A callback function that will be called with each result. + :param save_events: If true, the output of each shot will be saved. If false, + they will be printed. + :param noise: The noise to use in simulation. + :param qubit_loss: The probability of qubit loss in simulation. + :param seed: The seed to use for the random number generator in simulation, if + any. + + :return: A list of results or runtime errors. If ``save_events`` is true, a list + of ``ShotResult`` is returned. + :rtype: List[Any] + :raises QSharpError: If there is an error interpreting the input. + :raises ValueError: If the number of shots is less than 1. + """ + ipython_helper() + + if shots < 1: + raise ValueError("The number of shots must be greater than 0.") + + telemetry_events.on_run( + shots, + noise=(noise is not None and noise != (0.0, 0.0, 0.0)), + qubit_loss=(qubit_loss is not None and qubit_loss > 0.0), + ) + start_time = monotonic() + + results: List[ShotResult] = [] + + def on_save_events(output: Output) -> None: + # Append the output to the last shot's output list + results[-1]["events"].append(output) + if output.is_matrix(): + results[-1]["matrices"].append(output) + elif output.is_state_dump(): + dump_data = cast(StateDumpData, output.state_dump()) + results[-1]["dumps"].append(StateDump(dump_data)) + elif output.is_message(): + results[-1]["messages"].append(str(output)) + + callable = None + run_entry_expr = None + if isinstance(entry_expr, Callable) and hasattr( + entry_expr, "__global_callable" + ): + self._check_same_session_callable(entry_expr) + args = self._python_args_to_interpreter_args(args) + callable = getattr(entry_expr, "__global_callable") + elif isinstance(entry_expr, (GlobalCallable, Closure)): + args = self._python_args_to_interpreter_args(args) + callable = entry_expr + else: + assert isinstance(entry_expr, str) + run_entry_expr = entry_expr + + noise_config = None + if isinstance(noise, NoiseConfig): + noise_config = noise + noise = None + + shot_seed = seed + for shot in range(shots): + # We also don't want every shot to return the same results, so we update the + # seed for the next shot with the shot number. This keeps the behavior + # deterministic if a seed was provided. + if seed is not None: + shot_seed = shot + seed + + results.append( + { + "result": None, + "events": [], + "messages": [], + "matrices": [], + "dumps": [], + } + ) + run_results = self._interpreter.run( + run_entry_expr, + on_save_events if save_events else self._display, + noise_config, + noise, + qubit_loss, + callable, + args, + shot_seed, + ) + run_results = self._qsharp_value_to_python_value(run_results) + results[-1]["result"] = run_results + if on_result: + on_result(results[-1]) + # For every shot after the first, treat the entry expression as None to + # trigger a rerun of the last executed expression without paying the cost + # for any additional compilation. + run_entry_expr = None + + durationMs = (monotonic() - start_time) * 1000 + telemetry_events.on_run_end(durationMs, shots) + + if save_events: + return results + else: + return [shot["result"] for shot in results] + + def compile( + self, entry_expr: Union[str, Callable, GlobalCallable, Closure], *args + ) -> QirInputData: + """ + Compiles the Q# source code into a program that can be submitted to a target. + Either an entry expression or a callable with arguments must be provided. + + :param entry_expr: The Q# expression that will be used as the entrypoint + for the program. Alternatively, a callable can be provided, which must + be a Q# callable. + :param *args: The arguments to pass to the callable, if one is provided. + + :return: The compiled program. Use ``str()`` to get the QIR string. + :rtype: QirInputData + + Example: + + .. code-block:: python + program = qsharp.compile("...") + with open('myfile.ll', 'w') as file: + file.write(str(program)) + """ + ipython_helper() + start = monotonic() + target_profile = self._config.get_target_profile() + telemetry_events.on_compile(target_profile) + if isinstance(entry_expr, Callable) and hasattr( + entry_expr, "__global_callable" + ): + self._check_same_session_callable(entry_expr) + args = self._python_args_to_interpreter_args(args) + ll_str = self._interpreter.qir( + callable=getattr(entry_expr, "__global_callable"), args=args + ) + elif isinstance(entry_expr, (GlobalCallable, Closure)): + args = self._python_args_to_interpreter_args(args) + ll_str = self._interpreter.qir(callable=entry_expr, args=args) + else: + assert isinstance(entry_expr, str) + ll_str = self._interpreter.qir(entry_expr=entry_expr) + res = QirInputData("main", ll_str) + durationMs = (monotonic() - start) * 1000 + telemetry_events.on_compile_end(durationMs, target_profile) + return res + + def circuit( + self, + entry_expr: Optional[Union[str, Callable, GlobalCallable, Closure]] = None, + *args, + operation: Optional[str] = None, + generation_method: Optional[CircuitGenerationMethod] = None, + max_operations: Optional[int] = None, + source_locations: bool = False, + group_by_scope: bool = True, + prune_classical_qubits: bool = False, + ) -> Circuit: + """ + Synthesizes a circuit for a Q# program. Either an entry + expression or an operation must be provided. + + :param entry_expr: An entry expression. Alternatively, a callable can be + provided, which must be a Q# callable. + :type entry_expr: str or Callable + + :param *args: The arguments to pass to the callable, if one is provided. + + :keyword operation: The operation to synthesize. This can be a name of + an operation or a lambda expression. The operation must take only + qubits or arrays of qubits as parameters. + :kwtype operation: str + + :keyword generation_method: The method to use for circuit generation. + :attr:`~qsharp.CircuitGenerationMethod.ClassicalEval` evaluates classical + control flow at circuit generation time. + :attr:`~qsharp.CircuitGenerationMethod.Simulate` runs a full simulation to + trace the circuit. + :attr:`~qsharp.CircuitGenerationMethod.Static` uses partial evaluation and + requires a non-``Unrestricted`` target profile. Defaults to ``None`` which + auto-selects the generation method. + :kwtype generation_method: :class:`~qsharp.CircuitGenerationMethod` + + :keyword max_operations: The maximum number of operations to include in the + circuit. Defaults to ``None`` which means no limit. + :kwtype max_operations: int + + :keyword source_locations: If ``True``, annotates each gate with its source + location. + :kwtype source_locations: bool + + :keyword group_by_scope: If ``True``, groups operations by their containing + scope, such as function declarations or loop blocks. + :kwtype group_by_scope: bool + + :keyword prune_classical_qubits: If ``True``, removes qubits that are never used + in a quantum gate (e.g. qubits only used as classical controls). + :kwtype prune_classical_qubits: bool + + :return: The synthesized circuit. + :rtype: :class:`~qsharp._native.Circuit` + :raises QSharpError: If there is an error synthesizing the circuit. + """ + ipython_helper() + start = monotonic() + telemetry_events.on_circuit() + config = CircuitConfig( + max_operations=max_operations, + generation_method=generation_method, + source_locations=source_locations, + group_by_scope=group_by_scope, + prune_classical_qubits=prune_classical_qubits, + ) + + if isinstance(entry_expr, Callable) and hasattr( + entry_expr, "__global_callable" + ): + self._check_same_session_callable(entry_expr) + args = self._python_args_to_interpreter_args(args) + res = self._interpreter.circuit( + config=config, + callable=getattr(entry_expr, "__global_callable"), + args=args, + ) + elif isinstance(entry_expr, (GlobalCallable, Closure)): + args = self._python_args_to_interpreter_args(args) + res = self._interpreter.circuit( + config=config, callable=entry_expr, args=args + ) + else: + assert entry_expr is None or isinstance(entry_expr, str) + res = self._interpreter.circuit(config, entry_expr, operation=operation) + + durationMs = (monotonic() - start) * 1000 + telemetry_events.on_circuit_end(durationMs) + + return res + + def logical_counts( + self, + entry_expr: Union[str, Callable, GlobalCallable, Closure], + *args, + ) -> LogicalCounts: + """ + Extracts logical resource counts from Q# source code. + Either an entry expression or a callable with arguments must be provided. + + :param entry_expr: The entry expression. Alternatively, a callable can be + provided, which must be a Q# callable. + + :return: Program resources in terms of logical gate counts. + :rtype: LogicalCounts + """ + ipython_helper() + + if isinstance(entry_expr, Callable) and hasattr( + entry_expr, "__global_callable" + ): + self._check_same_session_callable(entry_expr) + args = self._python_args_to_interpreter_args(args) + res_dict = self._interpreter.logical_counts( + callable=getattr(entry_expr, "__global_callable"), args=args + ) + elif isinstance(entry_expr, (GlobalCallable, Closure)): + args = self._python_args_to_interpreter_args(args) + res_dict = self._interpreter.logical_counts(callable=entry_expr, args=args) + else: + assert isinstance(entry_expr, str) + res_dict = self._interpreter.logical_counts(entry_expr=entry_expr) + return LogicalCounts(res_dict) + + def set_quantum_seed(self, seed: Optional[int]) -> None: + """ + Sets the seed for the random number generator used for quantum measurements. + This applies to all Q# code executed, compiled, or estimated. + + :param seed: The seed to use for the quantum random number generator. + If None, the seed will be generated from entropy. + """ + self._interpreter.set_quantum_seed(seed) + + def set_classical_seed(self, seed: Optional[int]) -> None: + """ + Sets the seed for the random number generator used for standard + library classical random number operations. + This applies to all Q# code executed, compiled, or estimated. + + :param seed: The seed to use for the classical random number generator. + If None, the seed will be generated from entropy. + """ + self._interpreter.set_classical_seed(seed) + + def dump_machine(self) -> StateDump: + """ + Returns the sparse state vector of the simulator as a StateDump object. + + :return: The state of the simulator. + :rtype: StateDump + """ + ipython_helper() + return StateDump(self._interpreter.dump_machine()) + + def import_openqasm( + self, + source: str, + **kwargs: Any, + ) -> Any: + """ + Imports OpenQASM source code into this session's interpreter. + + :param source: An OpenQASM program or fragment. + :type source: str + :param **kwargs: Additional keyword arguments. Common options: + + - ``name`` (str): The name of the program. This is used as the entry point + for the program. + - ``search_path`` (str): The optional search path for resolving file + references. + - ``output_semantics`` (OutputSemantics): The output semantics for the + compilation. + - ``program_type`` (ProgramType): The type of program compilation to + perform: + - ``ProgramType.Operation`` (default): the source becomes a Q# operation + in the global namespace with parameters for any declared classical + inputs and parameters for each of the declared qubits, while any + explicit or implicit output declarations become the return type of the + operation. + - ``ProgramType.File``: will treat the input source as a stand-alone + program and create an operation in the ``qasm_import`` namespace that + only takes classical parameters, allocates the required qubits + internally and releases them at the end of the operation. + - ``ProgramType.Fragments``: executes the provided source in the current + interactive interpreter, defining any declared variables or operations + in the current scope and returning the value of the last statement in + the source. + :return: The value returned by the last statement in the source code. + :rtype: Any + :raises QasmError: If there is an error generating, parsing, or analyzing the + OpenQASM source. + :raises QSharpError: If there is an error compiling the program. + """ + from ._fs import list_directory, read_file, resolve + from ._http import fetch_github + from .openqasm._ipython import display_or_print + + ipython_helper() + + telemetry_events.on_import_qasm() + start_time = monotonic() + + kwargs = {k: v for k, v in kwargs.items() if k is not None and v is not None} + if "search_path" not in kwargs: + kwargs["search_path"] = "." + + res = self._interpreter.import_qasm( + source, + display_or_print, + read_file, + list_directory, + resolve, + fetch_github, + **kwargs, + ) + + durationMs = (monotonic() - start_time) * 1000 + telemetry_events.on_import_qasm_end(durationMs) + + return res diff --git a/source/pip/qsharp/_simulation.py b/source/pip/qsharp/_simulation.py index b20a2ef54b..848eed9af6 100644 --- a/source/pip/qsharp/_simulation.py +++ b/source/pip/qsharp/_simulation.py @@ -8,6 +8,7 @@ from ._native import ( QirInstructionId, QirInstruction, + Result, run_clifford, run_clifford_adaptive, run_parallel_shots, @@ -25,7 +26,7 @@ Type, Linkage, ) -from ._qsharp import QirInputData, Result +from ._types import QirInputData from typing import TYPE_CHECKING from ._adaptive_pass import ( AdaptiveProfilePass, diff --git a/source/pip/qsharp/_types.py b/source/pip/qsharp/_types.py new file mode 100644 index 0000000000..789841ff51 --- /dev/null +++ b/source/pip/qsharp/_types.py @@ -0,0 +1,195 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import os +from pathlib import Path +from typing import ( + Any, + Dict, + List, + Optional, + TypedDict, + Union, +) + +from ._native import ( # type: ignore + Output, + StateDumpData, + TargetProfile, +) + + +class Config: + """ + Configuration hints for the language service. + """ + + _config: Dict[str, Any] + + def __init__( + self, + target_profile: TargetProfile, + language_features: Optional[List[str]], + manifest: Optional[str], + project_root: Optional[str], + ): + if target_profile == TargetProfile.Adaptive_RI: + self._config = {"targetProfile": "adaptive_ri"} + elif target_profile == TargetProfile.Adaptive_RIF: + self._config = {"targetProfile": "adaptive_rif"} + elif target_profile == TargetProfile.Adaptive_RIFLA: + self._config = {"targetProfile": "adaptive_rifla"} + elif target_profile == TargetProfile.Base: + self._config = {"targetProfile": "base"} + elif target_profile == TargetProfile.Unrestricted: + self._config = {"targetProfile": "unrestricted"} + + if language_features is not None: + self._config["languageFeatures"] = language_features + if manifest is not None: + self._config["manifest"] = manifest + if project_root: + # For now, we only support local project roots, so use a file schema in the URI. + # In the future, we may support other schemes, such as github, if/when + # we have VS Code Web + Jupyter support. + self._config["projectRoot"] = Path(os.getcwd(), project_root).as_uri() + + def __repr__(self) -> str: + return "Q# initialized with configuration: " + str(self._config) + + # See https://ipython.readthedocs.io/en/stable/config/integrating.html#rich-display + # See https://ipython.org/ipython-doc/3/notebook/nbformat.html#display-data + # This returns a custom MIME-type representation of the Q# configuration. + # This data will be available in the cell output, but will not be displayed + # to the user, as frontends would not know how to render the custom MIME type. + # Editor services that interact with the notebook frontend + # (i.e. the language service) can read and interpret the data. + def _repr_mimebundle_( + self, include: Union[Any, None] = None, exclude: Union[Any, None] = None + ) -> Dict[str, Dict[str, Any]]: + return {"application/x.qsharp-config": self._config} + + def get_target_profile(self) -> str: + """ + Returns the target profile as a string, or "unspecified" if not set. + """ + return self._config.get("targetProfile", "unspecified") + + +class StateDump: + """ + A state dump returned from the Q# interpreter. + """ + + """ + The number of allocated qubits at the time of the dump. + """ + qubit_count: int + + __inner: dict + __data: StateDumpData + + def __init__(self, data: StateDumpData): + self.__data = data + self.__inner = data.get_dict() + self.qubit_count = data.qubit_count + + def __getitem__(self, index: int) -> complex: + return self.__inner.__getitem__(index) + + def __iter__(self): + return self.__inner.__iter__() + + def __len__(self) -> int: + return len(self.__inner) + + def __repr__(self) -> str: + return self.__data.__repr__() + + def __str__(self) -> str: + return self.__data.__str__() + + def _repr_markdown_(self) -> str: + return self.__data._repr_markdown_() + + def check_eq( + self, state: Union[Dict[int, complex], List[complex]], tolerance: float = 1e-10 + ) -> bool: + """ + Checks if the state dump is equal to the given state. This is not mathematical + equality, as the check ignores global phase. + + :param state: The state to check against, provided either as a dictionary of + state indices to complex amplitudes, or as a list of real amplitudes. + :param tolerance: The tolerance for the check. Defaults to 1e-10. + :return: ``True`` if the state dump is equal to the given state within the given + tolerance, ignoring global phase. + :rtype: bool + """ + phase = None + # Convert a dense list of real amplitudes to a dictionary of state indices to complex amplitudes + if isinstance(state, list): + state = {i: val for i, val in enumerate(state)} + # Filter out zero states from the state dump and the given state based on tolerance + state = {k: v for k, v in state.items() if abs(v) > tolerance} + inner_state = {k: v for k, v in self.__inner.items() if abs(v) > tolerance} + if len(state) != len(inner_state): + return False + for key in state: + if key not in inner_state: + return False + if phase is None: + # Calculate the phase based on the first state pair encountered. + # Every pair of states after this must have the same phase for the states to be equivalent. + phase = inner_state[key] / state[key] + elif abs(phase - inner_state[key] / state[key]) > tolerance: + # This pair of states does not have the same phase, + # within tolerance, so the equivalence check fails. + return False + return True + + def as_dense_state(self) -> List[complex]: + """ + Returns the state dump as a dense list of complex amplitudes. This will include + zero amplitudes. + + :return: A dense list of complex amplitudes, one per computational basis state. + :rtype: List[complex] + """ + return [self.__inner.get(i, complex(0)) for i in range(2**self.qubit_count)] + + +class ShotResult(TypedDict): + """ + A single result of a shot. + """ + + events: List[Output | StateDump | str] + result: Any + messages: List[str] + matrices: List[Output] + dumps: List[StateDump] + + +# Class that wraps generated QIR, which can be used by +# azure-quantum as input data. +# +# This class must implement the QirRepresentable protocol +# that is defined by the azure-quantum package. +# See: https://github.com/microsoft/qdk-python/blob/fcd63c04aa871e49206703bbaa792329ffed13c4/azure-quantum/azure/quantum/target/target.py#L21 +class QirInputData: + # The name of this variable is defined + # by the protocol and must remain unchanged. + _name: str + + def __init__(self, name: str, ll_str: str): + self._name = name + self._ll_str = ll_str + + # The name of this method is defined + # by the protocol and must remain unchanged. + def _repr_qir_(self, **kwargs) -> bytes: + return self._ll_str.encode("utf-8") + + def __str__(self) -> str: + return self._ll_str diff --git a/source/pip/qsharp/openqasm/_circuit.py b/source/pip/qsharp/openqasm/_circuit.py index eaed78cba8..4b6d1e65c4 100644 --- a/source/pip/qsharp/openqasm/_circuit.py +++ b/source/pip/qsharp/openqasm/_circuit.py @@ -2,18 +2,16 @@ # Licensed under the MIT License. from time import monotonic -from typing import Any, Callable, Dict, Optional, Union -from .._fs import read_file, list_directory, resolve +from typing import Any, Callable, Optional, Union + +from .. import telemetry_events +from .._fs import list_directory, read_file, resolve from .._http import fetch_github -from .._native import circuit_qasm_program # type: ignore +from .._native import Circuit, CircuitConfig, circuit_qasm_program # type: ignore from .._qsharp import ( - get_interpreter, + _get_default_session, ipython_helper, - Circuit, - CircuitConfig, - python_args_to_interpreter_args, ) -from .. import telemetry_events def circuit( @@ -87,8 +85,9 @@ def circuit( ) if isinstance(source, Callable) and hasattr(source, "__global_callable"): - args = python_args_to_interpreter_args(args) - res = get_interpreter().circuit( + session = _get_default_session() + args = session._python_args_to_interpreter_args(args) + res = session._interpreter.circuit( config, callable=source.__global_callable, args=args ) else: diff --git a/source/pip/qsharp/openqasm/_compile.py b/source/pip/qsharp/openqasm/_compile.py index 8f34963eb1..acca096c27 100644 --- a/source/pip/qsharp/openqasm/_compile.py +++ b/source/pip/qsharp/openqasm/_compile.py @@ -2,20 +2,16 @@ # Licensed under the MIT License. from time import monotonic -from typing import Any, Callable, Dict, Optional, Union +from typing import Any, Callable, Union from .._fs import read_file, list_directory, resolve from .._http import fetch_github from .._native import ( # type: ignore compile_qasm_program_to_qir, -) -from .._qsharp import ( - QirInputData, - get_interpreter, - ipython_helper, TargetProfile, - python_args_to_interpreter_args, ) +from .._qsharp import _get_default_session, ipython_helper +from .._types import QirInputData from .. import telemetry_events @@ -67,9 +63,10 @@ def compile( telemetry_events.on_compile_qasm(target_profile) if isinstance(source, Callable) and hasattr(source, "__global_callable"): - args = python_args_to_interpreter_args(args) - ll_str = get_interpreter().qir( - entry_expr=None, callable=source.__global_callable, args=args + session = _get_default_session() + qsharp_args = session._python_args_to_interpreter_args(args) + ll_str = session._interpreter.qir( + entry_expr=None, callable=source.__global_callable, args=qsharp_args ) elif isinstance(source, str): # remove any entries from kwargs with a None key or None value diff --git a/source/pip/qsharp/openqasm/_estimate.py b/source/pip/qsharp/openqasm/_estimate.py index 7534562600..8877056033 100644 --- a/source/pip/qsharp/openqasm/_estimate.py +++ b/source/pip/qsharp/openqasm/_estimate.py @@ -12,11 +12,7 @@ ) from ..estimator import EstimatorParams, EstimatorResult -from .._qsharp import ( - get_interpreter, - ipython_helper, - python_args_to_interpreter_args, -) +from .._qsharp import _get_default_session, ipython_helper from .. import telemetry_events @@ -78,9 +74,13 @@ def _coerce_estimator_params( telemetry_events.on_estimate_qasm() start = monotonic() if isinstance(source, Callable) and hasattr(source, "__global_callable"): - args = python_args_to_interpreter_args(args) - res_str = get_interpreter().estimate( - param_str, entry_expr=None, callable=source.__global_callable, args=args + session = _get_default_session() + qsharp_args = session._python_args_to_interpreter_args(args) + res_str = session._interpreter.estimate( + param_str, + entry_expr=None, + callable=source.__global_callable, + args=qsharp_args, ) elif isinstance(source, str): # remove any entries from kwargs with a None key or None value diff --git a/source/pip/qsharp/openqasm/_import.py b/source/pip/qsharp/openqasm/_import.py index e616ee0d39..899dcc9cfc 100644 --- a/source/pip/qsharp/openqasm/_import.py +++ b/source/pip/qsharp/openqasm/_import.py @@ -1,17 +1,9 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from time import monotonic from typing import Any -from ._ipython import display_or_print -from .._fs import read_file, list_directory, resolve -from .._http import fetch_github -from .._qsharp import ( - get_interpreter, - ipython_helper, -) -from .. import telemetry_events +from qsharp._qsharp import _get_default_session def import_openqasm( @@ -44,29 +36,4 @@ def import_openqasm( :raises QasmError: If there is an error generating, parsing, or analyzing the OpenQASM source. :raises QSharpError: If there is an error compiling the program. """ - - ipython_helper() - - telemetry_events.on_import_qasm() - start_time = monotonic() - - # remove any entries from kwargs with a None key or None value - kwargs = {k: v for k, v in kwargs.items() if k is not None and v is not None} - - if "search_path" not in kwargs: - kwargs["search_path"] = "." - - res = get_interpreter().import_qasm( - source, - display_or_print, - read_file, - list_directory, - resolve, - fetch_github, - **kwargs, - ) - - durationMs = (monotonic() - start_time) * 1000 - telemetry_events.on_import_qasm_end(durationMs) - - return res + return _get_default_session().import_openqasm(source, **kwargs) diff --git a/source/pip/qsharp/openqasm/_run.py b/source/pip/qsharp/openqasm/_run.py index 1b82cb41ff..b0205c4b0c 100644 --- a/source/pip/qsharp/openqasm/_run.py +++ b/source/pip/qsharp/openqasm/_run.py @@ -5,20 +5,21 @@ from typing import Any, Callable, cast, Dict, List, Optional, Tuple, Union from .._fs import read_file, list_directory, resolve from .._http import fetch_github -from .._native import QasmError, Output, run_qasm_program # type: ignore -from .._qsharp import ( +from .._native import ( # type: ignore + NoiseConfig, + Output, + QasmError, + StateDumpData, + run_qasm_program, +) +from .._noise import ( BitFlipNoise, DepolarizingNoise, PauliNoise, PhaseFlipNoise, - ShotResult, - StateDump, - StateDumpData, - get_interpreter, - ipython_helper, - python_args_to_interpreter_args, - NoiseConfig, ) +from .._qsharp import _get_default_session, ipython_helper +from .._types import ShotResult, StateDump from .. import telemetry_events from ._ipython import display_or_print @@ -107,7 +108,7 @@ def on_save_events(output: Output) -> None: callable = None source_str: Optional[str] = None if isinstance(source, Callable) and hasattr(source, "__global_callable"): - args = python_args_to_interpreter_args(args) + args = _get_default_session()._python_args_to_interpreter_args(args) callable = source.__global_callable elif isinstance(source, str): source_str = source @@ -118,6 +119,7 @@ def on_save_events(output: Output) -> None: noise = None if callable: + interp = _get_default_session()._interpreter for _ in range(shots): results.append( { @@ -128,7 +130,7 @@ def on_save_events(output: Output) -> None: "messages": [], } ) - run_results = get_interpreter().run( + run_results = interp.run( source_str, on_save_events if save_events else display_or_print, noise_config, @@ -136,6 +138,7 @@ def on_save_events(output: Output) -> None: qubit_loss=qubit_loss, callable=callable, args=args, + seed=kwargs.get("seed", None), ) results[-1]["result"] = run_results diff --git a/source/pip/tests/test_interpreter.py b/source/pip/tests/test_interpreter.py index d9c4e365ab..296b619606 100644 --- a/source/pip/tests/test_interpreter.py +++ b/source/pip/tests/test_interpreter.py @@ -10,7 +10,7 @@ TargetProfile, CircuitConfig, ) -from qsharp._qsharp import qsharp_value_to_python_value +from qsharp._qsharp import _get_default_session import pytest from expecttest import assert_expected_inline @@ -19,7 +19,7 @@ def check_interpret(source: str, expect: str): e = Interpreter(TargetProfile.Unrestricted) - value = qsharp_value_to_python_value(e.interpret(source)) + value = _get_default_session()._qsharp_value_to_python_value(e.interpret(source)) assert str(value) == expect @@ -34,13 +34,13 @@ def _make_callable(callable, namespace, callable_name): e = Interpreter(TargetProfile.Unrestricted, make_callable=_make_callable) e.interpret(source) e.interpret(callable) - value = qsharp_value_to_python_value(e.invoke(f)) + value = _get_default_session()._qsharp_value_to_python_value(e.invoke(f)) assert str(value) == expect def check_run(entry_expr: str, expect: str): e = Interpreter(TargetProfile.Unrestricted) - value = qsharp_value_to_python_value(e.run(entry_expr)) + value = _get_default_session()._qsharp_value_to_python_value(e.run(entry_expr)) assert str(value) == expect diff --git a/source/pip/tests/test_session.py b/source/pip/tests/test_session.py new file mode 100644 index 0000000000..a3004f68c7 --- /dev/null +++ b/source/pip/tests/test_session.py @@ -0,0 +1,192 @@ +import qsharp +import pytest + +from qsharp import QSharpError + + +def test_eval() -> None: + s = qsharp.Session() + result = s.eval("1 + 2") + assert result == 3 + + +def test_run() -> None: + s = qsharp.Session() + s.eval("operation Main() : Result { use q = Qubit(); X(q); MResetZ(q) }") + assert s.run("Main()", 2) == [qsharp.Result.One, qsharp.Result.One] + assert s.code.Main() == qsharp.Result.One + + +def test_compile() -> None: + s = qsharp.Session(target_profile=qsharp.TargetProfile.Base) + s.eval("operation Program() : Result { use q = Qubit(); MResetZ(q) }") + program = s.compile("Program()") + assert isinstance(program._repr_qir_(), bytes) + + +def test_circuit() -> None: + s = qsharp.Session() + s.eval("operation Program() : Result { use q = Qubit(); H(q); MResetZ(q) }") + circuit = s.circuit("Program()") + assert "H" in str(circuit) + + +def test_logical_counts() -> None: + s = qsharp.Session(target_profile=qsharp.TargetProfile.Base) + s.eval("operation Program() : Result { use q = Qubit(); MResetZ(q) }") + counts = s.logical_counts("Program()") + assert counts["numQubits"] == 1 + + +def test_seed() -> None: + s1 = qsharp.Session() + s2 = qsharp.Session() + + # Classical seed. + s1.set_classical_seed(100) + s2.set_classical_seed(100) + value1 = s1.eval("Microsoft.Quantum.Random.DrawRandomInt(0, 100)") + value2 = s2.eval("Microsoft.Quantum.Random.DrawRandomInt(0, 100)") + assert value1 == value2 + + # Quantum seed. + code = """{ + use qs = Qubit[16]; + for q in qs { H(q); }; + Microsoft.Quantum.Measurement.MResetEachZ(qs) + }""" + s1.set_quantum_seed(100) + s2.set_quantum_seed(100) + value1 = s1.eval(code) + value2 = s2.eval(code) + assert value1 == value2 + + +def test_dump_machine() -> None: + s = qsharp.Session(target_profile=qsharp.TargetProfile.Unrestricted) + s.eval("use q = Qubit(); X(q);") + state_dump = s.dump_machine() + assert state_dump.qubit_count == 1 + assert state_dump.as_dense_state() == [0, 1] + + +def test_import_openqasm() -> None: + """import_openqasm loads and runs an OpenQASM program in this session.""" + s = qsharp.Session() + s.import_openqasm( + """ + OPENQASM 3.0; + include "stdgates.inc"; + qubit q; + output bit c; + x q; + c = measure q; + reset q; + """, + name="Program", + ) + + results = s.run("{ use q = Qubit(); Program(q) }", 1) + assert results == [qsharp.Result.One] + + +def test_context_callable_has_session_ref() -> None: + """Callables created via eval carry a _qdk_get_interpreter attribute.""" + s = qsharp.Session() + s.eval("function Add(a : Int, b : Int) : Int { a + b }") + add_fn = s.code.Add + assert hasattr(add_fn, "_qdk_session") + assert add_fn._qdk_session is s + + +def test_stale_callable_after_reinit() -> None: + """Callables from a prior init() become invalid after re-initialization.""" + qsharp.init() + qsharp.eval("function Stale() : Int { 99 }") + old_fn = qsharp.code.Stale + # Reinitialize — old callable should now be stale + qsharp.init() + with pytest.raises(QSharpError, match="disposed"): + old_fn() + + +def test_config_property() -> None: + """Session exposes a .config property with the target profile.""" + s = qsharp.Session(target_profile=qsharp.TargetProfile.Base) + assert s._config.get_target_profile() == "base" + + +def test_session_isolation() -> None: + s1 = qsharp.Session() + s2 = qsharp.Session() + s1.eval("function Foo() : Int { 42 }") + assert s1.eval("Foo()") == 42 + # s2 should not have Foo defined. + with pytest.raises(QSharpError): + s2.eval("Foo()") + + +def test_cross_session_callable_passing_raises() -> None: + session_a = qsharp.Session() + session_b = qsharp.Session() + session_a.eval("operation Foo() : Result { use q = Qubit(); M(q) }") + foo = session_a.code.Foo + + with pytest.raises(QSharpError, match="different Session"): + session_b.run(foo, 1) + + with pytest.raises(QSharpError, match="different Session"): + session_b.compile(foo) + + with pytest.raises(QSharpError, match="different Session"): + session_b.circuit(foo) + + with pytest.raises(Exception, match="different Session"): + session_b.logical_counts(foo) + + +def test_cross_session_struct_passing_raises() -> None: + session_a = qsharp.Session() + session_b = qsharp.Session() + # Define struct and function in both contexts with same definitions. + code = """ + struct Point { a : Int, b : Int } + function ProcessPoint(p : Point) : Int { p.a + p.b } + """ + session_a.eval(code) + session_b.eval(code) + + # Create a Point struct instance in session_a + point_from_session_a = session_a.code.Point(3, 4) + assert session_a.code.ProcessPoint(point_from_session_a) == 7 + + with pytest.raises(QSharpError, match="different Session"): + session_b.code.ProcessPoint(point_from_session_a) + + +def test_cross_session_callable_as_argument_raises() -> None: + session_a = qsharp.Session() + session_b = qsharp.Session() + + # Define a higher-order function in both contexts + code = """ + function InvokeWithFive(f : Int -> Int) : Int { f(5) } + function AddOne(x : Int) : Int { x + 1 } + """ + session_a.eval(code) + session_b.eval(code) + assert session_a.code.InvokeWithFive(session_a.code.AddOne) == 6 + + with pytest.raises(QSharpError, match="different Session"): + session_b.code.InvokeWithFive(session_a.code.AddOne) + + +def test_circular_reference_raises(): + qsharp.eval("function First(x : Int[]) : Int { x[0] }") + assert qsharp.code.First([1, 2]) == 1 + + circular_list = [] + circular_list.append(circular_list) + + with pytest.raises(QSharpError, match="Cannot send circular objects"): + qsharp.code.First(circular_list) diff --git a/source/qdk_package/qdk/__init__.py b/source/qdk_package/qdk/__init__.py index dfe5b0d012..c78385deee 100644 --- a/source/qdk_package/qdk/__init__.py +++ b/source/qdk_package/qdk/__init__.py @@ -15,7 +15,6 @@ """ - from qsharp.telemetry_events import on_qdk_import on_qdk_import() @@ -35,6 +34,7 @@ DepolarizingNoise, BitFlipNoise, PhaseFlipNoise, + Session, ) # utilities lifted from qsharp @@ -52,4 +52,5 @@ "DepolarizingNoise", "BitFlipNoise", "PhaseFlipNoise", + "Session", ] diff --git a/source/qdk_package/tests/mocks.py b/source/qdk_package/tests/mocks.py index 430edaa761..c4a31096ae 100644 --- a/source/qdk_package/tests/mocks.py +++ b/source/qdk_package/tests/mocks.py @@ -49,6 +49,7 @@ class _T: # placeholder types stub.DepolarizingNoise = _T stub.BitFlipNoise = _T stub.PhaseFlipNoise = _T + stub.Session = _T stub.__all__ = [ "run", "estimate", @@ -68,6 +69,7 @@ class _T: # placeholder types "estimator", "openqasm", "utils", + "Session", ] # Minimal submodules to back lifted shims est = types.ModuleType("qsharp.estimator")