博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RDD编程初级实践
阅读量:2121 次
发布时间:2019-04-30

本文共 4494 字,大约阅读时间需要 14 分钟。

RDD编程初级实践

文章目录


spark的特点

1.快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上;而基于磁盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效地处理数据流。

2.容易使用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同应用。而且Spark支持交互式的Python和Scala的Shell,这意味着可以非常方便的在这些Shell中使用Spark集群来验证解决问题的方法,而不是像以前一样,需要打包、上传集群、验证等。这对于原型开发非常重要。
3.通用性:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(通用Spark SQL)、实时流处理(通过Spark Streaming)、机器学习(通过Spark MLlib)和图计算(通过Spark GraphX)。

一、数据来源、环境介绍

本次实验使用到的数据集:data.txt; A.txt; B.txt; Algorithm.txt; Database.txt; Python.txt。

实验环境:Ubuntu Kylin 16.04;Spark 2.4.8 ;Python 3.1.4

二、实验步骤

1.将文件弄到虚拟机里

因为没有下载安装FinaShell,所以要想把文件弄到虚拟机内只能用qq邮箱传送下载的方法。

在这里插入图片描述

2.配置pyspark环境

放两个链接

http://dblab.xmu.edu.cn/blog/2501-2/
http://dblab.xmu.edu.cn/blog/2427-2/
按照里面的做就可以了

代码如下(示例):

data = pd.read_csv(    'https://labfile.oss.aliyuncs.com/courses/1283/adult.data.csv')print(data.head())

3.数据处理过程与结果

该处使用的url网络请求的数据。


(1)处理data.txt文件

该数据集包含了某大学计算机系的成绩,我们提取成绩要在pyspark shell界面中通过编程来操作,并且要通过输入pyspark来直接进入,不能通过上面例子中检测pyspark能否运行的方法:

在这里插入图片描述
这种事不行的

在这里插入图片描述

*要用这种

下面开始处理文件:

1.该系总共有多少学生

在这里插入图片描述

代码如下(示例):

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).map(lambda x:x[0])sum = res.distinct()sum.count()

2.该系共开设了多少门课程

代码如下(示例):

在这里插入图片描述

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).map(lambda x:x[1])sum = res.distinct()sum.count()

3.Tom同学的总成绩平均分是多少

在这里插入图片描述

代码如下(示例):

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0] == 'Tom')score = res.map(lambda x:int(x[2]))sum_score = score.reduce(lambda x,y:x+y)num = res.count()avg = sum_score/numprint(avg)

4.求每名同学的选修的课程门数

在这里插入图片描述

代码如下(示例):

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1))each_res = res.reduceByKey(lambda x,y:x+y)each_res.foreach(print)

5.该系DataBase课程共有多少人选修

在这里插入图片描述

代码如下(示例):

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1] == 'DataBase')res.count()

6.各门课程的平均分是多少

在这里插入图片描述

代码如下(示例):

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))avg = temp.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))avg.foreach(print)

7 使用累加器计算共有多少人选了DataBase这门课

在这里插入图片描述

代码如下(示例):

lines = sc.textFile("file:///usr/local/spark/data.txt")res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1] == 'DataBase')accum = sc.accumulator(0)res.foreach(lambda x:accum.add(1))accum.value

(2)合并A.txt,B.txt数据集

我们要对两个数据集进行集合与去重,将结果输出成一个新的数据集。

进入/usr/local/spark/目录,在目录下创建一个脚本文件remdup.py。
在这里插入图片描述
然后在remdup.py中输入代码
在这里插入图片描述

代码如下(示例):

from pyspark import SparkContextsc = SparkContext('local','remdup')lines1 = sc.textFile("file:///usr/local/spark/A.txt")lines2 = sc.textFile("file:///usr/local/spark/B.txt")lines = lines1.union(lines2)distinct_lines = lines.distinct()res = distinct_lines.sortBy(lambda x:x)res.repartition(1).saveAsTextFile("file:///usr/local/spark/result")

使用命令python3 remdup.py运行脚本文件,注意执行程序时先退出spark shell,否则会出现“地址已在使用”的错误。

在这里插入图片描述
然后在/usr/local/spark/spark/result路径下会看到part-00000文件的生成。
在这里插入图片描述
打开part-00000文件查看结果

在这里插入图片描述

或者在/usr/local/spark/result目录下输入vim part-00000 来查看
在这里插入图片描述

(3)求Algorithm.txt; Database.txt; Python.txt平均数

三个数据我们要编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中,此项实验与第二个实验几乎一样,只有处理数据的程序不同。

首先在路径/usr/local/spark/mtcode中创建avgscore.py脚本文件

在这里插入图片描述
在avgscore.py中编译代码

from pyspark import SparkContextsc = SparkContext('local','avgscore')lines1 = sc.textFile("file:///usr/local/spark/Algorithm.txt")lines2 = sc.textFile("file:///usr/local/spark/Database.txt")lines3 = sc.textFile("file:///usr/local/spark/Python.txt")lines = lines1.union(lines2).union(lines3)distinct_lines = lines.distinct()lines4 = distinct_lines.sortBy(lambda x:x).filter(bool)data = lines4.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))result.repartition(1).saveAsTextFile("file:///usr/local/spark/result")

注意:如果输出文件和之前实验二的文件名一样那要将之前的文件删除,不然会出错

然后使用命令python3 avgscore.py运行脚本文件,注意执行程序时先退出spark shell,否则会出现“地址已在使用”的错误。

在这里插入图片描述

在/usr/local/spark/spark//result路径下会有结果文件part-00000

在这里插入图片描述
查看结果
在这里插入图片描述

总结

本次实验需要在Ubuntu系统环境下,对与本地数据进行处理,查询,计算,统计等操作。首先RDD的创建分为,文件系统加载创建,HDFS加载上传和通过并行集合(列表)创建三种方法,为了简便,本实验实验第一种方法。接着便是RDD转换操作,通过不同的转换语句可以对数据进行排列去重等操作,从而实现对数据的处理。转换操作完成后要接着行动操作,行动操作才是真正触发计算的地方,Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一 次转换操作,最终,完成行动操作得到结果。除了以上两种操作,我们还要了解惰性机制,所谓的“惰性机制”是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。

转载地址:http://loyrf.baihongyu.com/

你可能感兴趣的文章
运行springboot项目出现:Type javax.xml.bind.JAXBContext not present
查看>>
Java中多线程向mysql插入同一条数据冲突问题
查看>>
Idea Maven项目使用jar包,添加到本地库使用
查看>>
FastDFS集群架构配置搭建(转载)
查看>>
HTM+CSS实现立方体图片旋转展示效果
查看>>
FFmpeg 命令操作音视频
查看>>
问题:Opencv(3.1.0/3.4)找不到 /opencv2/gpu/gpu.hpp 问题
查看>>
目的:使用CUDA环境变量CUDA_VISIBLE_DEVICES来限定CUDA程序所能使用的GPU设备
查看>>
问题:Mysql中字段类型为text的值, java使用selectByExample查询为null
查看>>
程序员--学习之路--技巧
查看>>
解决问题之 MySQL慢查询日志设置
查看>>
contOS6 部署 lnmp、FTP、composer、ThinkPHP5、docker详细步骤
查看>>
TP5.1模板布局中遇到的坑,配置完不生效解决办法
查看>>
PHPstudy中遇到的坑No input file specified,以及传到linux环境下遇到的坑,模板文件不存在
查看>>
TP5.1事务操作和TP5事务回滚操作多表
查看>>
composer install或composer update 或 composer require phpoffice/phpexcel 失败解决办法
查看>>
TP5.1项目从windows的Apache服务迁移到linux的Nginx服务需要注意几点。
查看>>
win10安装软件 打开时报错 找不到 msvcp120.dll
查看>>
PHPunit+Xdebug代码覆盖率以及遇到的问题汇总
查看>>
PHPUnit安装及使用
查看>>