Change signal timing to reduce signal waiting.
There is a network as shown below, and there are signals at four locations, D, E, H, and I.
Each of the four users will move as follows:
--At time 0, leave A for L --At time 2, leave B for K --At time 1, leave C for J --At time 2, leave G and head for F
There are 4 patterns of signal timing, and it is assumed that they can move as shown in the table below.
pattern | T1 | T2 | T3 | T4 |
---|---|---|---|---|
0 | Up and down | Left and right | Left and right | Up and down |
1 | Up and down | Up and down | Left and right | Left and right |
2 | Left and right | Up and down | Up and down | Left and right |
3 | Left and right | Left and right | Up and down | Up and down |
At this time, the timing pattern of the signals D, E, H, and I that flows most smoothly is found.
python3
%matplotlib inline
import networkx as nx, matplotlib.pyplot as plt
from more_itertools import chunked
plt.rcParams['figure.figsize'] = 4, 4
g = nx.DiGraph()
for i, ar in enumerate(['ADBECDEFGHIJHKIL', '', 'DEDHEIHI']):
for fr, to in chunked(ar, 2):
g.add_edge(fr, to, weight=i+1)
if i == 2:
g.add_edge(to, fr, weight=i+1)
pos = {chr(i+65):(int(x),int(y)) for i, (x,y)
in enumerate(chunked('154504144454011141511040', 2))}
nx.draw(g, pos, node_color='white')
nx.draw_networkx_labels(g, pos);
Separate layers for each user. It is 3D user x time x sky.
python3
import numpy as np, pandas as pd
from pulp import *
from ortoolpy import addvars, addbinvars
def make(g, T):
stnd, ednd, sttm = 'ABCG', 'LKJF', [0,2,1,2] #The beginning, end, and time of the generating user
for la in range(4):
for t1 in range(T):
for nd in g.nodes():
yield la, nd, t1, nd, t1+1
for fr, dc in g.edge.items():
for to, atr in dc.items():
t2 = t1 + atr['weight']
if t2 <= T:
yield la, fr, t1, to, t2
for l, c in enumerate(stnd):
yield l, '_', 0, c, sttm[l]
for l, c in enumerate(ednd):
for t in range(8,T):
t2 = t+sttm[l]
if t2 < T:
yield l, c, t2, '_', 0
T = 13
a = pd.DataFrame(make(g, T), columns=['Layer', 'FrNd', 'FrTm', 'ToNd', 'ToTm'])
a['From'] = a.FrNd+a.FrTm.astype(str)
a['To'] = a.ToNd+a.ToTm.astype(str)
a['Weight'] = a.ToTm - a.FrTm
a.loc[a.FrNd == a.ToNd, 'Weight'] = 0.001
a.loc[(a.FrNd == '_') | (a.ToNd == '_'), 'Weight'] = 0
a[:3]
Layer | FrNd | FrTm | ToNd | ToTm | From | To | Weight | |
---|---|---|---|---|---|---|---|---|
0 | 0 | J | 0 | J | 1 | J0 | J1 | 0.001 |
1 | 0 | A | 0 | A | 1 | A0 | A1 | 0.001 |
2 | 0 | B | 0 | B | 1 | B0 | B1 | 0.001 |
python3
m = LpProblem()
vs = addbinvars(4, 4) #DEHI offset
a['Var'] = addvars(len(a), upBound=1) #flow
m += lpDot(a.Weight, a.Var) #Objective function
for i in range(4):
m += lpSum(vs[i]) == 1 #Only one offset
for v in a[a.FrNd=='_'].Var:
m += v == 1 #Occurs for each layer
for l in range(4):
b = a[a.Layer == l]
for nd in set(b.From.unique()) | set(b.To.unique()):
#Equal on and out on each layer
m += lpSum(b[b.From == nd].Var) == lpSum(b[b.To == nd].Var)
for t in range(T):
b = a[a.FrTm == t]
for i, s in enumerate(['DH', 'EI', 'HK', 'IL']):
c = b[b.FrNd==s[0]]
#Signal control
m += lpSum(c[c.ToNd==s[1]].Var) <= vs[i][(t+0)%4]+vs[i][(t+1)%4]
m += lpSum(c[c.ToNd!=s[1]].Var) <= vs[i][(t+2)%4]+vs[i][(t+3)%4]
%time m.solve()
print(LpStatus[m.status], value(m.objective))
print('timing', np.vectorize(value)(vs)@np.arange(4))
>>>
Wall time: 79.5 ms
Optimal 32.001000000000005
timing[ 1. 3. 2. 0.]
that's all