前言
去cad&cg实验室泡汤了,为了准备下周三下午的爱奇艺面试而准备HiveSQL。 估计去面试的不止我一个,还是要做好被刷的准备。虽然我的数据库系统概论拿了4.0,但是SQL好久不用了语法还是有点忘,而且因为没有接触过Hadoop平台,所以不知道HiveSQL和MySQL的语法有没有什么差别,于是就打开尘封已久的Coursera,注册了一门和Hive有关的课程进行学习。是Yandex出品的一个专项课程中的一门课,感觉质量还不错。
四个例子
背景介绍
表格:
- 用户的点击信息 access_log
- 地址 host
- 网页上的请求 request
- 状态码
- User Agent
- 地理位置信息 geo_base
- 层级信息,不同层区域名字 regioncity
- 地址 host
- 机器人点击信息 robot
- 名字 bot_names
- 地址 host
- User Agent
- 用户个人信息 user
- 地址 host
- User Agent
- 性别 gender
- 年龄 age
1. Top-K 问题
问题:选择点击量最高的前K个区域。 思路:根据地址名称regioncity聚类,计算每个地区有几条信息,用IP地址连接两张表,使地区名字能对应。最后用LIMIT 选中前K条记录。 SQL:
SELECT regioncity,COUNT(1) as hit_count
FROM access_log JOIN geo_base
ON (access_log.host = geo_base.host)
GROUP BY regioncity ORDER BY hit_count LIMIT K
2. 真实人vs 机器人[bots]
问题:有两张表,分别是所有用户的点击信息和虚拟人的点击信息。想统计真实用户和虚拟用户的点击量分别是多少。
思路:机器人和用户的IP地址一一对应,对每条用户数据,如果是机器人,则机器人名字非空;否则是空值,连接后的整条记录是悬浮元组。用左连接即可。1. 创建特征 2.左连接 3.分组统计 SQL:
SELECT request,
SUM(IF(robot.bot_name IS NULL, 1, 0)) as user_hit_count,
SUM(IF(robot.bot_name IS NOT NULL,1, 0)) as bot_hit_count
FROM access.log LEFT OUTER JOIN robot ON(
access_log.host = robot.host
AND access_log.user_agent = robot.user_agent)
GROUP BY request
3. 每个地区男性、女性观众的点击数
问题:如题
思路:1. 建立男、女特征 2. 表格 3.分组统计
SQL:
SELECT regioncity,
SUM(IF(user.gender IS 'F', 1, 0)) as female_hit_count,
SUM(IF(user.gender IS 'M', 1, 0)) as male_hit_count
FROM access_log
JOIN geo_base ON (access_log.host = geo_base.host)
JOIN user ON(
access_log.host = user.host
AND access_log.user_agent = user.user_agent
)
GROUP BY regioncity
4. 每个地区观众的平均年龄
问题:如题
思路:和上面的差不多,换个聚类函数而已。注意的是,user不能直接和geo_base连接。
SQL:
SELECT regioncity,AVG(age)
FROM access_log
JOIN geo_base ON (access_log.host = geo_base.host)
JOIN user ON(
access_log.host = user.host
AND access_log.user_agent = user.user_agent
)
GROUP BY regioncity
DDL 数据定义语言
1. 创建表格
1.1 直接创建表格
和MySQL语法一样,注意逗号、分号和括号。 如果在表格名字前面没有指定哪个数据库,则默认为default.
CREATE TABLE dbname.table_name(
column1 STRING,
column2 INT
);
或者先指定数据库和表格存储的具体位置。
USE dbname
CREATE TABLE table_name(
column1 STRING,
column2 INT
)
LOCATION "path/to/hdfs/location";
1.2 找到描述信息
在Hive中怎么找数据库和表格的描述信息? 前一个是关于数据库的信息;后一个是关于表格的信息,样子比较好看,因为带了格式(formatted)
> describe database dbname
> describe formatted/extended table_name
1.3 根据外部数据创建表格
从外部的表中选择数据,然后插入。
CREATE TABLE table_name
AS SELECT attri_name
FROM table_name
WHERE state = 'CA';
2. 输入数据,修改分隔符
在Hadoop中,使用Tab来把列属性分隔开,但是在Hive中,默认使用Ctrl + A作为分隔符。也可以自己指定分隔符delimitor。
从语法中也可以看出field是以delimitor作为终结的。 注意行输入分隔符的定义之间是没有逗号的。
USE dbname
CREATE TABLE table_name(
column1 STRING,
column2 INT
)
ROW FORMAT DELIMINATED
FIELDS TERMINATED BY '\t'#默认Ctrl+A'\001'此处改成Tab
COLLECTION ITEMS TERMINATED BY '\002'#默认Ctrl+B
MAP KEYS TERMINATED BY '\003'#默认Ctrl+C
LINES TERMINATED BY '\n' #默认回车
STORED AS file_format
LOCATION "path/to/hdfs/location";
Field 分隔符是用来把列和列之间的输入分开的;Collection items分隔符是用来把列表内部元素分开的,相当于‘,’(Field不仅可以是字符串、数字等数据结构,也可以是列表、字典、结构体);MAP KEYS是用来做映射的,相当于’:’。
可以使用STORED AS 语法,把定义好的格式存储下来,默认是text文档,也可以变成binary的
同时,也可以选择不输入数据,直接用外部的数据建表。 如果是外部数据的表格,则在删除的时候不会删除数据,只会删除表的样式,可以恢复;如果是直接建立管理表格,则删除的时候数据跟着表格一起删除了。
CREATE TEMPORARY[EXTERNAL] TABLE table_name;
3. 一些函数的用法
- ROW_NUMBER()… OVER(ORDER BY column ASC) 必须要跟Over,否则会出错
- IF(条件,对的数值,错的数值)
- 注意Hive中没有EXIST
DML 数据操作语言
1. 把数据导入表格
如果数据是来自HDFS集群的,语法如下。
LOAD DATA INPATH '/local/path/employees'
INTO TABLE employees;
如果数据是来自本机的,加一个LOCAL,这样这个数据会先加载到HDFS中,然后加入集群。
LOAD DATA LOCAL INPATH '/local/path/employees'
INTO TABLE employees;
如果担心插入的表格中还有别的不想要的数据,在INTO前加一个OVERWRITE覆盖掉。
2. 把数据导出表格
和传统的数据库类似的导出
INSERT OVERWRITE [LOCAL] DIRECTORY '/tmp/emloyees'
SELECT name,salary, address
FROM table_name
where blabla;
Hive特有的多重插入语句,从一个表格中选择不同的子集,存储到不同的位置中。
FROM table_name
INSERT OVERWRITE [LOCAL] DIRECTORY '/tmp/CA_emloyees'
SELECT name,salary,address
WHERE state = 'CA'
INSERT OVERWRITE [LOCAL] DIRECTORY '/tmp/NY_emloyees'
WHERE state = 'NY';
3. MapReduce和Hive的关联
- SELECT FROM 是从一张表里面取子集,Map阶段
- WHERE 和SELECT一样,都是取子集,Map阶段
- GROUP BY 是聚合,在Shuffle&Sort阶段+ Having 在Reduce阶段
- JOIN Map或者Reduce阶段都可以进行
- ORDER BY/ SORT BY 在Reduce阶段,开销比ORDER BY 小
正则表达式
在使用外部数据创建表格的时候,当需要正则表达式来分开某列中复杂表达式的各个项的时候,需要用到正则表达式把复杂的项分开。例如
127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] “GET /apache_pb.gif HTTP/1.0” 200
可以用下面的正则表达式来分开
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES(
'input.regex' = '^(\\S*)(\\S*)(\\S*)\\[([^\\]]*)\\]"([^"]*)"(\\S*) .*$'
)
SERDE前半部分是’SER’,即serialisation,后半部分是Deserialisation。后面跟的是JAVA的类 在上面的表达式中,’^’代表以字符开始,一个()代表1列,’\‘代表后面的’'是反义的,’\S’代表一个非空格的字符,*代表是任意的。
正则表达式匹配的对象是数据,但是有时候列的标题也被匹配了,从而出现’NULL’行,可以在LOCATION后添加一行,tbl properties,跳过标题行。
TBLPROPERTIES('skip.header.line.count' = '1');
视图
1.1 创建视图
视图是基于基本表创建的,无法修改基本表中的内容,删除也不会影响基本表。
CREATE VIEW apache_log_view(
ip,
request_year,
request,
status_code
)
AS SELECT
ip,
regexp_extract(request_time, "\\d+\\/\\w+\\/(\\d+)", 1),
request, status_code
FROM apache_log;
上面的正则表达式对应的是”10/Oct/2000”,想提取的是2000,所以用括号括起来。
1.2 查询视图的有关信息
SHOW TABLES; #可以显示已经创建的基本表和视图
SHOW CREATE TABLE table_name; #可以显示创建表/视图的语法
1.3 视图的缺点
- 只读,无法修改
- 创建视图比起直接读取会消耗更多的内存
- 当基本表发生变动的时候,现在的表可能会变得没用
用Hive来分析
可以使用自己定义的函数进行分析,流程
- 定义自己的函数UDF(user-defined functions)只能用JAVA写
- 编译成.jar文件
- 把它们应用到集群cluster上
hive> add jar /path/to/lib.jar; hive> create temporary function func_name as 'java.class.name'; #创建暂时的函数 hive> select func_name(...)...; #使用新函数 ... hive> drop temporary function func_name; #删除新函数
如果不加temporary,则是永久的。 UDF(1对1)用在map阶段,UDAF(用户定义聚合函数,n对1)用在reduce阶段,UDTF(用户定义表格生成函数,1对n)可以用在任意阶段。
SELECT maneger_name, employee
FORM management
LATERAL VIEW explode(direct_reports) lateral_table
AS employee
其中的LATERAL VIEW是用来把UDTF函数explode的结果和基本表management结合起来的,如果一个SELECT里面又有column又有UDTF就会报错。
如果想知道每个操作是怎么执行的,只需要在语言(e.g.SELECT)前面加EXPLAIN即可。
Hive streaming
可以不使用JAVA,而使用python或者其他的语言如shell来操作定义的函数
FROM mytable
SELECT TRANSFORM(columnA, columnB)
USING "/bin/cat"
AS new_A, new_B
或者
FROM mytable
SELECT TRANSFORM(columnA, columnB)
USING "/bin/cat -f1"
AS new_A, new_B
在下面的情况中,输出的结果不是两列,因为在cat函数中把后面的砍掉了,所以输出的两列中第二列全部都是NULL。
在python中,WordCount样例中的mapper.py:
from __future__ import print_function
import sys
for line in sys.stdin:
article_id, content = line.split("\t", 1)
words = content.split()
for word in words:
print(word, 1, sep = "\t"
在python中,WordCount样例中的reducer.py
for line in sys.stdin:
word, counts = line.split("\t", 1)
counts = int(counts)
if word == current_word:
word_count += counts
else:
if current_word:
print(current_word, word_count, sep="\t")
current_word = word
word_count = counts
在sql中,整个流程
ADD FILE /path/mapper.py;
ADD FILE /path/reducer.py;
FROM (
FROM wiki_sample
SELECT TRANSFORM (line)
USING "./mapper.py" AS word, counts
DISTRIBUTE BY word SORT BY word
) word_pairs
SELECT TRANSFORM (word_pairs.word, word_pairs.counts)
USING "./reducer.py"
AS word, counts
distribute by是控制在map端如何拆分数据给reduce端的,是一个Partitioner。hive会根据distribute by后面的列,在这里是word,对应reduce的个数进行分发。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。
如果distribute by 和sort by后面跟的是同一列,那么可以直接使用cluster by,但是只能倒序。
不能只使用MAP()或者REDUCE()而不使用DISTRIBUTE BY/CLUSTER BY,因为后者才能保证你的script运行了而前者可能根本没有运行。
总结,可以正确保证reduce阶段的语句有
- DISTRIBUTE BY … REDUCE
- DISTRIBUTE BY … TRANSFORM
- CLUSTER BY … TRANSFORM
窗口函数
PTF(partition table functions),是n:m的映射 有一个动态的窗口,里面显示的就是我们要分析与统计的数据。
SELECT column_A,
ROW_NUMBER() OVER (PARTITION BY column_C),
RANK() OVER (PARTITION BY column_C),
DENSE_RANK() OVER (PARTITION BY column_C)
FROM table_name;
SELECT column_A,
ROW_NUMBER() OVER w,
RANK() OVER w,
DENSE_RANK() OVER w
FROM table_name
WINDOW w AS (PARTITION BY column_C)
上面两种写法都可以,一个是没有定义窗口名字,一个定义了窗口名字。
三个函数的区别:
- ROW_NUMBER() 给每行一个不同的数字,12345这么排列
- RANK() 是秩,相同属性的秩是一样的,如果一个属性对应了好多条,下一个属性的秩会相加,秩不是连续的aabc对应1134
- DENSE_RANK() 就是紧密连续的秩。aabc对应1123
PTF函数接收一整个partition,然后根据特定的条件返回一部分partition,并不会使输出的行数增加。
Hive优化技巧
1. Partition
优化的原理:进行正确的分割,让大部分任务可以通过选取一个小子集来完成。
在选择数据中的一部分进行聚合的时候(WHERE),都是先筛选,然后再把数据读入存储空间进行计算。 当筛选的结构比较多的时候,也可以建立层级结构。
需要注意的是,PARTITION里面出现的列不能出现在CREATE TABLE里面。
CREATE TABLE partitioned(
ip STRING,
...
)
PARTITIONED BY(
year STRING,
month STRING,
day STRING);
从raw_access里面导出数据到新表格partitioned,因为在创建表格的时候有partitioned by,所以需要遵守一些规则。
FROM raw_access
INSERT OVERWRITE TABLE partitioned
PARTITION (year = ?, month=?, day=?)
SELECT ip,...,year,month,day
这种是进行动态分割,让其他部分不受影响。 需要遵守的规则有
- 用来分割的动态的列排在最后
- 用来分割的列的顺序很重要,SELECT和PARTITION中的顺序需要一样
- 使用hive布局的一些参数,如果文件数目很多,需要调上限
- 控制空白分割,如令error.on.empty.partition = True
- 用来分割的列可以是动态和静态混合的,比如一个指定了’2017’,其他没有指定
2. Bucket和Sampling
优化的原理:在存储的时候优化存储结构,让抽样可以进行得比较迅速;在根据Bucket进行连接的时候也比较快;合并排序也是。比如说把一个用户的所有订单放在一个桶里面,这样聚合的时候就比较省时间。(因为对于用户的聚合操作经常发生)
在每个分割的文件夹中,有很多个数据桶bucket,我们可以指定每个文件夹中有多少个桶,按照什么样的顺序来排序。下面是如何设置。
CREATE TABLE granular_access_log(
ip STRING,
...
)
PARTITIONED BY (request_date STRING)
CLUSTERED BY (column_name, ...)
SORTED BY (column_nameA,...)
INTO 200 BUCKETS;
在设置完桶之后,可以对桶进行抽样。下面的就是抽25%的数据。
SELECT ip,...
FROM granular_access_log
TABLESAMPLE(BUCKET 1 OUT OF 4 ON user_id)
在写入的时候,我们需要注意让数据适应表格和桶的布局,设置相对应的reducer数目,DISTRIBUTE BY COLUMNS。 例如”SET mapred.reduce,tasks = 200;”
如果其中有一些参数过时了,那么就会出错,可以通过改变设置来避免这个错误。
SET hive.enforce.bucketing = true;
如果上面这个参数是TRUE,那么MapReduce会自动增加一些阶段避免出错。
在优化中,如果我们一直对固定的一些列做聚合操作,比如在WordCount问题中,只涉及到了Map阶段,没有涉及后面两个阶段,因此在中间建立一个安排好桶的表格可以节约大量的资源。
Hadoop的工作流程
- client node从HDFS系统中下载一个小表到cache中
- 建立哈希表
- 把哈希表上传到分布式系统的cache中
如果两张表在做连接的时候bucket数目不一样,应该怎么连接? 数据一方面是由哈希表决定的,另一方面要除以bucket的数目。 比如一边2个一边4个,4个的那边余数分别是1,0,1,0。???没搞懂
所以希望哈希桶数目总是2的倍数。
在Hive中,最有效率的排序是外部排序-合并排序。
3. Data Skew
第一种选择是List table,会创建很多文件夹。
CREATE TABLE skewed_log(
ip STRING,
...
user_id STRING,
...
)
SKEWED BY (user_id) ON ("unknown", "1")
STORED AS DIRECTORIES
注意,当使用data skew的时候,需要让hive.mapred.supports.subdirectories=true。这样的话,Hive会自动生成子文件夹。
第二种选择是skewed table。
CREATE TABLE skewed_access(
ip STRING,
...
)
SKEWED BY (user_id) ON ("unknown", "1")
在这种情况下,不会创建多的子文件夹,Hive自动扫描好几遍表格。第一遍,提取并处理skewed keys,第二遍,Hive框架处理其他的键,然后合并。
如果skewed keys和普通的键都有很多,第一种会创建很多很多文件夹,浪费空间;但是如果日常工作中需要把skewed和unskewed分开处理的话,第一种方法比较好。