Flume+Kafka+Storm+Hbase+HDSF+Poi整合

需求:

[En]

Demand:

对于一个网站,我们需要根据用户的行为记录日志信息,并分析对我们有用的数据。

[En]

For a website, we need to record log information according to the user’s behavior and analyze the data that is useful to us.

例如:本站www.hongten.com(当然这是一个虚拟电子商务站点),用户在本站可以有很多行为,如注册、登录、查看、点击、双击、买东西、添加购物车、添加记录、修改记录、删除记录、评论、注销等一系列熟悉的操作。这些操作记录在日志信息中。我们需要分析日志信息。

[En]

For example: this site www.hongten.com (of course, this is a virtual e-commerce site), users in this site can have a lot of behavior, such as registration, login, view, click, double-click, buy things, add shopping cart, add records, modify records, delete records, comments, logout and so on a series of familiar operations. These actions are recorded in the log information. We need to analyze the log information.

在本文中,我们分析了购买物品和添加购物车两种行为。然后生成相应的报告,这样我们就可以用报告来看到用户什么时候喜欢买东西,什么时候喜欢加入购物车,这样他们就可以在适当的时候采取行动,鼓励用户购买东西。将产品推荐给用户添加购物车(添加购物车,这属于潜在买家)。

[En]

In this article, we analyze the two behaviors of buying things and adding shopping carts. Then generate the corresponding report, so that we can use the report to see when users like to buy things and when they like to join the shopping cart, so that they can take action at the appropriate time to encourage users to buy things. recommend the product to the user to add the shopping cart (add the shopping cart, this belongs to the potential buyer).

毕竟,网站的盈利才是我们希望实现的,对吧?

[En]

After all, the profit of the website is what we hope to achieve, right?

1.抽象用户行为

// 用户的action
    public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

2.日志格式定义

115.19.62.102    海南    2018-12-20    1545286960749    1735787074662918890    www.hongten.com    Edit
27.177.45.84    新疆    2018-12-20    1545286962255    6667636903937987930    www.hongten.com    Delete
176.54.120.96    宁夏    2018-12-20    1545286962256    6988408478348165495    www.hongten.com    Comment
175.117.33.187    辽宁    2018-12-20    1545286962257    8411202446705338969    www.hongten.com    Shopping_Car
17.67.62.213    天津    2018-12-20    1545286962258    7787584752786413943    www.hongten.com    Add
137.81.41.9    海南    2018-12-20    1545286962259    6218367085234099455    www.hongten.com    Shopping_Car
125.187.107.57    山东    2018-12-20    1545286962260    3358658811146151155    www.hongten.com    Double_Click
104.167.205.87    内蒙    2018-12-20    1545286962261    2303468282544965471    www.hongten.com    Shopping_Car
64.106.149.83    河南    2018-12-20    1545286962262    8422202443986582525    www.hongten.com    Delete
138.22.156.183    浙江    2018-12-20    1545286962263    7649154147863130337    www.hongten.com    Shopping_Car
41.216.103.31    河北    2018-12-20    1545286962264    6785302169446728008    www.hongten.com    Shopping_Car
132.144.93.20    广东    2018-12-20    1545286962265    6444575166009004406    www.hongten.com    Add

日志格式:

[En]

Log format:

//log fromat
String log = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;

3.系统架构

4.报表样式

因为我使用的是随机生成的数据,所以我们看到的所有结果都是线性增长的。

[En]

Because I’m using randomly generated data, all the results we see are growing linearly.

我在这里只实现一小时的报告,当然也可以做一天、一个季度、一整年、三年、五年的报告,根据实际需要也可以实现。

[En]

Here I only achieve an hour’s report, of course, can also do a day, a quarter, the whole year, three years, five years of the report, according to the actual needs can be achieved.

5.组件分布情况

我总共构建了4个节点node1、node2、node3、node4(注意:所有4个节点都必须有JDK)

[En]

I have built a total of 4 nodes node1,node2,node3,node4 (Note: all 4 nodes must have JDK)

Zookeeper安装在node1,node2,nod3

Hadoop集群在node1,node2,nod3,node4

Hbase集群在node1,node2,nod3,node4

Flume安装在node2

Kafka安装在node1,node2,node3

Storm安装在node1,node2,node3

6.具体实现

6.1.配置Flume

--从node2
cd flumedir

vi flume2kafka

--node2配置如下
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = all_my_log
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

:wq

6.2.启动Zookeeper

--关闭防火墙node1,node2,node3,node4
service iptables stop

--启动Zookeeper,在node1,node2,node3
zkServer.sh start

6.3.启动Kafka

--启动kafka
--分别进入node1,node2,node3
cd /root/kafka/kafka_2.10-0.8.2.2
./start-kafka.sh

6.4.启动Flume服务

--进入node2,启动
cd /root/flumedir
flume-ng agent -n a1 -c conf -f flume2kafka -Dflume.root.logger=DEBUG,console

6.5.产生日志信息并写入到Flume

运行Java代码以生成日志信息并将其写入Flume服务器

[En]

Run the java code to generate log information and write it to the Flume server

package com.b510.big.data.flume.client;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

/**
 * @author Hongten
 *
 *         功能: 模拟产生用户日志信息,并且向Flume发送数据
 */
public class FlumeClient {

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new GenerateDataAndSend2Flume());

        exec.shutdown();
    }

}

class GenerateDataAndSend2Flume implements Runnable {

    FlumeRPCClient flumeRPCClient;
    static Random random = new Random();

    GenerateDataAndSend2Flume() {
        // 初始化RPC客户端
        flumeRPCClient = new FlumeRPCClient();
        flumeRPCClient.init(Common.FLUME_HOST_NAME, Common.FLUME_PORT);
    }

    @Override
    public void run() {
        while (true) {
            Date date = new Date();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMM);
            String d = simpleDateFormat.format(date);
            Long timestamp = new Date().getTime();
            // ip地址生成
            String ip = random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER);
            // ip地址对应的address(这里是为了构造数据,并没有按照真实的ip地址,找到对应的address)
            String address = Common.ADDRESS[random.nextInt(Common.ADDRESS.length)];

            Long userid = Math.abs(random.nextLong());
            String action = Common.USER_ACTION[random.nextInt(Common.USER_ACTION.length)];
            // 日志信息构造
            // example : 199.80.45.117 云南 2018-12-20 1545285957720 3086250439781555145 www.hongten.com Buy
            String data = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;
            //System.out.println(data);

            // 往Flume发送数据
            flumeRPCClient.sendData2Flume(data);

            try {
                TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                flumeRPCClient.cleanUp();
                System.out.println("interrupted exception : " + e);
            }
        }
    }
}

class FlumeRPCClient {

    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = getRpcClient(hostname, port);
    }

    public void sendData2Flume(String data) {
        Event event = EventBuilder.withBody(data, Charset.forName(Common.CHAR_FORMAT));

        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            cleanUp();
            client = null;
            client = getRpcClient(hostname, port);
        }
    }

    public RpcClient getRpcClient(String hostname, int port) {
        return RpcClientFactory.getDefaultInstance(hostname, port);
    }

    public void cleanUp() {
        // Close the RPC connection
        client.close();
    }
}

// 所有的常量定义
class Common {
    public static final String CHAR_FORMAT = "UTF-8";

    public static final String DATE_FORMAT_YYYYDDMM = "yyyy-MM-dd";

    // this is a test web site
    public static final String WEB_SITE = "www.hongten.com";

    // 用户的action
    public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

    public static final int MAX_IP_NUMBER = 224;
    // ip所对应的地址
    public static String[] ADDRESS = { "北京", "天津", "上海", "广东", "重庆", "河北", "山东", "河南", "云南", "山西", "甘肃", "安徽", "福建", "黑龙江", "海南", "四川", "贵州", "宁夏", "新疆", "湖北", "湖南", "山西", "辽宁", "吉林", "江苏", "浙江", "青海", "江西", "西藏", "内蒙", "广西", "香港", "澳门", "台湾", };

    // Flume conf
    public static final String FLUME_HOST_NAME = "node2";
    public static final int FLUME_PORT = 41414;

}

6.6.监听Kafka

--进入node3,启动kafka消费者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic all_my_log

运行效果:

[En]

Running effect:

168.208.193.207    安徽    2018-12-20    1545287646527    5462770148222682599    www.hongten.com    Login
103.143.79.127    新疆    2018-12-20    1545287646529    3389475301916412717    www.hongten.com    Login
111.208.80.39    山东    2018-12-20    1545287646531    535601622597096753    www.hongten.com    Shopping_Car
105.30.86.46    四川    2018-12-20    1545287646532    7825340079790811845    www.hongten.com    Login
205.55.33.74    新疆    2018-12-20    1545287646533    4228838365367235561    www.hongten.com    Logout
34.44.60.134    安徽    2018-12-20    1545287646536    702584874247456732    www.hongten.com    Double_Click
154.169.15.145    广东    2018-12-20    1545287646537    1683351753576425036    www.hongten.com    View
126.28.192.28    湖南    2018-12-20    1545287646538    8319814684518483148    www.hongten.com    Edit
5.140.156.73    台湾    2018-12-20    1545287646539    7432409906375230025    www.hongten.com    Logout
72.175.210.95    西藏    2018-12-20    1545287646540    5233707593244910849    www.hongten.com    View
121.25.190.25    广西    2018-12-20    1545287646541    268200251881841673    www.hongten.com    Buy

6.7.在Kafka创建Topic

--进入node1,创建一个topic:filtered_log
--设置3个partitions
--replication-factor=3
./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic filtered_log --partitions 3 --replication-factor 3

6.8.Storm清洗数据

  • Storm从Kafka消费数据
  • Storm对数据进行筛选(Buy-已经购买,Shopping_Car-潜在购买)
  • Storm把筛选的数据放入到Kafka
package com.b510.big.data.storm.process;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LogFilterTopology {

    public static void main(String[] args) {

        ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
        //Spout从'filtered_log' topic里面获取数据
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.ALL_MY_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
        List zkServers = new ArrayList<>();
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // 创建KafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        // Storm从Kafka消费数据
        builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
        // Storm对数据进行筛选(Buy-已经购买,Shopping_Car-潜在购买)
        builder.setBolt(Common.FILTER_BOLT, new FilterBolt(), 8).shuffleGrouping(Common.KAFKA_SPOUT);

        // 创建KafkaBolt
        @SuppressWarnings({ "unchecked", "rawtypes" })
        KafkaBolt kafkaBolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector(Common.FILTERED_LOG_TOPIC)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

        // Storm把筛选的数据放入到Kafka
        builder.setBolt(Common.KAFKA_BOLT, kafkaBolt, 2).shuffleGrouping(Common.FILTER_BOLT);

        Properties props = new Properties();
        props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
        props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
        props.put("serializer.class", Common.STORM_SERILIZER_CLASS);

        Config conf = new Config();
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);

        if (args == null || args.length == 0) {
            // 本地方式运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
        } else {
            // 集群方式运行
            conf.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException e) {
                System.out.println("error : " + e);
            }
        }
    }
}

class FilterBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String logStr = input.getString(0);
        // 只针对我们感兴趣的关键字进行过滤
        // 这里我们过滤包含'Buy', 'Shopping_Car'的日志信息
        if (logStr.contains(Common.KEY_WORD_BUY) || logStr.contains(Common.KEY_WORD_SHOPPING_CAR)) {
            collector.emit(new Values(logStr));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
    }
}

class Common {
    public static final String ALL_MY_LOG_TOPIC = "all_my_log";
    public static final String FILTERED_LOG_TOPIC = "filtered_log";

    public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
    public static final String DATE_FORMAT_HHMMSS = "HHmmss";
    public static final String DATE_FORMAT_HHMMSS_DEFAULT_VALUE = "000001";

    public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
    public static final int ZOOKEEPER_PORT = 2181;
    public static final String ZOOKEEPER_QUORUM = "node1:" + ZOOKEEPER_PORT + ",node2:" + ZOOKEEPER_PORT + ",node3:" + ZOOKEEPER_PORT + "";
    public static final String ZOOKEEPER_ROOT = "/MyKafka";
    public static final String ZOOKEEPER_ID = "MyTrack";

    public static final String KAFKA_SPOUT = "kafkaSpout";
    public static final String FILTER_BOLT = "filterBolt";
    public static final String PROCESS_BOLT = "processBolt";
    public static final String HBASE_BOLT = "hbaseBolt";
    public static final String KAFKA_BOLT = "kafkaBolt";

    // Storm Conf
    public static final String STORM_METADATA_BROKER_LIST = "node1:9092,node2:9092,node3:9092";
    public static final String STORM_REQUEST_REQUIRED_ACKS = "1";
    public static final String STORM_SERILIZER_CLASS = "kafka.serializer.StringEncoder";

    // key word
    public static final String KEY_WORD_BUY = "Buy";
    public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";

    //hbase
    public static final String TABLE_USER_ACTION = "t_user_actions";
    public static final String COLUMN_FAMILY = "cf";
    //间隔多少秒写入Hbase一次
    public static final int WRITE_RECORD_TO_TABLE_PER_SECOND = 1;
    public static final int TABLE_MAX_VERSION = (60/WRITE_RECORD_TO_TABLE_PER_SECOND) * 60 * 24;
}

6.9.监听Kafka

--进入node3,启动kafka消费者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic filtered_log

效果:

[En]

Effect:

87.26.135.185    黑龙江    2018-12-20    1545290594658    7290881731606227972    www.hongten.com    Shopping_Car
60.96.96.38    青海    2018-12-20    1545290594687    6935901257286057015    www.hongten.com    Shopping_Car
43.159.110.193    江苏    2018-12-20    1545290594727    7096698224110515553    www.hongten.com    Shopping_Car
21.103.139.11    山西    2018-12-20    1545290594693    7805867078876194442    www.hongten.com    Shopping_Car
139.51.213.184    广东    2018-12-20    1545290594729    8048796865619113514    www.hongten.com    Buy
58.213.148.89    河北    2018-12-20    1545290594708    5176551342435592748    www.hongten.com    Buy
36.205.221.116    湖南    2018-12-20    1545290594715    4484717918039766421    www.hongten.com    Shopping_Car
135.194.103.53    北京    2018-12-20    1545290594769    4833011508087432349    www.hongten.com    Shopping_Car
180.21.100.66    贵州    2018-12-20    1545290594752    5270357330431599426    www.hongten.com    Buy
167.71.65.70    山西    2018-12-20    1545290594790    275898530145861990    www.hongten.com    Buy
125.51.21.199    宁夏    2018-12-20    1545290594814    3613499600574777198    www.hongten.com    Buy

6.10.Storm再次消费Kafka数据处理后保存数据到Hbase

  • Storm再次从Kafka消费数据
  • Storm对数据进行统计(Buy-已经购买人数,Shopping_Car-潜在购买人数)
  • Storm将数据写入到Hbase
package com.b510.big.data.storm.process;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LogProcessTopology {

    public static void main(String[] args) {

        ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
        //Spout从'filtered_log' topic里面获取数据
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.FILTERED_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
        List zkServers = new ArrayList<>();
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // 创建KafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        // Storm再次从Kafka消费数据
        builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
        // Storm对数据进行统计(Buy-已经购买人数,Shopping_Car-潜在购买人数)
        builder.setBolt(Common.PROCESS_BOLT, new ProcessBolt(), 3).shuffleGrouping(Common.KAFKA_SPOUT);
        // Storm将数据写入到Hbase
        builder.setBolt(Common.HBASE_BOLT, new HbaseBolt(), 3).shuffleGrouping(Common.PROCESS_BOLT);

        Properties props = new Properties();
        props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
        props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
        props.put("serializer.class", Common.STORM_SERILIZER_CLASS);

        Config conf = new Config();
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);

        if (args == null || args.length == 0) {
            // 本地方式运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
        } else {
            // 集群方式运行
            conf.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException e) {
                System.out.println("error : " + e);
            }
        }

    }
}

class ProcessBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String logStr = input.getString(0);
        if (logStr != null) {
            String infos[] = logStr.split("\\t");
            //180.21.100.66    贵州    2018-12-20    1545290594752    5270357330431599426    www.hongten.com    Buy
            collector.emit(new Values(infos[2], infos[6]));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date", "user_action"));
    }
}

class HbaseBolt implements IBasicBolt {
    private static final long serialVersionUID = 1L;

    HBaseDAO hBaseDAO = null;

    SimpleDateFormat simpleDateFormat = null;
    SimpleDateFormat simpleDateFormatHHMMSS = null;

    int userBuyCount = 0;
    int userShoopingCarCount = 0;

    //这里要考虑避免频繁写入数据到hbase
    int writeToHbaseMaxNum = Common.WRITE_RECORD_TO_TABLE_PER_SECOND * 1000;
    long begin = System.currentTimeMillis();
    long end = 0;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map map, TopologyContext context) {
        hBaseDAO = new HBaseDAOImpl();
        simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMMHHMMSS);
        simpleDateFormatHHMMSS = new SimpleDateFormat(Common.DATE_FORMAT_HHMMSS);
        hBaseDAO.createTable(Common.TABLE_USER_ACTION, new String[]{Common.COLUMN_FAMILY}, Common.TABLE_MAX_VERSION);
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        // 如果时间是第二天的凌晨1s
        // 需要对count做清零处理
        //不过这里的判断不是很准确,因为在此时,可能前一天的数据还没有处理完
        if (simpleDateFormatHHMMSS.format(new Date()).equals(Common.DATE_FORMAT_HHMMSS_DEFAULT_VALUE)) {
            userBuyCount = 0;
            userShoopingCarCount = 0;
        }

        if (input != null) {
            // base one ProcessBolt.declareOutputFields()
            String date = input.getString(0);
            String userAction = input.getString(1);

            if (userAction.equals(Common.KEY_WORD_BUY)) {
                //同一个user在一天之内可以重复'Buy'动作
                userBuyCount++;
            }

            if (userAction.equals(Common.KEY_WORD_SHOPPING_CAR)) {
                userShoopingCarCount++;
            }

            end = System.currentTimeMillis();
            if ((end - begin) > writeToHbaseMaxNum) {
                System.out.println("hbase_key: " + Common.KEY_WORD_BUY + "_" + date + " , userBuyCount: " + userBuyCount + ", userShoopingCarCount :" + userShoopingCarCount);

                //往hbase中写入数据
                String quailifer = simpleDateFormat.format(new Date());
                hBaseDAO.insert(Common.TABLE_USER_ACTION ,
                        Common.KEY_WORD_BUY + "_" + date,
                        Common.COLUMN_FAMILY,
                        new String[] { quailifer },
                        new String[] { "{user_buy_count:" + userBuyCount + "}" }
                        );
                hBaseDAO.insert(Common.TABLE_USER_ACTION ,
                        Common.KEY_WORD_SHOPPING_CAR + "_" + date,
                        Common.COLUMN_FAMILY,
                        new String[] { quailifer },
                        new String[] { "{user_shopping_car_count:" + userShoopingCarCount + "}" }
                        );
                begin = System.currentTimeMillis();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public void cleanup() {

    }
}

interface HBaseDAO {
    public void createTable(String tableName, String[] columnFamilys, int maxVersion);
    public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
}

class HBaseDAOImpl implements HBaseDAO {

    HConnection hConnection = null;
    static Configuration conf = null;

    public HBaseDAOImpl() {
        conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
        try {
            hConnection = HConnectionManager.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void createTable(String tableName, String[] columnFamilys, int maxVersion) {
        try {
            HBaseAdmin admin = new HBaseAdmin(conf);
            if (admin.tableExists(tableName)) {
                System.err.println("table existing in hbase.");
            } else {
                HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
                for (String columnFamily : columnFamilys) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                    hColumnDescriptor.setMaxVersions(maxVersion);
                    tableDesc.addFamily(hColumnDescriptor);
                }

                admin.createTable(tableDesc);
                System.err.println("table is created.");
            }
            admin.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
        HTableInterface table = null;
        try {
            table = hConnection.getTable(tableName);
            Put put = new Put(rowKey.getBytes());
            for (int i = 0; i < quailifer.length; i++) {
                String col = quailifer[i];
                String val = value[i];
                put.add(family.getBytes(), col.getBytes(), val.getBytes());
            }
            table.put(put);
            System.err.println("save record successfuly.");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

Storm处理逻辑:

1.每秒向Hbase写入数据

2.明天凌晨会重置数据

如果我们继续运行上面的程序,那么系统总是会将数据写入HBase,这样我们就可以收集我们生成的报告的数据。

[En]

If we keep running the above program, then the system will always write data to Hbase, so that we can collect the data we generated report.

下面是报告的实现。

[En]

So here’s the report implementation.

6.11.读取Hbase数据通过POI生成Excel Report

  • 读取HBase数据
    [En]

    * read Hbase data

  • 通过POI生成Excel报告
    [En]

    * generate Excel reports through POI

package com.b510.big.data.poi;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HConnection;import org.apache.hadoop.hbase.client.HConnectionManager;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Result;import org.apache.poi.xssf.usermodel.XSSFCell;import org.apache.poi.xssf.usermodel.XSSFSheet;import org.apache.poi.xssf.usermodel.XSSFWorkbook;public class ReportUtil {    public static void main(String[] args) throws Exception {        String year = "2018";        String month = "12";        String day = "21";        String hour = "14";        generateReport(year, month, day, hour);    }    private static void generateReport(String year, String month, String day, String hour) {        HBaseDAO hBaseDAO = new HBaseDAOImpl();        // format: yyyyMMddHH        String begin = year + month + day + hour;        String[] split = generateQuailifers(begin);        List userBuyCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_BUY);        List userShoppingCarCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_SHOPPING_CAR);        //System.err.println(userBuyCountList.size());        //System.err.println(userShoppingCarCountList.size());        writeExcel(year, month, day, hour, userBuyCountList, userShoppingCarCountList);    }    private static void writeExcel(String year, String month, String day, String hour, List userBuyCountList, List userShoppingCarCountList) {        try {            File file = new File(Common.REPORT_TEMPLATE);            InputStream in = new FileInputStream(file);            XSSFWorkbook wb = new XSSFWorkbook(in);            XSSFSheet sheet = wb.getSheetAt(0);            if (sheet != null) {                XSSFCell cell = null;                cell = sheet.getRow(0).getCell(0);                cell.setCellValue("One Hour Report-" + year + "-" + month + "-" + day + " From " + hour + ":00 To " + hour + ":59");                putData(userBuyCountList, sheet, 3);                putData(userShoppingCarCountList, sheet, 7);                FileOutputStream out = new FileOutputStream(Common.REPORT_ONE_HOUR);                wb.write(out);                out.close();                System.err.println("done.");            }        } catch (Exception e) {            System.err.println("Exception" + e);        }    }    private static void putData(List userBuyCountList, XSSFSheet sheet, int rowNum) {        XSSFCell cell;        if (userBuyCountList != null && userBuyCountList.size() > 0) {            for (int i = 0; i < userBuyCountList.size(); i++) {                cell = sheet.getRow(rowNum).getCell(i + 1);                cell.setCellValue(userBuyCountList.get(i));            }        }    }    private static List getData(HBaseDAO hBaseDAO, String year, String month, String day, String[] split, String preKey) {        List list = new ArrayList();        Result rs = hBaseDAO.getOneRowAndMultiColumn(Common.TABLE_USER_ACTION, preKey + "_" + year + "-" + month + "-" + day, split);        for (Cell cell : rs.rawCells()) {            String value = new String(CellUtil.cloneValue(cell)).split(":")[1].trim();            value = value.substring(0, value.length() - 1);            list.add(Integer.valueOf(value));        }        return list;    }    private static String[] generateQuailifers(String begin) {        StringBuilder sb = new StringBuilder();        for (int i = 0; i < 60;) {            if (i == 0 || i == 5) {                sb.append(begin).append("0").append(i).append("00").append(",");            } else {                sb.append(begin).append(i).append("00").append(",");            }            i = i + 5;        }        sb.append(begin).append("5959");        String sbStr = sb.toString();        String[] split = sbStr.split(",");        return split;    }}interface HBaseDAO {    Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols);}class HBaseDAOImpl implements HBaseDAO {    HConnection hConnection = null;    static Configuration conf = null;    public HBaseDAOImpl() {        conf = new Configuration();        conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);        try {            hConnection = HConnectionManager.createConnection(conf);        } catch (IOException e) {            e.printStackTrace();        }    }    @Override    public Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols) {        HTableInterface table = null;        Result rsResult = null;        try {            table = hConnection.getTable(tableName);            Get get = new Get(rowKey.getBytes());            for (int i = 0; i < cols.length; i++) {                get.addColumn(Common.COLUMN_FAMILY.getBytes(), cols[i].getBytes());            }            rsResult = table.get(get);        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                table.close();            } catch (IOException e) {                e.printStackTrace();            }        }        return rsResult;    }}class Common {    // report    public static final String REPORT_TEMPLATE = "./resources/report.xlsx";    public static final String REPORT_ONE_HOUR = "./resources/one_report.xlsx";    public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";    public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";    // key word    public static final String KEY_WORD_BUY = "Buy";    public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";    // hbase    public static final String TABLE_USER_ACTION = "t_user_actions";    public static final String COLUMN_FAMILY = "cf";}

7.源码下载

Source Code:Flume_Kafka_Storm_Hbase_Hdfs_Poi_src.zip

相应的Jar文件,由于so big,自己根据import *信息加入。

8.总结

学习大数据一段时间,通过自己的学习和探索,达到自己想要的应用,还是有很大的获得感。当然,踩地雷也是一种很好的体验。:)

[En]

Learning Big Data for a period of time, through their own learning and exploration, to achieve the application they want, or a great sense of achievement. Of course, stepping on mines is also a good experience.:)

========================================================

More reading,and english is important.

I’m Hongten

大哥大姐,我觉得奖励你们是很有用的。你们的支持是我最大的动力。谢谢。红腾博客排名不到100位。有成千上万的粉丝。鸿腾产品必须是高品质的产品。<details><summary>[En]</summary>Big brother and sister, I think it's useful to reward you. Your support is my biggest motivation. Thank you. Hongten blogs rank less than 100th. There are thousands of fans. Hongten products must be high-quality products.</details>

E | hongtenzone@foxmail.com B |http://www.cnblogs.com/hongten

========================================================

Original: https://www.cnblogs.com/hongten/p/hongten_flume_kafka_storm_hbase_hdfs_poi.html
Author:
Title:

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6897/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部