使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

使用Java给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

背景

集群从CDH5.16升级到CDP7.1后,笔者用的阿里云DataPhin中台也升级了版本,之前的UDF不是很好用。某些UDF主要是让肤浅的SQL Boy们看不到Hive表某些机密字段的真实信息,防止出现机密信息泄露。笔者编写UDF函数,实现编码、解码、加密、解密,经测试在USDP集群的Apache Hive中可行。USDP的稳定性比Aliyun的中台貌似还好点。。。

原理

UDF

Hive的exec包中有UDF类,继承后用Java重写具体实现,并将编译好的Jar包放置在Hive路径,加载注册后即可像普通函数那样使用。

UDF简易案例

import org.apache.hadoop.hive.ql.exec.UDF;

import java.nio.charset.StandardCharsets;

public class base64code1 extends UDF {
    public String evaluate(String input){
        return java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
    }
}

这样即可实现最简易的UDF。

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

但是很明显,这个方法在3.1.2的Hive已经过时了。按照注释应该继承那些继承了该UDF类的类才能不报过时。

继承了该UDF类的类:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

随便点开一个:

package org.apache.hadoop.hive.ql.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

@Description(name = "ascii",
    value = "_FUNC_(str) - returns the numeric value of the first character"
    + " of str",
    extended = "Returns 0 if str is empty or NULL if str is NULL\n"
    + "Example:\n"
    + "  > SELECT _FUNC_('222') FROM src LIMIT 1;"
    + "  50\n"
    + "  > SELECT _FUNC_(2) FROM src LIMIT 1;\n" + "  50")
public class UDFAscii extends UDF {
  private final IntWritable result = new IntWritable();

  public IntWritable evaluate(Text s) {
    if (s == null) {
      return null;
    }

    if (s.getLength() > 0) {
      result.set(s.getBytes()[0]);
    } else {
      result.set(0);
    }

    return result;
  }
}

其实也没啥特别的,这些自带的继承了UDF的类只是已经写好了evaluate方法,再次继承只需要重写该方法即可。好多个继承了UDF类的类包种都可以看到重写了evaluate方法,显然该方法很重要。

UDF源码

首先查看该过时方法:

package org.apache.hadoop.hive.ql.exec;

import org.apache.hadoop.hive.ql.udf.UDFType;

@Deprecated
@UDFType(deterministic = true)
public class UDF {

  private UDFMethodResolver rslv;

  public UDF() {
    rslv = new DefaultUDFMethodResolver(this.getClass());
  }

  protected UDF(UDFMethodResolver rslv) {
    this.rslv = rslv;
  }

  public void setResolver(UDFMethodResolver rslv) {
    this.rslv = rslv;
  }

  public UDFMethodResolver getResolver() {
    return rslv;
  }

  public String[] getRequiredJars() {
    return null;
  }

  public String[] getRequiredFiles() {
    return null;
  }
}

发现一个使用频繁的类:

package org.apache.hadoop.hive.ql.exec;

import java.lang.reflect.Method;
import java.util.List;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

@Deprecated
public interface UDFMethodResolver {

  Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException;
}

显然这是个接口,继续查看具体的实现类:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

主要是这3种:

显然正常情况应该是使用:


package org.apache.hadoop.hive.ql.exec;

import java.lang.reflect.Method;
import java.util.List;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class DefaultUDFMethodResolver implements UDFMethodResolver {

  private final Class<? extends UDF> udfClass;

  public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
    this.udfClass = udfClass;
  }

  @Override
  public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
    return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false,
        argClasses);
  }
}

这个工具类的方法:

package org.apache.hadoop.hive.ql.exec;

public final class FunctionRegistry {

  public static <T> Method getMethodInternal(Class<? extends T> udfClass,
      String methodName, boolean exact, List<TypeInfo> argumentClasses)
      throws UDFArgumentException {

    List<Method> mlist = new ArrayList<Method>();

    for (Method m : udfClass.getMethods()) {
      if (m.getName().equals(methodName)) {
        mlist.add(m);
      }
    }

    return getMethodInternal(udfClass, mlist, exact, argumentClasses);
  }

}

显然底层是通过org.apache.hadoop.hive.ql.exec包的FunctionRegistry工具类的getMethodInternal这个静态方法,反射加载了所有继承了org.apache.hadoop.hive.ql.exec.UDF类并且方法名称为evaluate的所有方法。所以事实上可以进行类的重载,但是UDF函数的稳定性一直欠佳,笔者不喜欢这么做。到这一步就解释了为何继承UDF类后方法名称必须是evaluate。

而udfClass.getMethods()方法:

package java.lang;

public final class Class<T> implements java.io.Serializable,
                              GenericDeclaration,
                              Type,
                              AnnotatedElement {

    @CallerSensitive
    public Method[] getMethods() throws SecurityException {
        checkMemberAccess(Member.PUBLIC, Reflection.getCallerClass(), true);
        return copyMethods(privateGetPublicMethods());
    }
                              }

这个反射方法从JDK1.0时代就有了。而且反射还会抛异常:

package java.lang;

public class SecurityException extends RuntimeException {

}

底层会抛出运行时异常。

GenericUDF初探

既然直接继承UDF类是过时的做法,那么根据注释可以继承GenericUDF类包,虽然不推荐使用,但目前流行使用这种复杂的类包。

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

public class base64code2 extends GenericUDF {
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        return null;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        return null;
    }

    @Override
    public String getDisplayString(String[] children) {
        return null;
    }
}

显然继承了GenericUDF就必须重写3个方法,并且导入4个类。根据类名称,initialize应该是初始化,evaluate是具体实现的算法,getDisplayString应该是类似打印日志之类的操作。

org.apache.hadoop.hive.serde2.objectinspector包的ObjectInspector类显然有必要看看。


package org.apache.hadoop.hive.serde2.objectinspector;

import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
import org.apache.hadoop.hive.common.classification.InterfaceStability.Stable;

@Public
@Stable
public interface ObjectInspector extends Cloneable {
    String getTypeName();

    ObjectInspector.Category getCategory();

    public static enum Category {
        PRIMITIVE,
        LIST,
        MAP,
        STRUCT,
        UNION;

        private Category() {
        }
    }
}

显然这是个接口,里边还定义了一个枚举体Category。

事实上,该包还有很多类:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

名称差不多的类:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

例如:ObjectInspectorConverters内部是转换类型的方法。ObjectInspectorFactory类与ObjectInspectorUtils类是工具类,内部有很多静态方法。

GenericUDF源码

package org.apache.hadoop.hive.ql.udf.generic;

@InterfaceAudience.Public
@InterfaceStability.Stable
@UDFType(deterministic = true)
public abstract class GenericUDF implements Closeable {
}

显然这是个抽象类,虽然是共有的但是并不能直接使用。

这个抽象类有很多继承类:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

至于为神马找到这2个类,其实笔者是根据Hive的Error Log堆栈找到的。。。。。。

这2个同名类的区别是:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

Hive版本不同,CDH5.16都要淘汰了,2.3.7的老版本已经没有继续看的必要。

package org.apache.hadoop.hive.ql.udf.generic;

public class GenericUDFBridge extends GenericUDF implements Serializable {
  private static final long serialVersionUID = 4994861742809511113L;

  private String udfName;

  private boolean isOperator;

  private String udfClassName;

  private transient Method udfMethod;

  private transient ConversionHelper conversionHelper;

  private transient UDF udf;

  private transient Object[] realArguments;

  private transient UdfWhitelistChecker udfChecker;

  public GenericUDFBridge(String udfName, boolean isOperator,
      String udfClassName) {
    this.udfName = udfName;
    this.isOperator = isOperator;
    this.udfClassName = udfClassName;
  }

  public GenericUDFBridge() {
  }

  public void setUdfName(String udfName) {
    this.udfName = udfName;
  }

  @Override
  public String getUdfName() {
    return udfName;
  }

  public String getUdfClassName() {
    return udfClassName;
  }

  public void setUdfClassName(String udfClassName) {
    this.udfClassName = udfClassName;
  }

  public boolean isOperator() {
    return isOperator;
  }

  public void setOperator(boolean isOperator) {
    this.isOperator = isOperator;
  }

  public Class<? extends UDF> getUdfClass() {
    try {
      return getUdfClassInternal();
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e);
    }
  }

  private Class<? extends UDF> getUdfClassInternal()
      throws ClassNotFoundException {
    @SuppressWarnings("unchecked")
    Class<? extends UDF> clazz = (Class<? extends UDF>) Class.forName(
        udfClassName, true, Utilities.getSessionSpecifiedClassLoader());
    if (udfChecker != null && !udfChecker.isUdfAllowed(clazz)) {
      throw new SecurityException("UDF " + clazz.getCanonicalName() + " is not allowed");
    }
    return clazz;
  }

  @Override
  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

    try {
      udf = (UDF)getUdfClassInternal().newInstance();
    } catch (Exception e) {
      throw new UDFArgumentException(
          "Unable to instantiate UDF implementation class " + udfClassName + ": " + e);
    }

    ArrayList<TypeInfo> argumentTypeInfos = new ArrayList<TypeInfo>(
        arguments.length);
    for (ObjectInspector argument : arguments) {
      argumentTypeInfos.add(TypeInfoUtils
          .getTypeInfoFromObjectInspector(argument));
    }
    udfMethod = udf.getResolver().getEvalMethod(argumentTypeInfos);
    udfMethod.setAccessible(true);

    conversionHelper = new ConversionHelper(udfMethod, arguments);

    realArguments = new Object[arguments.length];

    ObjectInspector returnOI = ObjectInspectorFactory
        .getReflectionObjectInspector(udfMethod.getGenericReturnType(),
        ObjectInspectorOptions.JAVA);

    return returnOI;
  }

  @Override
  public Object evaluate(DeferredObject[] arguments) throws HiveException {
    assert (arguments.length == realArguments.length);

    for (int i = 0; i < realArguments.length; i++) {
      realArguments[i] = arguments[i].get();
    }

    Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper
        .convertIfNecessary(realArguments));

    if (result != null && result instanceof HiveDecimalWritable) {
      result = HiveDecimalWritable.enforcePrecisionScale
          ((HiveDecimalWritable) result,
              HiveDecimal.SYSTEM_DEFAULT_PRECISION,
              HiveDecimal.SYSTEM_DEFAULT_SCALE);
    }

    return result;
  }

  @Override
  public String getDisplayString(String[] children) {
    if (isOperator) {
      if (children.length == 1) {

        return "(" + udfName + " " + children[0] + ")";
      } else {

        assert children.length == 2;
        return "(" + children[0] + " " + udfName + " " + children[1] + ")";
      }
    } else {
      StringBuilder sb = new StringBuilder();
      sb.append(udfName);
      sb.append("(");
      for (int i = 0; i < children.length; i++) {
        sb.append(children[i]);
        if (i + 1 < children.length) {
          sb.append(", ");
        }
      }
      sb.append(")");
      return sb.toString();
    }
  }

  @Override
  public String[] getRequiredJars() {
    return udf.getRequiredJars();
  }

  @Override
  public String[] getRequiredFiles() {
    return udf.getRequiredFiles();
  }

  public void setUdfChecker(UdfWhitelistChecker udfChecker) {
    this.udfChecker = udfChecker;
  }

  public interface UdfWhitelistChecker {
    boolean isUdfAllowed(Class<?> clazz);
  }
}

根据注释,显然可以看出GenericUDFBridge的功能就是把老的UDF类的方法封装成新的GenericUDF类。那么实际上写UDF时直接继承老的UDF类、继承已经继承过UDF类的子类、直接继承GenericUDF类其实没啥区别,底层都会用GenericUDFBridge类转换为GenericUDF类并解析AST及下发给Map Reduce、Tez、Spark等计算引擎具体执行job的计算。

其中的getUdfClass方法抛出的ClassNotFoundException正好和笔者见到的堆栈报错一致。initialize及getUdfClass方法都会调用getUdfClassInternal方法,执行:

    Class<? extends UDF> clazz = (Class<? extends UDF>) Class.forName(
        udfClassName, true, Utilities.getSessionSpecifiedClassLoader());

Class.forName就剩根据udf的类名,true的需要初始化,Utilities.getSessionSpecifiedClassLoader()获取到的类加载器,反射加载出最终所需要的UDF类。

GenericUDF简易案例

搭好基本框架后就可以重写initialize初始化、evaluate算法实现、getDisplayString展示字符串这3个方法。

既然这种做法更先进,必然使用起来更复杂且功能更多。传入的参数:

package org.apache.hadoop.hive.serde2.objectinspector;

@Public
@Stable
public interface ObjectInspector extends Cloneable {
    String getTypeName();

    ObjectInspector.Category getCategory();

    public static enum Category {
        PRIMITIVE,
        LIST,
        MAP,
        STRUCT,
        UNION;

        private Category() {
        }
    }
}

显然是个接口类,必然要查看实现类:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

编码解码、加密解密这种操作可以传入传出string就够用:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

事实上:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

这种PrimitiveObjectInspector接口类的实现类都是对应Java的基本数据类型,而其它的ListObjectInspector、MapObjectInspector、StructObjectInspector等接口类对应Java的数组、链表、Map、结构体等复杂数据类型。使用这些复杂数据类型时,Hive的UDF函数在传参时就可以传入复杂数据类型,而不仅限于简单的String、int等基本类型。先进的做法有先进的好处。

编码解码

编码解码只是为了让私密信息看起来不那么明显,使用Base64即可。

加密解密

为了防止加密解密的密钥相同导致不安全,可以对密钥做处理,使得加密解密不对称。

环境准备

集群环境

USDP2.0集群,自带Hive3.1.2。JDK1.8。

编译环境

Idea2021,Maven3.3.9,JDK1.8。

GAV

<properties>
        <maven.compiler.source>8maven.compiler.source>
        <maven.compiler.target>8maven.compiler.target>
        <hive-exec.version>3.1.2hive-exec.version>
        <lombok-version>1.18.8lombok-version>
        <encoding>UTF-8encoding>
    properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hivegroupId>
            <artifactId>hive-execartifactId>
            <version>${hive-exec.version}version>
        dependency>
        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <version>${lombok-version}version>
            <scope>providedscope>
        dependency>
    dependencies>

Build

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-compiler-pluginartifactId>
                <version>3.2version>
                <configuration>
                    <source>1.8source>
                    <target>1.8target>
                    <encoding>UTF-8encoding>
                configuration>
            plugin>
        plugins>
    build>

Java实现

继承UDF

直接继承UDF类的UDF函数其实很容易写。

Base64编码

import org.apache.hadoop.hive.ql.exec.UDF;

import java.nio.charset.StandardCharsets;

public class Base64code1 extends UDF {
    public String evaluate(String input){
        return java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
    }
}

Base64解码

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.nio.charset.StandardCharsets;

public class Base64decode1 extends UDF {
    public String evaluate(String input){
        return new String(java.util.Base64.getDecoder().decode(input));
    }
}

加密

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.nio.charset.StandardCharsets;

public class Encryption1 extends UDF {
    public String evaluate(String input,String key){
        StringBuffer strb = new StringBuffer();

        return java.util.Base64.getEncoder().encodeToString(strb.append(input).append(key).reverse()
                .toString().getBytes(StandardCharsets.UTF_8));
    }
}

解密

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.nio.charset.StandardCharsets;

public class Decrypt1 extends UDF {
    public String evaluate(String input,String key){
        StringBuffer strb = new StringBuffer();
        key=new String(java.util.Base64.getDecoder().decode(strb.append(key).reverse().toString()));

        strb.delete(0,strb.length());
        input=new String(java.util.Base64.getDecoder().decode(input));
        input=strb.append(input).reverse().toString();

        input=input.replaceAll(key,"");

        return input;
    }
}

Main方法测试使用

package com.zhiyong.hiveUDF;

import java.nio.charset.StandardCharsets;

public class TestDemo1 {
    public static void main(String[] args) {
        System.out.println("start!");

        Base64code1 base64code1 = new Base64code1();
        String base64code = base64code1.evaluate("战斗暴龙兽");
        System.out.println("输入内容:战斗暴龙兽\nbase64编码:" + base64code);

        Base64decode1 base64decode1 = new Base64decode1();
        String base64decode = base64decode1.evaluate(base64code);
        System.out.println("base64解码:" + base64decode);

        Encryption1 encryption1 = new Encryption1();
        String encryption = encryption1.evaluate("战斗暴龙兽", "八神太一");
        System.out.println("加密密钥:八神太一\n加密后:" + encryption);

        StringBuffer strb = new StringBuffer();
        String key=strb.append(java.util.Base64.getEncoder()
                .encodeToString("八神太一".getBytes(StandardCharsets.UTF_8)))
                .reverse().toString();
        System.out.println("发给SQL Boy的密钥:" + key);

        Decrypt1 decrypt1 = new Decrypt1();
        String decrypt = decrypt1.evaluate(encryption, key);
        System.out.println("解密后:" + decrypt);

        System.out.println("***************************************");

        System.out.println("end!");
    }
}

执行后:

start!
输入内容:战斗暴龙兽
base64编码:5oiY5paX5pq06b6Z5YW9
base64解码:战斗暴龙兽
加密密钥:八神太一
加密后:5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY
发给SQL Boy的密钥:AiL5qSa5eW65rWY5
解密后:战斗暴龙兽
***************************************
end!

Process finished with exit code 0

可以看出在Main方法的测试中可以满足要求。

继承GenericUDF

为了能支持最常用的Parquet格式,最终选用了这种输出格式。

Base64编码

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import java.nio.charset.StandardCharsets;

public class Base64code2 extends GenericUDF {
    String output;
    String input;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) {
        output = "";
        return ParquetPrimitiveInspectorFactory.parquetStringInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        if (1 == arguments.length) {

            input = arguments[0].get().toString();
            output=java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
        }
        return output;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "这是udf编码函数,入参string,输出base64编码后的string";
    }
}

Base64解码

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

public class Base64decode2 extends GenericUDF {
    String output;
    String input;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) {
        output="";
        return ParquetPrimitiveInspectorFactory.parquetStringInspector;
    }

    @Override
    public String evaluate(DeferredObject[] arguments) throws HiveException {
        if(1==arguments.length) {

            input = (String) arguments[0].get();
            output=new String(java.util.Base64.getDecoder().decode(input));;
        }
        return output;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "这是udf编码函数,入参string,输出base64解码后的string";
    }
}

加密

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import java.nio.charset.StandardCharsets;

public class Encryption2 extends GenericUDF {
    String output;
    String input;
    String key;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) {
        output="";
        return ParquetPrimitiveInspectorFactory.parquetStringInspector;
    }

    @Override
    public String evaluate(DeferredObject[] arguments) throws HiveException {
        if(2==arguments.length){
            StringBuffer strb = new StringBuffer();
            input = (String) arguments[0].get();
            key = (String) arguments[1].get();

            output= java.util.Base64.getEncoder().encodeToString(strb.append(input).append(key).reverse()
                    .toString().getBytes(StandardCharsets.UTF_8));
        }
        return output;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "这是udf加密函数,入参string1=明文,入参string2=密钥,输出加密后的string";
    }
}

解密

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

public class Decrypt2 extends GenericUDF {
    String output;
    String input;
    String key;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) {
        output="";
        return ParquetPrimitiveInspectorFactory.parquetStringInspector;
    }

    @Override
    public String evaluate(DeferredObject[] arguments) throws HiveException {
        if(2==arguments.length){
            StringBuffer strb = new StringBuffer();
            input = arguments[0].get().toString();
            key = arguments[1].get().toString();
            key=new String(java.util.Base64.getDecoder().decode(strb.append(key).reverse().toString()));

            strb.delete(0,strb.length());
            input=new String(java.util.Base64.getDecoder().decode(input));
            input=strb.append(input).reverse().toString();

            output=input.replaceAll(key,"");

        }
        return output;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "这是udf解密函数,入参string1=明文,入参string2=密钥,输出解密后的string";
    }
}

Main方法测试使用

package com.zhiyong.hiveUDF;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

import java.nio.charset.StandardCharsets;

public class TestDemo2 {
    public static void main(String[] args) throws HiveException {
        System.out.println("start!");

        Base64code2 base64code2 = new Base64code2();
        String base64code = base64code2.evaluate(new GenericUDF.DeferredJavaObject[]{
                new GenericUDF.DeferredJavaObject("战斗暴龙兽")}).toString();
        System.out.println("输入内容:战斗暴龙兽\nbase64编码:" + base64code);

        Base64decode2 base64decode2 = new Base64decode2();
        String base64decode = base64decode2.evaluate(new GenericUDF.DeferredJavaObject[]{new GenericUDF.DeferredJavaObject(base64code)});
        System.out.println("base64解码:" + base64decode);

        Encryption2 encryption2 = new Encryption2();
        String encryption = encryption2.evaluate(new GenericUDF.DeferredJavaObject[]{
                new GenericUDF.DeferredJavaObject("战斗暴龙兽"),
                new GenericUDF.DeferredJavaObject("八神太一")
        });

        System.out.println("加密密钥:八神太一\n加密后:" + encryption);

        StringBuffer strb = new StringBuffer();
        String key=strb.append(java.util.Base64.getEncoder()
                        .encodeToString("八神太一".getBytes(StandardCharsets.UTF_8)))
                .reverse().toString();
        System.out.println("发给SQL Boy的密钥:" + key);

        Decrypt2 decrypt2 = new Decrypt2();
        String decrypt = decrypt2.evaluate(new GenericUDF.DeferredJavaObject[]{
                new GenericUDF.DeferredJavaObject(encryption),
                new GenericUDF.DeferredJavaObject(key)
        });
        System.out.println("解密后:" + decrypt);

        System.out.println("***************************************");
        System.out.println("end!");
    }
}

执行后:

start!
输入内容:战斗暴龙兽
base64编码:5oiY5paX5pq06b6Z5YW9
base64解码:战斗暴龙兽
加密密钥:八神太一
加密后:5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY
发给SQL Boy的密钥:AiL5qSa5eW65rWY5
解密后:战斗暴龙兽
***************************************
end!

Process finished with exit code 0

可以看到,不管是继承UDF还是GenericUDF,在Idea的Main方法都可以测试成功。但是绝大多数情况,不管是Hive SQL的UDF亦或Spark SQL的UDF甚至Flink SQL的UDF,都是要在集群上运行的,所以需要打Jar包扔集群运行。

运行在集群

检查MetaData

[root@zhiyong1 ~]
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.

Your MySQL connection id is 2
Server version: 5.7.30 MySQL Community Server (GPL)

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| db_hive_metastore  |
| db_hue             |
| db_ranger          |
| db_udp             |
| dolphinscheduler   |
| mysql              |
| performance_schema |
| sougoulogs         |
| sys                |
+--------------------+
10 rows in set (0.01 sec)

mysql> use db_hive_metastore;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+-------------------------------+
| Tables_in_db_hive_metastore   |
+-------------------------------+
| AUX_TABLE                     |
| BUCKETING_COLS                |
| CDS                           |
| COLUMNS_V2                    |
| COMPACTION_QUEUE              |
| COMPLETED_COMPACTIONS         |
| COMPLETED_TXN_COMPONENTS      |
| CTLGS                         |
| DATABASE_PARAMS               |
| DBS                           |
| DB_PRIVS                      |
| DELEGATION_TOKENS             |
| FUNCS                         |
| FUNC_RU                       |
| GLOBAL_PRIVS                  |
| HIVE_LOCKS                    |
| IDXS                          |
| INDEX_PARAMS                  |
| I_SCHEMA                      |
| KEY_CONSTRAINTS               |
| MASTER_KEYS                   |
| MATERIALIZATION_REBUILD_LOCKS |
| METASTORE_DB_PROPERTIES       |
| MIN_HISTORY_LEVEL             |
| MV_CREATION_METADATA          |
| MV_TABLES_USED                |
| NEXT_COMPACTION_QUEUE_ID      |
| NEXT_LOCK_ID                  |
| NEXT_TXN_ID                   |
| NEXT_WRITE_ID                 |
| NOTIFICATION_LOG              |
| NOTIFICATION_SEQUENCE         |
| NUCLEUS_TABLES                |
| PARTITIONS                    |
| PARTITION_EVENTS              |
| PARTITION_KEYS                |
| PARTITION_KEY_VALS            |
| PARTITION_PARAMS              |
| PART_COL_PRIVS                |
| PART_COL_STATS                |
| PART_PRIVS                    |
| REPL_TXN_MAP                  |
| ROLES                         |
| ROLE_MAP                      |
| RUNTIME_STATS                 |
| SCHEMA_VERSION                |
| SDS                           |
| SD_PARAMS                     |
| SEQUENCE_TABLE                |
| SERDES                        |
| SERDE_PARAMS                  |
| SKEWED_COL_NAMES              |
| SKEWED_COL_VALUE_LOC_MAP      |
| SKEWED_STRING_LIST            |
| SKEWED_STRING_LIST_VALUES     |
| SKEWED_VALUES                 |
| SORT_COLS                     |
| TABLE_PARAMS                  |
| TAB_COL_STATS                 |
| TBLS                          |
| TBL_COL_PRIVS                 |
| TBL_PRIVS                     |
| TXNS                          |
| TXN_COMPONENTS                |
| TXN_TO_WRITE_ID               |
| TYPES                         |
| TYPE_FIELDS                   |
| VERSION                       |
| WM_MAPPING                    |
| WM_POOL                       |
| WM_POOL_TO_TRIGGER            |
| WM_RESOURCEPLAN               |
| WM_TRIGGER                    |
| WRITE_SET                     |
+-------------------------------+
74 rows in set (0.01 sec)

mysql> select * from FUNCS;
Empty set (0.01 sec)

mysql>

可以看到目前集群一穷二白,连个UDF函数都没有。

打Jar包

先MVN clean:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

再MVN package:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

然后根据路径找到打好的Jar包:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

可以发现Jar包已经出现在该路径。

; 启动USDP集群

Last login: Fri Aug  5 12:19:02 2022 from 192.168.88.1
[root@zhiyong1 ~]
[root@zhiyong1 usdp]
BASE_PATH: /opt/usdp-srv/usdp/bin
JMX_PATH: /opt/usdp-srv/usdp/jmx_exporter
REPAIR_PATH: /opt/usdp-srv/usdp/repair
UDP_PATH: /opt/usdp-srv/usdp
REPAIR_BIN_PATH: /opt/usdp-srv/usdp/repair/bin
REPAIR_SBIN_PATH: /opt/usdp-srv/usdp/repair/sbin
PACKAGES_PATH: /opt/usdp-srv/usdp/repair/packages
nmap-6.40-19.el7.x86_64
nmap exists

UDP Server is running with: 10995
Done.

[root@zhiyong1 usdp]

web UI中登录:

http://192.168.88.100/login
admin,Zhiyong1

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

集群刚启动,资源充足。只需开启Zookeeper、HDFS、Yarn、Hive、Tez这几个组件即可。

按顺序启动后:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

集群资源充足。

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

宿主机资源充足,没有爆内存风险。

上传Jar包

由于3台Worker节点都安装了Hive Client,故任选一台上传即可:

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

使用MobaXterm上传:

Last login: Wed Apr 13 23:30:26 2022 from zhiyong1
[root@zhiyong2 ~]
/root
[root@zhiyong2 ~]
总用量 28
-rw-------. 1 root root  1639 3月   1 05:40 anaconda-ks.cfg
drwxr-xr-x. 2 root root    60 3月   1 23:11 logs
-rw-r--r--  1 root root 14444 3月  11 22:36 test1.txt
-rw-r--r--  1 root root    58 3月  14 22:19 wordtest1.txt
drwxr-xr-x  2 root root    34 4月   1 20:04 jars
-rw-r--r--  1 root root    79 4月  13 00:40 taildir_position.json
[root@zhiyong2 ~]
[root@zhiyong2 jars]
总用量 133364
-rw-r--r-- 1 root root 136561467 4月   1 20:04 flinkStudy-1.0.0.jar
[root@zhiyong2 jars]
总用量 133380
-rw-r--r-- 1 root root 136561467 4月   1 20:04 flinkStudy-1.0.0.jar
-rw-r--r-- 1 root root     13083 8月   5 12:35 hiveDemo-1.0.0.jar
[root@zhiyong2 jars]

上传完毕。

加载Jar包

[root@zhiyong2 jars]# hive
SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/hive/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/yarn/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 7c87c2aa-0d18-41aa-93ff-170b7363d068

Logging initialized using configuration in file:/opt/usdp-srv/srv/udp/2.0.0.0/hive/conf/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

Hive Session ID = a4f3739c-1693-4524-97b2-40601a0e4154
hive (default)> add jar /root/jars/hiveDemo-1.0.0.jar;
Added [/root/jars/hiveDemo-1.0.0.jar] to class path
Added resources: [/root/jars/hiveDemo-1.0.0.jar]
hive (default)>

此时已经加载成功。

注册UDF函数

加载后还需要注册才能使用。

create temporary function UDF函数名称 as '包名.类名';
desc function extended UDF函数名称;
drop temporary function if exists UDF函数名称;

当然没有temporary就是永久函数了。

create function Base64code1 as 'com.zhiyong.hiveUDF.Base64code1';
create function Base64code2 as 'com.zhiyong.hiveUDF.Base64code2';
create function Base64decode1 as 'com.zhiyong.hiveUDF.Base64decode1';
create function Base64decode2 as 'com.zhiyong.hiveUDF.Base64decode2';
create function Encryption1 as 'com.zhiyong.hiveUDF.Encryption1';
create function Encryption2 as 'com.zhiyong.hiveUDF.Encryption2';
create function Decrypt1 as 'com.zhiyong.hiveUDF.Decrypt1';
create function Decrypt2 as 'com.zhiyong.hiveUDF.Decrypt2';

desc function extended Base64code1;
desc function extended Base64code2;

drop function if exists default.Base64code2;
create function default.Base64code2 as 'com.zhiyong.hiveUDF.Base64code2';
add jar /root/jars/hiveDemo-1.0.0.jar;
select num,default.Base64code2(name) from digital_monster;
select num,default.Base64code2('aaa') from digital_monster;

执行后:

hive (default)> create function Base64code1 as 'com.zhiyong.hiveUDF.Base64code1';
2022-08-05 12:48:37 INFO impl.TimelineClientImpl: Timeline service address: zhiyong3:8188
OK
Time taken: 1.517 seconds
hive (default)> create function Base64code2 as 'com.zhiyong.hiveUDF.Base64code2';
OK
Time taken: 0.065 seconds
hive (default)> create function Base64decode1 as 'com.zhiyong.hiveUDF.Base64decode1';
OK
Time taken: 0.059 seconds
hive (default)> create function Base64decode2 as 'com.zhiyong.hiveUDF.Base64decode2';
OK
Time taken: 0.077 seconds
hive (default)> create function Encryption1 as 'com.zhiyong.hiveUDF.Encryption1';
OK
Time taken: 0.125 seconds
hive (default)> create function Encryption2 as 'com.zhiyong.hiveUDF.Encryption2';
OK
Time taken: 0.084 seconds
hive (default)> create function Decrypt1 as 'com.zhiyong.hiveUDF.Decrypt1';
OK
Time taken: 0.072 seconds
hive (default)> create function Decrypt2 as 'com.zhiyong.hiveUDF.Decrypt2';
OK
Time taken: 0.063 seconds
hive (default)> desc function extended Base64code1;
OK
tab_name
2022-08-05 12:49:15 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
2022-08-05 12:49:15 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 52decc77982b58949890770d22720a91adce0c3f]
There is no documentation for function 'Base64code1'
Function class:com.zhiyong.hiveUDF.Base64code1
Function type:PERSISTENT
Time taken: 0.215 seconds, Fetched: 3 row(s)
hive (default)> desc function extended Base64code2;
OK
tab_name
There is no documentation for function 'Base64code2'
Function class:com.zhiyong.hiveUDF.Base64code2
Function type:PERSISTENT
Time taken: 0.041 seconds, Fetched: 3 row(s)
hive (default)>

目前已注册成功。

吊起UDF函数

先切换库:

hive (default)> show databases;
OK
database_name
db_lzy
default
Time taken: 0.023 seconds, Fetched: 2 row(s)
hive (default)> use db_lzy;
OK
Time taken: 0.025 seconds
hive (db_lzy)> show tables;
OK
tab_name
demo1
Time taken: 0.053 seconds, Fetched: 1 row(s)

为了方便,先建表:

create table if not exists digital_monster(
    num int comment'序号',
    name string comment'名称'
)
comment '数码宝贝表'
stored as parquet
;

再插数据:

insert into digital_monster values(1,'亚古兽'),(2,'暴龙兽'),(3,'机械暴龙兽'),(4,'丧尸暴龙兽'),(5,'战斗暴龙兽'),(6,'大地暴龙兽'),(7,'闪光暴龙兽');

使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

插入时需要先设置Tez引擎否则会报错return code1:

hive (db_lzy)> set hive.execution.engine;
hive.execution.engine=mr
hive (db_lzy)> set hive.execution.engine=tez;
hive (db_lzy)> insert into digital_monster values(1,'亚古兽'),(2,'暴龙兽'),(3,'机械暴龙兽'),(4,'丧尸暴龙兽'),(5,'战斗暴龙兽'),(6,'大地暴龙兽'),(7,'闪光暴龙兽');
Query ID = root_20220805125911_cacc50f7-86a5-415d-8518-18ab7d4f1e56
Total jobs = 1
Launching Job 1 out of 1
2022-08-05 12:59:12 INFO client.AHSProxy: Connecting to Application History server at zhiyong3/192.168.88.102:10201
2022-08-05 12:59:12 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
Status: Running (Executing on YARN cluster with App id application_1659673721568_0003)

Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
Status: DAG finished successfully in 23.73 seconds

Query Execution Summary
Compile Query                           0.67s
Prepare Plan                           14.64s
Get Query Coordinator (AM)              0.03s
Submit Plan                             0.62s
Start DAG                               0.12s
Run DAG                                23.73s
  VERTICES      DURATION(ms)   CPU_TIME(ms)    GC_TIME(ms)   INPUT_RECORDS   OUTPUT_RECORDS

Loading data to table db_lzy.digital_monster
OK
col1    col2
Time taken: 42.947 seconds
hive (db_lzy)>

此时可以测试UDF:

select * from digital_monster;
select num,name from digital_monster;
select num,Base64code1(name) from digital_monster;
select num,Base64code2(name) from digital_monster;

执行效果:

hive (db_lzy)> select * from digital_monster;
OK
digital_monster.num     digital_monster.name
1       亚古兽
2       暴龙兽
3       机械暴龙兽
4       丧尸暴龙兽
5       战斗暴龙兽
6       大地暴龙兽
7       闪光暴龙兽
Time taken: 0.457 seconds, Fetched: 7 row(s)
hive (db_lzy)> select num,Base64code1(name) from digital_monster;
FAILED: SemanticException [Error 10011]: Invalid function Base64code1
hive (db_lzy)> reload function;
OK
Time taken: 0.071 seconds
hive (db_lzy)> select num,Base64code1(name) from digital_monster;
FAILED: SemanticException [Error 10011]: Invalid function Base64code1
hive (db_lzy)> show functions;
OK
tab_name
!
!=
$sum0
%
&
*
+
-
/
<

>
<>
=
==
>
>=
^
abs
acos
add_months
aes_decrypt
aes_encrypt
and
array
array_contains
ascii
asin
assert_true
assert_true_oom
atan
avg
base64
between
bin
bloom_filter
bround
cardinality_violation
case
cbrt
ceil
ceiling
char_length
character_length
chr
coalesce
collect_list
collect_set
compute_stats
concat
concat_ws
context_ngrams
conv
corr
cos
count
covar_pop
covar_samp
crc32
create_union
cume_dist
current_authorizer
current_database
current_date
current_groups
current_timestamp
current_user
date_add
date_format
date_sub
datediff
day
dayofmonth
dayofweek
decode
default.base64code1
default.base64code2
default.base64decode1
default.base64decode2
default.decrypt1
default.decrypt2
default.encryption1
default.encryption2
degrees
dense_rank
div
e
elt
encode
enforce_constraint
exp
explode
extract_union
factorial
field
find_in_set
first_value
floor
floor_day
floor_hour
floor_minute
floor_month
floor_quarter
floor_second
floor_week
floor_year
format_number
from_unixtime
from_utc_timestamp
get_json_object
get_splits
greatest
grouping
hash
hex
histogram_numeric
hour
if
in
in_bloom_filter
in_file
index
initcap
inline
instr
internal_interval
isfalse
isnotfalse
isnotnull
isnottrue
isnull
istrue
java_method
json_tuple
lag
last_day
last_value
lcase
lead
least
length
levenshtein
like
likeall
likeany
ln
locate
log
log10
log2
logged_in_user
lower
lpad
ltrim
map
map_keys
map_values
mask
mask_first_n
mask_hash
mask_last_n
mask_show_first_n
mask_show_last_n
matchpath
max
md5
min
minute
mod
month
months_between
murmur_hash
named_struct
negative
next_day
ngrams
noop
noopstreaming
noopwithmap
noopwithmapstreaming
not
ntile
nullif
nvl
octet_length
or
parse_url
parse_url_tuple
percent_rank
percentile
percentile_approx
pi
pmod
posexplode
positive
pow
power
printf
quarter
radians
rand
rank
reflect
reflect2
regexp
regexp_extract
regexp_replace
regr_avgx
regr_avgy
regr_count
regr_intercept
regr_r2
regr_slope
regr_sxx
regr_sxy
regr_syy
repeat
replace
replicate_rows
restrict_information_schema
reverse
rlike
round
row_number
rpad
rtrim
second
sentences
sha
sha1
sha2
shiftleft
shiftright
shiftrightunsigned
sign
sin
size
sort_array
sort_array_by
soundex
space
split
sq_count_check
sqrt
stack
std
stddev
stddev_pop
stddev_samp
str_to_map
struct
substr
substring
substring_index
sum
tan
to_date
to_epoch_milli
to_unix_timestamp
to_utc_timestamp
translate
trim
trunc
ucase
udftoboolean
udftobyte
udftodouble
udftofloat
udftointeger
udftolong
udftoshort
udftostring
unbase64
unhex
unix_timestamp
upper
uuid
var_pop
var_samp
variance
version
weekofyear
when
width_bucket
windowingtablefunction
xpath
xpath_boolean
xpath_double
xpath_float
xpath_int
xpath_long
xpath_number
xpath_short
xpath_string
year
|
~
Time taken: 0.016 seconds, Fetched: 297 row(s)
hive (db_lzy)> select num,default.Base64code1(name) from digital_monster;
OK
num     _c1
1       5Lqa5Y+k5YW9
2       5pq06b6Z5YW9
3       5py65qKw5pq06b6Z5YW9
4       5Lin5bC45pq06b6Z5YW9
5       5oiY5paX5pq06b6Z5YW9
6       5aSn5Zyw5pq06b6Z5YW9
7       6Zeq5YWJ5pq06b6Z5YW9
Time taken: 0.278 seconds, Fetched: 7 row(s)
hive (db_lzy)> select num,default.Base64code2(name) from digital_monster;
OK
num     _c1
1       5Lqa5Y+k5YW9
2       5pq06b6Z5YW9
3       5py65qKw5pq06b6Z5YW9
4       5Lin5bC45pq06b6Z5YW9
5       5oiY5paX5pq06b6Z5YW9
6       5aSn5Zyw5pq06b6Z5YW9
7       6Zeq5YWJ5pq06b6Z5YW9
Time taken: 0.277 seconds, Fetched: 7 row(s)

Tez的坑比较大,使用UDF时需要指定注册UDF时的库名。。。

select
    num as 数码宝贝序号,
    name as 数码宝贝名称,
    default.Base64code1(name) as base64编码,
    default.Base64Decode2(default.Base64code1(name)) as base64解码,
    default.Encryption1(name,'八神太一') as 加密后,
    default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') as 解密后
from db_lzy.digital_monster
;

Beeline最好是一行以防止报错:

select num,name,default.Base64code1(name),default.Base64Decode2(default.Base64code1(name)),default.Encryption1(name,'八神太一'),default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') from db_lzy.digital_monster;

带别名:

select num as 数码宝贝序号, name as 数码宝贝名称, default.Base64code1(name) as base64编码, default.Base64Decode2(default.Base64code1(name)) as base64解码, default.Encryption1(name,'八神太一') as 加密后, default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') as 解密后 from db_lzy.digital_monster ;

执行后:

hive (db_lzy)> select num as 数码宝贝序号, name as 数码宝贝名称, default.Base64code1(name) as base64编码, default.Base64Decode2(default.Base64code1(name码, default.Encryption1(name,'八神太一') as 加密后, default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') as 解密后` from db_lzy.digital_monster ;
OK
数码宝贝序号    数码宝贝名称    base64编码      base64解码      加密后  解密后
1       亚古兽  5Lqa5Y+k5YW9    亚古兽  5LiA5aSq56We5YWr5YW95Y+k5Lqa    亚古兽
2       暴龙兽  5pq06b6Z5YW9    暴龙兽  5LiA5aSq56We5YWr5YW96b6Z5pq0    暴龙兽
3       机械暴龙兽      5py65qKw5pq06b6Z5YW9    机械暴龙兽      5LiA5aSq56We5YWr5YW96b6Z5pq05qKw5py6    机械暴龙兽
4       丧尸暴龙兽      5Lin5bC45pq06b6Z5YW9    丧尸暴龙兽      5LiA5aSq56We5YWr5YW96b6Z5pq05bC45Lin    丧尸暴龙兽
5       战斗暴龙兽      5oiY5paX5pq06b6Z5YW9    战斗暴龙兽      5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY    战斗暴龙兽
6       大地暴龙兽      5aSn5Zyw5pq06b6Z5YW9    大地暴龙兽      5LiA5aSq56We5YWr5YW96b6Z5pq05Zyw5aSn    大地暴龙兽
7       闪光暴龙兽      6Zeq5YWJ5pq06b6Z5YW9    闪光暴龙兽      5LiA5aSq56We5YWr5YW96b6Z5pq05YWJ6Zeq    闪光暴龙兽
Time taken: 0.254 seconds, Fetched: 7 row(s)
hive (db_lzy)>

至此使用2种方式实现了Hive的UDF。

再次检查MetaData

mysql> select * from FUNCS;
ERROR 2006 (HY000): MySQL server has gone away
No connection. Trying to reconnect...

Connection id:    2113
Current database: db_hive_metastore

+---------+-----------------------------------+-------------+-------+---------------+-----------+------------+------------+
| FUNC_ID | CLASS_NAME                        | CREATE_TIME | DB_ID | FUNC_NAME     | FUNC_TYPE | OWNER_NAME | OWNER_TYPE |
+---------+-----------------------------------+-------------+-------+---------------+-----------+------------+------------+
|       1 | com.zhiyong.hiveUDF.Base64code1   |  1659674918 |     1 | base64code1   |         1 | NULL       | USER       |
|       3 | com.zhiyong.hiveUDF.Base64decode1 |  1659674930 |     1 | base64decode1 |         1 | NULL       | USER       |
|       4 | com.zhiyong.hiveUDF.Base64decode2 |  1659674930 |     1 | base64decode2 |         1 | NULL       | USER       |
|       5 | com.zhiyong.hiveUDF.Encryption1   |  1659674931 |     1 | encryption1   |         1 | NULL       | USER       |
|       6 | com.zhiyong.hiveUDF.Encryption2   |  1659674931 |     1 | encryption2   |         1 | NULL       | USER       |
|       7 | com.zhiyong.hiveUDF.Decrypt1      |  1659674931 |     1 | decrypt1      |         1 | NULL       | USER       |
|       8 | com.zhiyong.hiveUDF.Decrypt2      |  1659674934 |     1 | decrypt2      |         1 | NULL       | USER       |
|      24 | com.zhiyong.hiveUDF.Base64code2   |  1659701054 |     1 | base64code2   |         1 | NULL       | USER       |
+---------+-----------------------------------+-------------+-------+---------------+-----------+------------+------------+
8 rows in set (0.02 sec)

mysql>

发现Hive存储在MySQL的元数据表中已经记录了这些UDF函数。如果把Jar包放置在hive-site.xml中配置的路径下,则每次重启Hive不需要执行add jar命令。

Original: https://blog.csdn.net/qq_41990268/article/details/126186377
Author: 虎鲸不是鱼
Title: 使用Java继承UDF类或GenericUDF类给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817774/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球