Tensorflow2.0|基于深度强化学习(DQN)实现动态柔性作业车间调度问题(DFJSP)

注:本文的python实现基于论文《Dynamic scheduling for flexible job shop with new job insertions by deep reinforcement learning》

其他相关阅读可见个人CSDN专栏之《论文阅读与实现》,如:

作为一个刚入门的小白博主,非常愿意与各位同僚交流,欢迎叨扰!

1.1 算例生成(Instance_Generator)

import random
import numpy as np

Total_Machine=[10,20,30,40,50]  #全部机器
Initial_Job_num=20              #初始工件个数
Job_insert=[50,100,200]         #工件新到达个数
DDT=[0.5,1.0,1.5]               #工件紧急程度
E_ave=[50,100,200]              #指数分布

def Instance_Generator(M_num,E_ave,New_insert,DDT):
    '''
    :param M_num: Machine Number
    :param E_ave: exponetional distribution
    :param New_insert: New Job insert
    :param DDT:DDT
    :return: Processing time,A:New Job arrive time,
                                D:Deliver time,
                                M_num: Machine Number,
                                Op_num: Operation Number,
                                J_num:Job NUMBER
    '''
    Initial_Job_num=5
    Op_num=[random.randint(1,5) for i in range(New_insert+Initial_Job_num)]
    Processing_time=[]
    for i in range(Initial_Job_num+New_insert):
        Job_i=[]
        for j in range(Op_num[i]):
            k=random.randint(1,M_num-2)
            T=list(range(M_num))
            random.shuffle(T)
            T=T[0:k+1]
            O_i = list(np.ones(M_num) * (-1))
            for M_i in range(len(O_i)):
                if M_i in T:
                    O_i[M_i]=random.randint(1,50)
            Job_i.append(O_i)
        Processing_time.append(Job_i)
    A1=[0 for i in range(Initial_Job_num)]
    A=np.random.exponential(E_ave, size=New_insert)
    A=[int(A[i]) for i in range(len(A))]#New Insert Job arrive time
    A1.extend(A)
    T_ijave = []
    for i in range(Initial_Job_num+New_insert):
        Tad = []
        for j in range(Op_num[i]):
            T_ijk = [k for k in Processing_time[i][j] if k != -1]
            Tad.append(sum(T_ijk) / len(T_ijk))
        T_ijave.append(sum(Tad))
    D1=[int(T_ijave[i]*DDT) for i in range(Initial_Job_num)]
    D=[int(A1[i]+T_ijave[i]*DDT) for i in range(Initial_Job_num,Initial_Job_num+New_insert)]
    D1.extend(D)
    O_num=sum(Op_num)
    J=dict(enumerate(Op_num))
    J_num=Initial_Job_num+New_insert

    return Processing_time,A1,D1,M_num,Op_num,J,O_num,J_num

Processing_time,A,D,M_num,Op_num,J,O_num,J_num=Instance_Generator(10,50,10,0.5)
print(Processing_time,A,D,M_num,Op_num,J,O_num,J_num)

1.2 柔性作业车间的机器和工件类(Object_for_FJSP)

class Object:
    def __init__(self,I):
        self.I=I
        self.Start=[]
        self.End=[]
        self.T=[]
        self.assign_for=[]

    def _add(self,S,E,obs,t):
        #obs:安排的对象
        self.Start.append(S)
        self.End.append(E)
        self.Start.sort()
        self.End.sort()
        self.T.append(t)
        self.assign_for.insert(self.End.index(E),obs)

    def idle_time(self):
        Idle=[]
        try:
            if self.Start[0]!=0:
                Idle.append([0,self.Start[0]])
            K=[[self.End[i],self.Start[i+1]] for i in range(len(self.End)) if self.Start[i+1]-self.End[i]>0]
            Idle.extend(K)
        except:
            pass
        return  Idle

1.3 车间状态和动作(Job_shop)

import numpy as np
import random
from Instance_Generator import Processing_time,A,D,M_num,Op_num,J,O_num,J_num
from Object_for_FJSP import Object

class Situation:
    def __init__(self,J_num,M_num,O_num,J,Processing_time,D,Ai):
        self.Ai=Ai                  #工件到达时间
        self.D=D                    #交货期
        self.O_num=O_num            #工序总数
        self.M_num=M_num            #机器数
        self.J_num=J_num            #工件数
        self.J=J                    #工件对应的工序数
        self.Processing_time = Processing_time   # 加工时间
        self.CTK=[0 for i in range(M_num)]      #各机器上最后一道工序的完工时间列表
        self.OP=[0 for i in range(J_num)]       #各工件的已加工工序数列表
        self.UK=[0 for i in range(M_num)]       #各机器的实际使用率
        self.CRJ=[0 for i in range(J_num)]      #工件完工率
        # 工件集:
        self.Jobs=[]
        for i in range(J_num):
            F=Object(i)
            self.Jobs.append(F)
        #机器集
        self.Machines = []
        for i in range(M_num):
            F = Object(i)
            self.Machines.append(F)

    #更新数据
    def _Update(self,Job,Machine):
        self.CTK[Machine]=max(self.Machines[Machine].End)
        self.OP[Job]+=1
        self.UK[Machine]=sum(self.Machines[Machine].T)/self.CTK[Machine]
        self.CRJ[Job]=self.OP[Job]/self.J[Job]

    #机器平均使用率
    def Features(self):

        #1 机器平均利用率
        U_ave=sum(self.UK)/self.M_num
        K=0
        for uk in self.UK:
            K+=np.square(uk-U_ave)
        #2 机器的使用率标准差
        U_std=np.sqrt(K/self.M_num)
        #3 平均工序完成率
        CRO_ave=sum(self.OP)/self.O_num
        #4 平均工件工序完成率
        CRJ_ave=sum(self.CRJ)/self.J_num
        K = 0
        for uk in self.CRJ:
            K += np.square(uk - CRJ_ave)
        #5 工件工序完成率标准差
        CRJ_std=np.sqrt(K/self.J_num)
        #6 Estimated tardiness rate Tard_e
        T_cur=sum(self.CTK)/self.M_num
        N_tard,N_left=0,0
        for i in range(self.J_num):
            if J[i]>self.OP[i]:
                N_left+=self.J[i]-self.OP[i]
                T_left=0
                for j in range(self.OP[i]+1,J[i]):
                    M_ij=[k for k in self.Processing_time[i][j] if k>0 or kself.D[i]:
                        N_tard+=self.J[i]-j+1
        try:
            Tard_e=N_tard/N_left
        except:
            Tard_e =9999
        #7 Actual tardiness rate Tard_a
        N_tard, N_left = 0, 0
        for i in range(self.J_num):
            if J[i] > self.OP[i]:
                N_left += self.J[i] - self.OP[i]
                try:
                    if self.CTK[i] > self.D[i]:
                        N_tard += self.J[i] - j
                except:
                    pass
        try:
            Tard_a = N_tard / N_left
        except:
            Tard_a =9999
        return U_ave,U_std,CRO_ave,CRJ_ave,CRJ_std,Tard_e,Tard_a

    #Composite dispatching rule 1
    #return Job,Machine
    def rule1(self):
        #T_cur:平均完工时间
        T_cur = sum(self.CTK) / self.M_num
        #Tard_Job:不能按期完成的工件
        Tard_Job=[i for i in range(self.J_num) if self.OP[i]PT:
                if Idle[i][0]>last_ot:
                    start_time=Idle[i][0]
                    pass
                if Idle[i][0]PT:
                    start_time=last_ot
                    pass
        end_time=Start_time+PT
        self.Machines[Machine]._add(Start_time,end_time,Job,PT)
        self.Jobs[Job]._add(Start_time,end_time,Machine,PT)
        self._Update(Job,Machine)

    def reward(self,Ta_t,Te_t,Ta_t1,Te_t1,U_t,U_t1):
        '''
               :param Ta_t: Tard_a(t)
               :param Te_t: Tard_e(t)
               :param Ta_t1: Tard_a(t+1)
               :param Te_t1: Tard_e(t+1)
               :param U_t: U_ave(t)
               :param U_t1: U_ave(t+1)
               :return: reward
        '''
        if Ta_t1Ta_t:
                rt=-1
            else:
                if Te_t1Te_t:
                        rt=1
                    else:
                        if U_t1>U_t:
                            rt=1
                        else:
                            if U_t1>0.95*U_t:
                                rt=0
                            else:
                                rt=-1
        return rt

Sit=Situation(J_num,M_num,O_num,J,Processing_time,D,A)

1.4 DQN

import numpy as np
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import random
from collections import deque
from tensorflow.keras import layers,models
import tensorflow as tf
from Job_Shop import Situation
from tensorflow.keras.optimizers import Adam
from Instance_Generator import Processing_time,A,D,M_num,Op_num,J,O_num,J_num
import matplotlib.pyplot as plt

class DQN:
    def __init__(self,):
        self.Hid_Size = 30

        # ------------Hidden layer=5   30 nodes each layer--------------
        model = models.Sequential()
        model.add(layers.Input(shape=(7,)))
        model.add(layers.Dense(self.Hid_Size, name='l1'))
        model.add(layers.Dense(self.Hid_Size, name='l2'))
        model.add(layers.Dense(self.Hid_Size, name='l3'))
        model.add(layers.Dense(self.Hid_Size, name='l4'))
        model.add(layers.Dense(self.Hid_Size, name='l5'))
        model.add(layers.Dense(6, name='l6'))
        model.compile(loss='mse',
                      optimizer=Adam(learning_rate=0.001))
        # # model.summary()
        self.model = model

        #------------Q-network Parameters-------------
        self.act_dim=[1,2,3,4,5,6]                        #神经网络的输出节点
        self.obs_n=[0,0,0,0,0,0,0]                            #神经网路的输入节点
        self.gama = 0.95  # γ经验折损率
        # self.lr = 0.001  # 学习率
        self.global_step = 0
        self.update_target_steps = 200  # 更新目标函数的步长
        self.target_model = self.model

        #-------------------Agent-------------------
        self.e_greedy=0.6
        self.e_greedy_decrement=0.0001
        self.L=40          #Number of training episodes L

        #---------------Replay Buffer---------------
        self.buffer=deque(maxlen=2000)
        self.Batch_size=10       # Batch Size of Samples to perform gradient descent

    def replace_target(self):
        self.target_model.get_layer(name='l1').set_weights(self.model.get_layer(name='l1').get_weights())
        self.target_model.get_layer(name='l2').set_weights(self.model.get_layer(name='l2').get_weights())
        self.target_model.get_layer(name='l3').set_weights(self.model.get_layer(name='l3').get_weights())
        self.target_model.get_layer(name='l4').set_weights(self.model.get_layer(name='l4').get_weights())
        self.target_model.get_layer(name='l5').set_weights(self.model.get_layer(name='l5').get_weights())
        self.target_model.get_layer(name='l6').set_weights(self.model.get_layer(name='l6').get_weights())

    def replay(self):
        if self.global_step % self.update_target_steps == 0:
            self.replace_target()
        # replay the history and train the model
        minibatch = random.sample(self.buffer, self.Batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                k=self.target_model.predict(next_state)
                target = (reward + self.gama *
                          np.argmax(self.target_model.predict(next_state)))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        self.global_step += 1

    def Select_action(self,obs):
        # obs=np.expand_dims(obs,0)
        if random.random()>','执行action:',at,' ','将工件',at_trans[0],'安排到机器',at_trans[1])
                Sit.scheduling(at_trans)
                obs_t=Sit.Features()
                if i==O_num-1:
                    done=True
                #obs = obs_t
                obs_t = np.expand_dims(obs_t, 0)
                # obs = np.expand_dims(obs, 0)
                # print(obs,obs_t)
                r_t = Sit.reward(obs[0][6],obs[0][5],obs_t[0][6],obs_t[0][5],obs[0][0],obs_t[0][0])
                self._append((obs,at,r_t,obs_t,done))
                if k>self.Batch_size:
                    # batch_obs, batch_action, batch_reward, batch_next_obs,done= self.sample()
                    self.replay()
                Total_reward+=r_t
                obs=obs_t
            total_tadiness=0
            Job=Sit.Jobs
            E=0
            K=[i for i in range(len(Job))]
            End=[]
            for Ji in range(len(Job)):
                End.append(max(Job[Ji].End))
                if max(Job[Ji].End)>D[Ji]:
                    total_tadiness+=abs(max(Job[Ji].End)-D[Ji])
            print('<<<<<<<<>>>>>>>>>')
            Total_tard.append(total_tadiness)
            print('<<<<<<<<>>>>>>>>>')
            TR.append(Total_reward)
            # plt.plot(K,End,color='y')
            # plt.plot(K,D,color='r')
            # plt.show()
        plt.plot(x,Total_tard)
        plt.show()
        return Total_reward

d=DQN()
d.main(J_num, M_num, O_num, J, Processing_time, D, A)

Original: https://blog.csdn.net/crazy_girl_me/article/details/118694032
Author: 码丽莲梦露
Title: Tensorflow2.0|基于深度强化学习(DQN)实现动态柔性作业车间调度问题(DFJSP)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/618612/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球