本笔记记录采用flyte搭建简易机器学习平台。默认已安装docker、flyte、python3.7。实验操作系统:Ubuntu 18.04 amd64
分为以下几个步骤:
1. 搭建flyte平台
关于flyte平台介绍,参考该博客
1.1 初始化阶段
- 初始化一个flyte项目目录:
flytectl init iris_project
- 进入该目录,并开启sandbox,其中,–source后将指定的目录作为拟生成flyte平台的根目录,该步骤耗费大约5分钟:
cd /path/to/flyte/project && flytectl sandbox start --source iris_project
1.2 开发阶段
Flyte使用Docker容器来打包workflow和task,并将它们发送到远程的Flyte集群。上述的项目目录中已经包含了一个Dockerfile。在开放中,首先构建Docker容器,并将构建的image推送到注册表中。
1.2.1 服务器在本地
- flyte-sandbox在Docker容器中本地运行,所以不需要推送Docker镜像。通过简单地在flyte-sandbox容器中构建映像来组合构建和推送步骤。 这可以通过以下命令实现:
flytectl sandbox exec -- docker build . --tag "iris:v1"
- 接下来,使用与flytekit绑定的pyflyte cli打包workflow,并将其上传到Flyte后端。该映像与前一步中构建的映像相同:
pyflyte --pkgs flyte.workflows package --image "iris:v1" --force
- 将上述的包上传(注册)到Flyte后端。这里的版本v1不需要与上面命令中使用的版本匹配,但是通常建议与版本匹配,这样更容易跟踪:
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v1
1.2.2 服务器不在本地
当flyte服务端不在本地,而在远程时,实现远程访问。需要配置远程服务器:
flytectl config init --host='x.x.x.x:30081' --insecure
在开发好程序后,将目录内的文件封装为docker image:
docker build . --tag <registry/repo:version>
docker push <registry/repo:version>
随后将指定目录导出:
pyflyte --pkgs flyte.workflows package --image <registry/repo:version>
最后将生成的package注册到服务器中。
以下操作为拉取上述的image并直接运行docker,未经验证。
// 直接拉image,需要进一步创建容器
docker pull xiangyanghe/iris_flyte_project:v1.0
或者直接通过打包好的image实现(有待验证)
docker run --rm --privileged -p 30081:30081 -p 30084:30084 -p 30088:30088 docker.io/xiangyanghe/iris_flyte_project:v4.0
由此,可实现镜像迁移,跨设备一键部署。
2. 实现iris分类模型
上述步骤初始化并部署了flyte平台,本节将iris模型部署上去。
共实现了pytorch版和tensorflow版。
- pytorch版
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json
import os
from flytekit import task, workflow, Resources
import typing
from flytekit.types.file import PythonPickledFile
import torch
import torch.nn as nn
import torch.utils.data as Data
@dataclass_json
@dataclass
class Hyperparameters(object):
learning_rate: float = 0.1
batch_size: int = 10
epochs: int = 100
log_interval: int = 20
def loadData():
train_data = load_iris()
data = train_data['data']
labels = train_data['target'].reshape(-1,1)
total_data = np.hstack((data,labels))
np.random.shuffle(total_data)
train_length = int(len(total_data) * 0.8)
train = total_data[0:train_length, :-1]
test = total_data[train_length:, :-1]
train_label = total_data[0:train_length, -1].reshape(-1, 1)
test_label = total_data[train_length:, -1].reshape(-1, 1)
print(data.shape, labels.shape, train.shape, test.shape, train_label.shape, test_label.shape)
return data, labels, train, test, train_label, test_label
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
self.fc=nn.Sequential(
nn.Linear(4, 30),
nn.ReLU(),
nn.Linear(30, 3)
)
self.mse = nn.CrossEntropyLoss()
self.optim = torch.optim.Adam(params=self.parameters(),lr=0.1)
self.loss = None
def forward(self, inputs):
outputs=self.fc(inputs)
return outputs
def train(self, x, label):
out = self.forward(x)
self.loss = self.mse(out, label)
self.optim.zero_grad()
self.loss.backward()
self.optim.step()
def getLoss(self):
return self.loss
def test(self, test_):
return self.fc(test_)
if os.getenv("SANDBOX") != "":
print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")
mem = "100Mi"
gpu = "0"
storage = "500Mi"
else:
print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")
mem = "3Gi"
gpu = "0"
storage = "1Gi"
TrainingOutputs = typing.NamedTuple(
"TrainingOutputs",
train_losses=typing.List[float],
test_losses=typing.List[float],
train_accuracies=typing.List[float],
test_accuracies=typing.List[float],
all_acc=float,
all_loss=float,
model_state=PythonPickledFile,
)
def predictAcc(model, data, label):
out=model.test(torch.from_numpy(data).float())
prediction = torch.max(out, 1)[1]
pred_y = prediction.data.numpy()
test_y = label.reshape(1,-1)
target_y =torch.from_numpy(test_y).long().data.numpy()
accuracy = float((pred_y == target_y).astype(int).sum()) / float(target_y.size)
return accuracy, model.getLoss().item()
@task(
retries=2,
cache=False,
cache_version="1.0",
requests=Resources(gpu=gpu, mem=mem, storage=storage),
limits=Resources(gpu=gpu, mem=mem, storage=storage),
)
def pytorchIrisTask(hp: Hyperparameters) -> TrainingOutputs:
data, labels, train, test, train_label, test_label = loadData()
train_dataset = Data.TensorDataset(torch.from_numpy(train).float(), torch.from_numpy(train_label).long())
train_loader = Data.DataLoader(dataset=train_dataset, batch_size=hp.batch_size, shuffle=True)
model = Model()
train_accuracies = []
train_losses = []
test_accuracies = []
test_losses = []
for epoch in range(hp.epochs):
for step, (x, y) in enumerate(train_loader):
y = torch.reshape(y, [hp.batch_size])
model.train(x, y)
if epoch % 20 == 0:
accuracy, loss = predictAcc(model, test, test_label)
print(f"Epoch: {epoch} | Step: {step} | Loss: {model.getLoss().item()} | acc: {accuracy}")
train_acc, train_loss = predictAcc(model, train, train_label)
train_losses.append(train_loss)
train_accuracies.append(train_acc)
test_acc, test_loss = predictAcc(model, test, test_label)
test_losses.append(test_loss)
test_accuracies.append(test_acc)
all_acc, all_loss = predictAcc(model, data, labels)
model_file = "iris_model.pt"
torch.save(model.state_dict(), model_file)
return TrainingOutputs(
train_losses=train_losses,
test_losses=test_losses,
train_accuracies=train_accuracies,
test_accuracies=test_accuracies,
all_acc=all_acc,
all_loss=all_loss,
model_state=PythonPickledFile(model_file),
)
@workflow
def pytorchTrainingWorkflow(
hp: Hyperparameters = Hyperparameters(epochs=100, batch_size=10)
) -> TrainingOutputs:
a = 1
return pytorchIrisTask(hp=hp)
if __name__=='__main__' :
train_losses, test_losses, train_accuracies, test_accuracies, all_acc, all_loss, model = pytorchTrainingWorkflow(hp=Hyperparameters(epochs=100, batch_size=10, learning_rate=0.01))
print(train_losses, test_losses, train_accuracies, test_accuracies, all_acc, all_loss, model)
- tensorflow版
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json
import os
from flytekit import task, workflow, Resources
import typing
from flytekit.types.file import PythonPickledFile
@dataclass_json
@dataclass
class Hyperparameters(object):
learning_rate: float = 0.1
batch_size: int = 10
epochs: int = 100
log_interval: int = 20
def loadData():
train_data = load_iris()
data = train_data['data']
labels = train_data['target'].reshape(-1,1)
total_data = np.hstack((data,labels))
np.random.shuffle(total_data)
train_length = int(len(total_data) * 0.8)
train = total_data[0:train_length, :-1]
test = total_data[train_length:, :-1]
train_label = total_data[0:train_length, -1].reshape(-1, 1)
test_label = total_data[train_length:, -1].reshape(-1, 1)
print(data.shape, labels.shape, train.shape, test.shape, train_label.shape, test_label.shape)
return data, labels, train, test, train_label, test_label
if os.getenv("SANDBOX") != "":
print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")
mem = "100Mi"
gpu = "0"
storage = "500Mi"
else:
print(f"SANDBOX ENV: '{os.getenv('SANDBOX')}'")
mem = "3Gi"
gpu = "0"
storage = "1Gi"
TrainingOutputs = typing.NamedTuple(
"TrainingOutputs",
train_losses=typing.List[float],
train_accuracies=typing.List[float],
all_acc=float,
test_acc=float,
)
@task(
retries=2,
cache=False,
cache_version="1.0",
requests=Resources(gpu=gpu, mem=mem, storage=storage),
limits=Resources(gpu=gpu, mem=mem, storage=storage),
)
def tensorflowIrisTask(hp: Hyperparameters) -> TrainingOutputs:
import tensorflow as tf
data, labels, train, test, train_label, test_label = loadData()
model = tf.keras.Sequential([
tf.keras.layers.Dense(10, activation='relu',input_shape=(4,)),
tf.keras.layers.Dense(10, activation='relu'),
tf.keras.layers.Dense(3, activation='softmax')
])
train_dataset = tf.data.Dataset.from_tensor_slices((train, train_label))
train_dataset = train_dataset.batch(hp.batch_size)
model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=hp.learning_rate),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
history = model.fit(train_dataset, epochs=hp.epochs)
test_loss, test_acc = model.evaluate(test, test_label)
all_loss, all_acc = model.evaluate(data, labels)
print(history.history.keys())
training_accuracies = history.history['sparse_categorical_accuracy']
training_losses = history.history['loss']
return TrainingOutputs(train_losses=training_losses,
train_accuracies=training_accuracies,
all_acc=all_acc,
test_acc=test_acc,
)
@workflow
def tensorflowTrainingWorkflow(
hp: Hyperparameters = Hyperparameters(epochs=100, batch_size=10)
) -> TrainingOutputs:
return tensorflowIrisTask(hp=hp)
if __name__=='__main__' :
train_losses, test_losses, all_acc, test_acc = tensorflowTrainingWorkflow(hp=Hyperparameters(epochs=100, batch_size=10, learning_rate=0.01))
print(train_losses, test_losses, all_acc, test_acc)
3. 将iris分类模型部署到flyte平台上
代码中导入了sklearn, pytorch,tensorflow等包,需要同样导入到docker中,所以需要修改Dockerfile,如下:
FROM python:3.7-slim-buster
WORKDIR /root
ENV VENV /root/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"
RUN apt-get update && apt-get install -y build-essential
RUN pip3 install awscli
RUN python -m pip --no-cache-dir install --upgrade --pre torch torchvision torchaudio -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html
RUN pip3 install sklearn
COPY ./requirements.txt /root
RUN pip install -r /root/requirements.txt
COPY . /root
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
在此之后,重新执行” 开发阶段“,注意tag的版本。
4. 实验结果(以pytorch结果为例)
浏览器中输入http://localhost:30081/console ,进入flytesnacks项目的development domain。
执行对应的workflow,结果如下:
; 5. 将容器打包为image并上传(当前有效为v4.0版)
- 上dockerhub注册用户。
- 执行命令,查询container_id:
docker ps
- 登录dockerhob
docker login
- commit当前容器id,打上tag
docker commit <container_id> xiangyanghe/iris_flyte_project:v4.0
- 或者采用build实现
docker build . -t xiangyanghe/iris_flyte_project:v4.0
- 上传到dockerhub中
docker push xiangyanghe/iris_flyte_project:v4.0
上传地址为: https://hub.docker.com/repository/docker/xiangyanghe/iris_flyte_project
其他注意事项
打包的docker images 过多引起存储设备不足,可以通过以下命令删除所有的容器:
docker system prune -a --volumes
Original: https://blog.csdn.net/hxy17682323970/article/details/123822076
Author: busy yang
Title: flyte平台部署iris分类任务(pytorch+tensorflow)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/663919/
转载文章受原作者版权保护。转载请注明原作者出处!