注:本文的python实现基于论文《Dynamic scheduling for flexible job shop with new job insertions by deep reinforcement learning》
其他相关阅读可见个人CSDN专栏之《论文阅读与实现》,如:
作为一个刚入门的小白博主,非常愿意与各位同僚交流,欢迎叨扰!
1.1 算例生成(Instance_Generator)
import random
import numpy as np
Total_Machine=[10,20,30,40,50] #全部机器
Initial_Job_num=20 #初始工件个数
Job_insert=[50,100,200] #工件新到达个数
DDT=[0.5,1.0,1.5] #工件紧急程度
E_ave=[50,100,200] #指数分布
def Instance_Generator(M_num,E_ave,New_insert,DDT):
'''
:param M_num: Machine Number
:param E_ave: exponetional distribution
:param New_insert: New Job insert
:param DDT:DDT
:return: Processing time,A:New Job arrive time,
D:Deliver time,
M_num: Machine Number,
Op_num: Operation Number,
J_num:Job NUMBER
'''
Initial_Job_num=5
Op_num=[random.randint(1,5) for i in range(New_insert+Initial_Job_num)]
Processing_time=[]
for i in range(Initial_Job_num+New_insert):
Job_i=[]
for j in range(Op_num[i]):
k=random.randint(1,M_num-2)
T=list(range(M_num))
random.shuffle(T)
T=T[0:k+1]
O_i = list(np.ones(M_num) * (-1))
for M_i in range(len(O_i)):
if M_i in T:
O_i[M_i]=random.randint(1,50)
Job_i.append(O_i)
Processing_time.append(Job_i)
A1=[0 for i in range(Initial_Job_num)]
A=np.random.exponential(E_ave, size=New_insert)
A=[int(A[i]) for i in range(len(A))]#New Insert Job arrive time
A1.extend(A)
T_ijave = []
for i in range(Initial_Job_num+New_insert):
Tad = []
for j in range(Op_num[i]):
T_ijk = [k for k in Processing_time[i][j] if k != -1]
Tad.append(sum(T_ijk) / len(T_ijk))
T_ijave.append(sum(Tad))
D1=[int(T_ijave[i]*DDT) for i in range(Initial_Job_num)]
D=[int(A1[i]+T_ijave[i]*DDT) for i in range(Initial_Job_num,Initial_Job_num+New_insert)]
D1.extend(D)
O_num=sum(Op_num)
J=dict(enumerate(Op_num))
J_num=Initial_Job_num+New_insert
return Processing_time,A1,D1,M_num,Op_num,J,O_num,J_num
Processing_time,A,D,M_num,Op_num,J,O_num,J_num=Instance_Generator(10,50,10,0.5)
print(Processing_time,A,D,M_num,Op_num,J,O_num,J_num)
1.2 柔性作业车间的机器和工件类(Object_for_FJSP)
class Object:
def __init__(self,I):
self.I=I
self.Start=[]
self.End=[]
self.T=[]
self.assign_for=[]
def _add(self,S,E,obs,t):
#obs:安排的对象
self.Start.append(S)
self.End.append(E)
self.Start.sort()
self.End.sort()
self.T.append(t)
self.assign_for.insert(self.End.index(E),obs)
def idle_time(self):
Idle=[]
try:
if self.Start[0]!=0:
Idle.append([0,self.Start[0]])
K=[[self.End[i],self.Start[i+1]] for i in range(len(self.End)) if self.Start[i+1]-self.End[i]>0]
Idle.extend(K)
except:
pass
return Idle
1.3 车间状态和动作(Job_shop)
import numpy as np
import random
from Instance_Generator import Processing_time,A,D,M_num,Op_num,J,O_num,J_num
from Object_for_FJSP import Object
class Situation:
def __init__(self,J_num,M_num,O_num,J,Processing_time,D,Ai):
self.Ai=Ai #工件到达时间
self.D=D #交货期
self.O_num=O_num #工序总数
self.M_num=M_num #机器数
self.J_num=J_num #工件数
self.J=J #工件对应的工序数
self.Processing_time = Processing_time # 加工时间
self.CTK=[0 for i in range(M_num)] #各机器上最后一道工序的完工时间列表
self.OP=[0 for i in range(J_num)] #各工件的已加工工序数列表
self.UK=[0 for i in range(M_num)] #各机器的实际使用率
self.CRJ=[0 for i in range(J_num)] #工件完工率
# 工件集:
self.Jobs=[]
for i in range(J_num):
F=Object(i)
self.Jobs.append(F)
#机器集
self.Machines = []
for i in range(M_num):
F = Object(i)
self.Machines.append(F)
#更新数据
def _Update(self,Job,Machine):
self.CTK[Machine]=max(self.Machines[Machine].End)
self.OP[Job]+=1
self.UK[Machine]=sum(self.Machines[Machine].T)/self.CTK[Machine]
self.CRJ[Job]=self.OP[Job]/self.J[Job]
#机器平均使用率
def Features(self):
#1 机器平均利用率
U_ave=sum(self.UK)/self.M_num
K=0
for uk in self.UK:
K+=np.square(uk-U_ave)
#2 机器的使用率标准差
U_std=np.sqrt(K/self.M_num)
#3 平均工序完成率
CRO_ave=sum(self.OP)/self.O_num
#4 平均工件工序完成率
CRJ_ave=sum(self.CRJ)/self.J_num
K = 0
for uk in self.CRJ:
K += np.square(uk - CRJ_ave)
#5 工件工序完成率标准差
CRJ_std=np.sqrt(K/self.J_num)
#6 Estimated tardiness rate Tard_e
T_cur=sum(self.CTK)/self.M_num
N_tard,N_left=0,0
for i in range(self.J_num):
if J[i]>self.OP[i]:
N_left+=self.J[i]-self.OP[i]
T_left=0
for j in range(self.OP[i]+1,J[i]):
M_ij=[k for k in self.Processing_time[i][j] if k>0 or kself.D[i]:
N_tard+=self.J[i]-j+1
try:
Tard_e=N_tard/N_left
except:
Tard_e =9999
#7 Actual tardiness rate Tard_a
N_tard, N_left = 0, 0
for i in range(self.J_num):
if J[i] > self.OP[i]:
N_left += self.J[i] - self.OP[i]
try:
if self.CTK[i] > self.D[i]:
N_tard += self.J[i] - j
except:
pass
try:
Tard_a = N_tard / N_left
except:
Tard_a =9999
return U_ave,U_std,CRO_ave,CRJ_ave,CRJ_std,Tard_e,Tard_a
#Composite dispatching rule 1
#return Job,Machine
def rule1(self):
#T_cur:平均完工时间
T_cur = sum(self.CTK) / self.M_num
#Tard_Job:不能按期完成的工件
Tard_Job=[i for i in range(self.J_num) if self.OP[i]PT:
if Idle[i][0]>last_ot:
start_time=Idle[i][0]
pass
if Idle[i][0]PT:
start_time=last_ot
pass
end_time=Start_time+PT
self.Machines[Machine]._add(Start_time,end_time,Job,PT)
self.Jobs[Job]._add(Start_time,end_time,Machine,PT)
self._Update(Job,Machine)
def reward(self,Ta_t,Te_t,Ta_t1,Te_t1,U_t,U_t1):
'''
:param Ta_t: Tard_a(t)
:param Te_t: Tard_e(t)
:param Ta_t1: Tard_a(t+1)
:param Te_t1: Tard_e(t+1)
:param U_t: U_ave(t)
:param U_t1: U_ave(t+1)
:return: reward
'''
if Ta_t1Ta_t:
rt=-1
else:
if Te_t1Te_t:
rt=1
else:
if U_t1>U_t:
rt=1
else:
if U_t1>0.95*U_t:
rt=0
else:
rt=-1
return rt
Sit=Situation(J_num,M_num,O_num,J,Processing_time,D,A)
1.4 DQN
import numpy as np
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import random
from collections import deque
from tensorflow.keras import layers,models
import tensorflow as tf
from Job_Shop import Situation
from tensorflow.keras.optimizers import Adam
from Instance_Generator import Processing_time,A,D,M_num,Op_num,J,O_num,J_num
import matplotlib.pyplot as plt
class DQN:
def __init__(self,):
self.Hid_Size = 30
# ------------Hidden layer=5 30 nodes each layer--------------
model = models.Sequential()
model.add(layers.Input(shape=(7,)))
model.add(layers.Dense(self.Hid_Size, name='l1'))
model.add(layers.Dense(self.Hid_Size, name='l2'))
model.add(layers.Dense(self.Hid_Size, name='l3'))
model.add(layers.Dense(self.Hid_Size, name='l4'))
model.add(layers.Dense(self.Hid_Size, name='l5'))
model.add(layers.Dense(6, name='l6'))
model.compile(loss='mse',
optimizer=Adam(learning_rate=0.001))
# # model.summary()
self.model = model
#------------Q-network Parameters-------------
self.act_dim=[1,2,3,4,5,6] #神经网络的输出节点
self.obs_n=[0,0,0,0,0,0,0] #神经网路的输入节点
self.gama = 0.95 # γ经验折损率
# self.lr = 0.001 # 学习率
self.global_step = 0
self.update_target_steps = 200 # 更新目标函数的步长
self.target_model = self.model
#-------------------Agent-------------------
self.e_greedy=0.6
self.e_greedy_decrement=0.0001
self.L=40 #Number of training episodes L
#---------------Replay Buffer---------------
self.buffer=deque(maxlen=2000)
self.Batch_size=10 # Batch Size of Samples to perform gradient descent
def replace_target(self):
self.target_model.get_layer(name='l1').set_weights(self.model.get_layer(name='l1').get_weights())
self.target_model.get_layer(name='l2').set_weights(self.model.get_layer(name='l2').get_weights())
self.target_model.get_layer(name='l3').set_weights(self.model.get_layer(name='l3').get_weights())
self.target_model.get_layer(name='l4').set_weights(self.model.get_layer(name='l4').get_weights())
self.target_model.get_layer(name='l5').set_weights(self.model.get_layer(name='l5').get_weights())
self.target_model.get_layer(name='l6').set_weights(self.model.get_layer(name='l6').get_weights())
def replay(self):
if self.global_step % self.update_target_steps == 0:
self.replace_target()
# replay the history and train the model
minibatch = random.sample(self.buffer, self.Batch_size)
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
k=self.target_model.predict(next_state)
target = (reward + self.gama *
np.argmax(self.target_model.predict(next_state)))
target_f = self.model.predict(state)
target_f[0][action] = target
self.model.fit(state, target_f, epochs=1, verbose=0)
self.global_step += 1
def Select_action(self,obs):
# obs=np.expand_dims(obs,0)
if random.random()>','执行action:',at,' ','将工件',at_trans[0],'安排到机器',at_trans[1])
Sit.scheduling(at_trans)
obs_t=Sit.Features()
if i==O_num-1:
done=True
#obs = obs_t
obs_t = np.expand_dims(obs_t, 0)
# obs = np.expand_dims(obs, 0)
# print(obs,obs_t)
r_t = Sit.reward(obs[0][6],obs[0][5],obs_t[0][6],obs_t[0][5],obs[0][0],obs_t[0][0])
self._append((obs,at,r_t,obs_t,done))
if k>self.Batch_size:
# batch_obs, batch_action, batch_reward, batch_next_obs,done= self.sample()
self.replay()
Total_reward+=r_t
obs=obs_t
total_tadiness=0
Job=Sit.Jobs
E=0
K=[i for i in range(len(Job))]
End=[]
for Ji in range(len(Job)):
End.append(max(Job[Ji].End))
if max(Job[Ji].End)>D[Ji]:
total_tadiness+=abs(max(Job[Ji].End)-D[Ji])
print('<<<<<<<<>>>>>>>>>')
Total_tard.append(total_tadiness)
print('<<<<<<<<>>>>>>>>>')
TR.append(Total_reward)
# plt.plot(K,End,color='y')
# plt.plot(K,D,color='r')
# plt.show()
plt.plot(x,Total_tard)
plt.show()
return Total_reward
d=DQN()
d.main(J_num, M_num, O_num, J, Processing_time, D, A)
Original: https://blog.csdn.net/crazy_girl_me/article/details/118694032
Author: 码丽莲梦露
Title: Tensorflow2.0|基于深度强化学习(DQN)实现动态柔性作业车间调度问题(DFJSP)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/618612/
转载文章受原作者版权保护。转载请注明原作者出处!