分布式训练(二)——分布式策略

目录

1.为什么需要分布式

2.tensorflow 支持的分布式策略

2.1 MirroredStrategy

2.2 CentralStorageStrategy

2.3 MultiworkerMirroredStrategy

2.4 TPUStrategy

2.5 ParameterServerStrategy

2.6 同步式和异步式

2.6.1 图解同步和异步

2.6.2 同步和异步的优劣

3. 实战

3.1 keras distributed

3.1.1 keras baseline

3.1.2 MirroredStrategy distributed

3.2 estimator distributed

3.2.1 estimator baseline

3.2.2 estimator distributed

3.2.3 tensorboard展示

3.3 自定义训练流程的分布式

3.3.1 custom train baseline

3.3.2 custom train distributed

1.为什么需要分布式

训练程序跑的慢。

训练程序跑的慢的原因:

(1)数据量太大

为了达到更好的效果,往往需要更多的训练数据。数据越多,模型的稳健性越好。

[En]

In order to achieve better results, more training data are often needed. The more data, the better the robustness of the model.

模型的训练又以epoch为单位,把训练数据迭代一次,就是一个epoch。Epoch值越大,理论上来说,模型训练的越好,而当数据太大的时候,就会导致做一个epoch所需要的时间比较长

(2)模型太复杂

为了得到较好的结果,往往需要复杂的模型。复杂的模型通常意味着更大的计算量,从而导致程序变慢。

[En]

In order to get good results, complex models are often needed. Complex models often mean a larger amount of computation, resulting in slower programs.

日益增长的数据量和模型的复杂度与落后的计算资源之间的矛盾。正因为有了这个矛盾,单GPU的训练模式在工业界往往不被采用,一般会采用分布式的网络结构。

2.tensorflow 支持的分布式策略

2.1 MirroredStrategy

(1) 同步式分布式训练

同步式: 即有些东西是需要同步的。什么会被同步呢?参数会被同步。

分布式训练: 即意味着会用到所有的gpu,每个gpu都是分布式中的一个节点。

(2) 适用于一机多卡(在同一台机器上,有多个GPU)情况

(3) 每个GPU都有网络结构的所有参数,这些参数会被同步

(4) 数据并行(通过数据并行来进行加速)

Batch数据切分为N份分给各个GPU

梯度聚合然后更新给各个GPU上的参数

2.2 CentralStorageStrategy

(1) MirroredStorageStrategy的变种

(2) 参数不是在每个GPU上,而是存储在一个设备上

CPU或者唯一的GPU上

(3) 计算是在所有GPU上并行的

除了更新参数的计算之外(因为我存储参数的设备可能是cpu,当只有1个gpu时,是存储在这个唯一的gpu上。)

2.3 MultiworkerMirroredStrategy

(1)类似于MirroredStrategy

(2)适用于多机多卡情况

2.4 TPUStrategy

(1)类似于MirroredStrategy

(2)适用于在TPU上的策略

2.5 ParameterServerStrategy

(1)异步式分布

异步分布式:异步是”异步”在参数上。

MirroredStrategy:同步分布式:同步也是”同步”在参数上

(2)更加适用于大规模分布式系统

(3)机器分为Parameter Server和worker两类

Parameter Server负责整合梯度,更新参数

Worker负责计算,训练网络

(4)ParameterServerStrategy工作原理

分布式训练(二)——分布式策略

2.6 同步式和异步式

2.6.1 图解同步和异步

分布式训练(二)——分布式策略

分布式训练(二)——分布式策略

2.6.2 同步和异步的优劣

  • 一机多卡,一般一个机器上,会放同一型号的gpu,同步可避免过多的通信。
  • 多机多卡,异步可以避免短板效应,因为在多机多卡的环境下,各个机器上装的gpu的类型不一样,程序运行速度就会有快慢之分,同步就会导致短板效应。
  • 异步在机器计算后直接提前更新参数,导致一些机器获取的参数不是最新的。所以异步并不是严格正确的,正是因为它不是严格正确的,所以模型更容易容忍错误,所以泛化能力越高。

    [En]

    Asynchronous updates the parameters directly in advance after the calculation of a machine, so that the parameters obtained by some machines are not up-to-date. So asynchrony is not strictly correct, precisely because it is not strictly correct, the model is more likely to tolerate errors, so the higher the generalization ability.*

  • 实战

3.1 keras distributed

3.1.1 keras baseline

分布式策略的对比基线(对fashion_minist数据集进行分类)

载入需要引入的模块

import os
import sys
import time

import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras

加载数据

fashion_mnist = keras.datasets.fashion_mnist
(x_train_all, y_train_all), (x_test, y_test) = fashion_mnist.load_data()

x_valid, x_train = x_train_all[:5000], x_train_all[5000:]
y_valid, y_train = y_train_all[:5000], y_train_all[5000:]

对数据进行归一化

from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train [None, 28, 28] --> [None, 784]
x_train_scaler = scaler.fit_transform(x_train.reshape(-1, 784)).reshape(-1, 28, 28, 1) # 最后一维 1,表示1个通道
x_valid_scaler = scaler.transform(x_valid.reshape(-1, 784)).reshape(-1, 28, 28, 1)
x_test_scaler = scaler.transform(x_test.reshape(-1, 784)).reshape(-1, 28, 28, 1)

构造dataset

def make_dataset(images, labels, epochs, batch_size, shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    if shuffle:
        dataset = dataset.shuffle(10000)

    # prefetch:表示从数据中预先取出来多少个,来给生成数据作准备。为什么说是用来加速的一个函数?
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
    return dataset

batch_size = 256
epochs = 100
train_dataset = make_dataset(x_train_scaler, y_train, epochs, batch_size)

构造一个20层的模型

model = keras.models.Sequential()
model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
                              padding='same',
                              activation='selu',
                              input_shape=(28, 28, 1)))
model.add(keras.layers.SeparableConv2D(filters=128, kernel_size=3,
                                       padding='same',
                                       activation='selu'))
model.add(keras.layers.MaxPool2D(pool_size=2))

一般每进行一次pooling层,图像的大小就会缩小,中间的数据就会大大减少,为减少这种信息的损失,故将filters翻倍。
model.add(keras.layers.SeparableConv2D(filters=256, kernel_size=3,
                                       padding='same',
                                       activation='selu'))
model.add(keras.layers.SeparableConv2D(filters=256, kernel_size=3,
                                       padding='same',
                                       activation='selu'))
model.add(keras.layers.MaxPool2D(pool_size=2))

model.add(keras.layers.SeparableConv2D(filters=512, kernel_size=3,
                                       padding='same',
                                       activation='selu'))
model.add(keras.layers.SeparableConv2D(filters=512, kernel_size=3,
                                       padding='same',
                                       activation='selu'))
model.add(keras.layers.MaxPool2D(pool_size=2))

展平
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(512, activation='selu')) # 全链接层
model.add(keras.layers.Dense(10, activation="softmax")) # 全链接层

model.compile(loss=keras.losses.SparseCategoricalCrossentropy(),
              optimizer=keras.optimizers.SGD(),
              metrics=["accuracy"])

model.summary()

训练模型

history = model.fit(train_dataset,
                    steps_per_epoch = x_train_scaler.shape[0] // batch_size,
                    epochs=10)

查看模型使用时间

分布式训练(二)——分布式策略

3.1.2 MirroredStrategy distributed

在3.1.1代码的基础上做如下修改

设置每个GPU内存自增长

tf.debugging.set_log_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

logical_gpus = tf.config.experimental.list_logical_devices('GPU')

print("the num of gpu", len(gpus))
print("the num of logical gpu", len(logical_gpus))

or 划分逻辑GPU

tf.debugging.set_log_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')

对GPU进行切分
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.LogicalDeviceConfiguration(memory_limit=3072),
     tf.config.LogicalDeviceConfiguration(memory_limit=3072)]
)

logical_gpus = tf.config.experimental.list_logical_devices('GPU')

print("the num of gpu", len(gpus))
print("the num of logical gpu", len(logical_gpus))

Note:设置每个GPU内存自增长 和 划分逻辑GPU不能同时进行

增大batch_zise

batch_size_per_replica = 256
batch_size = batch_size_per_replica * len(logical_gpus)

使用MirroredStrategy策略”同步分布式”训练

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
                                padding='same',
                                activation='selu',
                                input_shape=(28, 28, 1)))
    model.add(keras.layers.SeparableConv2D(filters=128, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    # 一般每进行一次pooling层,图像的大小就会缩小,中间的数据就会大大减少,为减少这种信息的损失,故将filters翻倍。
    model.add(keras.layers.SeparableConv2D(filters=256, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.SeparableConv2D(filters=256, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    model.add(keras.layers.SeparableConv2D(filters=512, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.SeparableConv2D(filters=512, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    # 展平
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(512, activation='selu')) # 全链接层
    model.add(keras.layers.Dense(10, activation="softmax")) # 全链接层

    model.compile(loss=keras.losses.SparseCategoricalCrossentropy(),
                optimizer=keras.optimizers.SGD(),
                metrics=["accuracy"])

训练,获得训练时间和准确率

[En]

Train, get the training time and accuracy

分布式训练(二)——分布式策略

可以看到训练一个epoch时间是4s,时间提升了3倍,但是准确率有所下降,why?

因为batch_size变大,整个模型训练的次数变少

令Batch_size不变,重新训练

分布式训练(二)——分布式策略

Batch_size分到每个gpu,每个gpu处理的数据变少,算力没有被真正的开发出来,所以加速比不是很高。

真实分布式场景下,一般是推荐将batch_size 设置大一些,因为我们还可以通过其他的调参手段来提高准确率(比如调整学习率,调整参数的初始化方法,添加batch_normorlize加速效果的提升)进而可以达到,效果不变,但是速度有极大提升。

3.2 estimator distributed

同样,也是在3.1.1 的代码上做修改

3.2.1 estimator baseline

将model转化为estimator,并设置baseline_model_dir

baseline_model_dir = "**/estimator_baseline"
estimator = keras.estimator.model_to_estimator(keras_model = model,
                                               model_dir = baseline_model_dir)

训练模型

estimator.train(input_fn = lambda: make_dataset(
    x_train_scaler, y_train, epochs, batch_size),
    max_steps = 5000)

3.2.2 estimator distributed

将model转化为estimator,并设置distributed_model_dir,并加入分布式训练策略

distributed_model_dir = "**/estimator_distributed"
strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(train_distribute = strategy)
estimator = keras.estimator.model_to_estimator(keras_model = model,
                                               model_dir = model_dir,
                                               config=config)

训练模型

estimator.train(input_fn = lambda: make_dataset(
    x_train_scaler, y_train, epochs, batch_size),
    max_steps = 5000)

3.2.3 tensorboard展示

将baseline和distributed estimator 训练过程同时展示在tensorboard上

python3 /*/Python/3.8/lib/python/site-packages/tensorboard/main.py –logdir=baseline:estimator_baseline, distributed:estimator_distributed

其中:logdir以key:value的形式给出,key为tensorboard 中展现的名字,value是文件夹的名字。

分布式训练(二)——分布式策略

Baseline竟然比分布式estimator还要快?

  1. 做分布式的时候,dataset也需要是分布式的,因为我们做的事数据并行。而对estimator来说,数据是没有做数据并行的,因而在分布式中,数据I/O成了瓶颈。
  2. 在estimator_baseline 和estimator_distribution中batch_size是一样的,都是256,对分布式来说,用256就会有算力的浪费。因而,尽可能设置一个比较大的batch_size,可能会比较合理。当设置比较大的batch_size,数据批量载入,就没有I/O瓶颈了。

3.3 自定义训练流程的分布式

3.3.1 custom train baseline

同样,也是在3.1.1 的代码上做修改

因为是自己定义训练流程,所以dataset的生成要有所改变,即dataset为1个epoch

def make_dataset(images, labels, epochs, batch_size, shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    if shuffle:
        dataset = dataset.shuffle(10000)

    # prefetch:表示从数据中预先取出来多少个,来给生成数据作准备。为什么说是用来加速的一个函数?
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
    return dataset

batch_size = 256
train_dataset = make_dataset(x_train_scaler, y_train, 1, batch_size)
valid_dataset = make_dataset(x_valid_scaler, y_valid, 1, batch_size)

删除掉model.compile

自定义训练流程

  1. define losses functions.

  2. define function train_step

  3. define function test_step
  4. for-loop training loop
customized training loop

1. define losses functions
loss_func中reduction表示计算出来损失函数之后,如何聚合
SUM_OVER_BATCH_SIZE 表示在整个batch_size上求和
loss_func = keras.losses.SparseCategoricalCrossentropy(
    reduction = keras.losses.Reduction.SUM_OVER_BATCH_SIZE)
test_loss = keras.metrics.Mean(name = "test_loss")
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "train_accuracy")
test_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "test_accuracy")

optimizer = keras.optimizers.SGD(learning_rate = 0.01)

2. define function train_step
转换成@tf.function的图,压缩计算,训练时间有了大大提高
@tf.function
def train_step(inputs):
    images, labels = inputs
    with tf.GradientTape() as tape:
        predictions = model(images, training = True)
        loss = loss_func(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_accuracy.update_state(labels, predictions)
    return loss

3. define function test_step
@tf.function
def test_step(inputs):
    images, labels = inputs
    predictions = model(images)
    t_loss = loss_func(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)

4. for-loop training looop
epochs = 10
for epoch in range(epochs):
    total_loss = 0.0
    num_batches = 0 # 指在这次epoch中,已经计算过几个batch了
    for x in train_dataset:
        start_time = time.time()
        total_loss += train_step(x)
        run_time = time.time() - start_time

        num_batches += 1
        print('\rtotal_loss: %3.3f, num_batches: %d, average_loss: %3.3f, '
        'time: %3.3f' %(total_loss, num_batches, total_loss/num_batches,
                        run_time))

    train_loss = total_loss / num_batches

    for x in valid_dataset:
        test_step(x)

    print('\rEpooch: %d, loss: %3.3f, Acc: %3.3f, Val_loss: %3.3f, '
    'Val_Acc: %3.3f' %(epoch + 1, train_loss, train_accuracy.result(),
                       test_loss.result(), test_accuracy.result()))

    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()

得到训练输出

3.3.2 custom train distributed

在3.3.1的代码基础上修改

对GPU做切分

tf.debugging.set_log_device_placement(True)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

对GPU进行切分
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.LogicalDeviceConfiguration(memory_limit=3072),
     tf.config.LogicalDeviceConfiguration(memory_limit=3072)]
)

logical_gpus = tf.config.experimental.list_logical_devices('GPU')

print("the num of gpu", len(gpus))
print("the num of logical gpu", len(logical_gpus))

(1)将train_dataset,和valid_dataset改造成分布式的dataset

def make_dataset(images, labels, epochs, batch_size, shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    if shuffle:
        dataset = dataset.shuffle(10000)

    # prefetch:表示从数据中预先取出来多少个,来给生成数据作准备。为什么说是用来加速的一个函数?
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
    return dataset

strategy = tf.distribute.MirroredStrategy()
with strategy.scope():

    batch_size_per_replica = 256
    batch_size = batch_size_per_replica * len(logical_gpus)
    train_dataset = make_dataset(x_train_scaler, y_train, 1, batch_size)
    valid_dataset = make_dataset(x_valid_scaler, y_valid, 1, batch_size)

    train_dataset_distribute = strategy.experimental_distribute_dataset(
        train_dataset)
    valid_dataset_distribute = strategy.experimental_distribute_dataset(
        valid_dataset)

MirroredStrategy 数据并行,dataset获取一个batch之后,平分成len(logical_gpu)份,到各个gpu去计算梯度,然后再将梯度做聚合,更新所有gpu上的参数。

对dataset做分布式策略,即每个GPU都有个dataset,这个dataset每次就给这个GPU专门供数据,因而,加快了数据传输的速度。

(2)Model定义在分布式的scope下进行

with strategy.scope():
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
                                padding='same',
                                activation='selu',
                                input_shape=(28, 28, 1)))
    model.add(keras.layers.SeparableConv2D(filters=128, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    # 一般每进行一次pooling层,图像的大小就会缩小,中间的数据就会大大减少,为减少这种信息的损失,故将filters翻倍。
    model.add(keras.layers.SeparableConv2D(filters=256, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.SeparableConv2D(filters=256, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    model.add(keras.layers.SeparableConv2D(filters=512, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.SeparableConv2D(filters=512, kernel_size=3,
                                        padding='same',
                                        activation='selu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))

    # 展平
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(512, activation='selu')) # 全链接层
    model.add(keras.layers.Dense(10, activation="softmax")) # 全链接层

(3)修改loss_function的reduction,并将train_step的loss聚合

Note:test_step的loss 不用computer_loss 得到,因为test_loss,test_accuracy,本来就是累计的,所以不用做聚合

customized training loop
with strategy.scope():
    # 1. define losses functions
    # batch_size, 每个gpu上分到的数据batch_size / gpu个数
    # loss_func中reduction表示计算出来损失函数之后,如何聚合,选SUM_OVER_BATCH_SIZE,
    # 就会出错
    loss_func = keras.losses.SparseCategoricalCrossentropy(
        reduction = keras.losses.Reduction.NONE)
    def compute_loss(labels, predictions):
        per_replica_loss = loss_func(labels, predictions)
        return tf.nn.compute_average_loss(per_replica_loss,
                                          global_batch_size = batch_size)

    test_loss = keras.metrics.Mean(name = "test_loss")
    train_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "train_accuracy")
    test_accuracy = keras.metrics.SparseCategoricalAccuracy(name = "test_accuracy")

    optimizer = keras.optimizers.SGD(learning_rate = 0.01)

    # 2. define function train_step
    def train_step(inputs):
        images, labels = inputs
        with tf.GradientTape() as tape:
            predictions = model(images, training = True)
            loss = compute_loss(labels, predictions)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        train_accuracy.update_state(labels, predictions)
        return loss

    @tf.function
    def distributed_train_step(inputs):
        per_replica_average_loss = strategy.run(train_step, args=(inputs, ))
        return strategy.reduce(tf.distribute.ReduceOp.SUM,
                               per_replica_average_loss,
                               axis = None)

    # 3. define function test_step
    def test_step(inputs):
        images, labels = inputs
        predictions = model(images)
        # 注意:t_loss 不用computer_loss 得到,因为test_loss,test_accuracy
        # 本来就是累计的,所以不用做聚合
        t_loss = loss_func(labels, predictions)

        test_loss.update_state(t_loss)
        test_accuracy.update_state(labels, predictions)

    @tf.function
    def distributed_test_step(inputs):
        strategy.run(test_step, args=(inputs, ))

    # 4. for-loop training looop
    epochs = 10
    for epoch in range(epochs):
        total_loss = 0.0
        num_batches = 0 # 指在这次epoch中,已经计算过几个batch了
        for x in train_dataset:
            start_time = time.time()
            total_loss += distributed_train_step(x)
            run_time = time.time() - start_time

            num_batches += 1
            print('\rtotal_loss: %3.3f, num_batches: %d, average_loss: %3.3f, '
            'time: %3.3f' %(total_loss, num_batches, total_loss/num_batches,
                            run_time))

        train_loss = total_loss / num_batches

        for x in valid_dataset:
            distributed_test_step(x)

        print('\rEpooch: %d, loss: %3.3f, Acc: %3.3f, Val_loss: %3.3f, '
        'Val_Acc: %3.3f' %(epoch + 1, train_loss, train_accuracy.result(),
                        test_loss.result(), test_accuracy.result()))

        train_accuracy.reset_states()
        test_loss.reset_states()
        test_accuracy.reset_states()

在查看时间之前,它是否比使用分布式之前的时间更短?

[En]

Before checking the time, is it less time than before using distributed?

Original: https://blog.csdn.net/zhao_crystal/article/details/123772862
Author: zhao_crystal
Title: 分布式训练(二)——分布式策略

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/496875/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球