Spark简介
什么是Spark?
Apache Spark是用于大规模数据处理的快速(fast)和通用(general)引擎,由加州伯克利分校AMP(Algorithms、Machines and People Lab,在算法、机器和人之间通过大规模集成来展现大数据的应用平台)实验室开发的大数据处理框架。
Spark提供了大数据处理的一站式解决方案,以Spark Core为基础推出了Spark SQL、Spark Streaming、MLlib、GraphX、SparkR等组件。整个Spark生态体系称为BDAS,即:伯克利数据分析栈。
Spark特点
Spark具有运行速度快、易用性好、通用型强和随处运行的特点。
运行速度快(Speed)
如果Spark基于内存读取,速度是Hadoop的100倍;使用磁盘读取,也是Hadoop的十倍。spark之所以能够比Hadoop快,有两点主要原因:内存计算和引入DAG执行引擎。
易用性好(Ease of Use)
Spark支持Scala、Java、Python、R语言编写应用程序,并且提供了Scala、Python、R的交互式操作shell。
通用型强(generality)
Spark提供了一站式的大数据解决方案,生态圈BADS包含了:提供内存计算框架的Spark Core、用于结构化查询的Spark SQL、用于实时计算的Spark Streaming、用于机器学习的MLlib和用于图计算的GraphX。
随处运行(Runs Everywhere)
Spark提供了本地Local运行模式,用来学习和测试(当然还有许多用途,比如我们正在做的一个项目就是基于Local模式的)。对于集群部署模式,Spark能够以YARN、Mesos和自身提供的Standalone作为资源管理调度框架来执行作业。对于数据源,Spark能够读取HDFS、Cassandra、HBase、S3、Alluxio等数据源数据。
资源调度器YARN
YARN简介
YARN是Spark的3种调度器之一,主要是用来管理和分配集群资源
需要根据情况修改YARN默认参数配置(修改yarn-site.xml或者CDH设置):
提交任务到集群,根据计算类型分配各自参数:
- IO密集型:每个exector的cores为1就好;
- driver-memory:主要取决于最后聚合输出(如collect操作)的结果大小;
- num-executors:最好与代码里面的partition或coalesce操作的分区数量相匹配;
- executor-memory:取决于每个stage需要的内存大小;
- 为了使代码更加清晰可读,shell代码可以使用\来分行
1
2
3
4
5
6
7
8
9
10
11
12
13#!/usr/bin/env sh
sbt package &&\
scp target/scala-2.11/metro-data_2.11-1.0.jar \
root@c1:~/chengsluo/ &&\
ssh root@c1 -o StrictHostKeyChecking=no "
cd chengsluo && \
spark2-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 1g \
--executor-cores 2 \
metro-data_2.11-1.0.jar
"
提前中断任务
1 | yarn application -kill application_1516444072266_0002 |
HIVE和Spark-SQL性能对比
逻辑介绍
- 生成OD断面中间表,通过历史运行数据来得到运行线路;
- 由一个历史OD比例表与一个子查询链接,共生成130441786条数据。
性能对比
— Spark with Python Spark with Scala HIVE On MapReduce 耗时 4.7min 4.4min 20min 其中HIVE on MapReduce可能受大屏项目的其他查询影响,导致速度很可能比正常要慢一些。 ### 三种实现 HIVE-SQL语句: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34insert overwrite table sparktest.tbl_dm_od_cust_mid1 select
dm_id
,a.od_begin_cd
,a.od_end_cd
,a.time_id
,line_id
,GD_begin_cd
,gd_end_cd
,lead(gd_end_cd,1,'0000') over(partition by substr(cast(dm_id as string),1,9) order by run_time) as gd_end_cd_nxt
,gd_begin_nm
,gd_end_nm
,a.time_id+run_time/3 as sta_time_id
,a.time_id+max_time/3 end_time_id
,cast(b.percent*a.cust_rate as float) precent
from
sptcc_dm.tbl_dm_od_rate a,
(select
dm_id
,line_id
,od_begin_cd
,od_end_cd
,gd_begin_cd
,gd_end_cd
,lead(gd_end_cd,1,'0000') over(partition by substr(cast(dm_id as string),1,9) order by run_time) as gd_end_cd_nxt
,gd_begin_nm
,gd_end_nm
,run_time
,percent
,max(run_time) over(partition by substr(cast(dm_id as string),1,9)) as max_time
from sptcc_dpa.tbl_dim_od_dm
) b
where
a.od_begin_cd=b.od_begin_cd
and a.od_end_cd=b.od_end_cd
Spark实现代码:
1 | package org.apache.spark.examples.sql.hive |
pyspark实现代码:
1 | from pyspark.sql import HiveContext |
评价
在本次测试中,只是简单的使用了一下Spark中SQL查询API的使用。实际上并没有体现出Spark的优势。个人觉得Spark的优势在于在一系列的业务处理和查询过程中,可以很方便的把子查询的中间结果大批量的缓存在内存中,这样就给传统的SQL查询带来了很多优化的空间。另外,Spark也可以快速的对处理结果进行RDD编程和流式处理,这样Spark平台就可以支撑起绝大多数业务类型了。
附录
测试组件版本
- hadoop:2.6.0
- spark:2.1.0
- hive:1.1.0
- java:1.8
网页资料
- Spark-SQL,DataFrames and Datasets 官方文档
- hadoop HDFS常用文件操作命令
- YARN的内存和CPU配置
- JavaChen的博客里面的示例很好
- Spark的一些调优经验
学习中的一些坑
- 对于有结果的任务,下一次重新运行时要记得清空结果
- 对于输入输出路径最好参数化,本地测试和集群运行使用不同的参数即可
- 对于yarn的参数最好不要写死在代码里面,而是在运行的脚本里面附件参数,这样调试更灵活
- 编译时要注意各种组件的版本号,本地测试时要配置与集群一致的版本号.
- 像我这样对API不熟悉的初学者,测试时应该尽量用spark-shell调试代码原型,在本地编辑器中调试不是很方便。
- 要分清Spark中的RDD/DF和DS区别和相互转化。