package com.test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object PeopleInfoGenParquet {
private val schemaString = "id,gender,height"
def main(args: Array[String]) {
println(util.Properties.versionString)
if (args.length < 1) {
println("Usage:PeopleDataStatistics2 filePath")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2")
val sc = new SparkContext(conf)
val peopleDataRDD = sc.textFile(args(0))
val sqlCtx = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
val schemaArray = schemaString.split(",")
val schema = StructType(schemaArray.map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map(
eachRow => Row(eachRow(0), eachRow(1), eachRow(2)))
val peopleDF = sqlCtx.createDataFrame(rowRDD, schema)
peopleDF.write.parquet("/sample_people_info")
}
}
[root@master spark]# su hdfs -c 'hadoop fs -du -h -s /sample_people_info.txt'
1.4 G 2.8 G /sample_people_info.txt
[root@master spark]# su hdfs -c 'hadoop fs -du -h -s /sample_people_info'
517.2 M 1.0 G /sample_people_info
[root@master spark]# su hdfs -c 'hdfs dfs -ls /sample_people_info'
Found 12 items
-rw-r--r-- 2 lizhiyong supergroup 0 2019-09-17 15:01 /sample_people_info/_SUCCESS
-rw-r--r-- 2 lizhiyong supergroup 52343381 2019-09-17 15:00 /sample_people_info/part-00000-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48659495 2019-09-17 15:00 /sample_people_info/part-00001-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48539321 2019-09-17 15:00 /sample_people_info/part-00002-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48539522 2019-09-17 15:00 /sample_people_info/part-00003-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48539392 2019-09-17 15:00 /sample_people_info/part-00004-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48541478 2019-09-17 15:00 /sample_people_info/part-00005-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48538826 2019-09-17 15:01 /sample_people_info/part-00006-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48537173 2019-09-17 15:01 /sample_people_info/part-00007-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48541052 2019-09-17 15:01 /sample_people_info/part-00008-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 48539090 2019-09-17 15:01 /sample_people_info/part-00009-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
-rw-r--r-- 2 lizhiyong supergroup 53056415 2019-09-17 15:00 /sample_people_info/part-00010-8c0b1962-8f2b-444b-b20f-085f4cea33b9-c000.snappy.parquet
package com.test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object PeopleDataStatistics {
private val schemaString = "id,gender,height"
def main(args: Array[String]) {
println(util.Properties.versionString)
if (args.length < 1) {
println("Usage:PeopleDataStatistics2 filePath")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2")
val sc = new SparkContext(conf)
val peopleDataRDD = sc.textFile(args(0))
val sqlCtx = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlCtx.implicits._
val schemaArray = schemaString.split(",")
val schema = StructType(schemaArray.map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map(
eachRow => Row(eachRow(0), eachRow(1), eachRow(2)))
val peopleDF = sqlCtx.createDataFrame(rowRDD, schema)
peopleDF.createOrReplaceTempView("people")
//select count(*) from people where height > 180 and gender='M'
//get the male people whose height is more than 180
val higherMale180 = sqlCtx.sql("select id,gender, height from people where height > 180 and gender='M'")
println("Men whose height are more than 180: " + higherMale180.count())
println("<Display #1>")
//get the female people whose height is more than 170
val higherFemale170 = sqlCtx.sql("select id,gender, height from people where height > 170 and gender='F'")
println("Women whose height are more than 170: " + higherFemale170.count())
println("<Display #2>")
//Grouped the people by gender and count the number
peopleDF
.groupBy(peopleDF("gender")).count().show()
println("People Count Grouped By Gender")
println("<Display #3>")
//Men whose height is more than 210
peopleDF
.filter(peopleDF("gender").equalTo("M"))
.filter(peopleDF("height") > 210)
.show(50)
println("Men whose height is more than 210")
println("<Display #4>")
//Sorted the people by height in descend order,Show top 50 people
peopleDF.sort($"height".desc).take(50)
.foreach { row => println(row(0) + "," + row(1) + "," + row(2)) }
println("Sorted the people by height in descend order,Show top 50 people")
println("<Display #5>")
//The Average height for Men
peopleDF
.filter(peopleDF("gender").equalTo("M"))
.agg(Map("height" -> "avg")).show()
println("The Average height for Men")
println("<Display #6>")
//The Max height for Women
peopleDF
.filter(peopleDF("gender").equalTo("F"))
.agg("height" -> "max").show()
println("The Max height for Women:")
println("<Display #7>")
//......
println("All the statistics actions are finished on structured People data.")
}
}
package com.test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object PeopleDataStatisticsParquet {
private val schemaString = "id,gender,height"
def main(args: Array[String]) {
println(util.Properties.versionString)
if (args.length < 1) {
println("Usage:PeopleDataStatistics2 filePath")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2")
val sc = new SparkContext(conf)
// val peopleDataRDD = sc.textFile(args(0))
val sqlCtx = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlCtx.implicits._
val schemaArray = schemaString.split(",")
val schema = StructType(schemaArray.map(fieldName => StructField(fieldName, StringType, true)))
val peopleDF = sqlCtx.read.parquet("/sample_people_info")
peopleDF.createOrReplaceTempView("people")
//get the male people whose height is more than 180
val higherMale180 = sqlCtx.sql("select id,gender, height from people where height > 180 and gender='M'")
println("Men whose height are more than 180: " + higherMale180.count())
println("<Display #1>")
//get the female people whose height is more than 170
val higherFemale170 = sqlCtx.sql("select id,gender, height from people where height > 170 and gender='F'")
println("Women whose height are more than 170: " + higherFemale170.count())
println("<Display #2>")
//Grouped the people by gender and count the number
peopleDF
.groupBy(peopleDF("gender")).count().show()
println("People Count Grouped By Gender")
println("<Display #3>")
//Men whose height is more than 210
peopleDF
.filter(peopleDF("gender").equalTo("M"))
.filter(peopleDF("height") > 210)
.show(50)
println("Men whose height is more than 210")
println("<Display #4>")
//Sorted the people by height in descend order,Show top 50 people
peopleDF.sort($"height".desc).take(50)
.foreach { row => println(row(0) + "," + row(1) + "," + row(2)) }
println("Sorted the people by height in descend order,Show top 50 people")
println("<Display #5>")
//The Average height for Men
peopleDF
.filter(peopleDF("gender").equalTo("M"))
.agg(Map("height" -> "avg")).show()
println("The Average height for Men")
println("<Display #6>")
//The Max height for Women
peopleDF
.filter(peopleDF("gender").equalTo("F"))
.agg("height" -> "max").show()
println("The Max height for Women:")
println("<Display #7>")
println("All the statistics actions are finished on structured People data.")
}
}
su hive -l -s /bin/bash -c '/opt/hive/bin/beeline'
> !connect jdbc:hive2://node2.test.com:10000/default
在本案例中,我们将统计分析 1 千万用户和 1 亿条交易数据。对于用户数据,它是一个包含 6 个列 (ID, 性别, 年龄, 注册日期, 角色 (从事行业), 所在区域) 的文本文件,具有以下格式:
1 F 30 2015-10-9 ROLE001 REG002
2 F 15 2005-4-2 ROLE001 REG004
3 F 26 2000-11-18 ROLE005 REG002
4 M 27 2012-6-1 ROLE005 REG005
5 M 11 2001-2-23 ROLE001 REG005
6 M 49 2002-3-20 ROLE003 REG001
7 M 39 2014-12-22 ROLE001 REG005
8 M 43 2012-1-3 ROLE005 REG004
9 F 11 2014-10-11 ROLE001 REG005
1 2000-8-9 5 312 1823359
2 2003-1-3 8 266 4426761
3 2011-11-16 7 1197 2504036
4 2007-9-27 3 1013 9093075
5 2015-9-18 7 1064 5729462
6 2008-11-16 1 985 5921470
7 2003-4-15 5 1464 5516412
8 2005-10-13 2 691 4493409
9 2009-5-8 8 1339 4353873
10 2009-9-23 2 1976 2144924
import java.io.FileWriter
import scala.util.Random
object UserDataGenerator {
private val FILE_PATH = "sample_user_data.txt"
private val ROLE_ID_ARRAY = Array[String]("ROLE001", "ROLE002", "ROLE003", "ROLE004", "ROLE005")
private val REGION_ID_ARRAY = Array[String]("REG001", "REG002", "REG003", "REG004", "REG005")
private val MAX_USER_AGE = 60
//how many records to be generated
private val MAX_RECORDS = 10000000
def main(args: Array[String]): Unit = {
generateDataFile(FILE_PATH, MAX_RECORDS)
}
private def generateDataFile(filePath: String, recordNum: Int): Unit = {
var writer: FileWriter = null
try {
writer = new FileWriter(filePath, true)
val rand = new Random()
for (i <- 1 to recordNum) {
//generate the gender of the user
var gender = getRandomGender
var age = rand.nextInt(MAX_USER_AGE)
if (age < 10) {
age = age + 10
}
var year = rand.nextInt(16) + 2000
var month = rand.nextInt(12) + 1
var day = rand.nextInt(28) + 1
var registerDate = year + "-" + month + "-" + day
//generate the role of the user
var roleIndex: Int = rand.nextInt(ROLE_ID_ARRAY.length)
var role = ROLE_ID_ARRAY(roleIndex)
//generate the region where the user is
var regionIndex: Int = rand.nextInt(REGION_ID_ARRAY.length)
var region = REGION_ID_ARRAY(regionIndex)
writer.write(i + " " + gender + " " + age + " " + registerDate
+ " " + role + " " + region
)
writer.write(System.getProperty("line.separator"))
}
writer.flush()
} catch {
case e: Exception => println("Error occurred:" + e)
} finally {
if (writer != null) writer.close()
}
println("User Data File generated successfully.")
}
private def getRandomGender(): String = {
val rand = new Random()
val randNum = rand.nextInt(2) + 1
if (randNum % 2 == 0) {
"M"
} else {
"F"
}
}
}
import java.io.FileWriter
import scala.util.Random
object ConsumingDataGenerator {
private val FILE_PATH = "sample_consuming_data.txt"
// how many records to be generated
private val MAX_RECORDS = 100000000
// we suppose only 10 kinds of products in the consuming data
private val PRODUCT_ID_ARRAY = Array[Int](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// we suppose the price of most expensive product will not exceed 2000 RMB
private val MAX_PRICE = 2000
// we suppose the price of cheapest product will not be lower than 10 RMB
private val MIN_PRICE = 10
//the users number which should be same as the one in UserDataGenerator object
private val USERS_NUM = 10000000
def main(args: Array[String]): Unit = {
generateDataFile(FILE_PATH, MAX_RECORDS);
}
private def generateDataFile(filePath: String, recordNum: Int): Unit = {
var writer: FileWriter = null
try {
writer = new FileWriter(filePath, true)
val rand = new Random()
for (i <- 1 to recordNum) {
//generate the buying date
var year = rand.nextInt(16) + 2000
var month = rand.nextInt(12) + 1
//to avoid checking if it is a valid day for specific
// month,we always generate a day which is no more than 28
var day = rand.nextInt(28) + 1
var recordDate = year + "-" + month + "-" + day
//generate the product ID
var index: Int = rand.nextInt(PRODUCT_ID_ARRAY.length)
var productID = PRODUCT_ID_ARRAY(index)
//generate the product price
var price: Int = rand.nextInt(MAX_PRICE)
if (price == 0) {
price = MIN_PRICE
}
// which user buys this product
val userID = rand.nextInt(10000000) + 1
writer.write(i + " " + recordDate + " " + productID
+ " " + price + " " + userID)
writer.write(System.getProperty("line.separator"))
}
writer.flush()
} catch {
case e: Exception => println("Error occurred:" + e)
} finally {
if (writer != null)
writer.close()
}
println("Consuming Data File generated successfully.")
}
}
package com.test
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object UserConsumingDataStatistics {
def main(args: Array[String]) {
println("args.length:" + args.length)
if (args.length < 1) {
println("Usage:UserConsumingDataStatistics userDataFilePath consumingDataFilePath")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics")
//Kryo serializer is more quickly by default java serializer
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ctx = new SparkContext(conf)
val sqlCtx = new SQLContext(ctx)
import sqlCtx.implicits._
//Convert user data RDD to a DataFrame and register it as a temp table
val userDF = ctx.textFile(args(0)).map(_.split(" ")).map(
u => User(u(0), u(1), u(2).toInt, u(3), u(4), u(5))).toDF()
userDF.createOrReplaceTempView("user")
//Convert consuming data RDD to a DataFrame and register it as a temp table
val orderDF = ctx.textFile(args(1)).map(_.split(" ")).map(o => Order(
o(0), o(1), o(2).toInt, o(3).toInt, o(4))).toDF()
orderDF.createOrReplaceTempView("orders")
//转换成parquet格式
if(args.length == 3){
//just gen parquet data
userDF.write.parquet("/sample_user_data")
orderDF.write.parquet("/sample_consuming_data")
return
}
//cache the DF in memory with serializer should make the program run much faster
userDF.persist(StorageLevel.MEMORY_ONLY_SER)
orderDF.persist(StorageLevel.MEMORY_ONLY_SER)
//The number of people who have orders in the year 2015
val count = orderDF.filter(orderDF("orderDate").contains("2015")).join(
userDF, orderDF("userID").equalTo(userDF("userID"))).count()
println("The number of people who have orders in the year 2015:" + count)
//total orders produced in the year 2014
val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where orderDate like '2014%'").count()
println("total orders produced in the year 2014:" + countOfOrders2014)
//Orders that are produced by user with ID 1 information overview
val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID, o.price,u.userID FROM orders o,user u where u.userID = 1 and u.userID = o.userID").show()
println("Orders produced by user with ID 1 showed.")
//Calculate the max,min,avg prices for the orders that are producted by user with ID 10
val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice, min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID FROM orders o, user u where u.userID = 10 and u.userID = o.userID group by u.userID")
println("Order statistic result for user with ID 10:")
orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice")
+ ";Maximum Price=" + order.getAs("maxPrice")
+ ";Average Price=" + order.getAs("avgPrice")
).foreach(result => println(result))
}
}
Job 0 finished: count at UserConsumingDataStatistics.scala:45, took 103.472787 s
The number of people who have orders in the year 2015:6249355
Job 1 finished: count at UserConsumingDataStatistics.scala:50, took 74.912479 s
total orders produced in the year 2014:6251708
Job 2 finished: show at UserConsumingDataStatistics.scala:56, took 74.572010 s
Job 3 finished: show at UserConsumingDataStatistics.scala:56, took 0.111601 s
Job 4 finished: show at UserConsumingDataStatistics.scala:56, took 0.109862 s
Job 5 finished: show at UserConsumingDataStatistics.scala:56, took 0.481688 s
2019-09-19 16:08:25,699 INFO scheduler.DAGScheduler: Job 6 finished: show at UserConsumingDataStatistics.scala:56, took 0.309017 s
+--------+---------+-----+------+
| orderID|productID|price|userID|
+--------+---------+-----+------+
|25038167| 6| 1239| 1|
|64521701| 8| 1428| 1|
|75084459| 2| 101| 1|
| 8477710| 9| 425| 1|
+--------+---------+-----+------+
Orders produced by user with ID 1 showed.
Job 7 finished: collect at UserConsumingDataStatistics.scala:64, took 82.077812 s
Minimum Price=461;Maximum Price=1512;Average Price=955.6
package com.test
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object UserConsumingDataStatisticsParquet {
def main(args: Array[String]) {
if (args.length < 1) {
println("Usage:UserConsumingDataStatisticsParquet userDataFilePath consumingDataFilePath")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics")
//Kryo serializer is more quickly by default java serializer
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ctx = new SparkContext(conf)
val sqlCtx = new SQLContext(ctx)
import sqlCtx.implicits._
val userDF = sqlCtx.read.parquet(args(0))
userDF.createOrReplaceTempView("user")
val orderDF = sqlCtx.read.parquet(args(1))
orderDF.createOrReplaceTempView("orders")
//cache the DF in memory with serializer should make the program run much faster
userDF.persist(StorageLevel.MEMORY_ONLY_SER)
orderDF.persist(StorageLevel.MEMORY_ONLY_SER)
//The number of people who have orders in the year 2015
val count = orderDF.filter(orderDF("orderDate").contains("2015")).join(
userDF, orderDF("userID").equalTo(userDF("userID"))).count()
println("The number of people who have orders in the year 2015:" + count)
//total orders produced in the year 2014
val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where orderDate like '2014%'").count()
println("total orders produced in the year 2014:" + countOfOrders2014)
//Orders that are produced by user with ID 1 information overview
val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID, o.price,u.userID FROM orders o,user u where u.userID = 1 and u.userID = o.userID").show()
println("Orders produced by user with ID 1 showed.")
//Calculate the max,min,avg prices for the orders that are producted by user with ID 10
val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice, min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID FROM orders o, user u where u.userID = 10 and u.userID = o.userID group by u.userID")
println("Order statistic result for user with ID 10:")
orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice")
+ ";Maximum Price=" + order.getAs("maxPrice")
+ ";Average Price=" + order.getAs("avgPrice")
).foreach(result => println(result))
}
}
Job 0 finished: parquet at UserConsumingDataStatisticsParquet.scala:21, took 2.768159 s
Job 1 finished: parquet at UserConsumingDataStatisticsParquet.scala:25, took 1.965211 s
Job 2 finished: count at UserConsumingDataStatisticsParquet.scala:36, took 88.654541 s
The number of people who have orders in the year 2015:6249355
Job 3 finished: count at UserConsumingDataStatisticsParquet.scala:41, took 60.676918 s
total orders produced in the year 2014:6251708
Job 4 finished: show at UserConsumingDataStatisticsParquet.scala:47, took 70.043645 s
Job 5 finished: show at UserConsumingDataStatisticsParquet.scala:47, took 0.105641 s
Job 6 finished: show at UserConsumingDataStatisticsParquet.scala:47, took 0.123206 s
Job 7 finished: show at UserConsumingDataStatisticsParquet.scala:47, took 0.502567 s
Job 8 finished: show at UserConsumingDataStatisticsParquet.scala:47, took 0.397689 s
+--------+---------+-----+------+
| orderID|productID|price|userID|
+--------+---------+-----+------+
|75084459| 2| 101| 1|
| 8477710| 9| 425| 1|
|25038167| 6| 1239| 1|
|64521701| 8| 1428| 1|
+--------+---------+-----+------+
Orders produced by user with ID 1 showed.
Job 9 finished: collect at UserConsumingDataStatisticsParquet.scala:55, took 65.390210 s
Minimum Price=461;Maximum Price=1512;Average Price=955.6
su hive -l -s /bin/bash -c '/opt/hive/bin/beeline'
> !connect jdbc:hive2://node2.test.com:10000/default
drop table users;
create table users(userID STRING, gender STRING, age INT, registerDate STRING, role STRING, region STRING) row format delimited fields terminated by ' ' stored as textfile;
load data inpath '/sample_user_data.txt' overwrite into table users;
select count(*) from users;
drop table orders;
create table orders (orderID STRING, orderDate STRING, productID INT, price INT, userID STRING) row format delimited fields terminated by ' ' stored as textfile;
load data inpath '/sample_consuming_data.txt' overwrite into table orders;
select count(*) from orders;
drop table users2;
create EXTERNAL table users2(userID STRING, gender STRING, age INT, registerDate STRING, role STRING, region STRING) stored as PARQUETFILE LOCATION '/sample_user_data';
select count(*) from users2;
drop table orders2;
create EXTERNAL table orders2 (orderID STRING, orderDate STRING, productID INT, price INT, userID STRING) row format delimited fields terminated by ' ' stored as PARQUETFILE LOCATION '/sample_consuming_data';
select count(*) from orders2;