단어세기 예제 코드
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
require(args.length==2,"Usage : WordCount <inputFilePath> <outputPath>")
val inputPath = args(0)
val outoutPath = args(1)
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val logData = sc.textFile(inputPath)
val words = logData.flatMap( str => str.split(" "))
val wcPair = words.map((_,1))
val result = wcPair.reduceByKey(_+_)
result.saveAsTextFile(outoutPath)
result.foreach(println)
sc.stop()
}
}
빌드
PS > sbt package
빌드 결과물:
Spark 에서 실행
spark 가 설치되어 있는 곳에서 아래의 명령어 입력하여 실행한다.
파일 경로 입력시 file:// 로 시작하니 주위해야 한다.
~/spark$ ./bin/spark-submit --class WordCount /mnt/c/Users/NX/IdeaProjects/sparkTest1/target/scala-2.11/sparktest1_2.11-1.0.jar file:///mnt/d/test/wordCount.txt file:///mnt/d/test/wordCountResult
실행 결과
입력
aa bb cc dd aa bb bb bb
결과
(aa,2) (dd,1) (bb,4) (cc,1)
테스트 환경 셋팅
name := "sparkTest1"
version := "1.0"
scalaVersion := "2.11.8"
resolvers += "Artima Maven Repository" at "http://repo.artima.com/releases"
libraryDependencies ++= {
val sparkVer = "2.2.0"
Seq(
"org.apache.spark" %% "spark-core" % sparkVer
)
}
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
테스트 코드 작성
코드
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.FunSuite
import scala.collection.mutable.ListBuffer
class WordCountSpec extends FunSuite{
test("wordCount"){
val conf = new SparkConf()
conf.setMaster("local[*]").setAppName("WordCountTest")
conf.set("spark.local.ip","127.0.0.1")
conf.set("spark.driver.host","127.0.0.1")
val sc = new SparkContext(conf)
val input = new ListBuffer[String]
input += "Apache Spark is a fast and general engine"
input += "Apache Spark is Windows and Unix"
input += "Apache Spark is Unix"
//input.toList
val inputRDD = sc.parallelize(input)
val words = inputRDD.flatMap(_.split(" "))
val wcPair = words.map((_,1))
val resultRDD = wcPair.reduceByKey(_+_)
val resultMap = resultRDD.collectAsMap()
resultMap.foreach( println)
}
}
테스트 결과
(is,3)
(fast,1)
(general,1)
(and,2)
(a,1)
(Spark,3)
(engine,1)
(Apache,3)
(Unix,2)
(Windows,1)
(,2)
참고링크
http://pubdata.tistory.com/38
http://hyunje.com/data%20analysis/2014/11/06/spark-rdd-functions/
콤비네이터 설명(한글)
https://twitter.github.io/scala_school/ko/collections.html
http://jdm.kr/blog/85
http://coding-korea.blogspot.kr/2012/12/map-flatmap-foldleft.html
http://www.brunton-spall.co.uk/post/2011/12/02/map-map-and-flatmap-in-scala/
'IT > Spark' 카테고리의 다른 글
spark docker 에 설치하기(작성중) (0) | 2017.12.11 |
---|---|
RDD 정리 (0) | 2017.12.08 |
Reading before learning Spark (0) | 2017.11.22 |
spark develop environment (scala + intellij + sbt) (0) | 2017.11.22 |
spark shell test (0) | 2017.11.20 |