4/20/17
Introduction to Hadoop and Spark
Dr. Weijia Xu, Dr. Ruizhu Huang @ UTA
- Hadoop and Spark on Wrangler
- Scala/Spark
- Data Analysis Using Hadoop/Spark
Big Data v.s. HPC
- Traditional
- separate data
-
MPI code to use multiple nodes
-
MapReduce: programing model, platform for customized computation
- move computations to data, reduce data transfer
- sequential execute, same as RDBMS
- scale out, not scale up
- hardware indepedent
Hadoop
- distributed data
- key-value pair
- computation
- map instances
- reduce instances
support at TACC * hadoop: open source implementation of MapReduce, programming in JAVA but interface to others * zeppelin: similiar to jupyter, hundreds of interpretator * spark, in-memory
Hadoop and Yarn
Procedure
- input
- splitting
- mapping
- shuffling
- reducing
- final result
Manage -> create hadoop reservation
- VNC job: access VNC at
vis.tacc.utexas.edu
- check cluster info. and hadoop job status
- idev job
- manage data in & out
- submit Hadoop jobs
- test, dev, debug
- batch job: submit jobs to YARN resource manager
- large analysis job
- sequentially jobs
# manage
## check reservation
scontrol show reservation
## load module and idev to compute node
module load hadoop-paths
idev –r hadoop+TRAINING-HPC+2186 –m 240 –p hadoop # -m for minutes
## file system operations
hadoop fs -ls/mkdir/put/get/stat/cat/tail/setrep #set replication factor
## YARN, run jobs
yarn jar / hadoop jar # for java
yarn application -list/kill/appStates/appTypes
node -list
logs
## upload data
hadoop fs -put local_data data
## running hadoop
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce- examples.jar \
wordcount \ #java class name to run
-D mapred.map.tasks=500 \ #number of mapper instance
-D mapred.reduce.tasks=256 \ # number of reducer instance
/tmp/data/enwiki-20120104-pages-articles.xml \ #input file on hdfs
wiki_wc #folder to store the output
## get results
hadoop fs –get /tmp/wiki_wc wiki_wc
Hadoop API
Check example at /Users/domi/Dropbox/Open_Course/TACC
webinars/Hadoop_Spark/hadoop-training
# recap of hadoop streaming
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-input /path/to/input/in/hdfs
-output /path/to/output/in/hdfs
-D mapred.map.tasks=500 \ #number of mapper instance
-D mapred.reduce.tasks=256 \ # number of reducer instance
-mapper map_implementation ## can use other languages
-reducer reduce_implementation
-file map system
-file reduce system.
vs Spark
- Spark is faster
- more language support
- MLib for Machine Learning
Zeppelin
- web-based notebook
- interactive
- visualization
# Zepplelin
cp /data/apps/.zeppelin/job.zeppelin .
sbatch --reservation=hadoop+TRAINING-HPC+2186 job.zeppelin
%spark
# with Python
%pyspark
# with R
%spark.r
%sh
Contacts
Data Group @ TACC Weijia Xu Ruizhu Huang
04/27/2017
Programming with Scala/Spark
Zhao Zhang
Wrangler Setup
idev -r hadoop+TRAINING-HPC+2187 -t 240
export PATH=/opt/apps/scala/scala-2.11.8/bin:$PATH
Scala
- Variables
- Mutable,
var
- Immutable,
val
: cannot be reassigned - Primitive types
- Double, Float, Long, Int, Short, Byte:
val a: Double = 5
- Char, Boolean, Unit
- Composite variables
- List, Map, Seq, Set, Tuple
- List: immutable for length and element; mutable counter-structure:
val l = new ListBuffer[Int/Double]()
- (), head, last, length, reverse, sorted
- Tuple:
val t = (1,2,3)
,t._1 > Int = 1
,val(i,j,k) = t
- List: immutable for length and element; mutable counter-structure:
- String
- Function
(x: Int) => {\n println(x) \n x * 2 \n}
map((x: Int)/x => x * 2)
ormap(_ * 2)
- multiple parameters:
x: (Int, Int) => x._1 + x._1
orcase (x,y) => x+y
- multiple return:
(x: Int, y: Int) => (x+3, y+5)
- name function:
def func(x: Int): Int = x * x
as(x: Int) => x * x
- Control Flow
- Loop
while ( i < l.length ) {}
for ( i <- l )
,<-
l.foreach(x => xxx )
,=>
l.map(x => ...)
- If ... else ...:
l.foreach(x => {if (x%2 == 1) println(x)})
- Pattern matching:
l.foreach(x => x%2 match {case 1 => println(x) case_ => })
- Iterator
## compile
scalac xxx.scala
## execute
scala xxx
## variable vs value
val l = List(1,2,3) # value, immutable
var l = List(1,2,3) # variable, can be reassigned
## mutable pointer
import scala.collection.mutable.ListBuffer
var l = new ListBuffer[Any]()
l += 1
## for-loop
val r = l.map((x:Int) => x*2) # type: Int, Float, Double, String, List[_], ...
val r = l.foreach(x => x%2 match{
case 1 => println(x)
case _ =>
})
val l = List(1,2,3,4,5,6)
val i = l.toIterator # i is a pointer to the List
i.hasNext
i.next
i.toList # reverse the rest to list
Thread.sleep(1000)
RDD: Resilient Distributed Dataset
immutable, partitioned collection of elements that can be operated on in parallel
Programming Models
* Transformations
* map: val r = l.map(x => x*2)
, List(2,4,6,8,10)
* filter: val r = l.filter(x => x%2 == 0)
, List(2,4)
* groupBy: val r = l.groupBy(x => x%2)
, Map(1 -> List(1, 3, 5), 0 ->
List(2, 4))
* textFile: val lines = sc.textFile(“path-to-file”)
* binaryFiles: val rdd = sc.binaryFiles(“path-to-file”)
* Actions
* count: val r = l.count(x => x%2 == 0)
, 2
* collect
* take
* reduce
* saveAsTextFiles
Using Scala/Spark
Weijia Xu
Start zepplin on Wrangler:
sbatch --reservation hadoop+TRAINING-HPC+2188
-A TRAINING-HPC /data/apps/.zeppelin/job.zeppelin.work -t 240
Data APIs
- RDD: Resilient Distributed Dataset
rdd.map(x => x*2)
rdd.reduce(_+_)
rdd.filter(_%3 == 0)
- DataFrame: since 1.3
- Abstract API, on top of RDD
- Schema
- off-heap storage, both memory & hard drive
- DataSet: since 1.6
- Spark 2.x:
dataframe = dataset[row]
RDD vs DataFrame vs DataSet
val rdd = sc.parallelize(0 until 100)
# Conversion
## DF to RDD
val car_rdd = df.rdd
## RDD to DF
val rdd_df = rdd.toDF
## RDD to DS
val rdd_ds = rdd.toDS
# Functions
df.show(int n)
df.printSchema # types and keys
df.describe(cols)
rdd.filter(_ < 10).collect
rdd_df.filter("value < 10").show
rdd_ds.filter("value < 10").show
rdd_ds.filter(_ < 10).show
rdd.map(_ * 2).collect
rdd_ds.map(_ * 2).show
rdd_df.map(_ * 2).show // will not work
rdd_df.select('value * 2).show
# Spark SQL
df.select($"model", $"mpg" * 1.6).filter("mpg > 4").show
df.groupBy("mpg").count().show
df.createOrReplaceTempView("cars")
spark.sql("SELECT col1, col2, ...
FROM table1, table2, ...
[WHERE condition1, AND|OR condition2 ...]
[GROUPBY col1, ...]
[ORDERBY col1, ...]")
IO in Spark
- File format
- JSON: schema
- parquet, ORC: JSON with compression
- path prefix
- default:
hdfs:///
->/tmp/data
- specify:
file:///
- file/folder
- automatically read all files within directory
- use directory to separate data
- existing file:
mode(error/append/overwrite/ignore)
, default is error
# Read CSV file
val df = spark.read.format(“csv”) .options(“header”, true)
.load(“/tmp/data/mtcars.csv”)
df.show()
# More Files
df.write.json("cars.json")
df.write.parquet("cars.parquet")
df.write.option("delimiter","\t").csv("cars.tab")
val df_json = spark.read.json("cars.json")
val df_parquet = spark.read.parquet("cars.parquet")
Workflow
- prepare datasets
val cars = spark.read.format("csv").option(...).load("path_to_file").selectExpr("mpg + 0.0 as mpg", "cyl + 0.0 as label")
: easy convert to doubleval training = cars.sample(false, fraction)
val test = cars.except(training)
- assemble feature
val assembler = new VectorAssembler().setInputCols(Array('key1', ...)).setOutputCol('features')
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.2).setElasticNetParam(0.0)
- define analysis
val pipeline = new Pipeline().setStages(Array(assembler, lr))
- run analysis
val lrModel = pipeline.fit(training)
- results
result = lrModel.transform(test).select('model, 'lable, 'prediction)
Spark Architecture
* Create RDDs
* HadoopRDD: val lines = sc.textFile("hdfs://names")
* MapPartitionsRDD: val kvp = lines.map(name => (name(0), name))
* ShuffledRDD: val groups = kvp.groupByKey()
* MapPartitionsRDD: val res = groups.mapvalues(names => names.toSet.size)
* collect()
* RDD Dependency
* Narrow: One To One
* Shuffle Dependency:
* DAG(Directed Acyclic Graph) Generation
* stage 0: HadoopRDD, MapPartitionsRDD
* stage 1: ShuffledRDD, MapPartitionsRDD
* Schedule Tasks
* split each stage into tasks based on partition
* reversed order, recursively find parent stages
* blocking between stages
delay scheduling
spark.locality.wait (default 3s)
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
- Executor Deployment
- Node
- Process: share physical node
- Thread: share physical memory
Spark YARN mode, in conf/spark-env.sh
SPARK_EXECUTOR_INSTANCES (default 2) # Process SPARK_EXECUTOR_CORES (default 1) # Thread
- Memory Management
- Reserved: 300MB, default
- User Memory: 25%, user data structure in RDD (value/variable)
- Execution: internal storage, shuffle buffer on the mapper
- export
SPARK_WORKER_MEMORY 96G
- export
- Storage: cache, dynamic, default 0.5 of 75%
spark.memory.storageFraction
inconf/spark-defaults.conf
- Increase parallelism
- more reducers by setting
spark.default.parallelism
- set number of partitions of the largest parent RDD
sc.textFile(path,128)
- YARN: use flag
--executor-memory 2g
- more reducers by setting
Comments !