pyspark实现:关于python的算法程序,如何做成分布式处理,并解决环境的冲突等问题的记录

pyspark实现:关于python的算法程序,如何做成分布式处理,并解决环境的冲突等问题的记录

背景:

一词切分,一次自然语言处理。

[En]

One word cut, one natural language processing.

原算法的程序:

[En]

The program of the original algorithm:

文字切割程序.py:每次读取数据,都需要从数据仓库中取出数据文件,然后手动引导给算法学员进行处理,算法学员再运行本地的笔记本数据,得到文字切割数据。

[En]

Word cutting program .py: every time you read the data, you need to pull out the data file from the data warehouse, and then manually guide it to the algorithm students for processing, and then the algorithm students run the local notebook data to get the word cutting data.

自然语言处理。PY:依靠上一次分词程序的结果数据,然后进行情感分析。获得结果后,手动导入数据仓库

[En]

Natural language processing. Py: rely on the result data of the last word-cutting program, and then do emotional analysis. After you get the results, import the data warehouse manually

问题:

1)、程序不自动化,每次都需要手动导入原始数据、并将结果手动导入数仓

2)、效率不高,因为不是分布式,程序执行效率非常低

3)、公司的调度平台是实体机,是不允许安装任何第三方python包的(因为要考虑兼容问题、新增机器没有这个依赖包等问题)

第一期解决方案:利用hadoop-streaming的方式将脚本自动化

首先,介绍Hadoop-Streaming的格式:

[En]

First, introduce the format of hadoop-streaming:

1):官网链接 http://hadoop.apache.org/docs/r2.7.2/hadoop-streaming/HadoopStreaming.html

近似格式:

[En]

Approximate format:

hadoop jar hadoop-streaming-2.7.2.jar \
                -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar' \
                -D mapreduce.job.maps=1 \
                -D mapreduce.job.reduces=1 \
                -D mapreduce.job.name="Experiment" \
                -input "/user/me/samples/cachefile/input.txt" \
                -output "/user/me/samples/cachefile/out" \
                -mapper "xargs cat" \
                -reducer "cat"
-archives:就是一些大的包(可以是文件包、可以是jar),提前上传到HDFS,然后通过这个archives指定,hadoop会帮你自动加载并解压通过yarn的日志也能够观察到,hadoop会自动下载archives指定的文件,并解压:比如:

pyspark实现:关于python的算法程序,如何做成分布式处理,并解决环境的冲突等问题的记录

ps. 我上面放的是spark的日志,但其实和mr一样的日志情况

-input 和 -output 就是正常数据的输入和输出

-mapper和-reduce 就是mr的两阶段处理数据过程

2):解决调度平台不让安装任何第三方依赖包,但脚本还要在调度平台执行的问题

网上有很多种方案,我选择的是虚拟环境:https://docs.python.org/zh-cn/3/library/venv.html

但值得注意的是,你必须在相同的环境下创建一个虚拟环境。

[En]

But it is worth noting that you must create a virtual environment under the same environment.

例如:通用公司的内部执行机不允许连接到外部网络。但要建立一个虚拟环境,你必须从互联网上下载东西。

[En]

For example: the general company’s internal execution machine is not allowed to connect to the external network. But to build a virtual environment, you have to download things from the Internet.

然后一定要在可以上网的同一环境(Linux)下载虚拟环境所需的依赖包,然后压缩虚拟环境并导入到HDFS中(因为我在Mac上做虚拟环境并导入到HDFS中,我发现无论我如何使用它)

[En]

Then be sure to download the dependency package needed by the virtual environment in the same environment (Linux) where you can access the Internet, and then compress the virtual environment and import it into HDFS (because I was doing the virtual environment on mac and importing it to HDFS, I found that no matter how I used it)

因此,技术选择是Hadoop-Streaming。为虚拟环境做好准备,然后开始使用脚本:

[En]

So the technology selection is hadoop-streaming. Prepare for a virtual environment, and then start using scripts:

hadoop jar /usr/XXX/3.1.4.0-315/hadoop-mapreduce/hadoop-streaming-3.1.1.3.1.4.0-315.jar \
     -D mapreduce.job.reduces=1 \
     -D stream.non.zero.exit.is.failure=true \
     -archives "hdfs://bdp/XXX/dms/nlp/venv.tar.gz#venv" \
     -input "/user/XXX/tmp_nlp3/2021-08-15/part-00000" \
     -output "/user/XXX/tmp_nlp3/908" \
     -mapper "cat" \
     -reducer 'venv/venv/bin/python ./sentiment_predict_dist.py' \
     -file ./sentiment_predict_dist.py \
     -file ./albert_config.json \
     -file ./model.ckpt-best.data-00000-of-00001 \
     -file ./vocab_chinese.txt \
     -file ./model.ckpt-best.index \
     -file ./model.ckpt-best.meta \
     -file ./key_sentence_202103.txt \
     -file ./inverse.txt \
     -file ./negative_regex.txt \
     -file ./negative.txt \
     -file ./positive_regex.txt \
     -file ./positive.txt \
     -file ./very.txt \
     -file ./words_add.txt \
     -file ./words_del.txt \
     -file ./best_model.weights

注意:-ARCHIES指定虚拟环的压缩包,后跟一个#,这是一个软连接

[En]

Note:-archives specifies the compressed package of the virtual ring, followed by a #, which is a soft connection

例如:-存档“HDFS://bdp/xxx/dms/nlp/venv.tar.gz#venv”

[En]

For example:-archives “hdfs://bdp/XXX/dms/nlp/venv.tar.gz#venv”

hadoop会将venv.tar.gz这个包下载下来,并解压; 因为添加了#venv 所以解压后的路径实际是:venv/venv/….

例如,如果我想使用python3,解压缩后的路径是:venv/venv/bin/python3

[En]

For example, if I want to use python3, the decompressed path is: venv/venv/bin/python3

虽然上述方法解决了PYTHON的NLP脚本可以在调度平台上运行的问题。但有两个非常严重的问题:

[En]

Although the above methods solve the problem that the nlp script of python can run on the scheduling platform. But there are two very serious problems:

1、因为采用的是hadoop-streaming 也就是mr的方式,所以效率极其差!

细心的学生可以发现我的执行脚本有一个:-映射器“猫”

[En]

Careful students can find that my execution script has a:-mapper “cat”

这是因为Hadoop-Streaming的Python版本在这里接收数据,使用sys.stdin来接收数据,即一个接一个地处理3万条文本数据,处理需要1.2个小时。

[En]

This is because the python version of hadoop-streaming receives data here, using sys.stdin to receive data, that is, 30, 000 pieces of text data are processed one by one, and it takes 1.2 hours to process.

2、因为采用的是hadoop-streaming方式,采用的是-input 去读取文件,这样会导致,如果数仓的表是多分区的、多分桶的,甚至多个小文件的情况;

那么这种Hadoop流媒体在这个时候是非常不适用的。因为您必须一遍又一遍地更改-输入路径才能完成一个表。考虑3万条数据需要1.2个小时。如果是多分区、多存储桶的大表。时间会慢慢地怀疑生命。

[En]

Then this hadoop-streaming is very inapplicable at this time. Because you have to change the-input path over and over again to complete a table. And it takes you 1.2 hours to think about 30,000 pieces of data. If it is a large table with multi-partition and multi-bucket. Time will be slow to doubt life.

第二期解决方案:利用pyspark+虚拟环境

采用PYSPARK的初衷就是为了解决上述问题,同时增加处理的并行性,更好地洗牌处理等优点。

[En]

The original purpose of adopting pyspark is to solve the above problems, and at the same time to increase the parallelism of processing, better shuffle processing and other advantages.

1)、虚拟环境跟上面讲的操作是一样的

2)、代码改成pyspark模式

pyspark需要注意的是,在处理UDAF和UDTF的支持并不友好,你甚至发现其实pyspark压根不支持,所以要想很多弯道超车的方案,比如(先把字段合并,在拆分等,按照UDF的方式来处理UDTF和UDAF)

那么剧本就是关于:

[En]

Then the script is about:

spark-submit \
 --deploy-mode cluster \
 --master yarn \
 --driver-memory 5g \
 --num-executors 20 \
 --executor-memory 4g \
 --executor-cores 10 \
 --name nlp \
 --archives hdfs://bdp/user/dms/nlp/venv.tar.gz#venv \
 --conf spark.pyspark.driver.python=./venv/venv/bin/python3 \
 --conf spark.pyspark.python=./venv/venv/bin/python3 \
 --files 需要的文件

以这种方式使用pyspark,程序得到了优化:

[En]

Using pyspark in this way, the program is optimized:

1、可以直接读写数仓的表了

2、代码更加可维护了,因为是sql,非常方便

3、性能提升了至少10倍,原来需要1.2小时,现在只需要9分钟

Original: https://www.cnblogs.com/niutao/articles/15179633.html
Author: niutao
Title: pyspark实现:关于python的算法程序,如何做成分布式处理,并解决环境的冲突等问题的记录

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6573/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

发表回复

登录后才能评论
免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部