import multiprocessing
import sys
from squanch import errors
from squanch.qubit import Qubit
__all__ = ["QChannel", "CChannel", "FiberOpticQChannel"]
[docs]class QChannel:
'''
Base class for a quantum channel connecting two agents
'''
[docs] def __init__(self, from_agent, to_agent, length = 0.0, errors = ()):
'''
Instantiate the quantum channel
:param Agent from_agent: sending agent
:param Agent to_agent: receiving agent
:param float length: length of quantum channel in km; default: 0.0km
:param QError[] errors: list of error models to apply to qubits in this channel; default: [] (no errors)
'''
# Register agent connections
self.from_agent = from_agent
self.to_agent = to_agent
# Signal propagation properties
self.length = length # Physical length of the channel in km
self.signal_speed = 2.998 * 10 ** 5 # Speed of light in km/s
# A queue representing the qubits in transit along the channel
self.queue = multiprocessing.Queue()
# Register error models
self.errors = errors
[docs] def put(self, qubit):
'''
Serialize and push qubit into the channel queue
:param Qubit qubit: the qubit to send
'''
# Calculate the time of arrival
time_of_arrival = self.from_agent.time + self.from_agent.pulse_length + (self.length / self.signal_speed)
if qubit is not None:
self.queue.put((qubit.serialize(), time_of_arrival))
else:
self.queue.put((None, time_of_arrival))
[docs] def get(self):
'''
Retrieve a qubit by reference from the channel queue, applying errors upon retrieval
:return: tuple: (the qubit with errors applied (possibly ``None``), receival time)
'''
indices, receive_time = self.queue.get()
if indices is not None:
system_index, qubit_index = indices
qubit = Qubit.from_stream(self.to_agent.qstream, system_index, qubit_index)
else:
qubit = None
# Apply errors
for error in self.errors:
qubit = error.apply(qubit)
# Return modified qubit and return time
return qubit, receive_time
[docs]class CChannel:
'''
Base class for a classical channel connecting two agents
'''
[docs] def __init__(self, from_agent, to_agent, length = 0.0):
'''
Instantiate the quantum channel
:param Agent from_agent: sending agent
:param Agent to_agent: receiving agent
:param float length: length of fiber optic line in km; default: 0.0km
'''
# Register agent connections
self.from_agent = from_agent
self.to_agent = to_agent
# Signal propagation
self.length = length # Physical length of the channel in km
self.signal_speed = 2.998 * 10 ** 5 # Speed of light in km/s
# The channel queue
self.queue = multiprocessing.Queue()
[docs] def put(self, thing):
'''
Serialize and push a serializable object into the channel queue
:param any thing: the qubit to send
'''
# Calculate the time of arrival
pulse_time = sys.getsizeof(thing) * 8 * self.from_agent.pulse_length
time_of_arrival = self.from_agent.time + pulse_time + (self.length / self.signal_speed)
self.queue.put((thing, time_of_arrival))
[docs] def get(self):
'''
Retrieve a classical object form the queue
:return: tuple: (the object, receival time)
'''
thing, receive_time = self.queue.get()
return thing, receive_time
[docs]class FiberOpticQChannel(QChannel):
'''
Represents a fiber optic line with attenuation errors
'''
[docs] def __init__(self, from_agent, to_agent, length = 0.0):
'''
Instantiate the simulated fiber optic quantum channel
:param Agent from_agent: sending agent
:param Agent to_agent: receiving agent
:param float length: length of fiber optic channel in km; default: 0.0km
'''
QChannel.__init__(self, from_agent, to_agent, length = length)
# Register attenuation errors
self.errors = [
errors.AttenuationError(self),
# errors.RandomUnitaryError(self, 2*np.pi / 100),
# errors.SystematicUnitaryError(self, 2*np.pi / 10),
]