- 印刷する
- PDF
Spark Scala Job の送信
- 印刷する
- PDF
VPC環境で利用できます。
このガイドでは、Spark Scala Jobを作成してCloud Hadoopクラスタに送信する方法について説明します。
Scalaコードの作成とコンパイル
SparkアプリケーションをScalaで作成してjarにパッケージングする方法は、以下の2つです。
1. ターミナルでScalaを使用
ターミナルでHelloScala
を出力するScalaコードを作成してコンパイルし、.jarにパッケージングする例を挙げて説明します。
Scalaバイナリファイルのダウンロード
Scalaバイナリをダウンロードして圧縮を展開してください。
macOSでHomebrewを使用する場合、以下のようにインストールします。
brew install scala
環境変数の設定
以下のコマンドを実行して実行ファイル(例:.bashrc
)にSCALA_HOME
環境変数を設定し、PATH
に$SCALA_HOME
を追加してください。
export SCALA_HOME=/usr/local/share/scala
export PATH=$PATH:$SCALA_HOME/
Sparkアプリケーションの作成
Sparkアプリケーションを作成する方法は以下のとおりです。
- Scala REPLを実行し、
scala
を実行します。
❯ scala
# Welcome to Scala version ...
# Type in expressions to have them evaluated.
# Type :help for more information.
# scala>
- 以下のように
HelloWorld.scala
classを作成して保存します。
object HelloWorld {
def main(args: Array[String]): Unit = {
println("Hello, world!")
}
}
scala> :save HelloWorld.scala
scala> :q
- 以下のコマンドを用いて
scalac
にコンパイルします。
❯ scalac HelloWorld.scala
ls
コマンドを使用して正常にコンパイルされたか、.class
ファイルを確認します。
❯ ls HelloWorld*.class
HelloWorld$.class HelloWorld.class
JARファイルの作成
JARファイルを作成する方法は以下のとおりです。
jarコマンドを実行するには、Java SE、JREがインストールされている必要があります。
HelloWorld*.class
ファイルがあるディレクトリに移動し、以下のコマンドを使用してクラスファイルを.jarにパッケージングします。
❯ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
added manifest
adding: HelloWorld$.class(in = 670) (out= 432)(deflated 35%)
adding: HelloWorld.class(in = 645) (out= 524)(deflated 18%)
- パッケージングされたJARファイルで
HelloWorld
classがアプリケーションのentry pointとして設定されたことをMANIFEST.MFで確認します。
❯ unzip -q -c HelloWorld.jar META-INF/MANIFEST.MF
Manifest-Version: 1.0
Created-By: 1.8.0_181 (Oracle Corporation)
Main-Class: HelloWorld # entry point
2. IntelliJ SBTプラグインを使用
このガイドでは、Sparkアプリケーションの開発・デバッグを行うための環境をIntelliJに設定し、Hello Scala
というWordCount Jobをビルドする方法を例を挙げて説明します。
- ビルドマネージャー:SBT
- 例の作成環境:Windows OS, IntelliJ Ultimate 2022.1.4
プロジェクトの作成
プロジェクトを作成する方法は次のとおりです。
IntelliJを実行してください。
左側のPluginsメニューからScalaを検索してインストールしてください。
プラグインを反映するには再起動が必要です。 [Restart IDE] ボタンをクリックしてIntelliJを再起動してください。
ホーム画面の左メニューでProjectsをクリックし、New Projectをクリックしてください。
次のようにScalaとsbtを選択し、[Create] ボタンをクリックしてください。
- プロジェクト名:
WordCount
で指定 - Scalaとsbtのバージョンを選択
- プロジェクト名:
プロジェクトが正常に作成されたか確認します。
- プロジェクトが作成されると、基本的に以下のような構造を持つディレクトリとファイル情報が確認できます。
- .idea:IntelliJの構成ファイル
- project:コンパイルに用いられるファイル
- src:ソースコード。アプリケーションコードの大半は
src/main
に位置する必要がある。src/test
はテストスクリプト用のスペース。 - target:プロジェクトをコンパイルすると、この場所に保存
- build.sbt:SBTの構成ファイル
- プロジェクトが作成されると、基本的に以下のような構造を持つディレクトリとファイル情報が確認できます。
SBTライブラリをインポートする
IntelliJがSparkコードを認識するには、Spark-coreライブラリと文書をインポートする必要があります。
- Spark-coreライブラリは特定のバージョンのScalaと互換性があります。ライブラリをインポートする際は、Spark-coreとScalaのバージョンをそれぞれ確認してください。
- mvn repositoryで、Spark-coreライブラリとArtifact IDとの互換性のあるScalaのバージョンも確認します。
- Target > build.sbtをクリックし、スクリプト画面に以下のような内容を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
- ライブラリが正常にインポートされたのか、Buildコンソールで確認します。
SBTでライブラリをインポート際には、以下のような構文(syntax)を使用してください。
Group Id %% Artifact Id % Revision
Sparkアプリケーションの作成
ここではシェイクスピアのソネットのテキストファイル(shakespeare.txt
)をデータセットとして使用し、ソネットに含まれたワードをカウントするWordCountアプリケーションを作成する方法を例に挙げて説明します。
shakespeare.txtをダウンロードして
src/main/resources
に保存します。- Cloud Hadoopクラスタでこのアプリケーションを実行する際には、S3バケットまたはHDFSにデータセットをアップロードして使用します。
- Cloud Hadoopクラスタでこのアプリケーションを実行する際には、S3バケットまたはHDFSにデータセットをアップロードして使用します。
src > main
を選択してディレクトリを拡張し、scala
ディレクトリをマウス右クリックしてNew > Scala Classをクリックします。WordCount/src/main/scala
の下に新しいクラスを作成します。- Kind:
Object
- Kind:
正常に設定されたのか確認するには、
WordCount.scala
に以下のサンプルコードを作成して実行します。
object WordCount {
def main(args: Array[String]): Unit = {
println("This is WordCount application")
}
}
以下のように正常に出力されたか確認します。
WordCount.scala
に適用したサンプルコードを削除し、シェイクスピアソネットテキストファイルのワード数をカウントするコードを以下のように作成します。
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]) : Unit = {
//Create a SparkContext to initialize Spark
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Word Count")
val sc = new SparkContext(conf)
// Load the text into a Spark RDD, which is a distributed representation of each line of text
val textFile = sc.textFile("src/main/resources/shakespeare.txt")
//word count
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.foreach(println)
System.out.println("Total words: " + counts.count())
counts.saveAsTextFile("/tmp/sonnetWordCount")
}
}
Master URLs
Sparkの配布環境に応じてMaster URLは変わります。
Local(pseudo-cluster):
local
、local[N]
、local[*]
(使用するthread数に応じて区分、「*」は、JVMで使用できる最大限のプロセッサの分だけのthreadsを使用)Clustered
Spark Standalone:spark://host:port,host1:port1...
Spark on Hadoop YARN:yarn
Spark on Apache Mesos:mesos://
WordCount.scala
を実行して出力結果を確認します。
Jarファイルの作成
- Object Storageバケットにデータセットをアップロードし、ソースコードの
resource
ファイルパスを以下のように変更します。- データセットを
HDFS
にアップロードして使用するには、s3a://
の代わりにhdfs://
を使用します。
- データセットを
// FROM
conf.setMaster("local")
// TO
conf.setMaster("yarn-cluster")
// FROM
val textFile = sc.textFile("src/main/resources/shakespeare.txt")
// TO
val textFile = sc.textFile("s3a://deepdrive-hue/tmp/shakespeare.txt")
// FROM
counts.saveAsTextFile("/tmp/sonnetWordCount");
// TO
counts.saveAsTextFile("s3a://deepdrive-hue/tmp/sonnetWordCount");
ここではSpark 1.6を基準にしているため、conf.setMaster()
でyarn-cluster
と明示する必要があります。Spark 2からはyarn
でも構いません。
- Terminal コンソールで次のコマンドを使用して、更新したコードを Cloud Hadoop クラスターに送信できるように compiled jar でパッケージ化します。
- JARファイルには、アプリケーションコードと
build.sbt
に定義したすべてのdependenciesが含まれています。 sbt package
コマンドは$PROJECT_HOME/target/scala-2.11
の下位にwordcount_2.11-0.1.jar
ファイルを作成します。
- JARファイルには、アプリケーションコードと
> cd ~/IdeaProjects/WordCount # PROJECT_HOME
> sbt package
Cloud HadoopクラスタにSpark Jobを送信
ローカルで作成したSparkアプリケーション(.jar)をCloud Hadoopに配布して送信する方法を説明します。
Object Storageにjarをアップロード
HUEのS3ブラウザまたはObject Storageコンソールを用いてshakespeare.txt
と.jarをObject Storageバケットにコピーしてください。
- HUEへのアクセスや使用に関する詳細は、 HUEを使用するをご参照ください。
- Object Storageバケットに関する詳細は、Object Storageご利用ガイドを参照してください。
Jobの送信
JARファイルをクラスタに送信する2つの方法について説明します。
以下のようなpropertyがspark-defaults.conf
に正しく設定されている必要があります。
spark.hadoop.fs.s3a.access.key=<OBEJCT-STORAGE-ACCESS-KEY>
spark.hadoop.fs.s3a.endpoint=http://kr.objectstorage.ncloud.com
spark.hadoop.fs.s3a.secret.key=<OBJECT-STORAGE-SECRET-KEY>
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
HUEのSpark Submit Jar利用
Spark Clientsノードから送信
- クラスタで、Spark Clientがインストールされたノードで以下のように
spark-submit
コマンドを実行します。
spark-submit --class WordCount --master yarn-cluster --deploy-mode cluster s3a://deepdrive-hue/tmp/wordcount_2.11-0.1.jar
- Jobの実行が完了すると、以下のように結果が指定したバケットのパスに保存されたことを確認します。
Deploy modeは、配布環境でドライバー(SparkContext)が実行される場所によって決まります。モードには、以下のようなオプションがあります。
client
(default):Sparkアプリケーションが実行されるマシンでドライバーが実行されるcluster
:クラスタ内のランダムなノードでドライバーが実行される
spark-submit
コマンドの--deploy-mode
CLIオプション、またはSpark property構成でspark.submit.deployMode
に変更できます。