map
map[U](f:(T) => U):RDD[U]
함수f 는 입력타입이 T 이고, 출력타입이 U 이다. map 의 결과값은 RDD[U] 가 된다.
Ex)
기호 _ 는 피호출객체의 원소를 말한다.
val rdd2 = rdd1.map(_ + 1)
map 은 간단하다. 컬렉션 원소 하나하나에 입력 함수를 맥여서 그 결과들의 집합 컬랙션을 리턴한다. 이때 입력한 함수의 리턴 타입과 동일한 타입의 컬렉션을 리턴한다고 보면 된다.
flatMap
flatMap[U] (f:(T) => TraversableOnce[U]):RDD[U]
함수f 는 입력타입이 T 이고, 출력타입이 TraversableOnce[U] 이다. map 의 결과값은 RDD[U] 가 된다.
Ex)
val rdd1 = List("apple,orange","grape,apple,mango")
val rdd2 = rdd1.flatMap(_.split(","))
flatMap map 과 비슷한데, 입력 함수만 다르다고 보면 된다.
"TraversableOnce 형의 원소를 리턴하는 함수"를 넣으면 된다. 말은 쉽지;; ㅠㅠ
예에서 String 을 split 하면 Array 로 리턴되지만 내부에 Array의 apply 암묵적 형변환이 일어나서 TraversableOnce[String] 으로 변환된다.
MapPartitions
mapPartitions(f:Iterator[T] => Iterator[U], preservesPartitioning: Boolean) : RDD[U]
//Ex) val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitions(numbers => {
numbers.map {
number => number + 1
}
})
결과)
DB연결 !!!
DB연결 !!!
DB연결 !!!
2, 3, 4, 5, 6, 7, 8, 9, 10, 11
map 이랑 같은데 "RDD를 파티셔닝 해서 집어넣은 컬렉션"에 먹이는 함수이다.
조금 특이한 것은 Iterator[T] 입력으로 받는 것
MapPartitionsWithIndex
MapPartitions 과 같지만, 함수 입력시 파티션 번호를 입력으로 받을 수 있기에, 특정 파티션에만 먼가를 먹이고 싶을 때 쓸 수 있다.
MapPartitionsWithIndex( f:(Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
Ex)
test("MapPartitionsWithIndex"){
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => {
numbers.flatMap {
case number if idx == 1 => Option(number + 1)
case _ => None
}
})
println(rdd2.collect.mkString(", "))
}
결과)
5, 6, 7
여기서 넘어오는 함수 f 의 입력 Iterator[U] 는 각 파티션의 집합이다. flatMap 을 먹인 이유는 None 을 없애기 위해서 이다. 걍 map을 먹이면 none도 함께 출력될 것이다. 이것도 넘긴 함수가 세번 호출 될 것이다.
mapValues, flatMapValues
key,value 쌍으로 이루어진 Map 에서 value 들만 취해서 map 혹은 flatMap 연산을 하는 것.
Zip
두개 RDD를 키,쌍으로 만들어준다. 두 RDD의 파티션 Count, 원소 카운트가 같아야 동작한다.
ZipPartitions
파티션 단위로 zip을 먹이고 특정 함수를 적용해 새로운 RDD 리턴, 파티션의 Count 만 동일하면 된다.
Ex)
test("doZipPartitions") {
val rdd1 = sc.parallelize(List("a", "b", "c"), 3)
val rdd2 = sc.parallelize(List(1, 2, 3), 3)
val result = rdd1.zipPartitions(rdd2) {
(it1, it2) =>
for {
v1 <- it1
v2 <- it2
} yield v1 + v2
}
println(result.collect.mkString(", "))
}
결과)
a1, b2, c3
for - yield 설명
참고 : http://knight76.tistory.com/entry/scala-for-%EB%AC%B8-yield
'IT > Spark' 카테고리의 다른 글
RDD Transformation aggregation (0) | 2017.12.18 |
---|---|
RDD Transformation #2 (0) | 2017.12.15 |
spark docker 에 설치하기(작성중) (0) | 2017.12.11 |
RDD 정리 (0) | 2017.12.08 |
Practice using the results "jar" in Spark (0) | 2017.11.28 |