import queue
import multiprocessing
import itertools
import sys
__all__ = ["QConnect", "CConnect"]
pulse_length_default = 10 * 10 ** -12 # 10 ps photon pulse length
signal_speed = 2.998 * 10 ** 5 #speed of light in km/s
fiber_length_default = 0.0
[docs]class QConnect:
[docs] def __init__(self, *args, transit_devices=[]):
'''
This is the base class for a quantum connection between multiple agents.
:param agents \*args: list of agents to connect
:param List<Devices> transit_devices: list of devices qubits travel through
'''
agents = list(args)
self.agents = {}
self.source_devices = {}
self.target_devices = {}
self.transit_devices = {}
'''
Create queue to keep track of multiple requests. Name of queue is name of
target agent.
'''
self.queues = {}
for agent in agents:
self.agents.update({agent.name: agent})
self.source_devices.update({agent.name: agent.source_devices})
self.target_devices.update({agent.name: agent.target_devices})
self.transit_devices.update({agent.name: transit_devices})
self.queues.update({agent.name: queue.Queue()})
for agentConnect in agents:
if agentConnect != agent:
agent.qconnections[agentConnect.name] = self
[docs] def put(self, source, target, qubits, source_time):
'''
Constructs full list of devices that each qubit must travel through. Sends the qubits
through source devices. Places qubits and a list of transit and target
devices on the queue. Queue is keyed on the target agent's name.
:param String source: name of agent where the qubits being sent originated
:param String target: name of agent receiving qubits
:param Array qubits: array of numbers corresponding to qubits the source is sending
:param Float source_time: time of source agent before sending qubits
:returns: time qubits took to pass through source devices
'''
source_devices = self.source_devices[source]
transit_devices = self.transit_devices[source]
target_devices = self.target_devices[target]
non_source_devices = {
"transit": transit_devices,
"target": target_devices,
}
program = self.agents[source].program
source_delay = 0
# Keep track of qubits remaining
traveling_qubits = qubits
if not source_devices:
source_delay += pulse_length_default
else:
# Keep track of qubits lost by each device
total_lost_qubits = []
for device in source_devices:
# If qubits are still remaining
if traveling_qubits:
res = device.apply(program, traveling_qubits)
if 'lost_qubits' in res.keys():
lost_qubits = res['lost_qubits']
# Remove lost qubits from traveling qubits
traveling_qubits = list(set(traveling_qubits) - set(lost_qubits))
# Add lost_qubits lost from current device to total_lost_qubits
total_lost_qubits += lost_qubits
if 'delay' in res.keys(): source_delay += res['delay']
else: break
# Invert lost qubits and add to traveling qubits
for q in total_lost_qubits:
if q == 0: total_lost_qubits.append(float("-inf"))
else: total_lost_qubits.append(-q)
traveling_qubits += total_lost_qubits
# Scale source delay time according to number of qubits sent
scaled_source_delay = source_delay*len(qubits)
self.queues[target].put((traveling_qubits, non_source_devices, scaled_source_delay, source_time))
return scaled_source_delay
[docs] def get(self, agent):
'''
Pops qubits off of the agent's queue. Sends qubit through transit and target devices,
simulating a quantum network. Return an array of the qubits that have been altered, as well as
the time it took the qubit to travel through the network. Some qubits may be lost during transmission. If lost,
their value will switch to negative, or, in the case of 0, be set to -inf
:param Agent agent: agent receiving the qubits
:returns: list of qubits, time to pass through transit and target devices, and the source agent's time
'''
traveling_qubits, devices, source_delay, source_time = self.queues[agent.name].get()
agent.qubits = list(set(traveling_qubits + agent.qubits))
program = self.agents[agent.name].program
transit_devices = devices["transit"]
target_devices = devices["target"]
# Number of qubits before any are lost
num_travel_qubits = len(traveling_qubits)
travel_delay = 0
if not transit_devices:
travel_delay += fiber_length_default/signal_speed
if not target_devices:
travel_delay += 0
total_lost_qubits = [q for q in traveling_qubits if q < 0 or q == float("-inf")]
remaining_qubits = [q for q in traveling_qubits if q >= 0]
for device in list(itertools.chain(transit_devices, target_devices)):
# If qubits are remaining
if remaining_qubits:
res = device.apply(program, traveling_qubits)
if 'lost_qubits' in res.keys():
lost_qubits = res['lost_qubits']
# Remove lost qubits from traveling qubits
remaining_qubits = list(set(remaining_qubits) - set(lost_qubits))
# Add lost_qubits lost from current device to total_lost_qubits
total_lost_qubits += lost_qubits
if 'delay' in res.keys(): travel_delay += res['delay']
else: break
# Remove traveling_qubits
agent.qubits = list(set(agent.qubits) - set(traveling_qubits))
lost_qubits_flipped = []
for q in total_lost_qubits:
if q == 0: lost_qubits_flipped.append(float("-inf"))
else:
lost_qubits_flipped.append(-q)
# Add inverted lost qubits to remaining qubits
traveling_qubits = remaining_qubits + lost_qubits_flipped
agent.qubits += traveling_qubits
scaled_delay = travel_delay*num_travel_qubits + source_delay
return traveling_qubits, scaled_delay, source_time
[docs]class CConnect:
[docs] def __init__(self, *args, length=0.0):
'''
This is the base class for a classical connection between multiple agents.
:param agents \*args: list of agents to connect
:param Float length: distance between first and second agent
'''
agents = list(args)
self.agents = {}
'''
Create queue to keep track of multiple requests. Name of queue is name of
target agent.
'''
self.queues = {}
for agent in agents:
self.agents.update({agent.name: agent})
self.queues.update({agent.name: queue.Queue()})
for agentConnect in agents:
if agentConnect != agent:
agent.cconnections[agentConnect.name] = self
self.length = length
[docs] def put(self, target, cbits):
'''
Places cbits on queue keyed on the target Agent's name
:param String target: name of recipient of program
:param Array cbits: array of numbers corresponding to cbits agent is sending
:returns: time for cbits to travel
'''
csource_delay = pulse_length_default * 8 * sys.getsizeof(cbits)
self.queues[target].put((cbits, csource_delay))
return csource_delay
[docs] def get(self, agent):
'''
Pops cbits off of the agent's queue and adds travel delay
:param String agent: name of the agent receiving the cbits
:returns: cbits from source and time they took to travel
'''
cbits, source_delay = self.queues[agent].get()
travel_delay = self.length/signal_speed
scaled_delay = travel_delay*len(cbits) + source_delay
return cbits, scaled_delay