GroupBy
RDD 원소들을 일정 기준에 따라 분리, K 타입을 키로 가지는 T 집합체를 리턴해 준다.
groupBy[K](f: T => K): RDD[(K, Iterable[T])]
Ex)
test("GroupBy"){
val rdd = sc.parallelize(1 to 10)
val result = rdd.groupBy {
case i: Int if (i % 2 == 0) => "even"
case _ => "odd"
}
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
}
result)
even, [2,4,6,8,10]
odd, [1,3,5,7,9]
패턴 매칭 예는 아래 사이트 에서
https://twitter.github.io/scala_school/ko/basics2.html#match
GroupByKey
key, value 쌍으로 이뤄진 컬렉션에서 key 기준으로 , value 들을 맵핑한다.groupByKey(): RDD[(K, Iterable[V])]
Ex)test("GroupByKey"){
val rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1))
//rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1)) 에서 map((_,1)) 은
//(a,1),(b,1),(c,1),(b,1),(c,1) 이다
val result = rdd.groupByKey
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
}
Result)a, [1]
b, [1,1]
c, [1,1]
패턴 매칭 예는 아래 사이트 에서
groupByKey(): RDD[(K, Iterable[V])]
test("GroupByKey"){
val rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1))
//rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1)) 에서 map((_,1)) 은
//(a,1),(b,1),(c,1),(b,1),(c,1) 이다
val result = rdd.groupByKey
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
}
a, [1]
b, [1,1]
c, [1,1]
https://twitter.github.io/scala_school/ko/basics2.html#match
Cogroup
key, value 쌍으로 이뤄진 컬렉션 두 개가 있다고 가정.
A 컬렉션, B컬렉션 이라 하자, 그리고 결과는 C
A의 key 들, B의 Key 들의 Distinct 한 것을 새로운 Rdd의 key로 놓고 그것이 C의 Key가 된다.
C의 value 는 Pair 형의 (A.value, B.value) 가 된다. 이때 A.value 는 Iterable(List 형태)로 나오게 될 것이다.
cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
Ex)
val rdd1 = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k1", "v3")))
val rdd2 = sc.parallelize(List(("k1", "v4")))
val result = rdd1.cogroup(rdd2)
result.collect.foreach {
case (k, (v_1, v_2)) => {
println(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")
}
}
Result)
(k1, [v1,v3], [v4])
(k2, [v2], [])
Distinct
쉬운거니 생략..
Cartesian
두개 RDD 의 모든 조합을 튜플로 리턴
Ex)
test("Cartesian"){
val rdd1 = sc.parallelize(List(1, 2, 3))
val rdd2 = sc.parallelize(List("a", "b", "c"))
val result = rdd1.cartesian(rdd2)
println(result.collect.mkString(", "))
}
Result)
(1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c)
Subtract, Union, Intersection
차집합, 합집합, 교집합... 생략한다.
Join
key, value 쌍으로 이뤄진 두 컬랙션에서 서로 같은 키를 가진 요소를 모아 RDD 생성, 일반적인 RDB의 Join 연산과 유사하다.
join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
Ex)
test("doJoin") {
val rdd1 = sc.parallelize(List("a", "b", "c", "d", "e")).map((_, 1))
val rdd2 = sc.parallelize(List("b", "c")).map((_, 2))
val result = rdd1.join(rdd2)
println(result.collect.mkString("\n"))
}
Result)
(b,(1,2))
(c,(1,2))
LeftOuterJoin, RightOuterJoin
말 그대로.. 왼쪽조인, 오른쪽 조인이다. 값이 없는 경우도 생기니
왼쪽 조인일 경우 오른쪽 값이 옵션으로 리턴된다.
leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))
Ex)
test("doLeftOuterJoin"){
val rdd1 = sc.parallelize(List("a", "b", "c")).map((_, 1))
val rdd2 = sc.parallelize(List("b", "c")).map((_, 2))
val result1 = rdd1.leftOuterJoin(rdd2)
val result2 = rdd1.rightOuterJoin(rdd2)
println("Left: " + result1.collect.mkString("\t"))
println("Right: " + result2.collect.mkString("\t"))
}
Result)
Left: (a,(1,None)) (b,(1,Some(2))) (c,(1,Some(2)))
Right: (b,(Some(1),2)) (c,(Some(1),2))
SubtractByKey
쉬워서 생략하겠다.
'IT > Spark' 카테고리의 다른 글
RDD Operate Partition (0) | 2017.12.18 |
---|---|
RDD Transformation aggregation (0) | 2017.12.18 |
RDD Transformation #1 (0) | 2017.12.14 |
spark docker 에 설치하기(작성중) (0) | 2017.12.11 |
RDD 정리 (0) | 2017.12.08 |