flyte平台部署iris分类任务(pytorch+tensorflow)

本笔记记录采用flyte搭建简易机器学习平台。默认已安装docker、flyte、python3.7。实验操作系统:Ubuntu 18.04 amd64

分为以下几个步骤:

1. 搭建flyte平台

关于flyte平台介绍,参考该博客

1.1 初始化阶段

  1. 初始化一个flyte项目目录:
flytectl init iris_project
  1. 进入该目录,并开启sandbox,其中,–source后将指定的目录作为拟生成flyte平台的根目录,该步骤耗费大约5分钟:
cd /path/to/flyte/project && flytectl sandbox start --source iris_project
1.2 开发阶段

Flyte使用Docker容器来打包workflow和task,并将它们发送到远程的Flyte集群。上述的项目目录中已经包含了一个Dockerfile。在开放中,首先构建Docker容器,并将构建的image推送到注册表中。

1.2.1 服务器在本地
  1. flyte-sandbox在Docker容器中本地运行,所以不需要推送Docker镜像。通过简单地在flyte-sandbox容器中构建映像来组合构建和推送步骤。 这可以通过以下命令实现:
flytectl sandbox exec -- docker build . --tag "iris:v1"
  1. 接下来,使用与flytekit绑定的pyflyte cli打包workflow,并将其上传到Flyte后端。该映像与前一步中构建的映像相同:
pyflyte --pkgs flyte.workflows package --image "iris:v1" --force
  1. 将上述的包上传(注册)到Flyte后端。这里的版本v1不需要与上面命令中使用的版本匹配,但是通常建议与版本匹配,这样更容易跟踪:
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v1
1.2.2 服务器不在本地

当flyte服务端不在本地,而在远程时,实现远程访问。需要配置远程服务器:

flytectl config init --host='x.x.x.x:30081' --insecure

在开发好程序后,将目录内的文件封装为docker image:

docker build . --tag <registry/repo:version>
docker push <registry/repo:version>

随后将指定目录导出:

pyflyte --pkgs flyte.workflows package --image <registry/repo:version>

最后将生成的package注册到服务器中。

以下操作为拉取上述的image并直接运行docker,未经验证。

// 直接拉image,需要进一步创建容器
docker pull xiangyanghe/iris_flyte_project:v1.0

或者直接通过打包好的image实现(有待验证)

docker run --rm --privileged -p 30081:30081 -p 30084:30084 -p 30088:30088 docker.io/xiangyanghe/iris_flyte_project:v4.0

由此,可实现镜像迁移,跨设备一键部署。

2. 实现iris分类模型

上述步骤初始化并部署了flyte平台,本节将iris模型部署上去。

共实现了pytorch版和tensorflow版。

  • pytorch版
from  sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json
import os
from flytekit import task, workflow, Resources
import typing
from flytekit.types.file import PythonPickledFile
import torch
import torch.nn as nn
import torch.utils.data as Data
@dataclass_json
@dataclass
class Hyperparameters(object):
    learning_rate: float = 0.1
    batch_size: int = 10
    epochs: int = 100
    log_interval: int = 20
def loadData():
    train_data = load_iris()
    data = train_data['data']
    labels = train_data['target'].reshape(-1,1)
    total_data = np.hstack((data,labels))
    np.random.shuffle(total_data)
    train_length = int(len(total_data) * 0.8)
    train = total_data[0:train_length, :-1]
    test = total_data[train_length:, :-1]
    train_label = total_data[0:train_length, -1].reshape(-1, 1)
    test_label = total_data[train_length:, -1].reshape(-1, 1)
    print(data.shape, labels.shape, train.shape, test.shape, train_label.shape, test_label.shape)
    return data, labels, train, test, train_label, test_label

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.fc=nn.Sequential(
            nn.Linear(4, 30),
            nn.ReLU(),

            nn.Linear(30, 3)
        )
        self.mse = nn.CrossEntropyLoss()
        self.optim = torch.optim.Adam(params=self.parameters(),lr=0.1)
        self.loss = None

    def forward(self, inputs):
        outputs=self.fc(inputs)
        return outputs

    def train(self, x, label):
        out = self.forward(x)
        self.loss = self.mse(out, label)
        self.optim.zero_grad()
        self.loss.backward()
        self.optim.step()

    def getLoss(self):
        return self.loss

    def test(self, test_):
        return self.fc(test_)
if os.getenv("SANDBOX") != "":
    print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")

    mem = "100Mi"
    gpu = "0"
    storage = "500Mi"
else:
    print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")

    mem = "3Gi"
    gpu = "0"
    storage = "1Gi"
TrainingOutputs = typing.NamedTuple(
    "TrainingOutputs",
    train_losses=typing.List[float],
    test_losses=typing.List[float],
    train_accuracies=typing.List[float],
    test_accuracies=typing.List[float],
    all_acc=float,
    all_loss=float,
    model_state=PythonPickledFile,
)
def predictAcc(model, data, label):

    out=model.test(torch.from_numpy(data).float())
    prediction = torch.max(out, 1)[1]
    pred_y = prediction.data.numpy()
    test_y = label.reshape(1,-1)
    target_y =torch.from_numpy(test_y).long().data.numpy()
    accuracy = float((pred_y == target_y).astype(int).sum()) / float(target_y.size)
    return accuracy, model.getLoss().item()
@task(
    retries=2,
    cache=False,
    cache_version="1.0",
    requests=Resources(gpu=gpu, mem=mem, storage=storage),
    limits=Resources(gpu=gpu, mem=mem, storage=storage),
)
def pytorchIrisTask(hp: Hyperparameters) -> TrainingOutputs:

    data, labels, train, test, train_label, test_label = loadData()
    train_dataset = Data.TensorDataset(torch.from_numpy(train).float(), torch.from_numpy(train_label).long())
    train_loader = Data.DataLoader(dataset=train_dataset, batch_size=hp.batch_size, shuffle=True)

    model = Model()

    train_accuracies = []
    train_losses = []
    test_accuracies = []
    test_losses = []

    for epoch in range(hp.epochs):
        for step, (x, y) in enumerate(train_loader):
            y = torch.reshape(y, [hp.batch_size])
            model.train(x, y)

            if epoch % 20 == 0:
                accuracy, loss = predictAcc(model, test, test_label)

                print(f"Epoch: {epoch} | Step: {step} | Loss: {model.getLoss().item()} | acc: {accuracy}")

        train_acc, train_loss = predictAcc(model, train, train_label)
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)

        test_acc, test_loss = predictAcc(model, test, test_label)
        test_losses.append(test_loss)
        test_accuracies.append(test_acc)

    all_acc, all_loss = predictAcc(model, data, labels)

    model_file = "iris_model.pt"
    torch.save(model.state_dict(), model_file)
    return TrainingOutputs(
        train_losses=train_losses,
        test_losses=test_losses,
        train_accuracies=train_accuracies,
        test_accuracies=test_accuracies,
        all_acc=all_acc,
        all_loss=all_loss,
        model_state=PythonPickledFile(model_file),
    )

@workflow
def pytorchTrainingWorkflow(
    hp: Hyperparameters = Hyperparameters(epochs=100, batch_size=10)
) -> TrainingOutputs:
    a = 1
    return pytorchIrisTask(hp=hp)

if __name__=='__main__' :
    train_losses, test_losses, train_accuracies, test_accuracies, all_acc, all_loss, model = pytorchTrainingWorkflow(hp=Hyperparameters(epochs=100, batch_size=10, learning_rate=0.01))

    print(train_losses, test_losses, train_accuracies, test_accuracies, all_acc, all_loss, model)
  • tensorflow版
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json
import os
from flytekit import task, workflow, Resources
import typing
from flytekit.types.file import PythonPickledFile
@dataclass_json
@dataclass
class Hyperparameters(object):
    learning_rate: float = 0.1
    batch_size: int = 10
    epochs: int = 100
    log_interval: int = 20
def loadData():
    train_data = load_iris()
    data = train_data['data']
    labels = train_data['target'].reshape(-1,1)
    total_data = np.hstack((data,labels))
    np.random.shuffle(total_data)
    train_length = int(len(total_data) * 0.8)
    train = total_data[0:train_length, :-1]
    test = total_data[train_length:, :-1]
    train_label = total_data[0:train_length, -1].reshape(-1, 1)
    test_label = total_data[train_length:, -1].reshape(-1, 1)
    print(data.shape, labels.shape, train.shape, test.shape, train_label.shape, test_label.shape)
    return data, labels, train, test, train_label, test_label
if os.getenv("SANDBOX") != "":
    print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")

    mem = "100Mi"
    gpu = "0"
    storage = "500Mi"
else:
    print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")
    mem = "3Gi"
    gpu = "0"
    storage = "1Gi"
TrainingOutputs = typing.NamedTuple(
    "TrainingOutputs",
    train_losses=typing.List[float],
    train_accuracies=typing.List[float],
    all_acc=float,
    test_acc=float,
)
@task(
    retries=2,
    cache=False,
    cache_version="1.0",
    requests=Resources(gpu=gpu, mem=mem, storage=storage),
    limits=Resources(gpu=gpu, mem=mem, storage=storage),
)
def tensorflowIrisTask(hp: Hyperparameters) -> TrainingOutputs:
    import tensorflow as tf

    data, labels, train, test, train_label, test_label = loadData()
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(10, activation='relu',input_shape=(4,)),
        tf.keras.layers.Dense(10, activation='relu'),
        tf.keras.layers.Dense(3, activation='softmax')
    ])

    train_dataset = tf.data.Dataset.from_tensor_slices((train, train_label))
    train_dataset = train_dataset.batch(hp.batch_size)

 model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=hp.learning_rate),
                loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

    history = model.fit(train_dataset, epochs=hp.epochs)

    test_loss, test_acc = model.evaluate(test, test_label)
    all_loss, all_acc = model.evaluate(data, labels)
    print(history.history.keys())

    training_accuracies = history.history['sparse_categorical_accuracy']
    training_losses = history.history['loss']

    return TrainingOutputs(train_losses=training_losses,
        train_accuracies=training_accuracies,
        all_acc=all_acc,
        test_acc=test_acc,
    )
@workflow
def tensorflowTrainingWorkflow(
    hp: Hyperparameters = Hyperparameters(epochs=100, batch_size=10)
) -> TrainingOutputs:
    return tensorflowIrisTask(hp=hp)
if __name__=='__main__' :

    train_losses, test_losses, all_acc, test_acc = tensorflowTrainingWorkflow(hp=Hyperparameters(epochs=100, batch_size=10, learning_rate=0.01))

    print(train_losses, test_losses, all_acc, test_acc)

3. 将iris分类模型部署到flyte平台上

代码中导入了sklearn, pytorch,tensorflow等包,需要同样导入到docker中,所以需要修改Dockerfile,如下:

FROM python:3.7-slim-buster

WORKDIR /root

ENV VENV /root/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root

RUN python3 -m venv ${VENV}

ENV PATH="${VENV}/bin:$PATH"

RUN apt-get update && apt-get install -y build-essential

RUN pip3 install awscli

RUN python -m pip --no-cache-dir install --upgrade --pre torch torchvision torchaudio -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html

RUN pip3 install sklearn

COPY ./requirements.txt /root
RUN pip install -r /root/requirements.txt

COPY . /root

ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag

在此之后,重新执行” 开发阶段“,注意tag的版本。

4. 实验结果(以pytorch结果为例)

浏览器中输入http://localhost:30081/console ,进入flytesnacks项目的development domain。

执行对应的workflow,结果如下:

; 5. 将容器打包为image并上传(当前有效为v4.0版)

  • 上dockerhub注册用户。
  • 执行命令,查询container_id:
docker ps
  • 登录dockerhob
docker login
  • commit当前容器id,打上tag
docker commit <container_id> xiangyanghe/iris_flyte_project:v4.0
  • 或者采用build实现
docker build . -t xiangyanghe/iris_flyte_project:v4.0
  • 上传到dockerhub中
docker push xiangyanghe/iris_flyte_project:v4.0

上传地址为: https://hub.docker.com/repository/docker/xiangyanghe/iris_flyte_project

其他注意事项

打包的docker images 过多引起存储设备不足,可以通过以下命令删除所有的容器:

docker system prune -a --volumes

Original: https://blog.csdn.net/hxy17682323970/article/details/123822076
Author: busy yang
Title: flyte平台部署iris分类任务(pytorch+tensorflow)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/663919/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球