在Hadoop上用Python实现WordCount

一、简单说明

本例中我们用Python写一个简单的运行在Hadoop上的MapReduce程序,即WordCount(读取文本文件并统计单词的词频)。这里我们将要输入的单词文本input.txt和Python脚本放到/home/data/python/WordCount目录下。

cd /home/data/python/WordCount

vi input.txt

输入:

There is no denying that

hello python

hello mapreduce

mapreduce is good

二、编写Map代码

这里我们创建一个mapper.py脚本,从标准输入(stdin)读取数据,默认以空格分隔单词,然后按行输出单词机器词频到标准输出(stdout),整个Map处理过程不会统计每个单词出现的总次数,而是直接输出”word 1″,以便作为Reduce的输入进行统计,确保该文件是可执行的(chmod +x /home/data/python//WordCount/mapper.py)。

cd /home/data/python//WordCount

vi mapper.py

!/usr/bin/env python

– coding:UTF-8 –

import sys

for line in sys.stdin: #sys.stdin为读取数据,遍历读入数据的每一行

line = line.strip() #删除开头和结尾的空格

words = line.split() #以默认空格分隔行单词到words列表

for word in words:

输出所有单词,格式为”单词,1″以便作为Reduce的输入

print(‘%s\t%s’ %(word,1))

截图如下:

三、编写Reduce代码

这里我们创建一个reducer.py脚本,从标准输入(stdin)读取mapper.py的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout),

确保该文件是可执行的(chmod +x /home/data/python//WordCount/reducer.py)

cd /home/data/python//WordCount

vi reducer.py

!/usr/bin/env python

– coding:UTF-8 –

import sys

current_word = None #当前单词

current_count = 0 #当前单词频数

word = None

for line in sys.stdin:

line = line.strip() #删除开头和结尾的空格

解析mapper.py输出作为程序的输入,以tab作为分隔符

word,count = line.split(‘\t’,1)

try:

count = int(count) #转换count从字符型为整型

except ValueError:

continue

要求mapper.py的输出做排序操作,以便对连接的word做判断,hadoop会自动排序

if current_word == word: #如果当前的单词等于读入的单词

current_count += count #单词频数加1

else:

if current_word: #如果当前的单词不为空则打印其单词和频数

print(‘%s\t%s’ %(current_word,current_count))

current_count = count #否则将读入的单词赋值给当前单词,且更新频数

current_word = word

if current_word == word #输出最后一个word统计

print(‘%s\%s’ %(current_word,current_count))

截图如下:

四、本地测试代码

我们可以在Hadoop平台运行之前在本地测试,校验mapper.py与reducer.py运行的结果是否正确。注意:测试reducer.py时需要对mapper.py的输出做排序(sort)操作,不过,Hadoop环境会自动实现排序。

在本地运行mapper.py:

cd /home/data/python/WordCount/

记得执行: chmod +x /home/data/python//WordCount/mapper.py

cat input.txt | ./mapper.py

在本地运行reducer.py

记得执行:chmod +x /home/data/python//WordCount/reducer.py

cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py

这里注意:利用管道符”|”将输出数据作为mapper.py这个脚本的输入数据,并将mapper.py的数据输入到reducer.py中,其中参数sort -k 1,1是将reducer的输出内容按照第一列的第一个字母的ASCII码值进行升序排序。

五、在Hadoop平台上运行代码

在hadoop运行代码,前提是已经搭建好hadoop集群

1、创建目录并上传文件

首先在HDFS上创建文本文件存储目录,这里我创建为:/WordCound

hdfs dfs -mkdir /WordCound

将本地文件input.txt上传到hdfs的/WordCount上。

hadoop fs -put /home/data/python/WordCount/input.txt /WordCount

hadoop fs -ls /WordCount #查看在hdfs中/data/WordCount目录下的内容

2、执行MapReduce程序

为了简化我们执行Hadoop MapReduce的命令,我们可以将Hadoop的hadoop-streaming-3.0.0.jar加入到系统环境变量/etc/profile中,在/etc/profile文件中添加如下配置:

首先在配置里导入hadoop-streaming-3.0.0.jar

vi /etc/profile

HADOOP_STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.0.0.jar

export HADOOP_STREAM

source /etc/profile #刷新配置

执行以下命令:

hadoop jar $HADOOP_STREAM -file /home/data/python/WordCount/mapper.py -mapper ./mapper.py -file /home/data/python/WordCount/reducer.py -reducer ./reducer.py -input /WordCount -output /output/word1

得到:

然后,输入以下命令查看结果:

hadoop fs -ls /output/word1

hadoop fs -cat /output/word1/part-00000 #查看分析结果

可以发现,结果与之前测试的时候是一致的,那么恭喜你,大功告成!

Original: https://www.cnblogs.com/pbinlog/p/9159214.html
Author: pbinlog
Title: 在Hadoop上用Python实现WordCount

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/571354/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • NopCommerce中的单例

    项目中经常会遇到单例的情况。大部分的单例代码都差不多像这样定义: internal class SingletonOne { private static SingletonOne…

    Java 2023年6月5日
    073
  • Spring DefaultResourceLoader

    Spring DefaultResourceLoader Spring DefaultResourceLoader继承ResourceLoader接口,用来加载资源, 通过Reso…

    Java 2023年6月7日
    064
  • java日常开发必备:list的四种遍历

    在平时的开发过程中使用List的场景很多,你知道List的遍历有多少种方式?今天一起来梳理下List的几种遍历方式。这里以java.util.ArrayList为例来演示。 这里有…

    Java 2023年6月9日
    081
  • 2022-8-17 mysql 第三天

    子查询 按照结果集的行列数不同,子查询可以分为以下几类: 标量子查询:结果集只有一行一列(单行子查询) 列子查询:结果集有一列多行 行子查询:结果集有一行多列 表子查询:结果集多行…

    Java 2023年6月13日
    060
  • SpringBoot扩展接口- ApplicationListener 事件监听器

    ApplicationListener监听器用来监听ApplicationEvent事件。 ApplicationListener 可以在Spring项目运行过程中,监听对应的事件…

    Java 2023年5月30日
    045
  • Map 笔记记录

    Map 是一个存放二元 Key – Value 对的数据集合接口。在其中每个元素都对应于一个唯一的 key,使用 key 可以获得对应的 value。 其有如下两个常用…

    Java 2023年6月7日
    072
  • java创建对象的5种方式

    1、使用new关键字 2、利用反射手段,调用java.lang.Class或者java.lang.reflect.Constructor类的newInstance()实例方法 3、…

    Java 2023年6月7日
    040
  • Springboot+Websocket+JWT实现的即时通讯模块

    场景 目前做了一个接口:邀请用户成为某课程的管理员,于是我感觉有能在用户被邀请之后能有个立马通知他本人的机(类似微博、朋友圈被点赞后就有立马能收到通知一样),于是就琢磨琢磨搞了一套…

    Java 2023年6月7日
    083
  • java关于for循环的效率优化

    循环作为三大结构之一,我们在编写代码的时候使用频率非常的高;循环结构的重要性也是不言而喻的,他让我们操作数组、集合和其他一些有规律的事物变得更加的方便,但是如果运用不得当,就会给性…

    Java 2023年5月29日
    059
  • python协程–asyncio模块(基础并发测试)

    在高并发的场景下,python提供了一个多线程的模块threading,但似乎这个模块并不近人如意,原因在于cpython本身的全局解析锁(GIL)问题,在一段时间片内实际上的执行…

    Java 2023年6月14日
    062
  • Fizz企业级微服务API网关进阶系列教程-服务编排处理列表数据(中)-数据提取与数据关联

    ​ 概述 服务编排是Fizz网关提供的一个强大的功能,能够基于现有的业务微服务通过在线配置的方式快速的生成一个聚合接口,减少中间层胶水代码以及降低编码投入。在服务编排中支持使用函数…

    Java 2023年6月9日
    065
  • 构造器

    package com.gao.test.Test2; public class Person { public Person(){ //构造器:没有任何参数的构造器 //叫做空参…

    Java 2023年6月5日
    066
  • [学习笔记] Java读取用户输入

    在程序的实际运行过程中,我们很可能会要求用户输入数据以继续运行程序; java.util包提供的Scanner类就可用于读取用户输入; 创建Scanner对象 使用next()方法…

    Java 2023年6月5日
    044
  • Jetpack架构组件学习(4)——APP Startup库的使用

    最近在研究APP的启动优化,也是发现了Jetpack中的 App Startup库,可以进行SDK的初始化操作,于是便是学习了,特此记录 原文:Jetpack架构组件学习(4)——…

    Java 2023年6月13日
    092
  • 终于上传了一篇文章

    拖了好久的第一批文章今天终于写完上传了,以后会有陆陆续续的文章上传,最近几篇都是以提高开发效率和系统移植解决方案的文章,有兴趣的朋友请关注后续文章,有全新开发的项目的朋友可以联系我…

    Java 2023年6月8日
    073
  • static关键字的一些使用

    百度百科定义static关键字 通常情况下,类成员必须通过它的类的对象访问,但是可以创建这样一个成员,它能够被它自己使用,而不必引用特定的实例。在成员的声明前面加上关键字stati…

    Java 2023年6月7日
    065
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球