Custom UDF, UDTF, UDAF functions
(1) Custom UDF: Inherit UDF, rewrite evaluate method
(2) Custom UDTF: Inherited from GenericUDTF, rewrite 3 methods: initialize (custom output column name and type), process (return the result to forward (result)), close
Package = “upload cluster path =” register in hive client
Why customize UDF/UDTF?
Because of the custom function, you can embed the Log to print the log yourself, if there is an error or data exception, it is convenient for debugging.
Dependency and jar package plug-in configuration
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flumeInterceptor</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
UDF
in one out one
Method 1: Inherit UDF class (Deprecated is no longer recommended in hive3.x)
package com.ambitfly.udf; import org.apache.hadoop.hive.ql.exec.UDF; public class MyLength extends UDF{<!-- --> public String evaluate(String s){<!-- --> return s. length() + ""; } }
add jar package
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
Create a temporary function
CREATE TEMPORARY FUNCTION mylength AS ‘com.ambitfly.udf.MyLength’;
destroy temporary function
DROP TEMPORARY FUNCTION myLength;
Method 2: Inherit GenericUDF (multiple values, variable parameters, structures and other complex types can be passed in)
Example 1: myToUpper
Input String, return String
package com.ambitfly.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.lazy.LazyString; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import java.util.concurrent.ExecutionException; public class MyToUpper extends GenericUDF {<!-- --> private ObjectInspector strObj; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {<!-- --> if(arguments. length != 1){<!-- --> throw new UDFArgumentException("upper function requires a parameter"); } strObj = (ObjectInspector)arguments[0]; return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException {<!-- --> LazyString lString = (LazyString) (arguments[0].get()); String str = ((StringObjectInspector) strObj).getPrimitiveJavaObject(lString); if(str == null){<!-- --> return null; } return str.toUpperCase(); } @Override public String getDisplayString(String[] children) {<!-- --> return ""; } }
add jar package
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
Create a temporary function
CREATE TEMPORARY FUNCTION myToUpper AS ‘com.ambitfly.udf.MyToUpper’;
destroy temporary function
DROP TEMPORARY FUNCTION myToUpper;
Example 2: MyScoreToStruct
Input math, english, history output {“math”: “89”, “english”: “78”, “history”: “99”}
Create table statement & amp; insert data:
create table score( name string, math string, english string, history string ) row format delimited fields terminated by ','; -- LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)] -- data import load data local inpath '/opt/data/hivefunctiondata/score.txt' into table score;
data:
aa,89,78,99 bb,90,67,64 cc,91,98,90 dd,45,30,68
code:
package com.ambitfly.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.lazy.LazyString; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.Text; import org.omg.CORBA.Object; import java.util.ArrayList; class MyScoreToStruct extends GenericUDF {<!-- --> // input variable type definition private ObjectInspector mathObj; private ObjectInspector englishObj; private ObjectInspector historyObj; /** * Two functions: * 1. Determine the number of input parameters and whether the input type is correct * 2. Define the input type * @param arguments * @return * @throws UDFArgumentException */ @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {<!-- --> if(arguments. length != 3){<!-- --> throw new UDFArgumentException("Must enter 3 parameters!"); } mathObj = (ObjectInspector)arguments[0]; englishObj = (ObjectInspector)arguments[1]; historyObj = (ObjectInspector)arguments[2]; // output structure definition ArrayList structFieldNames = new ArrayList(); ArrayList structFieldObjectInspectors = new ArrayList(); structFieldNames.add("math"); structFieldNames.add("english"); structFieldNames.add("history"); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); } @Override public Object[] evaluate(DeferredObject[] arguments) throws HiveException {<!-- --> LazyString lazyMath = (LazyString) (arguments[0].get()); LazyString lazyEnglish = (LazyString) (arguments[1].get()); LazyString lazyHistory = (LazyString) (arguments[2].get()); String strMath =((StringObjectInspector)mathObj).getPrimitiveJavaObject(lazyMath); String strEnglish =((StringObjectInspector)englishObj).getPrimitiveJavaObject(lazyEnglish); String strHistory =((StringObjectInspector)historyObj).getPrimitiveJavaObject(lazyHistory); Object[] e = new Object[3]; e[0] = (Object) new Text(strMath); e[1] = (Object) new Text(strEnglish); e[2] = (Object) new Text(strHistory); return e; } @Override public String getDisplayString(String[] children) {<!-- --> return ""; } }
add jar package
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
Create a temporary function
CREATE TEMPORARY FUNCTION myScoreToStruct AS ‘com.ambitfly.udf.MyScoreToStruct’;
destroy temporary function
DROP TEMPORARY FUNCTION myScoreToStruct;
Example 3: MyScoreStructToSumScore
Input {“math”:”89″,”english”:”78″,”history”:”99″} Output 266
code:
package com.ambitfly.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.lazy.LazyStruct; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.object inspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.Text; import java.util.List; public class MyScoreStructToSumScore extends GenericUDF {<!-- --> private StructObjectInspector scoreObj; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {<!-- --> if(arguments. length != 1){<!-- --> throw new UDFArgumentException("The parameter passed in is not 1!"); } scoreObj = (StructObjectInspector)arguments[0]; return PrimitiveObjectInspectorFactory.javaIntObjectInspector; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException {<!-- --> // LazyStruct lazyStructScore = null; // try {<!-- --> // lazyStructScore = (LazyStruct)(arguments[0].get); // } catch (Exception e) {<!-- --> // throw new HiveException(e. getMessage()); // } List<Object> structFieldsDataAsList = null; try {<!-- --> structFieldsDataAsList = scoreObj.getStructFieldsDataAsList(arguments[0].get()); } catch (Exception e) {<!-- --> throw new HiveException(e. getMessage()); } int sumScore = 0; for (Object o : structFieldsDataAsList) {<!-- --> try {<!-- --> sumScore += Integer. parseInt(o. toString()); } catch (Exception e) {<!-- --> throw new HiveException(e. getMessage()); } } return sumScore; } @Override public String getDisplayString(String[] children) {<!-- --> return null; } }
add jar package
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
Create a temporary function
CREATE TEMPORARY FUNCTION myScoreStructToSumScore AS ‘com.ambitfly.udf.MyScoreStructToSumScore’;
destroy temporary function
DROP TEMPORARY FUNCTION myScoreStructToSumScore;
UDTF
In and out
Example: MyExplodeScoreStruct
code:
package com.ambitfly.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; public class MyExplodeScoreStruct extends GenericUDTF {<!-- --> ArrayList<String> output = new ArrayList<>(); @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {<!-- --> if(argOIs.getAllStructFieldRefs().size()!=1){<!-- --> throw new UDFArgumentException("The parameter passed in is not 1!"); } ArrayList structFieldNames = new ArrayList(); ArrayList structFieldObjectInspectors = new ArrayList(); structFieldNames.add("score"); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); StandardStructObjectInspector standardStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); return standardStructObjectInspector; } @Override public void process(Object[] args) throws HiveException {<!-- --> String str = args[0].toString(); String[] split = str. split("-"); for (String s : split) {<!-- --> output. clear(); output. add(s); forward(output); } } @Override public void close() throws HiveException {<!-- --> } }
add jar package
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
Create a temporary function
CREATE TEMPORARY FUNCTION myExplodeScoreStruct AS ‘com.ambitfly.udf.MyExplodeScoreStruct’;
destroy temporary function
DROP TEMPORARY FUNCTION myExplodeScoreStruct;
Inquire:
select myExplodeScoreStruct(concat(math,'-',english,'-',history)) from score select name,s from score LATERAL VIEW myExplodeScoreStruct(concat(math,'-',english,'-',history)) score AS s;
UDAF
That is, User-defined Aggregation Function (user-defined aggregation function), which acts on multiple rows of records and returns a result value, which is mostly used for statistical analysis within the same group.