본문 바로가기

IT/Spark

RDD Transformation #1


map


map[U](f:(T) => U):RDD[U]

함수f 는 입력타입이 T 이고, 출력타입이 U 이다. map 의 결과값은 RDD[U] 가 된다.

Ex)
기호 _ 는 피호출객체의 원소를 말한다.
val rdd2 = rdd1.map(_ + 1)

map 은 간단하다. 컬렉션 원소 하나하나에 입력 함수를 맥여서 그 결과들의 집합 컬랙션을 리턴한다. 이때 입력한 함수의 리턴 타입과 동일한 타입의 컬렉션을 리턴한다고 보면 된다. 

flatMap


flatMap[U] (f:(T) => TraversableOnce[U]):RDD[U]

함수f 는 입력타입이 T 이고, 출력타입이 TraversableOnce[U] 이다. map 의 결과값은 RDD[U] 가 된다.

Ex)

val rdd1 = List("apple,orange","grape,apple,mango")

val rdd2 = rdd1.flatMap(_.split(","))



flatMap map 과 비슷한데, 입력 함수만 다르다고 보면 된다.
"TraversableOnce 형의 원소를 리턴하는 함수"를 넣으면 된다. 말은 쉽지;; ㅠㅠ
예에서 String 을 split 하면 Array 로 리턴되지만 내부에 Array의 apply 암묵적 형변환이 일어나서 TraversableOnce[String] 으로 변환된다.


MapPartitions

mapPartitions(f:Iterator[T] => Iterator[U], preservesPartitioning: Boolean) : RDD[U]

//Ex) val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitions(numbers => {
numbers.map {
number => number + 1
}
})

결과)

DB연결 !!!
DB연결 !!!
DB연결 !!!
2, 3, 4, 5, 6, 7, 8, 9, 10, 11

map 이랑 같은데 "RDD를 파티셔닝 해서 집어넣은 컬렉션"에 먹이는 함수이다.

조금 특이한 것은 Iterator[T] 입력으로 받는 것


MapPartitionsWithIndex

MapPartitions 과 같지만, 함수 입력시 파티션 번호를 입력으로 받을 수 있기에, 특정 파티션에만 먼가를 먹이고 싶을 때 쓸 수 있다.


MapPartitionsWithIndex( f:(Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] 

Ex)

test("MapPartitionsWithIndex"){
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => {
numbers.flatMap {
case number if idx == 1 => Option(number + 1)
case _ => None
}
})
println(rdd2.collect.mkString(", "))
}

결과)

5, 6, 7


여기서 넘어오는 함수 f 의 입력 Iterator[U] 는 각 파티션의 집합이다. flatMap 을 먹인 이유는 None 을 없애기 위해서 이다. 걍 map을 먹이면 none도 함께 출력될 것이다. 이것도 넘긴 함수가 세번 호출 될 것이다.


mapValues, flatMapValues

key,value 쌍으로 이루어진 Map 에서 value 들만 취해서  map 혹은 flatMap 연산을 하는 것.

Zip

두개 RDD를 키,쌍으로 만들어준다. 두 RDD의 파티션 Count, 원소 카운트가 같아야 동작한다.

ZipPartitions

파티션 단위로 zip을 먹이고 특정 함수를 적용해 새로운 RDD 리턴, 파티션의 Count 만 동일하면 된다.

Ex)

test("doZipPartitions") {
val rdd1 = sc.parallelize(List("a", "b", "c"), 3)
val rdd2 = sc.parallelize(List(1, 2, 3), 3)
val result = rdd1.zipPartitions(rdd2) {
(it1, it2) =>
for {
v1 <- it1
v2 <- it2
} yield v1 + v2
}
println(result.collect.mkString(", "))
}

결과)

a1, b2, c3

for - yield 설명

참고 : http://knight76.tistory.com/entry/scala-for-%EB%AC%B8-yield


'IT > Spark' 카테고리의 다른 글

RDD Transformation aggregation  (0) 2017.12.18
RDD Transformation #2  (0) 2017.12.15
spark docker 에 설치하기(작성중)  (0) 2017.12.11
RDD 정리  (0) 2017.12.08
Practice using the results "jar" in Spark  (0) 2017.11.28