Java和scala实现 Spark RDD转换成DataFrame的两种方法小结
一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18
二:实现
Java版:
1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:
packagecom.cxd.sql;
importjava.io.Serializable;
@SuppressWarnings("serial")
publicclassStudentimplementsSerializable{
Stringsid;
Stringsname;
intsage;
publicStringgetSid(){
returnsid;
}
publicvoidsetSid(Stringsid){
this.sid=sid;
}
publicStringgetSname(){
returnsname;
}
publicvoidsetSname(Stringsname){
this.sname=sname;
}
publicintgetSage(){
returnsage;
}
publicvoidsetSage(intsage){
this.sage=sage;
}
@Override
publicStringtoString(){
return"Student[sid="+sid+",sname="+sname+",sage="+sage+"]";
}
}
2.转换,具体代码如下
packagecom.cxd.sql;
importjava.util.ArrayList;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.sql.Dataset;
importorg.apache.spark.sql.Row;
importorg.apache.spark.sql.RowFactory;
importorg.apache.spark.sql.SaveMode;
importorg.apache.spark.sql.SparkSession;
importorg.apache.spark.sql.types.DataTypes;
importorg.apache.spark.sql.types.StructField;
importorg.apache.spark.sql.types.StructType;
publicclassTxtToParquetDemo{
publicstaticvoidmain(String[]args){
SparkConfconf=newSparkConf().setAppName("TxtToParquet").setMaster("local");
SparkSessionspark=SparkSession.builder().config(conf).getOrCreate();
reflectTransform(spark);//Java反射
dynamicTransform(spark);//动态转换
}
/**
*通过Java反射转换
*@paramspark
*/
privatestaticvoidreflectTransform(SparkSessionspark)
{
JavaRDDsource=spark.read().textFile("stuInfo.txt").javaRDD();
JavaRDDrowRDD=source.map(line->{
Stringparts[]=line.split(",");
Studentstu=newStudent();
stu.setSid(parts[0]);
stu.setSname(parts[1]);
stu.setSage(Integer.valueOf(parts[2]));
returnstu;
});
Datasetdf=spark.createDataFrame(rowRDD,Student.class);
df.select("sid","sname","sage").
coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
}
/**
*动态转换
*@paramspark
*/
privatestaticvoiddynamicTransform(SparkSessionspark)
{
JavaRDDsource=spark.read().textFile("stuInfo.txt").javaRDD();
JavaRDDrowRDD=source.map(line->{
String[]parts=line.split(",");
Stringsid=parts[0];
Stringsname=parts[1];
intsage=Integer.parseInt(parts[2]);
returnRowFactory.create(
sid,
sname,
sage
);
});
ArrayListfields=newArrayList();
StructFieldfield=null;
field=DataTypes.createStructField("sid",DataTypes.StringType,true);
fields.add(field);
field=DataTypes.createStructField("sname",DataTypes.StringType,true);
fields.add(field);
field=DataTypes.createStructField("sage",DataTypes.IntegerType,true);
fields.add(field);
StructTypeschema=DataTypes.createStructType(fields);
Datasetdf=spark.createDataFrame(rowRDD,schema);
df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
}
}
scala版本:
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.types.StringType
importorg.apache.spark.sql.types.StructField
importorg.apache.spark.sql.types.StructType
importorg.apache.spark.sql.Row
importorg.apache.spark.sql.types.IntegerType
objectRDD2Dataset{
caseclassStudent(id:Int,name:String,age:Int)
defmain(args:Array[String])
{
valspark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
importspark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/**
*通过Java反射转换
*@paramspark
*/
privatedefreflectCreate(spark:SparkSession):Unit={
importspark.implicits._
valstuRDD=spark.sparkContext.textFile("student2.txt")
//toDF()为隐式转换
valstuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
//stuDf.select("id","name","age").write.text("result")//对写入文件指定列名
stuDf.printSchema()
stuDf.createOrReplaceTempView("student")
valnameDf=spark.sql("selectnamefromstudentwhereage<20")
//nameDf.write.text("result")//将查询结果写入一个文件
nameDf.show()
}
/**
*动态转换
*@paramspark
*/
privatedefdynamicCreate(spark:SparkSession):Unit={
valstuRDD=spark.sparkContext.textFile("student.txt")
importspark.implicits._
valschemaString="id,name,age"
valfields=schemaString.split(",").map(fieldName=>StructField(fieldName,StringType,nullable=true))
valschema=StructType(fields)
valrowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
valstuDf=spark.createDataFrame(rowRDD,schema)
stuDf.printSchema()
valtmpView=stuDf.createOrReplaceTempView("student")
valnameDf=spark.sql("selectnamefromstudentwhereage<20")
//nameDf.write.text("result")//将查询结果写入一个文件
nameDf.show()
}
}
注:
1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
以上这篇Java和scala实现SparkRDD转换成DataFrame的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。