본문 바로가기

IT/Spark

RDD Action

RDD action

액션 = 결과값이 정수나, 리스트, 맵 등 RDD가 아닌 다른 타입

결과값이 RDD 인건 트랜스포메이션이라 한다.

둘 다 Lazy evaluation 방식이다. -> 연산을 계속 누적했다가 실제로 계산이 필요한 시점이 되어서야 실행


어떤 RDD의 map, flatMap, reduceByKey 등 트랜스포메이션 메서드를 100번 호출한다하더라도, count() 같은 액션 메서드를 호출하는 시점이 돼서야 비로서 그동안 쌓여있던 100개의 트랜스포메이션 연산을 순차적으로 실행하게 된다.

-> 액션 메서드를 여러번 호출하면 트랜스포메이션 연산도 여러번 호출될 수 있다.!


출력 연산


first

첫번째 원소를 하나 리턴한다.

take

입력 인자의 숫자만큼 원소를 리턴해준다.

takeSample

샘플 추츨. RDD를 리턴하지 않고, 배열이나 리스트 같은 형태로 리턴해 준다.

collect,count

collect 는 모든 원소리턴, count 는 원소 갯수 리턴

countByValue

값 갯수 를 전부 세서 리턴, 컬렉션으로 리턴한다.

reduce

두개의 원소를 받아 하나로 병합하여 리턴

reduce(f: (T, T) => T): T
test("Reduce") {
val rdd = sc.parallelize(1 to 10, 3)
val result = rdd.reduce(_ + _)
println(result)
}

fold

reduce랑 같은데, 최초 원소와 입력한 상수를 먼저 병합한다.!!!

reduce,fold 모두 결합법칙 교환법칙이 성립하는 수식을 넣어야 한다. 어떤 파티션이 먼저 수행될 지 모르기 때문


aggregate

reduce 와 fold 와 다르게 타입의 제약이 없다.

aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

Ex)

test("Aggregate") {
val rdd = sc.parallelize(List(100, 80, 75, 90, 95), 3)
val zeroValue = Record(0, 0)
val seqOp = (r: Record, v: Int) => r.add(v)
val combOp = (r1: Record, r2: Record) => r1 add r2
val result1 = rdd.aggregate(zeroValue)(seqOp, combOp)
println(result1.amount / result1.number)

// 좀더 간결한 코드
val result2 = rdd.aggregate(Record(0, 0))(_ add _, _ add _)
println(result1.amount / result1.number)
println(result2)
}


사실 앞에서 설명한 CombineByKey 와 AggrigateByKey 같은 것들과 크게 다르지 않다.


foreach, foreachPartition

foreach 씨리즈는 결과를 리턴해 주지 않는다는 차이점
foreachPartition(f: Iterator[T] => Unit): Uni

test("ForeachPartition") {
val rdd = sc.parallelize(1 to 10, 3)

rdd.foreach { v =>
println(s"Value Side Effect: ${v}")
}

rdd.foreachPartition(values => {
println("!!!!!!!!!!!Partition Side Effect!!")
for (v <- values) println(s"Value Side Effect: ${v}")
})

}



cache, persist, unpersist, partitions

구글링 해도 금방 찾을 수 있다. 쉽기도 하다..
import org.apache.spark.storage.StorageLevel
test("Cache") {
val rdd = sc.parallelize(1 to 100, 10)
val rdd1 = rdd.cache
val rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY)
}
test("GetPartitions") {
val rdd = sc.parallelize(1 to 1000, 10)
println(rdd.partitions.size)
println(rdd.getNumPartitions)
}


'IT > Spark' 카테고리의 다른 글

Spark Cluster #1  (0) 2017.12.20
RDD Data Load, Save  (2) 2017.12.18
RDD filter And Sort  (0) 2017.12.18
RDD Operate Partition  (0) 2017.12.18
RDD Transformation aggregation  (0) 2017.12.18