map
map[U](f:(T) => U):RDD[U]
val rdd2 = rdd1.map(_ + 1)
flatMap
flatMap[U] (f:(T) => TraversableOnce[U]):RDD[U]
val rdd1 = List("apple,orange","grape,apple,mango")
val rdd2 = rdd1.flatMap(_.split(","))
MapPartitions
mapPartitions(f:Iterator[T] => Iterator[U], preservesPartitioning: Boolean) : RDD[U]
//Ex) val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitions(numbers => {
numbers.map {
number => number + 1
}
})
결과)
DB연결 !!!
DB연결 !!!
DB연결 !!!
2, 3, 4, 5, 6, 7, 8, 9, 10, 11
map 이랑 같은데 "RDD를 파티셔닝 해서 집어넣은 컬렉션"에 먹이는 함수이다.
조금 특이한 것은 Iterator[T] 입력으로 받는 것
MapPartitionsWithIndex
MapPartitionsWithIndex( f:(Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
Ex)
test("MapPartitionsWithIndex"){
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => {
numbers.flatMap {
case number if idx == 1 => Option(number + 1)
case _ => None
}
})
println(rdd2.collect.mkString(", "))
}
결과)
5, 6, 7
여기서 넘어오는 함수 f 의 입력 Iterator[U] 는 각 파티션의 집합이다. flatMap 을 먹인 이유는 None 을 없애기 위해서 이다. 걍 map을 먹이면 none도 함께 출력될 것이다. 이것도 넘긴 함수가 세번 호출 될 것이다.
mapValues, flatMapValues
Zip
ZipPartitions
test("doZipPartitions") {
val rdd1 = sc.parallelize(List("a", "b", "c"), 3)
val rdd2 = sc.parallelize(List(1, 2, 3), 3)
val result = rdd1.zipPartitions(rdd2) {
(it1, it2) =>
for {
v1 <- it1
v2 <- it2
} yield v1 + v2
}
println(result.collect.mkString(", "))
}
결과)
a1, b2, c3
for - yield 설명
'IT > Spark' 카테고리의 다른 글
RDD Transformation aggregation (0) | 2017.12.18 |
---|---|
RDD Transformation #2 (0) | 2017.12.15 |
spark docker 에 설치하기(작성중) (0) | 2017.12.11 |
RDD 정리 (0) | 2017.12.08 |
Practice using the results "jar" in Spark (0) | 2017.11.28 |