RDD action
액션 = 결과값이 정수나, 리스트, 맵 등 RDD가 아닌 다른 타입
결과값이 RDD 인건 트랜스포메이션이라 한다.
둘 다 Lazy evaluation 방식이다. -> 연산을 계속 누적했다가 실제로 계산이 필요한 시점이 되어서야 실행
어떤 RDD의 map, flatMap, reduceByKey 등 트랜스포메이션 메서드를 100번 호출한다하더라도, count() 같은 액션 메서드를 호출하는 시점이 돼서야 비로서 그동안 쌓여있던 100개의 트랜스포메이션 연산을 순차적으로 실행하게 된다.
-> 액션 메서드를 여러번 호출하면 트랜스포메이션 연산도 여러번 호출될 수 있다.!
출력 연산
first
첫번째 원소를 하나 리턴한다.
take
입력 인자의 숫자만큼 원소를 리턴해준다.
takeSample
샘플 추츨. RDD를 리턴하지 않고, 배열이나 리스트 같은 형태로 리턴해 준다.
collect,count
collect 는 모든 원소리턴, count 는 원소 갯수 리턴
countByValue
값 갯수 를 전부 세서 리턴, 컬렉션으로 리턴한다.
reduce
두개의 원소를 받아 하나로 병합하여 리턴
reduce(f: (T, T) => T): T
test("Reduce") {
val rdd = sc.parallelize(1 to 10, 3)
val result = rdd.reduce(_ + _)
println(result)
}
fold
reduce랑 같은데, 최초 원소와 입력한 상수를 먼저 병합한다.!!!
reduce,fold 모두 결합법칙 교환법칙이 성립하는 수식을 넣어야 한다. 어떤 파티션이 먼저 수행될 지 모르기 때문
aggregate
reduce 와 fold 와 다르게 타입의 제약이 없다.
aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
Ex)
test("Aggregate") {
val rdd = sc.parallelize(List(100, 80, 75, 90, 95), 3)
val zeroValue = Record(0, 0)
val seqOp = (r: Record, v: Int) => r.add(v)
val combOp = (r1: Record, r2: Record) => r1 add r2
val result1 = rdd.aggregate(zeroValue)(seqOp, combOp)
println(result1.amount / result1.number)
// 좀더 간결한 코드
val result2 = rdd.aggregate(Record(0, 0))(_ add _, _ add _)
println(result1.amount / result1.number)
println(result2)
}
사실 앞에서 설명한 CombineByKey 와 AggrigateByKey 같은 것들과 크게 다르지 않다.
foreach, foreachPartition
foreach 씨리즈는 결과를 리턴해 주지 않는다는 차이점
foreachPartition(f: Iterator[T] => Unit): Uni
test("ForeachPartition") {
val rdd = sc.parallelize(1 to 10, 3)
rdd.foreach { v =>
println(s"Value Side Effect: ${v}")
}
rdd.foreachPartition(values => {
println("!!!!!!!!!!!Partition Side Effect!!")
for (v <- values) println(s"Value Side Effect: ${v}")
})
}
cache, persist, unpersist, partitions
구글링 해도 금방 찾을 수 있다. 쉽기도 하다..
import org.apache.spark.storage.StorageLevel
test("Cache") {
val rdd = sc.parallelize(1 to 100, 10)
val rdd1 = rdd.cache
val rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY)
}
test("GetPartitions") {
val rdd = sc.parallelize(1 to 1000, 10)
println(rdd.partitions.size)
println(rdd.getNumPartitions)
}
'IT > Spark' 카테고리의 다른 글
Spark Cluster #1 (0) | 2017.12.20 |
---|---|
RDD Data Load, Save (2) | 2017.12.18 |
RDD filter And Sort (0) | 2017.12.18 |
RDD Operate Partition (0) | 2017.12.18 |
RDD Transformation aggregation (0) | 2017.12.18 |