电商离线数仓(一)

一、数据采集及传输

电商离线数仓(一)

涉及知识点:SpringBoot、Tomcat、埋点、JSON日志、Flume(agent、source、channel、sink、自定义拦截器、多路复用选择器)、Kafka(topic、partition分区、replication副本)

1、创建topic_start和topic_event,用于存放启动日志和事件日志

kafka-topics.sh --zookeeper hadoop10:2181 --topic topic_start \
--create --partitions 1 --replication-factor 1

kafka-topics.sh --zookeeper hadoop10:2181 --topic topic_event \
--create --partitions 1 --replication-factor 1

2、自定义flume拦截器,对数据进行处理(过滤),发布到kafka对应的分区

package com.yh.interceptor;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MallInterceptor implements Interceptor {

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        //event对象中有两个属性,byte[]类型的body-存储采集到的数据
        //Map类型的headers-存储key-value键值对数据,默认里面为空
        //Event: {  headers:{} body: 61 61 61     aaa  }
        byte[] body = event.getBody(); //获取event对象中body的属性值
        String str = new String(body, Charset.forName("UTF-8")); //字节数组转换为字符串
        if(StringUtils.isBlank(str)){ //如果字符串不包含(null,空串),isBlank返回true
            return null; //有问题的话不返回event,返回null
        }
        boolean flag = false;
        Map<string, string> map = event.getHeaders(); //&#x83B7;&#x53D6;event&#x5BF9;&#x8C61;&#x4E2D;header&#x7684;&#x5C5E;&#x6027;&#x503C;
        if(str.contains("\"en\":\"start\"")){
            map.put("topic","topic_start");
            flag=ETLUtilss.vaildStart(str);
        } else {
            map.put("topic","topic_event");
            flag=ETLUtilss.vaildEvent(str);
        }
        if(!flag){
            return null;
        }
        return event;
    }

    @Override
    public List<event> intercept(List<event> list) {
        List list2 = new ArrayList(); //&#x7528;&#x4E8E;&#x5B58;&#x50A8;&#x975E;&#x7A7A;&#x7684;event
        for (Event event : list) {
            Event event1 = intercept(event);
            if(event1 != null){
                list2.add(event1);
            }
        }
        return null;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new MallInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

package com.yh;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.List;

public class MallUDTF extends GenericUDTF {

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //1&#x3001;&#x521B;&#x5EFA;list&#x96C6;&#x5408;&#xFF0C;&#x5B58;&#x50A8;&#x70B8;&#x88C2;&#x540E;&#x5F62;&#x6210;&#x7684;&#x5217;&#x540D;
        List<string> fieldNames = new ArrayList();
        fieldNames.add("event_name");
        fieldNames.add("event_json");

        //&#x521B;&#x5EFA;list&#x96C6;&#x5408;&#xFF0C;&#x5B58;&#x50A8;&#x70B8;&#x88C2;&#x540E;&#x5F62;&#x6210;&#x5217;&#x7684;&#x6570;&#x636E;&#x7C7B;&#x578B;
        List<objectinspector> fieldOIs = new ArrayList<>();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    List list = new ArrayList();
    @Override
    public void process(Object[] args) throws HiveException {
        // "et": [ {},{} ]      malludtf(malludf(line,"et"))
        String s = args[0].toString();
        JSONArray jsonArray = new JSONArray(s);
        for(int i=0; i<jsonarray.length(); i++){ list.clear(); string[] ret="new" string[2]; jsonobject string en="jsonObject.getString("en");" json="jsonObject.toString();" list.add(en); list.add(json); forward(list); system.out.println(list); } @override public void close() throws hiveexception { static main(string[] args) malludtf malludtf(); s="[{\" ett\":\"1650336734810\",\"en\":\"loading\",\"kv\":{\"extend2\":\"\",\"loading_time\":\"0\",\"action\":\"3\",\"extend1\":\"\",\"type\":\"3\",\"type1\":\"\",\"loading_way\":\"1\"}},\n" + " {\"ett\":\"1650375109091\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"3\"}},\n" {\"ett\":\"1650310705805\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":4,\"addtime\":\"1650328750056\",\"praise_count\":796,\"other_id\":3,\"comment_id\":6,\"reply_count\":52,\"userid\":1,\"content\":\"姓谭帅末操料液故\"}},\n" {\"ett\":\"1650340480983\",\"en\":\"favorites\",\"kv\":{\"course_id\":6,\"id\":0,\"add_time\":\"1650340150151\",\"userid\":5}}]"; malludtf.process(new object[]{s}); < code></jsonarray.length();></objectinspector></string>

3、永久函数jar包需上传至hdfs(临时的在本地),创建函数与java class关联

--&#x521B;&#x5EFA;&#x6C38;&#x4E45;&#x51FD;&#x6570;&#x4E0E;&#x5F00;&#x53D1;&#x597D;&#x7684;java class&#x5173;&#x8054;&#xFF0C;&#x5982;&#x679C;&#x5931;&#x8D25;&#xFF0C;&#x9700;&#x8981;drop function xx&#xFF0C;&#x5220;&#x9664;jar&#x5305;&#xFF0C;&#x91CD;&#x65B0;&#x64CD;&#x4F5C;
create function base_analizer as 'com.yh.MallUDF' using jar
'hdfs://hadoop10:9000/user/hive/jars/mall-udtf-1.0-SNAPSHOT.jar';

create function flat_analizer as 'com.yh.MallUDTF' using jar
'hdfs://hadoop10:9000/user/hive/jars/mall-udtf-1.0-SNAPSHOT.jar';

4、利用自定义函数向dwd_base_event_log插入数据

insert into table dwd_base_event_log partition(dt='2022-04-01')
select
    base_analizer(line,"mid") as mid_id,
    base_analizer(line,"uid") as user_id,
    base_analizer(line,'vc') as version_code,
    base_analizer(line,'vn') as version_name,
    base_analizer(line,'l') as lang,
    base_analizer(line,'sr') as source,
    base_analizer(line,'os') as os,
    base_analizer(line,'ar') as area,
    base_analizer(line,'md') as model,
    base_analizer(line,'ba') as brand,
    base_analizer(line,'sv') as sdk_version,
    base_analizer(line,'g') as gmail,
    base_analizer(line,'hw') as height_width,
    base_analizer(line,'t') as app_time,
    base_analizer(line,'nw') as network,
    base_analizer(line,'ln') as lng,
    base_analizer(line,'la') as lat,
    event_name,
    event_json,
    base_analizer(line,'st') as server_time
from ods_event_log
lateral view flat_analizer(base_analizer(line,'et')) tt
as event_name,event_json where dt='2022-04-01'
and base_analizer(line,'et')<>'';

5、修改hive-site.xml文件,将hive底层引擎由mr更换为spark

<property>
    <name>spark.yarn.jars</name>
    <value>hdfs://hadoop10:9000/spark-jars/*</value>
</property>

<property>
    <name>hive.execution.engine</name>
    <value>spark</value>
</property>

6、根据dwd_base_event_log,创建dwd层商品点击表、点赞表、收藏表等

--dwd&#x5C42;&#x7684;&#x5546;&#x54C1;&#x70B9;&#x51FB;&#x8868;
CREATE EXTERNAL TABLE dwd_display_log(
mid_id string,
user_id string,
version_code string,
version_name string,
lang string,
source string,
os string,
area string,
model string,
brand string,
sdk_version string,
gmail string,
height_width string,
app_time string,
network string,
lng string,
lat string,
action string,
goodsid string,
place string,
extend1 string,
category string,
server_time string
)PARTITIONED BY (dt string);

insert overwrite table dwd_display_log partition(dt='2022-04-01')
select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action'),
    get_json_object(event_json,'$.kv.goodsid'),
    get_json_object(event_json,'$.kv.place'),
    get_json_object(event_json,'$.kv.extend1'),
    get_json_object(event_json,'$.kv.category'),
    server_time
from dwd_base_event_log where dt='2022-04-01'
and event_name='display'

--dwd&#x5C42;&#x7684;&#x70B9;&#x8D5E;&#x8868;
CREATE EXTERNAL TABLE dwd_praise_log(
mid_id string,
user_id string,
version_code string,
version_name string,
lang string,
source string,
os string,
area string,
model string,
brand string,
sdk_version string,
gmail string,
height_width string,
app_time string,
network string,
lng string,
lat string,
id string,
userid string,
target_id string,
type string,
add_time string,
server_time string
)PARTITIONED BY (dt string);

insert overwrite table dwd_praise_log PARTITION (dt='2022-04-01')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time from dwd_base_event_log
where dt='2022-04-01' and event_name='praise';

--dwd&#x5C42;&#x7684;&#x6536;&#x85CF;&#x8868;
CREATE EXTERNAL TABLE dwd_favorites_log(
mid_id string,
user_id string,
version_code string,
version_name string,
lang string,
source string,
os string,
area string,
model string,
brand string,
sdk_version string,
gmail string,
height_width string,
app_time string,
network string,
lng string,
lat string,
id int,
course_id int,
userid int,
add_time string,
server_time string
)PARTITIONED BY (dt string);

insert overwrite table dwd_favorites_log PARTITION (dt='2022-04-01')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time from dwd_base_event_log
where dt='2022-04-01' and event_name='favorites';

select  * from dwd_praise_log where dt='2022-04-01'

Original: https://blog.csdn.net/weixin_45887858/article/details/124535166
Author: 宋荣子_
Title: 电商离线数仓(一)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/818346/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球