본문 바로가기

IT/Spark

RDD Transformation aggregation

ReduceByKey

key, value 쌍으로 이루어진 컬렉션에서,  같은 key를 가진 값을 병합하고,  각 key 에 매칭되는 value들을 연산한다.

중요한 것은 value 들을 계속 누적해 나가면서 계산한다는 것이고, 그래서 이 연산은 결합법칙, 교환법칙이 성립하는 수식을 넣어야 문제가 없다.

reduceByKey(func: (V, V) => V): RDD[(K, V)]


Ex)

test("doReduceByKey"){
val rdd = sc.parallelize(List("a", "b", "b","b")).map((_, 1))
val result = rdd.reduceByKey( (x,y) => { println(s"x=$x y=$y") ;x+y } )
println(result.collect.mkString(","))
}

조금 자세히 설명하면,


rdd 의 튜플들은 아래와 같다고 볼 수 있다.

  1. (a,1)
  2. (b,1)
  3. (b,1)
  4. (b,1) 

연산 순서는

  1. (a,1) 은 하나니까 패스~
  2. 이후 (b,1) reduce (b,1) = 1+1 = 2
  3. (b,2) reduce (b,1) = 2 + 1= 3
이런식으로 동작한다.


특히 데이터들이 partition 되어 있다면, 어느것이 먼저 실행될지 모르기에, 처음 설명한 것처럼, 결합법칙과 교환법칙이 반드시 성립하는 수식을 넣어야 하는 것이다. 그렇지 않다면 실행마다 연산이 달라질 것이다.



결과)


x=1 y=
x=2 y=1 // 위 식의 1+1의 결과가 다시 x로 넘어오는것에 주목

(a,1),(b,3)


1+1 한 결과에 한번더 1을 더하는 것에 주목하자


_ + _  는 (x,y) => x+y 와 동일한 표현식이다.



foldByKey

reduceByKey 와 비슷한데  첫번째 연산을 할때 인자로 넘어오는 값을 최초 한번 초기값으로 연산할 수 있다.
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]


Ex)

test("FoldByKey1") {
val rdd = sc.parallelize(List("a", "b", "b")).map((_, 1))
val result = rdd.foldByKey(0)((x,y) => { println(s"x=$x y=$y") ;x+y })
println(result.collect.mkString(","))
}

test("FoldByKey2") {
val rdd = sc.parallelize(List("a", "b", "b")).map((_, 2))
val result = rdd.foldByKey(1)((x,y) => { println(s"x=$x y=$y") ;x+y })
println(result.collect.mkString(","))
}

Result) FoldByKey1


// 먼저 한번 먹임
x=0 y=1
x=0 y=1
x=0 y=1

// 그리고 다음 원소 reduceByKey 함
x=1 y=1

// 결과
(a,1),(b,2)

Result) FoldByKey2

// 먼저 한번 먹임
x=1 y=2
x=1 y=2
x=1 y=2
// 그리고 다음 원소 reduceByKey 함
x=3 y=3
// 결과
(a,3),(b,6)


CombineByKey

reduceByKey 및 foldByKey 와 비슷하지만 병합 과정에서 타입이 바뀔 수 있다......

combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
  • CreateCombiner
    • 임의의 객체가 처음 생성될 때 호출되는 함수이고, 예제 에서는 Record 객체가 생성될 것이다.
    • 첫번째 원소 ("Math", 100L) 가 처음 적용될 때, 즉 "Math" 라는 Key 가 처음 생성될 때, 이 함수가 호출되는 것이다.
    • 이후 또 "Math" 라는 Key를 가진 원소가 들어온다면, MergeValue가 호출된다.
  • mergeValue
    • 이미 동일 Key 로 생성된 객체가 존재한다면, 호출되는 함수다. 말그대로 기존 객체에 머지된다고 보면 된다.
  • mergeCombiners
    • CreateCombiner 와 mergeValue 는 파티션 단위로 실행된다.
    • 원소들이 여러 파티션에 나눠져 저장되어 있을때, 마지막으로 reduce 가 호출되는 것이다. 모든 파티션에 있는 원소들이 전부 머지되어야 함으로.
    • 모든 원소가 파티션 하나에 저장되어 있다면, mergeCombiners 는 호출되지 않는다.


임의의 클래스 Record
case class Record(var amount: Long, var number: Long = 1) {

def add(amount: Long): Record = {
add(Record(amount))
}
def add(other: Record): Record = {
this.number += other.number
this.amount += other.amount
this
}
override def toString: String = s"avg:${amount / number}"
}

Ex)

test("CombineByKey") {
val data = Seq(("Math", 100L), ("Eng", 80L),("Eng", 10L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L),("His",70L))
val rdd = sc.parallelize(data)
val createCombiner = (v: Long) => { println(s"$v Create Combiner");Record(v) }
val mergeValue = (c: Record, v: Long) => { println(s"mergeValue ${c.amount} $v");c.add(v) }
val mergeCombiners = (c1: Record, c2: Record) => { println( s"mergeCombiners ${c1.amount}, ${c2.amount}, ${c1.add(c2).amount}" ) ;c1.add(c2)}
val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
println(result.collect.mkString(",\t"))
}

Result)
100 Create Combiner // 1
80 Create Combiner // 2
mergeValue 80 10 // 3
mergeValue 100 50 // 4
mergeValue 90 70 // 5
mergeValue 160 90 // 6
70 Create Combiner // 7

(Math,avg:75), (His,avg:70), (Eng,avg:62)

해석)

  1. ("Math",100L) -> "Math"라는 키가 처음 나왔으므로, CreateCombiner 가 호출된다.
  2. ("Eng", 80L) -> "Eng" 라는 키가 처음 나왔으므로, CreateCombiner 가 호출된다.
  3. ("Eng", 10L) -> "Eng" 라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 80 점 객체와 merge 되는 것이다. ("Eng",90L)
  4. ("Math", 50L) -> "Math"라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다.  기존 100점 객체와 merge 된다. ("Math", 150L)
  5. ("Eng", 70L) -> "Eng" 라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 90 점 객체와 merge 되는 것 ("Eng",160L)
  6. ("Eng", 90L) -> "Eng" 라는 키로 이미 객체가 생성되었기에 MergeValue가 호출된다. 기존 160 점 객체와 merge 되는 것. ("Eng",250L)
  7. ("His",70L) -> "Math"라는 키가 처음 나왔으므로, CreateCombiner 가 호출된다.
  8. Partition 이 여러 개로 나눠져 있다면, 중간 출력물이 다르지만, 결과는 동일하다.mergeCombiners 가 나중에 다 합칠 것이다.

AggregateByKey

key, value 쌍으로 이루어진 컬렉션에 사용할때 쓴다. CombineByKey 와 동일한 동작을 하지만, 초기값을 생성하는 부분만 다르다.

Ex)

test("AggregateByKey") {
val data = Seq(("Math", 100L), ("Eng", 80L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L))
val rdd = sc.parallelize(data)
val zero = Record(0, 0)
val mergeValue = (c: Record, v: Long) => c.add(v)
val mergeCombiners = (c1: Record, c2: Record) => c1.add(c2)
val result = rdd.aggregateByKey(zero)(mergeValue, mergeCombiners)
println(result.collect.mkString(",\t"))
}


'IT > Spark' 카테고리의 다른 글

RDD filter And Sort  (0) 2017.12.18
RDD Operate Partition  (0) 2017.12.18
RDD Transformation #2  (0) 2017.12.15
RDD Transformation #1  (0) 2017.12.14
spark docker 에 설치하기(작성중)  (0) 2017.12.11