打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
SparkSQL内置函数
使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基于上就是实际模型的映射),让我们聚焦在数据分析上,在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。
  • 1
  • 2
  • 3

聚合函数

approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
  • 1
  • 2

集合函数

array_contains, explode, size, sort_array
  • 1
  • 2

日期/时间函数

日期时间转换:unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp从日期时间中提取字段:year, month, dayofmonth, hour, minute, second日期/时间计算:datediff, date_add, date_sub, add_months, last_day, next_day, months_between获取当前时间等:current_date, current_timestamp, trunc, date_format
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

数学函数

abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
  • 1
  • 2

混合函数

array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
  • 1
  • 2

字符串函数

ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
  • 1
  • 2

窗口函数

cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber
  • 1
  • 2

案例实战:

第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,Spark程序在本地运行
  • 1
  • 2
    val conf = new SparkConf() //创建SparkConf对象      conf.setAppName("SparkSQL") //设置应用程序的名称,在程序运行的监控界面可以看到名称      //conf.setMaster("spark://DaShuJu-040:7077") //此时,程序在Spark集群      conf.setMaster("local")  
  • 1
  • 2
  • 3
  • 4
第二步:创建SparkContext对象  SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext  SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend  同时还会负责Spark程序往Master注册程序等  SparkContext是整个Spark应用程序中最为至关重要的一个对象。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息  val sqlContext = new SQLContext(sc) //注意:要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换  import sqlContext.implicits._  
  • 1
  • 2
  • 3
  • 4
  • 5
第三步:模拟数据,最后生成RDD
  • 1
  • 2
val userData = Array(        "2016-3-27,001,http://spark.apache.org/,1000",        "2016-3-27,001,http://hadoop.apache.org/,1001",        "2016-3-27,002,http://fink.apache.org/,1002",        "2016-3-28,003,http://kafka.apache.org/,1020",        "2016-3-28,004,http://spark.apache.org/,1010",        "2016-3-28,002,http://hive.apache.org/,1200",        "2016-3-28,001,http://parquet.apache.org/,1500",        "2016-3-28,001,http://spark.apache.org/,1800"      )    //并行化成RDD    val userDataRDD = sc.parallelize(userData)  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
第四步:根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型,于此同时要提供DataFrame中的Columns的元数据信息描述。
  • 1
  • 2
    val userDataRDDRow = userDataRDD.map(row => {val splited = row.split(",") ;Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})      val structTypes = StructType(Array(            StructField("time", StringType, true),            StructField("id", IntegerType, true),            StructField("url", StringType, true),            StructField("amount", IntegerType, true)      ))      val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)    第五步:使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自定进行CG
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
userDataDF.groupBy("time").agg('time, countDistinct('id)).map(row=>Row(row(1),row(2))).collect.foreach {  println  } userDataDF.groupBy("time").agg('time, sum('amount)).show()  
  • 1
  • 2
  • 3

“`

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Spark计算引擎之SparkSQL详解
大数据IMF传奇行动绝密课程第70课:Spark SQL内置函数解密与实战
SparkSession简单介绍
Spark RDD、DataFrame和DataSet的区别
独家 | PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)
070 DStream中的transform和foreachRDD函数
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服