学习spark首选要掌握好rdd和dataframe,他是spark中的基础,掌握好了rdd/dataframe以及他们的算子操作,为你学习spark打好基础。 RDD是弹性分布式数据集(resilient distributed dataset) 的简称,是一个可以参与并行操作并且可容错的元素集合;DataFrame的定义与RDD类似,即都是Spark 平台用以分布式并行计算的不可变分布式数据集合。与RDD最大的不同在于,RDD仅仅是一条条数据的集合,并不了解每条数据的内容是怎样的,而DataFrame明确的了解每条数据有几个命名字段组成,即可以形象地理解为RDD是条条数据组成的一维表,DataFrame 是行数据都有共同清晰的列划分的维表,每一行的内容的Row对象组成DF
下面通过一些测试代码演示下spark在rdd和dataframe上的基本应用,主要包括怎么把文件读到rdd/dataframe的结构中,怎么把rdd/dataframe结构中的数据压缩输出到文件中。
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{udf, _}
import org.junit.{Assert, Test}
import org.apache.spark.sql.types._
@Test
class SparkRddTest extends Assert{
@Test
def testWriteRdd(): Unit ={
val sparkConf = new SparkConf().setAppName("SparkTestRdd");
sparkConf.setMaster("local[*]");
val ss = SparkSession.builder.config(sparkConf).getOrCreate()
val seq = List("American Person", "Tom", "Jim")
var rdd = ss.sparkContext.makeRDD(seq)
rdd.saveAsTextFile("D:/SparkTest/rdd")
}
@Test
def testWriteRdd1(): Unit ={
val sparkConf = new SparkConf().setAppName("SparkTestRdd");
sparkConf.setMaster("local[*]");
val ss = SparkSession.builder.config(sparkConf).getOrCreate()
val seq = List("American Person Test Spark Rdd, Today is nice Day", "Tom", "Jim","American", "Tom", "Jim","American", "Tom", "Jim",
"Test" , "Test", "Hello", "Test","Tom", "Jim","American", "Tom", "Jim","American,China", "Tom", "Jim","This is End")
var rdd = ss.sparkContext.parallelize(seq)
rdd.repartition(1).saveAsTextFile("D:/SparkTest/rdd1")
}
@Test
def testReadText(): Unit ={
val sparkConf = new SparkConf().setAppName("SparkTestRdd");
sparkConf.setMaster("local[*]");
val ss = SparkSession.builder.config(sparkConf).getOrCreate()
//读取目录下文本,返回dataset
//val ds = ss.read.textFile("D:/SparkTest/rdd1")
//打印输出dataset,默认显示20个字符,20行记录
//ds.show()
//读取目录下文本,并转化成dataframe给字段命名
//val df = ss.read.textFile("D:/SparkTest/rdd1").toDF("name")
//打印输出dataframe,1000是行数,false是不截取字符全部显示
//df.show(1000,false)
//读取目录下文本,直接读取成dataframe,并给字段重命名
val df = ss.read.text("D:/SparkTest/rdd1").withColumnRenamed("value","name")
df.show()
}
@Test
def testCsv(): Unit ={
val sparkConf = new SparkConf().setAppName("SparkTestRdd");
sparkConf.setMaster("local[*]");
val ss = SparkSession.builder.config(sparkConf).getOrCreate()
val df = ss.read.text("D:/SparkTest/rdd1").withColumnRenamed("value","name")
val coder6: (String => Long) = (arg: String) => {2}
val func = udf(coder6)
val df2 = df.withColumn("addCoumn",col("name")).withColumn("addCoumn2",func(col("name")))
//df2.write.option("delimiter","\t").csv("D:/SparkTest/rddcsv")
//效果和上面一样
//df2.write.format("csv").save("D:/SparkTest/rddcsv")
ss.read.option("delimiter","\t").csv("D:/SparkTest/rddcsv").select(col("_c0").cast(StringType),col("_c1").cast(StringType),col("_c2").cast(IntegerType))
.toDF("col1","col2","col3").groupBy("col1","col2").sum("col3").show
}
@Test
def testOrc(): Unit ={
val sparkConf = new SparkConf().setAppName("SparkTestRdd");
sparkConf.setMaster("local[*]");
val ss = SparkSession.builder.config(sparkConf).getOrCreate()
val df = ss.read.text("D:/SparkTest/rdd1").withColumnRenamed("value","name")
val coder6: (String => Long) = (arg: String) => {2}
val func = udf(coder6)
val df2 = df.withColumn("addCoumn",col("name")).withColumn("addCoumn2",func(col("name")))
df2.write.option("compression","zlib").orc("D:/SparkTest/rddorc")
ss.read.option("compression","zlib").orc("D:/SparkTest/rddcsv").select(col("_c0").cast(StringType),col("_c1").cast(StringType),col("_c2").cast(IntegerType))
.toDF("col1","col2","col3").groupBy("col1","col2").sum("col3")
}
}
联系客服