Spark Scala Job の送信

Prev Next

VPC環境で利用できます。

このガイドでは、Spark Scala Jobを作成してCloud Hadoopクラスタに送信する方法について説明します。

Scalaコードの作成とコンパイル

SparkアプリケーションをScalaで作成してjarにパッケージングする方法は、以下の2つです。

  1. ターミナルでScalaを使用
  2. IntelliJ SBTプラグインを使用

1. ターミナルでScalaを使用

ターミナルでHelloScalaを出力するScalaコードを作成してコンパイルし、.jarにパッケージングする例を挙げて説明します。

Scalaバイナリファイルのダウンロード

Scalaバイナリをダウンロードして圧縮を展開してください。

macOSでHomebrewを使用する場合、以下のようにインストールします。

brew install scala

環境変数の設定

以下のコマンドを実行して実行ファイル(例:.bashrc)にSCALA_HOME環境変数を設定し、PATH$SCALA_HOMEを追加してください。

export SCALA_HOME=/usr/local/share/scala
export PATH=$PATH:$SCALA_HOME/

Sparkアプリケーションの作成

Sparkアプリケーションを作成する方法は以下のとおりです。

  1. Scala REPLを実行し、scalaを実行します。
❯ scala
# Welcome to Scala version ...
# Type in expressions to have them evaluated.
# Type :help for more information.
# scala>
  1. 以下のようにHelloWorld.scala classを作成して保存します。
object HelloWorld {
def main(args: Array[String]): Unit = {
println("Hello, world!")
  }
}
scala> :save HelloWorld.scala
scala> :q
  1. 以下のコマンドを用いてscalacにコンパイルします。
❯ scalac  HelloWorld.scala
  1. lsコマンドを使用して正常にコンパイルされたか、.classファイルを確認します。
❯ ls HelloWorld*.class
HelloWorld$.class HelloWorld.class

JARファイルの作成

JARファイルを作成する方法は以下のとおりです。

参考

jarコマンドを実行するには、Java SE、JREがインストールされている必要があります。

  1. HelloWorld*.classファイルがあるディレクトリに移動し、以下のコマンドを使用してクラスファイルを.jarにパッケージングします。
❯ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
added manifest
adding: HelloWorld$.class(in = 670) (out= 432)(deflated 35%)
adding: HelloWorld.class(in = 645) (out= 524)(deflated 18%)
  1. パッケージングされたJARファイルでHelloWorld classがアプリケーションのentry pointとして設定されたことをMANIFEST.MFで確認します。
❯ unzip -q -c HelloWorld.jar META-INF/MANIFEST.MF
Manifest-Version: 1.0
Created-By: 1.8.0_181 (Oracle Corporation)
Main-Class: HelloWorld # entry point

2. IntelliJ SBTプラグインを使用

このガイドでは、Sparkアプリケーションの開発・デバッグを行うための環境をIntelliJに設定し、Hello ScalaというWordCount Jobをビルドする方法を例を挙げて説明します。

  • ビルドマネージャー:SBT
  • 例の作成環境:Windows OS, IntelliJ Ultimate 2022.1.4

プロジェクトの作成

プロジェクトを作成する方法は次のとおりです。

IntelliJを実行してください。

  1. 左側のPluginsメニューからScalaを検索してインストールしてください。
    chadoop-4-6-002_ko.png

  2. プラグインを反映するには再起動が必要です。 [Restart IDE] ボタンをクリックしてIntelliJを再起動してください。
    chadoop-4-6-restart_ja.png

  3. ホーム画面の左メニューでProjectsをクリックし、New Projectをクリックしてください。
    chadoop-4-6-003_ja.png

  4. 次のようにScalasbtを選択し、[Create] ボタンをクリックしてください。

    • プロジェクト名: WordCountで指定
    • Scalasbtのバージョンを選択
      chadoop-4-6-004_ja.png
  5. プロジェクトが正常に作成されたか確認します。

    • プロジェクトが作成されると、基本的に以下のような構造を持つディレクトリとファイル情報が確認できます。
      • .idea:IntelliJの構成ファイル
      • project:コンパイルに用いられるファイル
      • src:ソースコード。アプリケーションコードの大半はsrc/mainに位置する必要がある。src/testはテストスクリプト用のスペース。
      • target:プロジェクトをコンパイルすると、この場所に保存
      • build.sbt:SBTの構成ファイル
        chadoop-4-6-006_ja.png

SBTライブラリをインポートする

IntelliJがSparkコードを認識するには、Spark-coreライブラリと文書をインポートする必要があります。

参考
  • Spark-coreライブラリは特定のバージョンのScalaと互換性があります。ライブラリをインポートする際は、Spark-coreとScalaのバージョンをそれぞれ確認してください。
  1. mvn repositoryで、Spark-coreライブラリとArtifact IDとの互換性のあるScalaのバージョンも確認します。

chadoop-4-6-008_ja.png

  1. Target > build.sbtをクリックし、スクリプト画面に以下のような内容を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
  1. ライブラリが正常にインポートされたのか、Buildコンソールで確認します。
    chadoop-4-6-007_ja.png
参考

SBTでライブラリをインポート際には、以下のような構文(syntax)を使用してください。

Group Id %% Artifact Id % Revision

Sparkアプリケーションの作成

ここではシェイクスピアのソネットのテキストファイル(shakespeare.txt)をデータセットとして使用し、ソネットに含まれたワードをカウントするWordCountアプリケーションを作成する方法を例に挙げて説明します。

  1. shakespeare.txtをダウンロードしてsrc/main/resourcesに保存します。

    • Cloud Hadoopクラスタでこのアプリケーションを実行する際には、S3バケットまたはHDFSにデータセットをアップロードして使用します。
      chadoop-4-6-009_ja.png
  2. src > mainを選択してディレクトリを拡張し、scalaディレクトリをマウス右クリックしてNew > Scala Classをクリックします。

  3. WordCount/src/main/scalaの下に新しいクラスを作成します。

    • Kind: Object
      chadoop-4-6-010_ja.png
  4. 正常に設定されたのか確認するには、WordCount.scalaに以下のサンプルコードを作成して実行します。

object WordCount {
    def main(args: Array[String]): Unit = {
      println("This is WordCount application")
    }
}
  1. 以下のように正常に出力されたか確認します。
    chadoop-4-6-011_ja.png

  2. WordCount.scalaに適用したサンプルコードを削除し、シェイクスピアソネットテキストファイルのワード数をカウントするコードを以下のように作成します。

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

def main(args: Array[String]) : Unit = {

  //Create a SparkContext to initialize Spark
  val conf = new SparkConf()
  conf.setMaster("local")
  conf.setAppName("Word Count")
  val sc = new SparkContext(conf)

  // Load the text into a Spark RDD, which is a distributed representation of each line of text
  val textFile = sc.textFile("src/main/resources/shakespeare.txt")

  //word count
  val counts = textFile.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey(_ + _)

  counts.foreach(println)
  System.out.println("Total words: " + counts.count())
  counts.saveAsTextFile("/tmp/sonnetWordCount")
 }

}
参考

Master URLs
Sparkの配布環境に応じてMaster URLは変わります。

  • Local(pseudo-cluster)locallocal[N]local[*](使用するthread数に応じて区分、「*」は、JVMで使用できる最大限のプロセッサの分だけのthreadsを使用)

  • Clustered
    Spark Standalone: spark://host:port,host1:port1...
    Spark on Hadoop YARN: yarn
    Spark on Apache Mesos: mesos://

  1. WordCount.scalaを実行して出力結果を確認します。
    chadoop-4-6-012_ja.png

Jarファイルの作成

  1. Object Storageバケットにデータセットをアップロードし、ソースコードのresourceファイルパスを以下のように変更します。
    • データセットをHDFSにアップロードして使用するには、s3a://の代わりにhdfs://を使用します。
// FROM
conf.setMaster("local")
// TO
conf.setMaster("yarn-cluster")

// FROM
val textFile = sc.textFile("src/main/resources/shakespeare.txt")
  // TO
val textFile = sc.textFile("s3a://deepdrive-hue/tmp/shakespeare.txt")

// FROM
counts.saveAsTextFile("/tmp/sonnetWordCount");
// TO
counts.saveAsTextFile("s3a://deepdrive-hue/tmp/sonnetWordCount");
参考

ここではSpark 1.6を基準にしているため、conf.setMaster()yarn-clusterと明示する必要があります。Spark 2からはyarnでも構いません。

  1. Terminal コンソールで次のコマンドを使用して、更新したコードを Cloud Hadoop クラスターに送信できるように compiled jar でパッケージ化します。
    • JARファイルには、アプリケーションコードとbuild.sbtに定義したすべてのdependenciesが含まれています。
    • sbt packageコマンドは$PROJECT_HOME/target/scala-2.11の下位にwordcount_2.11-0.1.jarファイルを作成します。
> cd ~/IdeaProjects/WordCount # PROJECT_HOME
> sbt package

chadoop-4-6-terminal_ja.png

Cloud HadoopクラスタにSpark Jobを送信

ローカルで作成したSparkアプリケーション(.jar)をCloud Hadoopに配布して送信する方法を説明します。

Object Storageにjarをアップロード

HUEのS3ブラウザまたはObject Storageコンソールを用いてshakespeare.txtと.jarをObject Storageバケットにコピーしてください。

Jobの送信

JARファイルをクラスタに送信する2つの方法について説明します。

参考

以下のようなpropertyがspark-defaults.confに正しく設定されている必要があります。

spark.hadoop.fs.s3a.access.key=<OBEJCT-STORAGE-ACCESS-KEY>
spark.hadoop.fs.s3a.endpoint=http://kr.objectstorage.ncloud.com
spark.hadoop.fs.s3a.secret.key=<OBJECT-STORAGE-SECRET-KEY>
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
  • HUEのSpark Submit Jar利用
    chadoop-4-6-014-vpc_ja.png

  • Spark Clientsノードから送信

  1. クラスタで、Spark Clientがインストールされたノードで以下のようにspark-submitコマンドを実行します。
spark-submit --class WordCount --master yarn-cluster --deploy-mode cluster s3a://deepdrive-hue/tmp/wordcount_2.11-0.1.jar
  1. Jobの実行が完了すると、以下のように結果が指定したバケットのパスに保存されたことを確認します。
    chadoop-4-6-015-vpc_ja.png
参考

Deploy modeは、配布環境でドライバー(SparkContext)が実行される場所によって決まります。モードには、以下のようなオプションがあります。

  • client(default):Sparkアプリケーションが実行されるマシンでドライバーが実行される
  • cluster:クラスタ内のランダムなノードでドライバーが実行される

spark-submitコマンドの--deploy-mode CLIオプション、またはSpark property構成でspark.submit.deployModeに変更できます。