Scala/Spark @ TACC

4/20/17

Introduction to Hadoop and Spark

Dr. Weijia Xu, Dr. Ruizhu Huang @ UTA

  • Hadoop and Spark on Wrangler
  • Scala/Spark
  • Data Analysis Using Hadoop/Spark

Webcast Discussion

Big Data v.s. HPC

  • Traditional
  • separate data
  • MPI code to use multiple nodes

  • MapReduce: programing model, platform for customized computation

  • move computations to data, reduce data transfer
  • sequential execute, same as RDBMS
  • scale out, not scale up
  • hardware indepedent

Hadoop

  • distributed data
  • key-value pair
  • computation
  • map instances
  • reduce instances

support at TACC * hadoop: open source implementation of MapReduce, programming in JAVA but interface to others * zeppelin: similiar to jupyter, hundreds of interpretator * spark, in-memory

Hadoop and Yarn

Procedure

  • input
  • splitting
  • mapping
  • shuffling
  • reducing
  • final result

Wrangler portal

Manage -> create hadoop reservation

  • VNC job: access VNC at vis.tacc.utexas.edu
  • check cluster info. and hadoop job status
  • idev job
  • manage data in & out
  • submit Hadoop jobs
  • test, dev, debug
  • batch job: submit jobs to YARN resource manager
  • large analysis job
  • sequentially jobs
# manage
## check reservation
scontrol show reservation

## load module and idev to compute node
module load hadoop-paths
idev –r hadoop+TRAINING-HPC+2186 –m 240 –p hadoop  # -m for minutes

## file system operations
hadoop fs -ls/mkdir/put/get/stat/cat/tail/setrep #set replication factor

## YARN, run jobs
yarn jar / hadoop jar # for java
yarn application -list/kill/appStates/appTypes
     node -list
     logs

## upload data
hadoop fs -put local_data data

## running hadoop
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce- examples.jar \
wordcount \ #java class name to run
-D mapred.map.tasks=500 \ #number of mapper instance 
-D mapred.reduce.tasks=256 \ # number of reducer instance
/tmp/data/enwiki-20120104-pages-articles.xml \ #input file on hdfs
wiki_wc #folder to store the output

## get results
hadoop fs –get /tmp/wiki_wc wiki_wc

Hadoop API

Check example at /Users/domi/Dropbox/Open_Course/TACC webinars/Hadoop_Spark/hadoop-training

# recap of hadoop streaming
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-input /path/to/input/in/hdfs
-output /path/to/output/in/hdfs
-D mapred.map.tasks=500 \ #number of mapper instance 
-D mapred.reduce.tasks=256 \ # number of reducer instance
-mapper map_implementation  ## can use other languages
-reducer reduce_implementation
-file map system
-file reduce system.

vs Spark

  • Spark is faster
  • more language support
  • MLib for Machine Learning

Zeppelin

  • web-based notebook
  • interactive
  • visualization
# Zepplelin
cp /data/apps/.zeppelin/job.zeppelin .
sbatch --reservation=hadoop+TRAINING-HPC+2186 job.zeppelin

web-UI

%spark

# with Python
%pyspark

# with R
%spark.r

%sh

Contacts

Data Group @ TACC Weijia Xu Ruizhu Huang


04/27/2017

Programming with Scala/Spark

Zhao Zhang

Wrangler Setup

idev -r hadoop+TRAINING-HPC+2187 -t 240 
export PATH=/opt/apps/scala/scala-2.11.8/bin:$PATH 

Scala

Google Scala API

  • Variables
  • Mutable, var
  • Immutable, val: cannot be reassigned
  • Primitive types
  • Double, Float, Long, Int, Short, Byte: val a: Double = 5
  • Char, Boolean, Unit
  • Composite variables
  • List, Map, Seq, Set, Tuple
    • List: immutable for length and element; mutable counter-structure: val l = new ListBuffer[Int/Double]()
    • (), head, last, length, reverse, sorted
    • Tuple: val t = (1,2,3), t._1 > Int = 1, val(i,j,k) = t
  • String
  • Function
  • (x: Int) => {\n println(x) \n x * 2 \n}
  • map((x: Int)/x => x * 2) or map(_ * 2)
  • multiple parameters: x: (Int, Int) => x._1 + x._1 or case (x,y) => x+y
  • multiple return: (x: Int, y: Int) => (x+3, y+5)
  • name function: def func(x: Int): Int = x * x as (x: Int) => x * x
  • Control Flow
  • Loop
    • while ( i < l.length ) {}
    • for ( i <- l ), <-
    • l.foreach(x => xxx ), =>
    • l.map(x => ...)
  • If ... else ...: l.foreach(x => {if (x%2 == 1) println(x)})
  • Pattern matching: l.foreach(x => x%2 match {case 1 => println(x) case_ => })
  • Iterator
## compile
scalac xxx.scala
## execute
scala xxx

## variable vs value
val l = List(1,2,3)  # value, immutable
var l = List(1,2,3)  # variable, can be reassigned

## mutable pointer
import scala.collection.mutable.ListBuffer
var l = new ListBuffer[Any]()
l += 1

## for-loop
val r = l.map((x:Int) => x*2)  # type: Int, Float, Double, String, List[_], ...
val r = l.foreach(x => x%2 match{
  case 1 => println(x)
  case _ =>
})

val l = List(1,2,3,4,5,6)
val i = l.toIterator  # i is a pointer to the List
i.hasNext
i.next
i.toList  # reverse the rest to list

Thread.sleep(1000)

RDD: Resilient Distributed Dataset

immutable, partitioned collection of elements that can be operated on in parallel Programming Models * Transformations * map: val r = l.map(x => x*2), List(2,4,6,8,10) * filter: val r = l.filter(x => x%2 == 0), List(2,4) * groupBy: val r = l.groupBy(x => x%2), Map(1 -> List(1, 3, 5), 0 -> List(2, 4)) * textFile: val lines = sc.textFile(“path-to-file”) * binaryFiles: val rdd = sc.binaryFiles(“path-to-file”) * Actions * count: val r = l.count(x => x%2 == 0), 2 * collect * take * reduce * saveAsTextFiles


Using Scala/Spark

Weijia Xu

Start zepplin on Wrangler: sbatch --reservation hadoop+TRAINING-HPC+2188 -A TRAINING-HPC /data/apps/.zeppelin/job.zeppelin.work -t 240

Data APIs

  • RDD: Resilient Distributed Dataset
  • rdd.map(x => x*2)
  • rdd.reduce(_+_)
  • rdd.filter(_%3 == 0)
  • DataFrame: since 1.3
  • Abstract API, on top of RDD
  • Schema
  • off-heap storage, both memory & hard drive
  • DataSet: since 1.6
  • Spark 2.x: dataframe = dataset[row]

RDD vs DataFrame vs DataSet

val rdd =  sc.parallelize(0 until 100)
# Conversion
## DF to RDD
val car_rdd = df.rdd
## RDD to DF
val rdd_df = rdd.toDF
## RDD to DS
val rdd_ds = rdd.toDS

# Functions
df.show(int n)
df.printSchema  # types and keys
df.describe(cols)

rdd.filter(_ < 10).collect
rdd_df.filter("value < 10").show
rdd_ds.filter("value < 10").show
rdd_ds.filter(_ < 10).show

rdd.map(_ * 2).collect
rdd_ds.map(_ * 2).show
rdd_df.map(_ * 2).show // will not work
rdd_df.select('value * 2).show

# Spark SQL
df.select($"model", $"mpg" * 1.6).filter("mpg > 4").show
df.groupBy("mpg").count().show
df.createOrReplaceTempView("cars")
spark.sql("SELECT col1, col2, ...
           FROM table1, table2, ...
           [WHERE condition1, AND|OR condition2 ...]
           [GROUPBY col1, ...]
           [ORDERBY col1, ...]")

IO in Spark

  • File format
  • JSON: schema
  • parquet, ORC: JSON with compression
  • path prefix
  • default: hdfs:/// -> /tmp/data
  • specify: file:///
  • file/folder
  • automatically read all files within directory
  • use directory to separate data
  • existing file: mode(error/append/overwrite/ignore), default is error
# Read CSV file
val df = spark.read.format(csv) .options(header, true)
.load(/tmp/data/mtcars.csv)
df.show()

# More Files
df.write.json("cars.json")
df.write.parquet("cars.parquet")
df.write.option("delimiter","\t").csv("cars.tab")
val df_json = spark.read.json("cars.json")
val df_parquet = spark.read.parquet("cars.parquet")

Workflow

  • prepare datasets
  • val cars = spark.read.format("csv").option(...).load("path_to_file").selectExpr("mpg + 0.0 as mpg", "cyl + 0.0 as label"): easy convert to double
  • val training = cars.sample(false, fraction)
  • val test = cars.except(training)
  • assemble feature
  • val assembler = new VectorAssembler().setInputCols(Array('key1', ...)).setOutputCol('features')
  • val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.2).setElasticNetParam(0.0)
  • define analysis
  • val pipeline = new Pipeline().setStages(Array(assembler, lr))
  • run analysis
  • val lrModel = pipeline.fit(training)
  • results
  • result = lrModel.transform(test).select('model, 'lable, 'prediction)

Spark Architecture * Create RDDs * HadoopRDD: val lines = sc.textFile("hdfs://names") * MapPartitionsRDD: val kvp = lines.map(name => (name(0), name)) * ShuffledRDD: val groups = kvp.groupByKey() * MapPartitionsRDD: val res = groups.mapvalues(names => names.toSet.size) * collect() * RDD Dependency * Narrow: One To One * Shuffle Dependency: * DAG(Directed Acyclic Graph) Generation * stage 0: HadoopRDD, MapPartitionsRDD * stage 1: ShuffledRDD, MapPartitionsRDD * Schedule Tasks * split each stage into tasks based on partition * reversed order, recursively find parent stages * blocking between stages

delay scheduling

spark.locality.wait (default 3s)
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

  • Executor Deployment
  • Node
  • Process: share physical node
  • Thread: share physical memory

    Spark YARN mode, in conf/spark-env.sh

    SPARK_EXECUTOR_INSTANCES (default 2) # Process SPARK_EXECUTOR_CORES (default 1) # Thread

  • Memory Management
  • Reserved: 300MB, default
  • User Memory: 25%, user data structure in RDD (value/variable)
  • Execution: internal storage, shuffle buffer on the mapper
    • export SPARK_WORKER_MEMORY 96G
  • Storage: cache, dynamic, default 0.5 of 75%
    • spark.memory.storageFraction in conf/spark-defaults.conf
  • Increase parallelism
    • more reducers by setting spark.default.parallelism
    • set number of partitions of the largest parent RDD sc.textFile(path,128)
    • YARN: use flag --executor-memory 2g
Published: Thu 04 May 2017. By Dongming Jin in

Comments !