# pyspark实现:关于python的算法程序，如何做成分布式处理，并解决环境的冲突等问题的记录

### 背景：

One word cut, one natural language processing.

The program of the original algorithm:

Word cutting program .py: every time you read the data, you need to pull out the data file from the data warehouse, and then manually guide it to the algorithm students for processing, and then the algorithm students run the local notebook data to get the word cutting data.

Natural language processing. Py: rely on the result data of the last word-cutting program, and then do emotional analysis. After you get the results, import the data warehouse manually

### 问题：

1）、程序不自动化，每次都需要手动导入原始数据、并将结果手动导入数仓

2）、效率不高，因为不是分布式，程序执行效率非常低

3）、公司的调度平台是实体机，是不允许安装任何第三方python包的（因为要考虑兼容问题、新增机器没有这个依赖包等问题）

First, introduce the format of hadoop-streaming:

Approximate format:

hadoop jar hadoop-streaming-2.7.2.jar \
-D mapreduce.job.maps=1 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="Experiment" \
-input "/user/me/samples/cachefile/input.txt" \
-output "/user/me/samples/cachefile/out" \
-mapper "xargs cat" \
-reducer "cat"

-archives：就是一些大的包（可以是文件包、可以是jar），提前上传到HDFS，然后通过这个archives指定，hadoop会帮你自动加载并解压通过yarn的日志也能够观察到，hadoop会自动下载archives指定的文件，并解压：比如：


ps. 我上面放的是spark的日志，但其实和mr一样的日志情况

-input 和 -output 就是正常数据的输入和输出

-mapper和-reduce 就是mr的两阶段处理数据过程

2）：解决调度平台不让安装任何第三方依赖包，但脚本还要在调度平台执行的问题

But it is worth noting that you must create a virtual environment under the same environment.

For example: the general company’s internal execution machine is not allowed to connect to the external network. But to build a virtual environment, you have to download things from the Internet.

Then be sure to download the dependency package needed by the virtual environment in the same environment (Linux) where you can access the Internet, and then compress the virtual environment and import it into HDFS (because I was doing the virtual environment on mac and importing it to HDFS, I found that no matter how I used it)

So the technology selection is hadoop-streaming. Prepare for a virtual environment, and then start using scripts:

hadoop jar /usr/XXX/3.1.4.0-315/hadoop-mapreduce/hadoop-streaming-3.1.1.3.1.4.0-315.jar \
-D mapreduce.job.reduces=1 \
-D stream.non.zero.exit.is.failure=true \
-archives "hdfs://bdp/XXX/dms/nlp/venv.tar.gz#venv" \
-input "/user/XXX/tmp_nlp3/2021-08-15/part-00000" \
-output "/user/XXX/tmp_nlp3/908" \
-mapper "cat" \
-reducer 'venv/venv/bin/python ./sentiment_predict_dist.py' \
-file ./sentiment_predict_dist.py \
-file ./albert_config.json \
-file ./model.ckpt-best.data-00000-of-00001 \
-file ./vocab_chinese.txt \
-file ./model.ckpt-best.index \
-file ./model.ckpt-best.meta \
-file ./key_sentence_202103.txt \
-file ./inverse.txt \
-file ./negative_regex.txt \
-file ./negative.txt \
-file ./positive_regex.txt \
-file ./positive.txt \
-file ./very.txt \
-file ./words_del.txt \
-file ./best_model.weights


Note:-archives specifies the compressed package of the virtual ring, followed by a #, which is a soft connection

For example:-archives “hdfs://bdp/XXX/dms/nlp/venv.tar.gz#venv”

For example, if I want to use python3, the decompressed path is: venv/venv/bin/python3

Although the above methods solve the problem that the nlp script of python can run on the scheduling platform. But there are two very serious problems:

Careful students can find that my execution script has a:-mapper “cat”

This is because the python version of hadoop-streaming receives data here, using sys.stdin to receive data, that is, 30, 000 pieces of text data are processed one by one, and it takes 1.2 hours to process.

Then this hadoop-streaming is very inapplicable at this time. Because you have to change the-input path over and over again to complete a table. And it takes you 1.2 hours to think about 30,000 pieces of data. If it is a large table with multi-partition and multi-bucket. Time will be slow to doubt life.

### 第二期解决方案：利用pyspark+虚拟环境

The original purpose of adopting pyspark is to solve the above problems, and at the same time to increase the parallelism of processing, better shuffle processing and other advantages.

1）、虚拟环境跟上面讲的操作是一样的

2）、代码改成pyspark模式

pyspark需要注意的是，在处理UDAF和UDTF的支持并不友好，你甚至发现其实pyspark压根不支持，所以要想很多弯道超车的方案，比如（先把字段合并，在拆分等，按照UDF的方式来处理UDTF和UDAF）

spark-submit \
--deploy-mode cluster \
--master yarn \
--driver-memory 5g \
--num-executors 20 \
--executor-memory 4g \
--executor-cores 10 \
--name nlp \
--archives hdfs://bdp/user/dms/nlp/venv.tar.gz#venv \
--conf spark.pyspark.driver.python=./venv/venv/bin/python3 \
--conf spark.pyspark.python=./venv/venv/bin/python3 \
--files 需要的文件


Using pyspark in this way, the program is optimized:

1、可以直接读写数仓的表了

2、代码更加可维护了，因为是sql，非常方便

3、性能提升了至少10倍，原来需要1.2小时，现在只需要9分钟

