I am making Blueqat, a quantum computing library, and I plan to read the source code of Qiskit, a quantum computing library.
Continuing from Last time, this time we will read the process of getting backend, calling, and getting result. Instead of reading the source of the simulator itself, we will read about the interface around the backend.
Qiskit is an open source quantum computing library developed by IBM.
Qiskit is divided into packages as shown below, but when installing, it is less troublesome to install them together with pip install qiskit
than to install them separately.
package | role |
---|---|
Qiskit Terra | This is the main package. It includes a class to create a circuit, a function to transpile the circuit for the actual machine, a function to hit the API and throw it to the actual machine, etc. |
Qiskit Aer | Includes a quantum circuit simulator, usually called from Qiskit Terra |
Qiskit Ignis | This is a library for those who want to fight the noise when running a quantum circuit on an actual machine. I have never used |
Qiskit Aqua | A library that makes it easy to use quantum algorithms |
This time I'm reading a part of Qiskit Terra.
https://github.com/Qiskit/qiskit-terra
Specifically, the code written in README.md
from qiskit import *
qc = QuantumCircuit(2, 2)
qc.h(0)
qc.cx(0, 1)
qc.measure([0,1], [0,1])
backend_sim = BasicAer.get_backend('qasm_simulator')
result = execute(qc, backend_sim).result()
print(result.get_counts(qc))
Of these, read the flow after backend_sim = BasicAer.get_backend ('qasm_simulator')
.
However, I will not read the details of the contents of the simulator and the transpile.
Continue reading the master branch on GitHub. The commit ID at the moment is e7be587, but it's updated quite often and may change by the end of the article. Please note.
BasicAer.get_backend
backend_sim = BasicAer.get_backend('qasm_simulator')
Read this line.
BasicAer
is in terra's qiskit / providers / basicaer / init.py
BasicAer = BasicAerProvider()
Is defined as.
BasicAerProvider
Read the BasicAerProvider
in qiskit / providers / basicaer / basicaerprovider.py.
SIMULATORS = [
QasmSimulatorPy,
StatevectorSimulatorPy,
UnitarySimulatorPy
]
class BasicAerProvider(BaseProvider):
"""Provider for Basic Aer backends."""
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
# Populate the list of Basic Aer backends.
self._backends = self._verify_backends()
#Abbreviation
def _verify_backends(self):
"""
Return the Basic Aer backends in `BACKENDS` that are
effectively available (as some of them might depend on the presence
of an optional dependency or on the existence of a binary).
Returns:
dict[str:BaseBackend]: a dict of Basic Aer backend instances for
the backends that could be instantiated, keyed by backend name.
"""
ret = OrderedDict()
for backend_cls in SIMULATORS:
try:
backend_instance = self._get_backend_instance(backend_cls)
backend_name = backend_instance.name()
ret[backend_name] = backend_instance
except QiskitError as err:
# Ignore backends that could not be initialized.
logger.info('Basic Aer backend %s is not available: %s',
backend_cls, str(err))
return ret
For the time being, read the __init __
of the parent class BaseProvider
.
class BaseProvider(ABC):
"""Base class for a Backend Provider."""
def __init__(self, *args, **kwargs):
pass
I'm not doing anything.
BasicAerProvider._verify_backends
continue
self._backends = self._verify_backends()
Read _verify_backends
in. I've pasted the code above, but it's really bad to do anything other than verify with the name verify
...
Set it aside. We can see the structure that BasicAer
is a provider, and a provider has a Backend
.
It seems that the [QasmSimulatorPy, StatevectorSimulatorPy, UnitarySimulatorPy]
classes defined in SIMULATORS
are each _get_backend_instance
and packed into a (ordered) dictionary and returned.
Let's read _get_backend_instance
. The docstring is cut off and quoted.
def _get_backend_instance(self, backend_cls):
# Verify that the backend can be instantiated.
try:
backend_instance = backend_cls(provider=self)
except Exception as err:
raise QiskitError('Backend %s could not be instantiated: %s' %
(backend_cls, err))
return backend_instance
I'm just creating an instance of the backend class, but I'm passing the provider itself to the instance. The provider and the backend are cross-referenced.
QasmSimulatorPy.__init__
These backend classes themselves
qiskit / providers / basicaer / qasm_simulator.py, qiskit/providers/basicaer/statevector_simulator. py, [qiskit/providers/basicaer/unitary_simulator.py](https://github. It is defined in com / Qiskit / qiskit-terra / blob / master / qiskit / providers / basicaer / unitary_simulator.py). Let's just look at QasmSimulatorPy.__init__
.
class QasmSimulatorPy(BaseBackend):
"""Python implementation of a qasm simulator."""
MAX_QUBITS_MEMORY = int(log2(local_hardware_info()['memory'] * (1024 ** 3) / 16))
DEFAULT_CONFIGURATION = {
'backend_name': 'qasm_simulator',
'backend_version': '2.0.0',
'n_qubits': min(24, MAX_QUBITS_MEMORY),
'url': 'https://github.com/Qiskit/qiskit-terra',
'simulator': True,
'local': True,
'conditional': True,
'open_pulse': False,
'memory': True,
'max_shots': 65536,
'coupling_map': None,
'description': 'A python simulator for qasm experiments',
'basis_gates': ['u1', 'u2', 'u3', 'cx', 'id', 'unitary'],
'gates': [
{
'name': 'u1',
'parameters': ['lambda'],
'qasm_def': 'gate u1(lambda) q { U(0,0,lambda) q; }'
},
{
'name': 'u2',
'parameters': ['phi', 'lambda'],
'qasm_def': 'gate u2(phi,lambda) q { U(pi/2,phi,lambda) q; }'
},
{
'name': 'u3',
'parameters': ['theta', 'phi', 'lambda'],
'qasm_def': 'gate u3(theta,phi,lambda) q { U(theta,phi,lambda) q; }'
},
{
'name': 'cx',
'parameters': ['c', 't'],
'qasm_def': 'gate cx c,t { CX c,t; }'
},
{
'name': 'id',
'parameters': ['a'],
'qasm_def': 'gate id a { U(0,0,0) a; }'
},
{
'name': 'unitary',
'parameters': ['matrix'],
'qasm_def': 'unitary(matrix) q1, q2,...'
}
]
}
DEFAULT_OPTIONS = {
"initial_statevector": None,
"chop_threshold": 1e-15
}
# Class level variable to return the final state at the end of simulation
# This should be set to True for the statevector simulator
SHOW_FINAL_STATE = False
def __init__(self, configuration=None, provider=None):
super().__init__(configuration=(
configuration or QasmBackendConfiguration.from_dict(self.DEFAULT_CONFIGURATION)),
provider=provider)
# Define attributes in __init__.
self._local_random = np.random.RandomState()
self._classical_memory = 0
self._classical_register = 0
self._statevector = 0
self._number_of_cmembits = 0
self._number_of_qubits = 0
self._shots = 0
self._memory = False
self._initial_statevector = self.DEFAULT_OPTIONS["initial_statevector"]
self._chop_threshold = self.DEFAULT_OPTIONS["chop_threshold"]
self._qobj_config = None
# TEMP
self._sample_measure = False
I feel like I've put in various settings. What you're doing is just initializing variables, and it doesn't look like you're doing much. You can only simulate up to 24 qubits. I didn't know. (I have the impression that it is set quite a little. You should be able to do your best even with an ordinary computer)
It seems that the backend itself has a state that seems to be in progress, such as the number of shots and memory. (Blueqat dares to avoid such an implementation) The idea around here may become a little more clear as you read the code.
Basic Aer
--BasicAer
is an instance of BasicAerProvider
--An instance of BasicAerProvider
has a list of backends
I understand. Next, let's take a look at BasicAer.get_backend
.
BasicAer.get_backend
def get_backend(self, name=None, **kwargs):
backends = self._backends.values()
# Special handling of the `name` parameter, to support alias resolution
# and deprecated names.
if name:
try:
resolved_name = resolve_backend_name(
name, backends,
self._deprecated_backend_names(),
{}
)
name = resolved_name
except LookupError:
raise QiskitBackendNotFoundError(
"The '{}' backend is not installed in your system.".format(name))
return super().get_backend(name=name, **kwargs)
Even though I have self._backends
in the dictionary, I'm curious that I'm doing it without looking up the dictionary.
Let's read resolve_backend_name
.
resolve_backend_name
resolve_backend_name
is defined in qiskit / providers / providersutils.py.
def resolve_backend_name(name, backends, deprecated, aliased):
"""Resolve backend name from a deprecated name or an alias.
A group will be resolved in order of member priorities, depending on
availability.
Args:
name (str): name of backend to resolve
backends (list[BaseBackend]): list of available backends.
deprecated (dict[str: str]): dict of deprecated names.
aliased (dict[str: list[str]]): dict of aliased names.
Returns:
str: resolved name (name of an available backend)
Raises:
LookupError: if name cannot be resolved through regular available
names, nor deprecated, nor alias names.
"""
available = [backend.name() for backend in backends]
resolved_name = deprecated.get(name, aliased.get(name, name))
if isinstance(resolved_name, list):
resolved_name = next((b for b in resolved_name if b in available), "")
if resolved_name not in available:
raise LookupError("backend '{}' not found.".format(name))
if name in deprecated:
logger.warning("Backend '%s' is deprecated. Use '%s'.", name,
resolved_name)
return resolved_name
Make a list of backend names with ʻavailable`.
deprecated
can be seen by those who have been using Qiskit for some time by looking atBasicAerProvider._deprecated_backend_names ()
.
@staticmethod
def _deprecated_backend_names():
"""Returns deprecated backend names."""
return {
'qasm_simulator_py': 'qasm_simulator',
'statevector_simulator_py': 'statevector_simulator',
'unitary_simulator_py': 'unitary_simulator',
'local_qasm_simulator_py': 'qasm_simulator',
'local_statevector_simulator_py': 'statevector_simulator',
'local_unitary_simulator_py': 'unitary_simulator',
'local_unitary_simulator': 'unitary_simulator',
}
It is a dictionary of {'old backend name':' active backend name'}
.
If the backend name you are looking for is in deprecated
, convert it to the active backend name.
For ʻaliased, I searched for
qiskit-terraand
qiskit-aer but couldn't find anything that wasn't empty, but the backend name I wanted to look for was in the ʻaliased
dictionary. If so, it seems to retrieve the list of aliases.
Since there are multiple aliases in the list, make it the very first ʻavailable backend name, and if none of them are ʻavailable
, make it an empty string.
When the name conversion is finished, look at the backend corresponding to the converted name, return the name if any, and throw an exception if not.
get_backend ()
After converting the name and getting a ʻavailable` name,
return super().get_backend(name=name, **kwargs)
I was saying. So let's read BaseProvider.get_backend ()
. Omit the docstring.
def get_backend(self, name=None, **kwargs):
backends = self.backends(name, **kwargs)
if len(backends) > 1:
raise QiskitBackendNotFoundError('More than one backend matches the criteria')
if not backends:
raise QiskitBackendNotFoundError('No backend matches the criteria')
return backends[0]
Returns the first element of the list or something returned by BasicAerProvider.backends (name)
.
Read BasicAerProvider.backends
.
def backends(self, name=None, filters=None, **kwargs):
# pylint: disable=arguments-differ
backends = self._backends.values()
# Special handling of the `name` parameter, to support alias resolution
# and deprecated names.
if name:
try:
resolved_name = resolve_backend_name(
name, backends,
self._deprecated_backend_names(),
{}
)
backends = [backend for backend in backends if
backend.name() == resolved_name]
except LookupError:
return []
return filter_backends(backends, filters=filters, **kwargs)
It also calls resolve_backend_name
. Unless the deprecated
is circulating or something strange like that, you should get the same result no matter how many times you try. This time I'm pulling out the backend itself, not the name.
As for filter_backends
, it's long even though I haven't done much, so if you are interested, [qiskit / providers / providersutils.py](https://github.com/Qiskit/qiskit-terra/blob/master Please refer to /qiskit/providers/providerutils.py).
Only docstring and comments are quoted.
def filter_backends(backends, filters=None, **kwargs):
"""Abbreviation
Args:
backends (list[BaseBackend]): list of backends.
filters (callable): filtering conditions as a callable.
**kwargs: dict of criteria.
Returns:
list[BaseBackend]: a list of backend instances matching the
conditions.
"""
# Inspect the backends to decide which filters belong to
# backend.configuration and which ones to backend.status, as it does
# not involve querying the API.
# 1. Apply backend.configuration filtering.
# 2. Apply backend.status filtering (it involves one API call for
# each backend).
# 3. Apply acceptor filter.
kwargs
gives conditions for backend.configuration
and backend.status
.
If these are given in a mixed manner, they will be sorted on the filter_backends
side.
In addition, if you give the function to the filters
argument, it will return the result of filtering with the Python built-in filter
function.
By the way, the story goes awry. You can pass None as an argument to the filter
function built into Python. I did not know.
Return an iterator yielding those items of iterable for which function(item) is true. If function is None, return the items that are true.
It seems.
list(filter(None, [1, 2, 0, 3, "", "a"]))
# => [1, 2, 3, 'a']
Anyway, I was able to get one backend that matched the name, despite being messed up. with this
backend_sim = BasicAer.get_backend('qasm_simulator')
You can read the line.
continue
result = execute(qc, backend_sim).result()
In the line of, read ʻexecute`. From qiskit / execute.py, I'll cut the docstring and quote it.
def execute(experiments, backend,
basis_gates=None, coupling_map=None, # circuit transpile options
backend_properties=None, initial_layout=None,
seed_transpiler=None, optimization_level=None, pass_manager=None,
qobj_id=None, qobj_header=None, shots=1024, # common run options
memory=False, max_credits=10, seed_simulator=None,
default_qubit_los=None, default_meas_los=None, # schedule run options
schedule_los=None, meas_level=2, meas_return='avg',
memory_slots=None, memory_slot_size=100, rep_time=None, parameter_binds=None,
**run_config):
# transpiling the circuits using given transpile options
experiments = transpile(experiments,
basis_gates=basis_gates,
coupling_map=coupling_map,
backend_properties=backend_properties,
initial_layout=initial_layout,
seed_transpiler=seed_transpiler,
optimization_level=optimization_level,
backend=backend,
pass_manager=pass_manager,
)
# assembling the circuits into a qobj to be run on the backend
qobj = assemble(experiments,
qobj_id=qobj_id,
qobj_header=qobj_header,
shots=shots,
memory=memory,
max_credits=max_credits,
seed_simulator=seed_simulator,
default_qubit_los=default_qubit_los,
default_meas_los=default_meas_los,
schedule_los=schedule_los,
meas_level=meas_level,
meas_return=meas_return,
memory_slots=memory_slots,
memory_slot_size=memory_slot_size,
rep_time=rep_time,
parameter_binds=parameter_binds,
backend=backend,
**run_config
)
# executing the circuits on the backend and returning the job
return backend.run(qobj, **run_config)
transpile
This time, I will not go into the details of Transpile. I really only look at the surface.
It can be found at qiskit / compiler / transpile.py. The docstring is so long that I'll omit it, but it's interesting and I recommend reading it.
def transpile(circuits,
backend=None,
basis_gates=None, coupling_map=None, backend_properties=None,
initial_layout=None, seed_transpiler=None,
optimization_level=None,
pass_manager=None, callback=None, output_name=None):
# transpiling schedules is not supported yet.
if isinstance(circuits, Schedule) or \
(isinstance(circuits, list) and all(isinstance(c, Schedule) for c in circuits)):
return circuits
if optimization_level is None:
config = user_config.get_config()
optimization_level = config.get('transpile_optimization_level', None)
# Get TranspileConfig(s) to configure the circuit transpilation job(s)
circuits = circuits if isinstance(circuits, list) else [circuits]
transpile_configs = _parse_transpile_args(circuits, backend, basis_gates, coupling_map,
backend_properties, initial_layout,
seed_transpiler, optimization_level,
pass_manager, callback, output_name)
# Check circuit width against number of qubits in coupling_map(s)
coupling_maps_list = list(config.coupling_map for config in transpile_configs)
for circuit, parsed_coupling_map in zip(circuits, coupling_maps_list):
# If coupling_map is not None
if isinstance(parsed_coupling_map, CouplingMap):
n_qubits = len(circuit.qubits)
max_qubits = parsed_coupling_map.size()
if n_qubits > max_qubits:
raise TranspilerError('Number of qubits ({}) '.format(n_qubits) +
'in {} '.format(circuit.name) +
'is greater than maximum ({}) '.format(max_qubits) +
'in the coupling_map')
# Transpile circuits in parallel
circuits = parallel_map(_transpile_circuit, list(zip(circuits, transpile_configs)))
if len(circuits) == 1:
return circuits[0]
return circuits
In actual machines, there are gates that are not connected to CNOT, so it seems that they are working to create circuits while allocating them. Furthermore, since the calculation itself is difficult, it is calculated in parallel. Even if you omit the argument, it will fetch the settings from the back end as appropriate, thanks to the addition of a lot of settings to the back end.
Converts the circuit and returns the circuit itself.
Look at ʻassemble` in qiskit / compiler / assemble.py in the same way. (Docstring omitted)
def assemble(experiments,
backend=None,
qobj_id=None, qobj_header=None,
shots=1024, memory=False, max_credits=None, seed_simulator=None,
qubit_lo_freq=None, meas_lo_freq=None,
qubit_lo_range=None, meas_lo_range=None,
schedule_los=None, meas_level=2, meas_return='avg', meas_map=None,
memory_slot_size=100, rep_time=None, parameter_binds=None,
**run_config):
experiments = experiments if isinstance(experiments, list) else [experiments]
qobj_id, qobj_header, run_config_common_dict = _parse_common_args(backend, qobj_id, qobj_header,
shots, memory, max_credits,
seed_simulator, **run_config)
# assemble either circuits or schedules
if all(isinstance(exp, QuantumCircuit) for exp in experiments):
run_config = _parse_circuit_args(parameter_binds, **run_config_common_dict)
# If circuits are parameterized, bind parameters and remove from run_config
bound_experiments, run_config = _expand_parameters(circuits=experiments,
run_config=run_config)
return assemble_circuits(circuits=bound_experiments, qobj_id=qobj_id,
qobj_header=qobj_header, run_config=run_config)
elif all(isinstance(exp, ScheduleComponent) for exp in experiments):
run_config = _parse_pulse_args(backend, qubit_lo_freq, meas_lo_freq,
qubit_lo_range, meas_lo_range,
schedule_los, meas_level, meas_return,
meas_map, memory_slot_size, rep_time,
**run_config_common_dict)
return assemble_schedules(schedules=experiments, qobj_id=qobj_id,
qobj_header=qobj_header, run_config=run_config)
else:
raise QiskitError("bad input to assemble() function; "
"must be either circuits or schedules")
It seems that you can assemble the circuit and the pulse schedule.
The type of the value to be returned is QObj
, and in the case of a circuit, the type QasmQObj
is returned.
If you don't know what kind of data structure the QasmQobj
type is, you will be in trouble after this, so
from qiskit import *
qc = QuantumCircuit(2, 2)
qc.h(0)
qc.cx(0, 1)
qc.measure([0,1], [0,1])
backend_sim = BasicAer.get_backend('qasm_simulator')
tran = transpile(qc, backend=backend_sim)
asm = assemble(qc, backend=backend_sim)
Looking at ʻasm`, it was as follows.
QasmQobj(
config=QasmQobjConfig(
memory=False,
memory_slots=2,
n_qubits=2,
parameter_binds=[],
shots=1024),
experiments=[
QasmQobjExperiment(
config=QasmQobjExperimentConfig(memory_slots=2, n_qubits=2),
header=QobjExperimentHeader(
clbit_labels=[['c', 0], ['c', 1]],
creg_sizes=[['c', 2]],
memory_slots=2,
n_qubits=2,
name='circuit0',
qreg_sizes=[['q', 2]],
qubit_labels=[['q', 0], ['q', 1]]),
instructions=[
QasmQobjInstruction(
name='u2', params=[0, 3.14159265358979], qubits=[0]),
QasmQobjInstruction(name='cx', qubits=[0, 1]),
QasmQobjInstruction(memory=[0], name='measure', qubits=[0]),
QasmQobjInstruction(memory=[1], name='measure', qubits=[1])
])
],
header=QobjHeader(backend_name='qasm_simulator', backend_version='2.0.0'),
qobj_id='06682d2e-bfd8-4dba-ba8e-4e46492c1609',
schema_version='1.1.0',
type='QASM')
I think you can read it without any problems. By the way, the H gate is converted to ʻu2 (0, π) `. This is an equivalent expression.
backend.run
Now that we've converted the circuit to QasmQobj
, next
return backend.run(qobj, **run_config)
I will read. qiskit / providers / basicaer / qasm_simulator.py
def run(self, qobj, backend_options=None):
"""Run qobj asynchronously.
Args:
qobj (Qobj): payload of the experiment
backend_options (dict): backend options
Returns:
BasicAerJob: derived from BaseJob
Additional Information:
backend_options: Is a dict of options for the backend. It may contain
* "initial_statevector": vector_like
The "initial_statevector" option specifies a custom initial
initial statevector for the simulator to be used instead of the all
zero state. This size of this vector must be correct for the number
of qubits in all experiments in the qobj.
Example::
backend_options = {
"initial_statevector": np.array([1, 0, 0, 1j]) / np.sqrt(2),
}
"""
self._set_options(qobj_config=qobj.config,
backend_options=backend_options)
job_id = str(uuid.uuid4())
job = BasicAerJob(self, job_id, self._run_job, qobj)
job.submit()
return job
It's nice to be able to put an initial vector in the options. It looks like it's completely backend dependent.
The chop_threshold
option seems to treat the value as 0
if it is less than or equal to this value when simulating the state vector.
QasmSimulatorPy._set_options
Continue reading _set_options
.
def _set_options(self, qobj_config=None, backend_options=None):
"""Set the backend options for all experiments in a qobj"""
# Reset default options
self._initial_statevector = self.DEFAULT_OPTIONS["initial_statevector"]
self._chop_threshold = self.DEFAULT_OPTIONS["chop_threshold"]
if backend_options is None:
backend_options = {}
# Check for custom initial statevector in backend_options first,
# then config second
if 'initial_statevector' in backend_options:
self._initial_statevector = np.array(backend_options['initial_statevector'],
dtype=complex)
elif hasattr(qobj_config, 'initial_statevector'):
self._initial_statevector = np.array(qobj_config.initial_statevector,
dtype=complex)
if self._initial_statevector is not None:
# Check the initial statevector is normalized
norm = np.linalg.norm(self._initial_statevector)
if round(norm, 12) != 1:
raise BasicAerError('initial statevector is not normalized: ' +
'norm {} != 1'.format(norm))
# Check for custom chop threshold
# Replace with custom options
if 'chop_threshold' in backend_options:
self._chop_threshold = backend_options['chop_threshold']
elif hasattr(qobj_config, 'chop_threshold'):
self._chop_threshold = qobj_config.chop_threshold
Due to the fact that the backend itself has options
# Reset default options
self._initial_statevector = self.DEFAULT_OPTIONS["initial_statevector"]
self._chop_threshold = self.DEFAULT_OPTIONS["chop_threshold"]
Has come out in such a place. Anyway, I just interpret the two options and set them in qobj.
BasicAerJob.__init__
job_id = str(uuid.uuid4())
job = BasicAerJob(self, job_id, self._run_job, qobj)
It has an appropriate UUID as job_id.
I am making BasicAerJob
as a job. self._run_job
is a method, but I'm passing the method as is. Let's see it when called later.
qiskit/providers/basicaer/basicaerjob.py
class BasicAerJob(BaseJob):
"""BasicAerJob class.
Attributes:
_executor (futures.Executor): executor to handle asynchronous jobs
"""
if sys.platform in ['darwin', 'win32']:
_executor = futures.ThreadPoolExecutor()
else:
_executor = futures.ProcessPoolExecutor()
def __init__(self, backend, job_id, fn, qobj):
super().__init__(backend, job_id)
self._fn = fn
self._qobj = qobj
self._future = None
I just created the data and haven't done anything at this point. I was a little worried about using the thread pool on Windows and Mac, and the process pool on others. In Python, even if there are multiple CPUs in multithreading, only one thread can run at the same time. Let's be careful about how it is used.
BasicAerJob.submit
job.submit()
return job
I'm submitting a job and then returning a job. Let's read BasicAerJob.submit
.
def submit(self):
"""Submit the job to the backend for execution.
Raises:
QobjValidationError: if the JSON serialization of the Qobj passed
during construction does not validate against the Qobj schema.
JobError: if trying to re-submit the job.
"""
if self._future is not None:
raise JobError("We have already submitted the job!")
validate_qobj_against_schema(self._qobj)
self._future = self._executor.submit(self._fn, self._job_id, self._qobj)
validate_qobj_against_schema(self._qobj)
As for, does Qobj match the format of JSON Schema defined in another file? Was just verified using the external library jsonschema
. It's not very interesting, so I'll omit it.
_executor
was the ThreadPoolExecutor
or ProcessPoolExecutor
in the Python standard libraryconcurrent.futures
. We are calling that submit
method.
Read the Official Documentation.
submit(fn, *args, **kwargs)
Schedule the callable objectfn
to run asfn (* args ** kwargs)
and represent the run of the callable object Future /3/library/concurrent.futures.html#concurrent.futures.Future) Returns an object.
In other words, you are creating a Future
object to scheduleself._fn (self._job_id, self._qobj)
to run.
This Future
object is for asynchronous execution, and while being moved behind the scenes by the _executor
, the process will proceed even if the calculation is not completed.
(If you order a hamburger at a hamburger shop, you can wait in front of the counter until it's done, but if you get a number tag, you can take a seat first or do something else and spend your time meaningfully. With a normal function call you have to wait for the result to come back, but with asynchronous execution you can do something else without waiting for it to come back. The Future
object will receive the hamburger in the future. It's like a burger)
QasmSimulatorPy._run_job
Recalling what was self._fn (self._job_id, self._qobj)
driven by _executor
,self made with
job = BasicAerJob (self, job_id, self._run_job, qobj)It was ._run_job (job_id, qobj)
.
So let's read QasmSimulatorPy._run_job
.
def _run_job(self, job_id, qobj):
"""Run experiments in qobj
Args:
job_id (str): unique id for the job.
qobj (Qobj): job description
Returns:
Result: Result object
"""
self._validate(qobj)
result_list = []
self._shots = qobj.config.shots
self._memory = getattr(qobj.config, 'memory', False)
self._qobj_config = qobj.config
start = time.time()
for experiment in qobj.experiments:
result_list.append(self.run_experiment(experiment))
end = time.time()
result = {'backend_name': self.name(),
'backend_version': self._configuration.backend_version,
'qobj_id': qobj.qobj_id,
'job_id': job_id,
'results': result_list,
'status': 'COMPLETED',
'success': True,
'time_taken': (end - start),
'header': qobj.header.to_dict()}
return Result.from_dict(result)
--self._validate
(read later)
--Set the necessary information on the backend itself
--Start timer
--Extract ʻexperiment one by one from
qobj.experiments --Add the result of
run_experiment (read later) for each ʻexperiment
to result_list
--Timer ends
--Pack the results in a dictionary, convert to Result
type (read later) and return
QasmSimulatorPy._validate
def _validate(self, qobj):
"""Semantic validations of the qobj which cannot be done via schemas."""
n_qubits = qobj.config.n_qubits
max_qubits = self.configuration().n_qubits
if n_qubits > max_qubits:
raise BasicAerError('Number of qubits {} '.format(n_qubits) +
'is greater than maximum ({}) '.format(max_qubits) +
'for "{}".'.format(self.name()))
for experiment in qobj.experiments:
name = experiment.header.name
if experiment.config.memory_slots == 0:
logger.warning('No classical registers in circuit "%s", '
'counts will be empty.', name)
elif 'measure' not in [op.name for op in experiment.instructions]:
logger.warning('No measurements in circuit "%s", '
'classical register will remain all zeros.', name)
We check that the number of qubits does not exceed the maximum value, and that there is a classic register and that measurement is being performed. (No classical register, no measurement is treated as a warning, not an error)
QasmSimulatorPy.run_experiment
Before that, let's remember what ʻexperiments` was made of.
experiments=[
QasmQobjExperiment(
config=QasmQobjExperimentConfig(memory_slots=2, n_qubits=2),
header=QobjExperimentHeader(
clbit_labels=[['c', 0], ['c', 1]],
creg_sizes=[['c', 2]],
memory_slots=2,
n_qubits=2,
name='circuit0',
qreg_sizes=[['q', 2]],
qubit_labels=[['q', 0], ['q', 1]]),
instructions=[
QasmQobjInstruction(
name='u2', params=[0, 3.14159265358979], qubits=[0]),
QasmQobjInstruction(name='cx', qubits=[0, 1]),
QasmQobjInstruction(memory=[0], name='measure', qubits=[0]),
QasmQobjInstruction(memory=[1], name='measure', qubits=[1])
])
],
QasmQobjExperiment
corresponds to one quantum circuit. With Qiskit, you can pack and execute a number of quantum circuits, but this time it seems that there is only one.
Since we were running the loop of for experiment in qobj.experiments:
, we are calling run_experiment
for each circuit. The circuit has ʻinstructions`, which correspond to gates and measurements.
run_experiment
is really long, so I'll omit it. Roughly speaking, the gate is calculated and the measurement is calculated for the number of shots
.
return {'name': experiment.header.name,
'seed_simulator': seed_simulator,
'shots': self._shots,
'data': data,
'status': 'DONE',
'success': True,
'time_taken': (end - start),
'header': experiment.header.to_dict()}
Returns a result like.
In addition, data
is
# Add data
data = {'counts': dict(Counter(memory))}
# Optionally add memory list
if self._memory:
data['memory'] = memory
# Optionally add final statevector
if self.SHOW_FINAL_STATE:
data['statevector'] = self._get_statevector()
# Remove empty counts and memory for statevector simulator
if not data['counts']:
data.pop('counts')
if 'memory' in data and not data['memory']
Like, it has measurement data.
qiskit.result.Result
The dictionary type is made to Result
type byreturn Result.from_dict (result)
.
qiskit/result/result.py
@bind_schema(ResultSchema)
class Result(BaseModel):
"""Model for Results.
Please note that this class only describes the required fields. For the
full description of the model, please check ``ResultSchema``.
Attributes:
backend_name (str): backend name.
backend_version (str): backend version, in the form X.Y.Z.
qobj_id (str): user-generated Qobj id.
job_id (str): unique execution id from the backend.
success (bool): True if complete input qobj executed correctly. (Implies
each experiment success)
results (list[ExperimentResult]): corresponding results for array of
experiments of the input qobj
"""
def __init__(self, backend_name, backend_version, qobj_id, job_id, success,
results, **kwargs):
self.backend_name = backend_name
self.backend_version = backend_version
self.qobj_id = qobj_id
self.job_id = job_id
self.success = success
self.results = results
Originally I want to read from_dict
, but one of the features of qiskit-terra is that I am doing my best to process around JSON schema. from_dict
is defined in the parent class BaseModel
, and its contents are
@classmethod
def from_dict(cls, dict_):
"""Deserialize a dict of simple types into an instance of this class.
Note that this method requires that the model is bound with
``@bind_schema``.
"""
try:
data = cls.schema.load(dict_)
except ValidationError as ex:
raise ModelValidationError(
ex.messages, ex.field_name, ex.data, ex.valid_data, **ex.kwargs) from None
return data
It's very simple. Create a ResultSchema
class separately from the Result
class
class ResultSchema(BaseSchema):
"""Schema for Result."""
# Required fields.
backend_name = String(required=True)
backend_version = String(required=True,
validate=Regexp('[0-9]+.[0-9]+.[0-9]+$'))
qobj_id = String(required=True)
job_id = String(required=True)
success = Boolean(required=True)
results = Nested(ExperimentResultSchema, required=True, many=True)
# Optional fields.
date = DateTime()
status = String()
header = Nested(ObjSchema)
And so on, I'm doing pretty well. (Defined in /qiskit/result/models.py) I can't read it properly, but I think that ʻExperimentResultSchema` etc. will be recursively made into objects while confirming that they have these data types.
In any case, it is a data refill, so let me omit it this time.
The Result
that I moved and returned at hand was as follows.
Result(
backend_name='qasm_simulator',
backend_version='2.0.0',
header=Obj(backend_name='qasm_simulator', backend_version='2.0.0'),
job_id='65b162ae-fe6b-480a-85ba-8c890d8bbf3b',
qobj_id='c75e9b2c-da7c-4dd6-96da-d132126e6dc0',
results=[
ExperimentResult(
data=ExperimentResultData(counts=Obj(0x0=493, 0x3=531)),
header=Obj(
clbit_labels=[['c', 0], ['c', 1]],
creg_sizes=[['c', 2]],
memory_slots=2,
n_qubits=2,
name='circuit0',
qreg_sizes=[['q', 2]],
qubit_labels=[['q', 0], ['q', 1]]),
meas_level=2,
name='circuit0',
seed_simulator=1480888590,
shots=1024,
status='DONE',
success=True,
time_taken=0.12355875968933105)
],
status='COMPLETED',
success=True,
time_taken=0.12369537353515625)
BasicAerJob.result ()
I skipped the middle, but
result = execute(qc, backend_sim).result()
When ʻexecute (qc, backend_sim)is finished, we will look at
.result (). ʻExecute
returned BasicAerJob
.
@requires_submit
def result(self, timeout=None):
# pylint: disable=arguments-differ
"""Get job result. The behavior is the same as the underlying
concurrent Future objects,
https://docs.python.org/3/library/concurrent.futures.html#future-objects
Args:
timeout (float): number of seconds to wait for results.
Returns:
qiskit.Result: Result object
Raises:
concurrent.futures.TimeoutError: if timeout occurred.
concurrent.futures.CancelledError: if job cancelled before completed.
"""
return self._future.result(timeout=timeout)
The first @ requires_submit
looks if you did a submit
.
Whether or not it was submitted
was set to self._future = None
when the BasicAerJob
was created, but when submitted
, the Future
object is included in self._future
, so it is set to None
. You can find out if you check if it is.
This normally calls the result
method of the Future
object.
See the official documentation (https://docs.python.org/ja/3/library/concurrent.futures.html#concurrent.futures.Future).
result(timeout=None)
Returns the value returned by the call. If the call is not yet complete, this method waits for timeout seconds. If the call does not complete within the timeout seconds, concurrent.futures.TimeoutError is thrown. You can specify int or float for timeout. If timeout is not specified or is None, there is no limit on the wait time. CanceledError is thrown if canceled before the future completes. If the call throws an exception, this method throws the same exception.
It is a method that returns the result if it is finished, waits if it is not finished, and then returns the result.
The result is an object of type Result
as we saw above.
Result.get_counts
Finally the last line.
print(result.get_counts(qc))
I will look at.
def get_counts(self, experiment=None):
"""Get the histogram data of an experiment.
Args:
experiment (str or QuantumCircuit or Schedule or int or None): the index of the
experiment, as specified by ``get_data()``.
Returns:
dict[str:int]: a dictionary with the counts for each qubit, with
the keys containing a string in binary format and separated
according to the registers in circuit (e.g. ``0100 1110``).
The string is little-endian (cr[0] on the right hand side).
Raises:
QiskitError: if there are no counts for the experiment.
"""
exp = self._get_experiment(experiment)
try:
header = exp.header.to_dict()
except (AttributeError, QiskitError): # header is not available
header = None
if 'counts' in self.data(experiment).keys():
return postprocess.format_counts(self.data(experiment)['counts'],
header)
elif 'statevector' in self.data(experiment).keys():
vec = postprocess.format_statevector(self.data(experiment)['statevector'])
return state_to_counts(vec)
else:
raise QiskitError('No counts for experiment "{0}"'.format(experiment))
In _get_experiment
, ʻexperiment corresponding to the circuit name is taken out. If there is only one ʻexperiment
like this time, you can omit the ʻexperiment` argument.
If you can extract the header from ʻexperiment, you will know the length of the classical register, so you can nicely zero-fill the measurement result.
format_counts` is doing a nice job of converting it to a string and packing it in a dictionary.
This time, in Qiskit, I read the flow of processing except for the part directly related to quantum calculation. Due to repeated changes and future-proof extensibility, there were some parts that had very difficult specifications, but it was made to be able to do quite a lot of things, and conversion to transpile and JSON, etc. I got the impression that the parts that Blueqat does not deal with are quite heavy and it will be a great learning experience.
This is the end of my goal of reading the whole process, but I will continue to read the Qiskit source code.