본문 바로가기

IT/Spark

RDD Transformation #2

GroupBy

RDD 원소들을 일정 기준에 따라 분리, K 타입을 키로 가지는 T 집합체를 리턴해 준다.

groupBy[K](f: T => K): RDD[(K, Iterable[T])]

Ex)
test("GroupBy"){
val rdd = sc.parallelize(1 to 10)
val result = rdd.groupBy {
case i: Int if (i % 2 == 0) => "even"
case _ => "odd"
}
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
}

result)

even, [2,4,6,8,10]
odd, [1,3,5,7,9]

패턴 매칭 예는 아래 사이트 에서

https://twitter.github.io/scala_school/ko/basics2.html#match


GroupByKey

key, value 쌍으로 이뤄진 컬렉션에서 key 기준으로 , value 들을 맵핑한다.
groupByKey(): RDD[(K, Iterable[V])]
Ex)
test("GroupByKey"){
val rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1))
//rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1)) 에서 map((_,1)) 은
//(a,1),(b,1),(c,1),(b,1),(c,1) 이다
val result = rdd.groupByKey
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
}
Result)
a, [1]
b, [1,1]
c, [1,1]
패턴 매칭 예는 아래 사이트 에서

https://twitter.github.io/scala_school/ko/basics2.html#match


Cogroup

key, value 쌍으로 이뤄진 컬렉션 두 개가 있다고 가정.
A 컬렉션, B컬렉션 이라 하자, 그리고 결과는 C

A의 key 들, B의 Key 들의 Distinct 한 것을 새로운 Rdd의 key로 놓고 그것이 C의 Key가 된다.
C의 value 는 Pair 형의 (A.value, B.value) 가 된다. 이때 A.value 는 Iterable(List 형태)로 나오게 될 것이다.

cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
Ex)
val rdd1 = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k1", "v3")))
val rdd2 = sc.parallelize(List(("k1", "v4")))
val result = rdd1.cogroup(rdd2)
result.collect.foreach {
case (k, (v_1, v_2)) => {
println(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")
}
}

Result)

(k1, [v1,v3], [v4])
(k2, [v2], [])


Distinct

쉬운거니 생략..

Cartesian

두개 RDD 의 모든 조합을 튜플로 리턴

Ex)
test("Cartesian"){
val rdd1 = sc.parallelize(List(1, 2, 3))
val rdd2 = sc.parallelize(List("a", "b", "c"))
val result = rdd1.cartesian(rdd2)
println(result.collect.mkString(", "))
}
Result)
(1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c)


Subtract, Union, Intersection

차집합, 합집합, 교집합... 생략한다.

Join

key, value 쌍으로 이뤄진 두 컬랙션에서 서로 같은 키를 가진 요소를 모아 RDD 생성, 일반적인 RDB의 Join 연산과 유사하다.


join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

Ex)

test("doJoin") {
val rdd1 = sc.parallelize(List("a", "b", "c", "d", "e")).map((_, 1))
val rdd2 = sc.parallelize(List("b", "c")).map((_, 2))
val result = rdd1.join(rdd2)
println(result.collect.mkString("\n"))
}

Result)

(b,(1,2))
(c,(1,2))


LeftOuterJoin,  RightOuterJoin


말 그대로.. 왼쪽조인, 오른쪽 조인이다. 값이 없는 경우도 생기니 

왼쪽 조인일 경우 오른쪽 값이 옵션으로 리턴된다.

leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))

Ex)

test("doLeftOuterJoin"){
val rdd1 = sc.parallelize(List("a", "b", "c")).map((_, 1))
val rdd2 = sc.parallelize(List("b", "c")).map((_, 2))
val result1 = rdd1.leftOuterJoin(rdd2)
val result2 = rdd1.rightOuterJoin(rdd2)
println("Left: " + result1.collect.mkString("\t"))
println("Right: " + result2.collect.mkString("\t"))
}

Result)

Left: (a,(1,None))  (b,(1,Some(2)))  (c,(1,Some(2)))
Right: (b,(Some(1),2)) (c,(Some(1),2))

SubtractByKey

쉬워서 생략하겠다.


'IT > Spark' 카테고리의 다른 글

RDD Operate Partition  (0) 2017.12.18
RDD Transformation aggregation  (0) 2017.12.18
RDD Transformation #1  (0) 2017.12.14
spark docker 에 설치하기(작성중)  (0) 2017.12.11
RDD 정리  (0) 2017.12.08