ReduceByKey
key, value 쌍으로 이루어진 컬렉션에서, 같은 key를 가진 값을 병합하고, 각 key 에 매칭되는 value들을 연산한다.
중요한 것은 value 들을 계속 누적해 나가면서 계산한다는 것이고, 그래서 이 연산은 결합법칙, 교환법칙이 성립하는 수식을 넣어야 문제가 없다.
reduceByKey(func: (V, V) => V): RDD[(K, V)]
Ex)
test("doReduceByKey"){
val rdd = sc.parallelize(List("a", "b", "b","b")).map((_, 1))
val result = rdd.reduceByKey( (x,y) => { println(s"x=$x y=$y") ;x+y } )
println(result.collect.mkString(","))
}
조금 자세히 설명하면,
rdd 의 튜플들은 아래와 같다고 볼 수 있다.
- (a,1)
- (b,1)
- (b,1)
- (b,1)
연산 순서는
- (a,1) 은 하나니까 패스~
- 이후 (b,1) reduce (b,1) = 1+1 = 2
- (b,2) reduce (b,1) = 2 + 1= 3
이런식으로 동작한다.
특히 데이터들이 partition 되어 있다면, 어느것이 먼저 실행될지 모르기에, 처음 설명한 것처럼, 결합법칙과 교환법칙이 반드시 성립하는 수식을 넣어야 하는 것이다. 그렇지 않다면 실행마다 연산이 달라질 것이다.
결과)
x=1 y=1
x=2 y=1 // 위 식의 1+1의 결과가 다시 x로 넘어오는것에 주목(a,1),(b,3)
1+1 한 결과에 한번더 1을 더하는 것에 주목하자
_ + _ 는 (x,y) => x+y 와 동일한 표현식이다.
foldByKey
reduceByKey 와 비슷한데 첫번째 연산을 할때 인자로 넘어오는 값을 최초 한번 초기값으로 연산할 수 있다.
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
Ex)
test("FoldByKey1") {
val rdd = sc.parallelize(List("a", "b", "b")).map((_, 1))
val result = rdd.foldByKey(0)((x,y) => { println(s"x=$x y=$y") ;x+y })
println(result.collect.mkString(","))
}
test("FoldByKey2") {
val rdd = sc.parallelize(List("a", "b", "b")).map((_, 2))
val result = rdd.foldByKey(1)((x,y) => { println(s"x=$x y=$y") ;x+y })
println(result.collect.mkString(","))
}
Result) FoldByKey1
// 먼저 한번 먹임
x=0 y=1
x=0 y=1
x=0 y=1
// 그리고 다음 원소 reduceByKey 함
x=1 y=1
// 결과
(a,1),(b,2)
Result) FoldByKey2
// 먼저 한번 먹임
x=1 y=2
x=1 y=2
x=1 y=2
// 그리고 다음 원소 reduceByKey 함
x=3 y=3
// 결과
(a,3),(b,6)
ㅇ
CombineByKey
reduceByKey 및 foldByKey 와 비슷하지만 병합 과정에서 타입이 바뀔 수 있다......
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
- CreateCombiner
- 임의의 객체가 처음 생성될 때 호출되는 함수이고, 예제 에서는 Record 객체가 생성될 것이다.
- 첫번째 원소 ("Math", 100L) 가 처음 적용될 때, 즉 "Math" 라는 Key 가 처음 생성될 때, 이 함수가 호출되는 것이다.
- 이후 또 "Math" 라는 Key를 가진 원소가 들어온다면, MergeValue가 호출된다.
- mergeValue
- 이미 동일 Key 로 생성된 객체가 존재한다면, 호출되는 함수다. 말그대로 기존 객체에 머지된다고 보면 된다.
- mergeCombiners
- CreateCombiner 와 mergeValue 는 파티션 단위로 실행된다.
- 원소들이 여러 파티션에 나눠져 저장되어 있을때, 마지막으로 reduce 가 호출되는 것이다. 모든 파티션에 있는 원소들이 전부 머지되어야 함으로.
- 모든 원소가 파티션 하나에 저장되어 있다면, mergeCombiners 는 호출되지 않는다.
임의의 클래스 Record
case class Record(var amount: Long, var number: Long = 1) {
def add(amount: Long): Record = {
add(Record(amount))
}
def add(other: Record): Record = {
this.number += other.number
this.amount += other.amount
this
}
override def toString: String = s"avg:${amount / number}"
}
Ex)
test("CombineByKey") {
val data = Seq(("Math", 100L), ("Eng", 80L),("Eng", 10L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L),("His",70L))
val rdd = sc.parallelize(data)
val createCombiner = (v: Long) => { println(s"$v Create Combiner");Record(v) }
val mergeValue = (c: Record, v: Long) => { println(s"mergeValue ${c.amount} $v");c.add(v) }
val mergeCombiners = (c1: Record, c2: Record) => { println( s"mergeCombiners ${c1.amount}, ${c2.amount}, ${c1.add(c2).amount}" ) ;c1.add(c2)}
val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
println(result.collect.mkString(",\t"))
}
Result)
100 Create Combiner // 1
80 Create Combiner // 2
mergeValue 80 10 // 3
mergeValue 100 50 // 4
mergeValue 90 70 // 5
mergeValue 160 90 // 6
70 Create Combiner // 7
(Math,avg:75), (His,avg:70), (Eng,avg:62)
해석)
- ("Math",100L) -> "Math"라는 키가 처음 나왔으므로, CreateCombiner 가 호출된다.
- ("Eng", 80L) -> "Eng" 라는 키가 처음 나왔으므로, CreateCombiner 가 호출된다.
- ("Eng", 10L) -> "Eng" 라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 80 점 객체와 merge 되는 것이다. ("Eng",90L)
- ("Math", 50L) -> "Math"라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 100점 객체와 merge 된다. ("Math", 150L)
- ("Eng", 70L) -> "Eng" 라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 90 점 객체와 merge 되는 것 ("Eng",160L)
- ("Eng", 90L) -> "Eng" 라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 160 점 객체와 merge 되는 것. ("Eng",250L)
- ("His",70L) -> "Math"라는 키가 처음 나왔으므로, CreateCombiner 가 호출된다.
- Partition 이 여러 개로 나눠져 있다면, 중간 출력물이 다르지만, 결과는 동일하다.mergeCombiners 가 나중에 다 합칠 것이다.
AggregateByKey
key, value 쌍으로 이루어진 컬렉션에 사용할때 쓴다. CombineByKey 와 동일한 동작을 하지만, 초기값을 생성하는 부분만 다르다.
Ex)
test("AggregateByKey") {
val data = Seq(("Math", 100L), ("Eng", 80L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L))
val rdd = sc.parallelize(data)
val zero = Record(0, 0)
val mergeValue = (c: Record, v: Long) => c.add(v)
val mergeCombiners = (c1: Record, c2: Record) => c1.add(c2)
val result = rdd.aggregateByKey(zero)(mergeValue, mergeCombiners)
println(result.collect.mkString(",\t"))
}
'IT > Spark' 카테고리의 다른 글
RDD filter And Sort (0) | 2017.12.18 |
---|---|
RDD Operate Partition (0) | 2017.12.18 |
RDD Transformation #2 (0) | 2017.12.15 |
RDD Transformation #1 (0) | 2017.12.14 |
spark docker 에 설치하기(작성중) (0) | 2017.12.11 |