본문 바로가기

IT/Spark

RDD Data Load, Save

지원포맷

text파일, JSON, 하둡 sequence파일, csv
HDFS, s3, switf, mysql, hbase, 카산드라, hive

text파일 저장

test("saveAndLoadTextFile") {
val rdd = sc.parallelize(1 to 1000, 3)
val codec = classOf[org.apache.hadoop.io.compress.GzipCodec]
// save
rdd.saveAsTextFile("<path_to_save>/sub1")
// save(gzip)
rdd.saveAsTextFile("<path_to_save>/sub2", codec)
// load
val rdd2 = sc.textFile("<path_to_save>/sub1")
println(rdd2.take(10).toList)
}

sequence파일 저장

test("saveAndLoadSequenceFile") {
val rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1))

// save
// 아래 경로는 실제 저장 경로로 변경하여 테스트
rdd.saveAsSequenceFile("data/sample/saveAsSeqFile/scala")

// load!
// 아래 경로는 실제 저장 경로로 변경하여 테스트
val rdd2 = sc.sequenceFile[String, Int]("data/sample/saveAsSeqFile/scala")
println(rdd2.collect.mkString(", "))
}

클러스터 환경에서의 공유 변수

test("Broadcast") {
val broadcastUsers = sc.broadcast(Set("u1", "u2"))
val rdd = sc.parallelize(List("u1", "u3", "u3", "u4", "u5", "u6"), 3)
val result = rdd.filter(broadcastUsers.value.contains(_))

println(result.collect.mkString(","))
}


'IT > Spark' 카테고리의 다른 글

kafka windows build  (0) 2018.01.10
Spark Cluster #1  (0) 2017.12.20
RDD Action  (0) 2017.12.18
RDD filter And Sort  (0) 2017.12.18
RDD Operate Partition  (0) 2017.12.18