package org.apache.hadoop.hive.contrib.udaf.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
public final class UDAFExampleMaxMinNUtil {
public static class State {
ArrayList<Double> a;
int n;
}
public abstract static class Evaluator implements UDAFEvaluator {
private State state;
public Evaluator() {
state = new State();
init();
}
public void init() {
state.a = new ArrayList<Double>();
state.n = 0;
}
protected abstract boolean getAscending();
public boolean iterate(Double o, int n) {
boolean ascending = getAscending();
state.n = n;
if (o != null) {
boolean doInsert = state.a.size() < n;
if (!doInsert) {
Double last = state.a.get(state.a.size()-1);
if (ascending) {
doInsert = o < last;
} else {
doInsert = o > last;
}
}
if (doInsert) {
binaryInsert(state.a, o, ascending);
if (state.a.size() > n) {
state.a.remove(state.a.size()-1);
}
}
}
return true;
}
public State terminatePartial() {
return state.a.size() == 0 ? null : state;
}
public boolean merge(State o) {
if (o != null) {
state.n = o.n;
state.a = sortedMerge(o.a, state.a, getAscending(), o.n);
}
return true;
}
public ArrayList<Double> terminate() {
return state.a.size() == 0 ? null : state.a;
}
}
static <T extends Comparable<T>> Comparator<T> getComparator(boolean ascending, T dummy) {
Comparator<T> comp;
if (ascending) {
comp = new Comparator<T>() {
public int compare(T o1, T o2) {
return o1.compareTo(o2);
}
};
} else {
comp = new Comparator<T>() {
public int compare(T o1, T o2) {
return o2.compareTo(o1);
}
};
}
return comp;
}
static <T extends Comparable<T>> void binaryInsert(List<T> list, T value, boolean ascending) {
int position = Collections.binarySearch(list, value, getComparator(ascending, (T)null));
if (position < 0) {
position = (-position) - 1;
}
list.add(position, value);
}
static <T extends Comparable<T>> ArrayList<T> sortedMerge(List<T> a1, List<T> a2,
boolean ascending, int n) {
Comparator<T> comparator = getComparator(ascending, (T)null);
int n1 = a1.size();
int n2 = a2.size();
int p1 = 0;
int p2 = 0;
ArrayList<T> output = new ArrayList<T>(n);
while (output.size() < n && (p1 < n1 || p2 < n2)) {
if (p1 < n1) {
if (p2 == n2 || comparator.compare(a1.get(p1), a2.get(p2)) < 0) {
output.add(a1.get(p1++));
}
}
if (output.size() == n) {
break;
}
if (p2 < n2) {
if (p1 == n1 || comparator.compare(a2.get(p2), a1.get(p1)) < 0) {
output.add(a2.get(p2++));
}
}
}
return output;
}
private UDAFExampleMaxMinNUtil() {
}
}