打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Spark Rdd & DataFrame

学习spark首选要掌握好rdd和dataframe,他是spark中的基础,掌握好了rdd/dataframe以及他们的算子操作,为你学习spark打好基础。  RDD是弹性分布式数据集(resilient distributed dataset) 的简称,是一个可以参与并行操作并且可容错的元素集合;DataFrame的定义与RDD类似,即都是Spark 平台用以分布式并行计算的不可变分布式数据集合。与RDD最大的不同在于,RDD仅仅是一条条数据的集合,并不了解每条数据的内容是怎样的,而DataFrame明确的了解每条数据有几个命名字段组成,即可以形象地理解为RDD是条条数据组成的一维表,DataFrame 是行数据都有共同清晰的列划分的维表,每一行的内容的Row对象组成DF

       下面通过一些测试代码演示下spark在rdd和dataframe上的基本应用,主要包括怎么把文件读到rdd/dataframe的结构中,怎么把rdd/dataframe结构中的数据压缩输出到文件中。






















































































import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions.{udf, _}import org.junit.{Assert, Test}import org.apache.spark.sql.types._   @Test class SparkRddTest extends  Assert{   @Test  def testWriteRdd(): Unit ={    val sparkConf = new SparkConf().setAppName("SparkTestRdd");    sparkConf.setMaster("local[*]");    val ss = SparkSession.builder.config(sparkConf).getOrCreate()    val seq = List("American Person", "Tom", "Jim")    var rdd = ss.sparkContext.makeRDD(seq)    rdd.saveAsTextFile("D:/SparkTest/rdd")  }    @Test  def testWriteRdd1(): Unit ={    val sparkConf = new SparkConf().setAppName("SparkTestRdd");    sparkConf.setMaster("local[*]");    val ss = SparkSession.builder.config(sparkConf).getOrCreate()    val seq = List("American Person Test Spark Rdd, Today is nice  Day", "Tom", "Jim","American", "Tom", "Jim","American", "Tom", "Jim",      "Test" , "Test", "Hello", "Test","Tom", "Jim","American", "Tom", "Jim","American,China", "Tom", "Jim","This is End")    var rdd = ss.sparkContext.parallelize(seq)    rdd.repartition(1).saveAsTextFile("D:/SparkTest/rdd1")  }   @Test  def testReadText(): Unit ={    val sparkConf = new SparkConf().setAppName("SparkTestRdd");    sparkConf.setMaster("local[*]");    val ss = SparkSession.builder.config(sparkConf).getOrCreate()    //读取目录下文本,返回dataset    //val ds = ss.read.textFile("D:/SparkTest/rdd1")    //打印输出dataset,默认显示20个字符,20行记录    //ds.show()     //读取目录下文本,并转化成dataframe给字段命名    //val df = ss.read.textFile("D:/SparkTest/rdd1").toDF("name")    //打印输出dataframe,1000是行数,false是不截取字符全部显示    //df.show(1000,false)     //读取目录下文本,直接读取成dataframe,并给字段重命名    val df = ss.read.text("D:/SparkTest/rdd1").withColumnRenamed("value","name")    df.show()  }   @Test  def testCsv(): Unit ={    val sparkConf = new SparkConf().setAppName("SparkTestRdd");    sparkConf.setMaster("local[*]");    val ss = SparkSession.builder.config(sparkConf).getOrCreate()     val df = ss.read.text("D:/SparkTest/rdd1").withColumnRenamed("value","name")    val coder6: (String => Long) = (arg: String) => {2}    val func = udf(coder6)    val df2 = df.withColumn("addCoumn",col("name")).withColumn("addCoumn2",func(col("name")))    //df2.write.option("delimiter","\t").csv("D:/SparkTest/rddcsv")    //效果和上面一样    //df2.write.format("csv").save("D:/SparkTest/rddcsv")    ss.read.option("delimiter","\t").csv("D:/SparkTest/rddcsv").select(col("_c0").cast(StringType),col("_c1").cast(StringType),col("_c2").cast(IntegerType))      .toDF("col1","col2","col3").groupBy("col1","col2").sum("col3").show  }   @Test  def testOrc(): Unit ={    val sparkConf = new SparkConf().setAppName("SparkTestRdd");    sparkConf.setMaster("local[*]");    val ss = SparkSession.builder.config(sparkConf).getOrCreate()     val df = ss.read.text("D:/SparkTest/rdd1").withColumnRenamed("value","name")    val coder6: (String => Long) = (arg: String) => {2}    val func = udf(coder6)    val df2 = df.withColumn("addCoumn",col("name")).withColumn("addCoumn2",func(col("name")))     df2.write.option("compression","zlib").orc("D:/SparkTest/rddorc")    ss.read.option("compression","zlib").orc("D:/SparkTest/rddcsv").select(col("_c0").cast(StringType),col("_c1").cast(StringType),col("_c2").cast(IntegerType))      .toDF("col1","col2","col3").groupBy("col1","col2").sum("col3")  }}
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Spark RDD、DataFrame和DataSet的区别
SparkSQL内置函数
谈谈RDD、DataFrame、Dataset的区别和各自的优势
大数据开发技术之Spark SQL的多种使用方法
pyspark入门教程
深入理解XGBoost:分布式实现
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服