博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark SQL 初探
阅读量:6208 次
发布时间:2019-06-21

本文共 31226 字,大约阅读时间需要 104 分钟。

hot3.png

Spark SQL 初探 博客分类: spark  

早在Spark Submit 2013里就有介绍到Spark SQL,不过更多的是介绍Catalyst查询优化框架。经过一年的开发,在今年Spark Submit 2014上,Databricks宣布放弃Shark 的开发,而转投Spark SQL,理由是Shark继承了Hive太多,优化出现了瓶颈,如图:

今天把Spark最新的代码签了下来,测试了一下:

1、编译SparkSQL

-bash-3.2$ git config --global http.sslVerify false-bash-3.2$ git clone https://github.com/apache/spark.git正克隆到 'spark'...remote: Reusing existing pack: 107821, done.remote: Counting objects: 103, done.remote: Compressing objects: 100% (72/72), done.remote: Total 107924 (delta 20), reused 64 (delta 16)Receiving objects: 100% (107924/107924), 69.06 MiB | 3.39 MiB/s, done.Resolving deltas: 100% (50174/50174), done.

    这里还是需要先build一下的,sbt/sbt assembly(如何build匹配版本,请参考 

运行 sbt/sbt hive/console也会进行编译。

最新的spark sql提供了一个console,在这里可以直接的运行交互式查下,也提供了几个例子。

2、执行Spark SQL

官方提供给我们了一个测试用例。通过查看log,find . -name TestHive*  找到了位于:

 /app/hadoop/shengli/spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveTestHive.scala 有兴趣可以自己打开 编译 调试下看看。

首先进入控制台:

sbt/sbt hive/console[info] Starting scala interpreter...[info] import org.apache.spark.sql.catalyst.analysis._import org.apache.spark.sql.catalyst.dsl._import org.apache.spark.sql.catalyst.errors._import org.apache.spark.sql.catalyst.expressions._import org.apache.spark.sql.catalyst.plans.logical._import org.apache.spark.sql.catalyst.rules._import org.apache.spark.sql.catalyst.types._import org.apache.spark.sql.catalyst.util._import org.apache.spark.sql.executionimport org.apache.spark.sql.hive._import org.apache.spark.sql.hive.test.TestHive._import org.apache.spark.sql.parquet.ParquetTestDataWelcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_20).Type in expressions to have them evaluated.Type :help for more information.scala>

查看一下当前RunTime下都提供了哪些方法:

scala> 
DslAttribute DslExpression DslString DslSymbol ParquetTestData SqlCmd analyzer autoConvertJoinSize binaryToLiteral booleanToLiteral byteToLiteral cacheTable cacheTables catalog classOf clear clone configure contains createParquetFile createSchemaRDD createTable decimalToLiteral describedTable doubleToLiteral emptyResult eq equals executePlan executeSql execution finalize floatToLiteral get getAll getClass getHiveFile getOption hashCode hiveDevHome hiveFilesTemp hiveHome hivePlanner hiveQTestUtilTables hiveconf hiveql hql inRepoTests inferSchema intToLiteral isCached joinBroadcastTables jsonFile jsonRDD loadTestTable logger logicalPlanToSparkQuery longToLiteral metastorePath ne notify notifyAll numShufflePartitions optimizer originalUdfs outputBuffer parquetFile parseSql parser planner prepareForExecution registerRDDAsTable registerTestTable reset runHive runSqlHive sessionState set shortToLiteral sparkContext sql stringToLiteral symbolToUnresolvedAttribute synchronized table testTables timestampToLiteral toDebugString toString uncacheTable wait warehousePath

我们发现,这个测试用例里面有一个testTables,由于这些成员都是lazy的,所以一开始没有被加载:

查看测试用例要加载哪些表:

scala> testTables14/07/02 18:45:59 INFO spark.SecurityManager: Changing view acls to: hadoop14/07/02 18:45:59 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop)14/07/02 18:46:00 INFO slf4j.Slf4jLogger: Slf4jLogger started14/07/02 18:46:00 INFO Remoting: Starting remoting14/07/02 18:46:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@web02.dw:42984]14/07/02 18:46:00 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@web02.dw:42984]14/07/02 18:46:00 INFO spark.SparkEnv: Registering MapOutputTracker14/07/02 18:46:00 INFO spark.SparkEnv: Registering BlockManagerMaster14/07/02 18:46:00 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140702184600-9e1614/07/02 18:46:00 INFO network.ConnectionManager: Bound socket to port 48348 with id = ConnectionManagerId(web02.dw,48348)14/07/02 18:46:00 INFO storage.MemoryStore: MemoryStore started with capacity 1097.0 MB14/07/02 18:46:00 INFO storage.BlockManagerMaster: Trying to register BlockManager14/07/02 18:46:00 INFO storage.BlockManagerInfo: Registering block manager web02.dw:48348 with 1097.0 MB RAM14/07/02 18:46:00 INFO storage.BlockManagerMaster: Registered BlockManager14/07/02 18:46:00 INFO spark.HttpServer: Starting HTTP Server14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v2013103114/07/02 18:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:3626014/07/02 18:46:01 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.1.8.207:3626014/07/02 18:46:01 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ca40f66c-edc3-484f-b317-d3f512aab24414/07/02 18:46:01 INFO spark.HttpServer: Starting HTTP Server14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v2013103114/07/02 18:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:5782114/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v2013103114/07/02 18:46:02 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:404014/07/02 18:46:02 INFO ui.SparkUI: Started SparkUI at http://web02.dw:4040metastore path is /tmp/sparkHiveMetastore8060064816530828092warehousePath path is /tmp/sparkHiveWarehouse5366068035857129261hiveHome path is Some(/home/hadoop/Java/lib/hive-0.6.0)hiveDevHome path is Noneres0: scala.collection.mutable.HashMap[String,org.apache.spark.sql.hive.test.TestHive.TestTable] = Map(sales -> TestTable(sales,WrappedArray(
,
)), src -> TestTable(src,WrappedArray(
,
)), src1 -> TestTable(src1,WrappedArray(
,
)), serdeins -> TestTable(serdeins,WrappedArray(
,
)), src_thrift -> TestTable(src_thrift,WrappedArray(
)), srcpart -> TestTable(srcpart,WrappedArray(
)), episodes -> TestTable(episodes,WrappedArray(
,
)), srcpart1 -> TestTable(srcpart1,WrappedArray(
)))

测试select语句

1.首先声明一个sql

2.这是测试用例会用hive的metastore,创建一个derby的数据库

3.创建上述的所以表,并把数据加载进去。

4.Parse这条select * from sales 语句。

5. 生成SchemaRDD并产生查询计划。

6. 当对querySales这个RDD执行Action的时候,会计算这条sql的执行。

以下是执行的详细结果:(可以看到log打出的大概执行步骤)

scala> val querySales = sql("select * from sales")14/07/02 18:51:19 INFO test.TestHive$: Loading test table sales14/07/02 18:51:19 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)  ([^ ]*)")       14/07/02 18:51:19 INFO parse.ParseDriver: Parse Completed14/07/02 18:51:19 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/02 18:51:19 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/07/02 18:51:19 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/07/02 18:51:19 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions14/07/02 18:51:19 INFO ql.Driver: 
14/07/02 18:51:19 INFO ql.Driver:
14/07/02 18:51:19 INFO ql.Driver:
14/07/02 18:51:19 INFO ql.Driver:
14/07/02 18:51:19 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'WITH SERDEPROPERTIES ("input.regex" = "([^ ]*) ([^ ]*)") 14/07/02 18:51:19 INFO parse.ParseDriver: Parse Completed14/07/02 18:51:19 INFO ql.Driver:
14/07/02 18:51:19 INFO ql.Driver:
14/07/02 18:51:19 INFO parse.SemanticAnalyzer: Starting Semantic Analysis14/07/02 18:51:19 INFO parse.SemanticAnalyzer: Creating table sales position=2714/07/02 18:51:20 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore14/07/02 18:51:20 INFO metastore.ObjectStore: ObjectStore, initialize called14/07/02 18:51:20 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored14/07/02 18:51:21 WARN bonecp.BoneCPConfig: Max Connections < 1. Setting to 20 14/07/02 18:51:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 14/07/02 18:51:25 INFO metastore.ObjectStore: Initialized ObjectStore 14/07/02 18:51:26 WARN bonecp.BoneCPConfig: Max Connections < 1. Setting to 20 14/07/02 18:51:26 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.12.0 14/07/02 18:51:27 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:27 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:27 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 14/07/02 18:51:27 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 14/07/02 18:51:28 INFO ql.Driver: Semantic Analysis Completed 14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver: Starting command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'WITH SERDEPROPERTIES ("input.regex" = "([^ ]*) ([^ ]*)") 14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:sales, dbName:default, owner:hadoop, createTime:1404298288, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:string, comment:null), FieldSchema(name:value, type:int, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.RegexSerDe, parameters:{serialization.format=1, input.regex=([^ ]*) ([^ ]*)}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))14/07/02 18:51:28 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=create_table: Table(tableName:sales, dbName:default, owner:hadoop, createTime:1404298288, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:string, comment:null), FieldSchema(name:value, type:int, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.RegexSerDe, parameters:{serialization.format=1, input.regex=([^ ]*) ([^ ]*)}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver: OK14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO parse.ParseDriver: Parsing command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales14/07/02 18:51:28 INFO parse.ParseDriver: Parse Completed14/07/02 18:51:28 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/02 18:51:28 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/07/02 18:51:28 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/07/02 18:51:28 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO parse.ParseDriver: Parsing command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales14/07/02 18:51:28 INFO parse.ParseDriver: Parse Completed14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:28 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/07/02 18:51:28 INFO ql.Driver: Semantic Analysis Completed14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver: Starting command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO ql.Driver:
14/07/02 18:51:28 INFO exec.Task: Copying data from file:/app/hadoop/spark/sql/hive/src/test/resources/data/files/sales.txt to file:/tmp/hive-hadoop/hive_2014-07-02_18-51-28_629_2309366591646930035-1/-ext-1000014/07/02 18:51:28 INFO exec.Task: Copying file: file:/app/hadoop/spark/sql/hive/src/test/resources/data/files/sales.txt14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO exec.Task: Loading data to table default.sales from file:/tmp/hive-hadoop/hive_2014-07-02_18-51-28_629_2309366591646930035-1/-ext-1000014/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: alter_table: db=default tbl=sales newtbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=alter_table: db=default tbl=sales newtbl=sales14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO exec.StatsTask: Executing stats task14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: alter_table: db=default tbl=sales newtbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=alter_table: db=default tbl=sales newtbl=sales14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO exec.Task: Table default.sales stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 13, raw_data_size: 0]14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver: OK14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO ql.Driver:
14/07/02 18:51:29 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/02 18:51:29 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales14/07/02 18:51:29 INFO storage.MemoryStore: ensureFreeSpace(355913) called with curMem=0, maxMem=115031408614/07/02 18:51:29 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 347.6 KB, free 1096.7 MB)14/07/02 18:51:29 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/07/02 18:51:29 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare ExpressionsquerySales: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:100== Query Plan ==HiveTableScan [key#2,value#3], (MetastoreRelation default, sales, None), None
执行spark sql
scala> querySales.collect()14/07/02 18:57:32 WARN snappy.LoadSnappy: Snappy native library is available14/07/02 18:57:32 WARN snappy.LoadSnappy: Snappy native library not loaded14/07/02 18:57:32 INFO mapred.FileInputFormat: Total input paths to process : 114/07/02 18:57:32 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:5214/07/02 18:57:32 INFO scheduler.DAGScheduler: Got job 0 (collect at SparkPlan.scala:52) with 3 output partitions (allowLocal=false)14/07/02 18:57:32 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:52)14/07/02 18:57:32 INFO scheduler.DAGScheduler: Parents of final stage: List()14/07/02 18:57:32 INFO scheduler.DAGScheduler: Missing parents: List()14/07/02 18:57:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at SparkPlan.scala:52), which has no missing parents14/07/02 18:57:32 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 0 (MappedRDD[5] at map at SparkPlan.scala:52)14/07/02 18:57:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 3 tasks14/07/02 18:57:32 INFO scheduler.TaskSetManager: Re-computing pending task lists.14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 3606 bytes in 20 ms14/07/02 18:57:32 INFO executor.Executor: Running task ID 014/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:0+614/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 0 is 194714/07/02 18:57:32 INFO executor.Executor: Sending result for 0 directly to driver14/07/02 18:57:32 INFO executor.Executor: Finished task ID 014/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 3606 bytes in 0 ms14/07/02 18:57:32 INFO executor.Executor: Running task ID 114/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 0 in 243 ms on localhost (progress: 1/3)14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:6+614/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 1 is 194814/07/02 18:57:32 INFO executor.Executor: Sending result for 1 directly to driver14/07/02 18:57:32 INFO executor.Executor: Finished task ID 114/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:2 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:2 as 3606 bytes in 1 ms14/07/02 18:57:32 INFO executor.Executor: Running task ID 214/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 1 in 36 ms on localhost (progress: 2/3)14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:12+114/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 2 is 172114/07/02 18:57:32 INFO executor.Executor: Sending result for 2 directly to driver14/07/02 18:57:32 INFO executor.Executor: Finished task ID 214/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 2 in 96 ms on localhost (progress: 3/3)14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)14/07/02 18:57:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/07/02 18:57:32 INFO scheduler.DAGScheduler: Stage 0 (collect at SparkPlan.scala:52) finished in 0.366 s14/07/02 18:57:32 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.454512333 sres1: Array[org.apache.spark.sql.Row] = Array([Joe,2], [Hank,2])
执行结果:
Array([Joe,2], [Hank,2])

查询计划优化:

scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
query: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:100== Query Plan ==
HiveTableScan [key#6,value#7], (MetastoreRelation default, src, None), None

3、Spark SQL LINQ

在Spark相关的框架里,一切的核心体都是RDD,SchemaRDD提供类似LINQ的语法api:

such as take, where... etc

scala> query.++                         aggregate                  as                         asInstanceOf               baseLogicalPlan            baseSchemaRDD              cache                      cartesian                  checkpoint                 coalesce                   collect                    compute                    context                    count                      countApprox                countApproxDistinct        countByValue               countByValueApprox         dependencies               distinct                   filter                     filterWith                 first                      flatMap                    flatMapWith                fold                       foreach                    foreachPartition           foreachWith                generate                   getCheckpointFile          getPartitions              getStorageLevel            glom                       groupBy                    id                         insertInto                 intersection               isCheckpointed             isInstanceOf               iterator                   join                       keyBy                      limit                      map                        mapPartitions              mapPartitionsWithContext   mapPartitionsWithIndex     mapPartitionsWithSplit     mapWith                    max                        min                        name                       name_=                     orderBy                    partitioner                partitions                 persist                    pipe                       preferredLocations         printSchema                queryExecution             randomSplit                reduce                     registerAsTable            repartition                sample                     saveAsObjectFile           saveAsParquetFile          saveAsTable                saveAsTextFile             schemaString               select                     setName                    sortBy                     sparkContext               sqlContext                 subtract                   take                       takeOrdered                takeSample                 toArray                    toDebugString              toJavaRDD                  toJavaSchemaRDD            toLocalIterator            toSchemaRDD                toString                   top                        union                      unionAll                   unpersist                  where                      zip                        zipPartitions              zipWithIndex               zipWithUniqueId

注意key前面带了一撇,这个是Catalyst的查下语法,以后我会写一篇详细介绍:

scala> query.where('key === 100).collect()14/07/02 19:07:55 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/02 19:07:55 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/07/02 19:07:55 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src14/07/02 19:07:55 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=src14/07/02 19:07:55 INFO storage.MemoryStore: ensureFreeSpace(358003) called with curMem=713876, maxMem=115031408614/07/02 19:07:55 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 349.6 KB, free 1096.0 MB)14/07/02 19:07:55 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/07/02 19:07:55 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions14/07/02 19:07:55 INFO mapred.FileInputFormat: Total input paths to process : 114/07/02 19:07:55 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:5214/07/02 19:07:55 INFO scheduler.DAGScheduler: Got job 2 (collect at SparkPlan.scala:52) with 2 output partitions (allowLocal=false)14/07/02 19:07:55 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at SparkPlan.scala:52)14/07/02 19:07:55 INFO scheduler.DAGScheduler: Parents of final stage: List()14/07/02 19:07:55 INFO scheduler.DAGScheduler: Missing parents: List()14/07/02 19:07:55 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52), which has no missing parents14/07/02 19:07:55 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52)14/07/02 19:07:55 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 2 tasks14/07/02 19:07:55 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)14/07/02 19:07:55 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 3854 bytes in 0 ms14/07/02 19:07:55 INFO executor.Executor: Running task ID 514/07/02 19:07:55 INFO storage.BlockManager: Found block broadcast_3 locally14/07/02 19:07:55 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/src/kv1.txt:0+290614/07/02 19:07:55 INFO executor.Executor: Serialized size of result for 5 is 195114/07/02 19:07:55 INFO executor.Executor: Sending result for 5 directly to driver14/07/02 19:07:55 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)14/07/02 19:07:55 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as 3854 bytes in 0 ms14/07/02 19:07:55 INFO executor.Executor: Finished task ID 514/07/02 19:07:55 INFO executor.Executor: Running task ID 614/07/02 19:07:55 INFO scheduler.TaskSetManager: Finished TID 5 in 44 ms on localhost (progress: 1/2)14/07/02 19:07:55 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)14/07/02 19:07:55 INFO storage.BlockManager: Found block broadcast_3 locally14/07/02 19:07:55 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/src/kv1.txt:2906+290614/07/02 19:07:55 INFO executor.Executor: Serialized size of result for 6 is 195114/07/02 19:07:55 INFO executor.Executor: Sending result for 6 directly to driver14/07/02 19:07:55 INFO executor.Executor: Finished task ID 614/07/02 19:07:55 INFO scheduler.TaskSetManager: Finished TID 6 in 19 ms on localhost (progress: 2/2)14/07/02 19:07:55 INFO scheduler.DAGScheduler: Completed ResultTask(2, 1)14/07/02 19:07:55 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/07/02 19:07:55 INFO scheduler.DAGScheduler: Stage 2 (collect at SparkPlan.scala:52) finished in 0.062 s14/07/02 19:07:55 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.06947625 sres6: Array[org.apache.spark.sql.Row] = Array([100,val_100], [100,val_100])

查询出2个key为100的结果。

4、总结:

Spark SQL 提供了一种Catalyst查询优化框架,在把SQL解析成逻辑执行计划,对执行计划优化,最后变成RDD操作,多种框架一种API,简单,规范。

本文暂且为止,后续还会继续相关的深入研究。

http://www.tuicool.com/articles/2Efi22

 

转载于:https://my.oschina.net/xiaominmin/blog/1597228

你可能感兴趣的文章
RHEL 7.5 设置IP地址,及常用基础网络命令
查看>>
hive count distinct和group by
查看>>
linux删除和修改乱码文件
查看>>
从网络获取图片JSON链接本地轮流播放
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
调试九法
查看>>
Iterator是怎样遍历Collection的?
查看>>
LInux Screen 命令
查看>>
初学Objective-C
查看>>
Mas OS 常用技巧
查看>>
影响微信公众号排名的因素
查看>>
探析软件项目管理十大原则
查看>>
Mybatis update delete 操作的返回值
查看>>
【iOS Tips】模态跳转的透明效果
查看>>
<![CDATA[文本内容]]>代表的意思
查看>>
macbook 网络及使用技巧
查看>>
程序员学习网站
查看>>
银行业务调度系统
查看>>
长轮询解决方案
查看>>