本文共 4162 字,大约阅读时间需要 13 分钟。
排序规则:先按照年龄排序,年龄小的往前排,年龄如果相同,按照按照作品次数排序
方式一import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object CustomSortDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CustomSortDemo1").setMaster("local[*]") val sc = new SparkContext(conf) val array: Array[String] = Array("1,苍老师,35,300","2,小泽,32,299","3,吉泽,32,298") //集合并行化 val value: RDD[String] = sc.makeRDD(array,2) //数据切分 val tuples: RDD[CustomSortDemo1] = value.map(t => { val s = t.split(",") val id = s(0) val name = s(1) val age = s(2).toInt val works = s(3).toInt //样例类不需要new 默认实现了apply CustomSortDemo1(id, name, age, works) }) val result: RDD[CustomSortDemo1] = tuples.sortBy(x=>x) result.collect().foreach(println) sc.stop() }}//普通类变量前加val有get方法,没有set方法, 加var两个方法都有 什么都不加两个方法都没有//class CustomSortDemo1(val id:String,val name:String,val age:Int,val works:Int) extends Ordered[CustomSortDemo1] with Serializable {//// override def compare(that: CustomSortDemo1): Int = {// if(that.age == this.age){// that.works - this.works// }else{// this.age - that.age// }// }//// override def toString = s"CustomSortDemo1($id, $name, $age, $works)"//}//样例类默认实现序列化 变量如果不加val 默认是valcase class CustomSortDemo1(val id:String,name:String,age:Int,works:Int) extends Ordered[CustomSortDemo1]{ override def compare(that: CustomSortDemo1): Int = { if(that.age == this.age){ that.works - this.works }else{ this.age - that.age } }}
方式二
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object CustomSortDemo2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("CustomSortDemo2") val sc = new SparkContext(conf) val array = Array("1,苍老师,35,300", "2,小泽,32,299", "3,吉泽,32,298") val lines: RDD[String] = sc.makeRDD(array, 2) val tuple: RDD[(String, String, Int, Int)] = lines.map(t => { val s = t.split(",") val id = s(0) val name = s(1) val age = s(2).toInt val works = s(3).toInt (id, name, age, works) }) //sortBy特性 不会改变原来元祖的类型,只会改变顺序 val sorted = tuple.sortBy(x => CustomSortDemo2(x._1, x._2, x._3, x._4)) sorted.collect().foreach(println) sc.stop() }}//样例类case class CustomSortDemo2(val id: String, name: String, age: Int, works: Int) extends Ordered[CustomSortDemo2] { override def compare(that: CustomSortDemo2): Int = { if (that.age == this.age) { that.works - this.works } else { this.age - that.age } }}
方式三
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDobject CustomSortDemo3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("CustomSortDemo3") val sc = new SparkContext(conf) val array = Array("1,苍老师,35,300","2,小泽,32,299","3,吉泽,32,298") val lines: RDD[String] = sc.makeRDD(array,2) val tuple: RDD[(String, String, Int, Int)] = lines.map(t => { val s = t.split(",") val id = s(0) val name = s(1) val age = s(2).toInt val works = s(3).toInt (id, name, age, works) }) //sortBy先按照传入的第一个值排序,在按照第二个值排序(这里是两个,如果还需要比较其他字段加入即可) val sorted = tuple.sortBy(x=> (x._3,- x._4)) sorted.collect().foreach(println) sc.stop() }}
方式四
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDobject CustomSortDemo4 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("CustomSortDemo3") val sc = new SparkContext(conf) val array = Array("1,苍老师,35,300","2,小泽,32,299","3,吉泽,32,298") val lines: RDD[String] = sc.makeRDD(array,2) val tuple: RDD[(String, String, Int, Int)] = lines.map(t => { val s = t.split(",") val id = s(0) val name = s(1) val age = s(2).toInt val works = s(3).toInt (id, name, age, works) }) //利用sortBy的隐式转换自定义Ordering排序 //元祖必须加括号 implicit val rules = Ordering[(Int,Int)].on[(String,String,Int,Int)](t=>(t._3, - t._4)) val sorted = tuple.sortBy(x=>x) sorted.collect().foreach(println) }}
转载地址:http://jykni.baihongyu.com/