본문 바로가기

IT/Spark

Practice using the results "jar" in Spark

단어세기 예제 코드

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
def main(args: Array[String]): Unit = {
require(args.length==2,"Usage : WordCount <inputFilePath> <outputPath>")
val inputPath = args(0)
val outoutPath = args(1)
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val logData = sc.textFile(inputPath)
val words = logData.flatMap( str => str.split(" "))
val wcPair = words.map((_,1))
val result = wcPair.reduceByKey(_+_)
result.saveAsTextFile(outoutPath)
result.foreach(
println)
sc.stop()
}
}


빌드

콘솔에서
PS > sbt package
sbt 실행을 통해 jar 뽑아낸다.

빌드 결과물:

\PROJECT_NAME\target\scala-2.11\XXX.jar 

Spark 에서 실행

spark 가 설치되어 있는 곳에서 아래의 명령어 입력하여 실행한다.

파일 경로 입력시 file:// 로 시작하니 주위해야 한다.

~/spark$ ./bin/spark-submit  --class WordCount /mnt/c/Users/NX/IdeaProjects/sparkTest1/target/scala-2.11/sparktest1_2.11-1.0.jar file:///mnt/d/test/wordCount.txt file:///mnt/d/test/wordCountResult

실행 결과

D:\test\wordCountResult\part-00000

입력

aa bb cc dd
aa
bb
bb
bb

결과

D:\test\wordCountResult\part-00000 를 text viewer 로 열어보면 아래의 결과를 볼 수 있다.
(aa,2)
(dd,1)
(bb,4)
(cc,1)


테스트 환경 셋팅

scalatest 사용

http://www.scalatest.org/user_guide/using_scalatest_with_sbt 참고

build.sbt
name := "sparkTest1"

version := "1.0"

scalaVersion := "2.11.8"
resolvers += "Artima Maven Repository" at "http://repo.artima.com/releases"
libraryDependencies ++= {
val sparkVer = "2.2.0"
Seq(
"org.apache.spark" %% "spark-core" % sparkVer
)
}

libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"

테스트 코드 작성


코드


import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.FunSuite

import scala.collection.mutable.ListBuffer
class WordCountSpec extends FunSuite{

test("wordCount"){
val conf = new SparkConf()
conf.setMaster("local[*]").setAppName("WordCountTest")
conf.set("spark.local.ip","127.0.0.1")
conf.set("spark.driver.host","127.0.0.1")

val sc = new SparkContext(conf)
val input = new ListBuffer[String]
input += "Apache Spark is a fast and general engine"
input += "Apache Spark is Windows and Unix"
input += "Apache Spark is Unix"

//input.toList

val inputRDD = sc.parallelize(input)

val words = inputRDD.flatMap(_.split(" "))
val wcPair = words.map((_,1))
val resultRDD = wcPair.reduceByKey(_+_)
val resultMap = resultRDD.collectAsMap()

resultMap.foreach( println)

}
}

테스트 결과

(is,3)
(fast,1)
(general,1)
(and,2)
(a,1)
(Spark,3)
(engine,1)
(Apache,3)
(Unix,2)
(Windows,1)
(,2)

참고링크

http://pubdata.tistory.com/38

http://hyunje.com/data%20analysis/2014/11/06/spark-rdd-functions/



콤비네이터 설명(한글)

https://twitter.github.io/scala_school/ko/collections.html

http://jdm.kr/blog/85

http://coding-korea.blogspot.kr/2012/12/map-flatmap-foldleft.html

http://www.brunton-spall.co.uk/post/2011/12/02/map-map-and-flatmap-in-scala/

'IT > Spark' 카테고리의 다른 글

spark docker 에 설치하기(작성중)  (0) 2017.12.11
RDD 정리  (0) 2017.12.08
Reading before learning Spark  (0) 2017.11.22
spark develop environment (scala + intellij + sbt)  (0) 2017.11.22
spark shell test  (0) 2017.11.20