hive学习笔记-udf-udaf

依賴 jar包 hive-exec

1
2
3
4
5
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>

udfDemo

1
2
3
4
5
6
7
8
9
10
11
package siys16877.HiveDemo.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class udfDemo extends UDF{
public Text evaluate(Text a,Text b) {
return new Text(a.toString()+"------"+b.toString());
}
}

hive被翻译成map-reduce ,Text类型必须是hadoop里面的,hadoop里面 String是无法识别的

use udfDemo

1
2
3
4
5
6
hive> add jar /home/shuai/udfDemo.jar
Added [/home/shuai/udfDemo.jar] to class path
Added resources: [/home/shuai/udfDemo.jar]
# function name is not limit
create temporary function udfDemo as 'siys16877.HiveDemo.hive.udfDemo';
select concat(tid,'--',tname) from test1;

udf使用场景扩充

网上的例子大多数都是字符串连接,没什么实际意义
将查询结果封装成json格式

封装为json

1
2
3
4
5
6
7
8
9
10
package siys16877.HiveDemo.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class udfJson extends UDF{
public Text evaluate(Text a, Text b) {
return new Text("{ tid:" + a.toString() + ','+ "tanme:"
+ b.toString() + " }");
}
}

获取表的列名信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package siys16877.HiveDemo.hive;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import siys16877.HiveDemo.utils.JDBCUtils;
public class ColumnName {
public static void main(String[] args) throws SQLException {
String sql = "select * from test1";
Connection conn = JDBCUtils.getConnection();
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(sql);
ResultSetMetaData rsmt = rs.getMetaData();
String col1 = rsmt.getColumnName(1);
String col2 = rsmt.getColumnName(2);
System.out.println(col1+'\t'+col2);
JDBCUtils.release(conn, st, rs);
}

udaf 学习案例-求平均数

(数据模型+Evaluator)继承(extends) UDAF

Evaluator 实现 UDAFEvaluator接口

  • init
  • iterate
  • terminatePartial
  • merge
  • terminate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package org.apache.hadoop.hive.contrib.udaf.example;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* This is a simple UDAF that calculates average.
*
* It should be very easy to follow and can be used as an example for writing
* new UDAFs.
*
* Note that Hive internally uses a different mechanism (called GenericUDAF) to
* implement built-in aggregation functions, which are harder to program but
* more efficient.
*
*/
@Description(name = "example_avg",
value = "_FUNC_(col) - Example UDAF to compute average")
public final class UDAFExampleAvg extends UDAF {
public static class UDAFAvgState {
private long mCount;
private double mSum;
}
/**
* The actual class for doing the aggregation. Hive will automatically look
* for all internal classes of the UDAF that implements UDAFEvaluator.
*/
public static class UDAFExampleAvgEvaluator implements UDAFEvaluator {
UDAFAvgState state;
public UDAFExampleAvgEvaluator() {
super();
state = new UDAFAvgState();
init();
}
/**
* Reset the state of the aggregation.
*/
public void init() {
state.mSum = 0;
state.mCount = 0;
}
/**
* Iterate through one row of original data.
*
* The number and type of arguments need to the same as we call this UDAF
* from Hive command line.
*
* This function should always return true.
*/
public boolean iterate(Double o) {
if (o != null) {
state.mSum += o;
state.mCount++;
}
return true;
}
/**
* Terminate a partial aggregation and return the state. If the state is a
* primitive, just return primitive Java classes like Integer or String.
*/
public UDAFAvgState terminatePartial() {
// This is SQL standard - average of zero items should be null.
return state.mCount == 0 ? null : state;
}
/**
* Merge with a partial aggregation.
*
* This function should always have a single argument which has the same
* type as the return value of terminatePartial().
*/
public boolean merge(UDAFAvgState o) {
if (o != null) {
state.mSum += o.mSum;
state.mCount += o.mCount;
}
return true;
}
/**
* Terminates the aggregation and return the final result.
*/
public Double terminate() {
// This is SQL standard - average of zero items should be null.
return state.mCount == 0 ? null : Double.valueOf(state.mSum
/ state.mCount);
}
}
private UDAFExampleAvg() {
// prevent instantiation
}
}

udaf 学习案例-max_minNUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package org.apache.hadoop.hive.contrib.udaf.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* The utility class for UDAFMaxN and UDAFMinN.
*/
public final class UDAFExampleMaxMinNUtil {
/**
* This class stores the information during an aggregation.
*
* Note that this class has to have a public constructor, so that Hive can
* serialize/deserialize this class using reflection.
*/
public static class State {
ArrayList<Double> a; // This ArrayList holds the max/min N
int n; // This is the N
}
/**
* The base class of the UDAFEvaluator for UDAFMaxN and UDAFMinN.
* We just need to override the getAscending function to make it work.
*/
public abstract static class Evaluator implements UDAFEvaluator {
private State state;
public Evaluator() {
state = new State();
init();
}
/**
* Reset the state.
*/
public void init() {
state.a = new ArrayList<Double>();
state.n = 0;
}
/**
* Returns true in UDAFMaxN, and false in UDAFMinN.
*/
protected abstract boolean getAscending();
/**
* Iterate through one row of original data.
* This function will update the internal max/min buffer if the internal buffer is not full,
* or the new row is larger/smaller than the current max/min n.
*/
public boolean iterate(Double o, int n) {
boolean ascending = getAscending();
state.n = n;
if (o != null) {
boolean doInsert = state.a.size() < n;
if (!doInsert) {
Double last = state.a.get(state.a.size()-1);
if (ascending) {
doInsert = o < last;
} else {
doInsert = o > last;
}
}
if (doInsert) {
binaryInsert(state.a, o, ascending);
if (state.a.size() > n) {
state.a.remove(state.a.size()-1);
}
}
}
return true;
}
/**
* Get partial aggregation results.
*/
public State terminatePartial() {
// This is SQL standard - max_n of zero items should be null.
return state.a.size() == 0 ? null : state;
}
/** Two pointers are created to track the maximal elements in both o and MaxNArray.
* The smallest element is added into tempArrayList
* Consider the sizes of o and MaxNArray may be different.
*/
public boolean merge(State o) {
if (o != null) {
state.n = o.n;
state.a = sortedMerge(o.a, state.a, getAscending(), o.n);
}
return true;
}
/**
* Terminates the max N lookup and return the final result.
*/
public ArrayList<Double> terminate() {
// This is SQL standard - return state.MaxNArray, or null if the size is zero.
return state.a.size() == 0 ? null : state.a;
}
}
/**
* Returns a comparator based on whether the order is ascending or not.
* Has a dummy parameter to make sure generics can infer the type correctly.
*/
static <T extends Comparable<T>> Comparator<T> getComparator(boolean ascending, T dummy) {
Comparator<T> comp;
if (ascending) {
comp = new Comparator<T>() {
public int compare(T o1, T o2) {
return o1.compareTo(o2);
}
};
} else {
comp = new Comparator<T>() {
public int compare(T o1, T o2) {
return o2.compareTo(o1);
}
};
}
return comp;
}
/**
* Insert an element into an ascending/descending array, and keep the order.
* @param ascending
* if true, the array is sorted in ascending order,
* otherwise it is in descending order.
*
*/
static <T extends Comparable<T>> void binaryInsert(List<T> list, T value, boolean ascending) {
int position = Collections.binarySearch(list, value, getComparator(ascending, (T)null));
if (position < 0) {
position = (-position) - 1;
}
list.add(position, value);
}
/**
* Merge two ascending/descending array and keep the first n elements.
* @param ascending
* if true, the array is sorted in ascending order,
* otherwise it is in descending order.
*/
static <T extends Comparable<T>> ArrayList<T> sortedMerge(List<T> a1, List<T> a2,
boolean ascending, int n) {
Comparator<T> comparator = getComparator(ascending, (T)null);
int n1 = a1.size();
int n2 = a2.size();
int p1 = 0; // The current element in a1
int p2 = 0; // The current element in a2
ArrayList<T> output = new ArrayList<T>(n);
while (output.size() < n && (p1 < n1 || p2 < n2)) {
if (p1 < n1) {
if (p2 == n2 || comparator.compare(a1.get(p1), a2.get(p2)) < 0) {
output.add(a1.get(p1++));
}
}
if (output.size() == n) {
break;
}
if (p2 < n2) {
if (p1 == n1 || comparator.compare(a2.get(p2), a1.get(p1)) < 0) {
output.add(a2.get(p2++));
}
}
}
return output;
}
// No instantiation.
private UDAFExampleMaxMinNUtil() {
}
}