HiveSQL

Yandex专项课程笔记

Posted by Susie on July 3, 2019

前言

去cad&cg实验室泡汤了,为了准备下周三下午的爱奇艺面试而准备HiveSQL。 估计去面试的不止我一个,还是要做好被刷的准备。虽然我的数据库系统概论拿了4.0,但是SQL好久不用了语法还是有点忘,而且因为没有接触过Hadoop平台,所以不知道HiveSQL和MySQL的语法有没有什么差别,于是就打开尘封已久的Coursera,注册了一门和Hive有关的课程进行学习。是Yandex出品的一个专项课程中的一门课,感觉质量还不错。

四个例子

背景介绍

表格:

  • 用户的点击信息 access_log
    • 地址 host
    • 网页上的请求 request
    • 状态码
    • User Agent
  • 地理位置信息 geo_base
    • 层级信息,不同层区域名字 regioncity
    • 地址 host
  • 机器人点击信息 robot
    • 名字 bot_names
    • 地址 host
    • User Agent
  • 用户个人信息 user
    • 地址 host
    • User Agent
    • 性别 gender
    • 年龄 age

1. Top-K 问题

问题:选择点击量最高的前K个区域。 思路:根据地址名称regioncity聚类,计算每个地区有几条信息,用IP地址连接两张表,使地区名字能对应。最后用LIMIT 选中前K条记录。 SQL:

SELECT regioncity,COUNT(1) as hit_count
FROM access_log JOIN geo_base
ON (access_log.host = geo_base.host)
GROUP BY regioncity ORDER BY hit_count LIMIT K

2. 真实人vs 机器人[bots]

问题:有两张表,分别是所有用户的点击信息和虚拟人的点击信息。想统计真实用户和虚拟用户的点击量分别是多少。

思路:机器人和用户的IP地址一一对应,对每条用户数据,如果是机器人,则机器人名字非空;否则是空值,连接后的整条记录是悬浮元组。用左连接即可。1. 创建特征 2.左连接 3.分组统计 SQL:

SELECT request,
    SUM(IF(robot.bot_name IS NULL, 1, 0)) as user_hit_count,
    SUM(IF(robot.bot_name IS NOT NULL,1, 0)) as bot_hit_count
FROM access.log LEFT OUTER JOIN robot ON(
    access_log.host = robot.host
    AND access_log.user_agent = robot.user_agent)
GROUP BY request

3. 每个地区男性、女性观众的点击数

问题:如题

思路:1. 建立男、女特征 2. 表格 3.分组统计

SQL:

SELECT regioncity,
    SUM(IF(user.gender IS 'F', 1, 0)) as female_hit_count,
    SUM(IF(user.gender IS 'M', 1, 0)) as male_hit_count
FROM access_log
    JOIN geo_base ON (access_log.host = geo_base.host)
    JOIN user ON(
    access_log.host = user.host
    AND access_log.user_agent = user.user_agent
    )
GROUP BY regioncity

4. 每个地区观众的平均年龄

问题:如题

思路:和上面的差不多,换个聚类函数而已。注意的是,user不能直接和geo_base连接。

SQL:

SELECT regioncity,AVG(age)
FROM access_log
    JOIN geo_base ON (access_log.host = geo_base.host)
    JOIN user ON(
    access_log.host = user.host
    AND access_log.user_agent = user.user_agent
    )
GROUP BY regioncity

DDL 数据定义语言

1. 创建表格

1.1 直接创建表格

和MySQL语法一样,注意逗号、分号和括号。 如果在表格名字前面没有指定哪个数据库,则默认为default.

CREATE TABLE dbname.table_name(
    column1 STRING,
    column2 INT
);

或者先指定数据库和表格存储的具体位置。

USE dbname
CREATE TABLE table_name(
    column1 STRING,
    column2 INT
)
LOCATION "path/to/hdfs/location";

1.2 找到描述信息

在Hive中怎么找数据库和表格的描述信息? 前一个是关于数据库的信息;后一个是关于表格的信息,样子比较好看,因为带了格式(formatted)

> describe database dbname
> describe formatted/extended table_name

1.3 根据外部数据创建表格

从外部的表中选择数据,然后插入。

CREATE TABLE table_name
AS SELECT attri_name
FROM table_name
WHERE state = 'CA';

2. 输入数据,修改分隔符

在Hadoop中,使用Tab来把列属性分隔开,但是在Hive中,默认使用Ctrl + A作为分隔符。也可以自己指定分隔符delimitor。

从语法中也可以看出field是以delimitor作为终结的。 注意行输入分隔符的定义之间是没有逗号的。

USE dbname
CREATE TABLE table_name(
    column1 STRING,
    column2 INT
)
ROW FORMAT DELIMINATED
    FIELDS TERMINATED BY '\t'#默认Ctrl+A'\001'此处改成Tab
    COLLECTION ITEMS TERMINATED BY '\002'#默认Ctrl+B
    MAP KEYS TERMINATED BY '\003'#默认Ctrl+C
    LINES TERMINATED BY '\n' #默认回车
STORED AS file_format
LOCATION "path/to/hdfs/location";

Field 分隔符是用来把列和列之间的输入分开的;Collection items分隔符是用来把列表内部元素分开的,相当于‘,’(Field不仅可以是字符串、数字等数据结构,也可以是列表、字典、结构体);MAP KEYS是用来做映射的,相当于’:’。

可以使用STORED AS 语法,把定义好的格式存储下来,默认是text文档,也可以变成binary的

同时,也可以选择不输入数据,直接用外部的数据建表。 如果是外部数据的表格,则在删除的时候不会删除数据,只会删除表的样式,可以恢复;如果是直接建立管理表格,则删除的时候数据跟着表格一起删除了。

CREATE TEMPORARY[EXTERNAL] TABLE table_name;

3. 一些函数的用法

  • ROW_NUMBER()… OVER(ORDER BY column ASC) 必须要跟Over,否则会出错
  • IF(条件,对的数值,错的数值)
  • 注意Hive中没有EXIST

DML 数据操作语言

1. 把数据导入表格

如果数据是来自HDFS集群的,语法如下。

LOAD DATA INPATH '/local/path/employees'
INTO TABLE employees;

如果数据是来自本机的,加一个LOCAL,这样这个数据会先加载到HDFS中,然后加入集群。

LOAD DATA LOCAL INPATH '/local/path/employees'
INTO TABLE employees;

如果担心插入的表格中还有别的不想要的数据,在INTO前加一个OVERWRITE覆盖掉。

2. 把数据导出表格

和传统的数据库类似的导出

INSERT OVERWRITE [LOCAL] DIRECTORY '/tmp/emloyees'
SELECT name,salary, address
FROM table_name
where blabla;

Hive特有的多重插入语句,从一个表格中选择不同的子集,存储到不同的位置中。

FROM table_name
INSERT OVERWRITE [LOCAL] DIRECTORY '/tmp/CA_emloyees'
SELECT name,salary,address
WHERE state = 'CA'
INSERT OVERWRITE [LOCAL] DIRECTORY '/tmp/NY_emloyees'
WHERE state = 'NY';

3. MapReduce和Hive的关联

  • SELECT FROM 是从一张表里面取子集,Map阶段
  • WHERE 和SELECT一样,都是取子集,Map阶段
  • GROUP BY 是聚合,在Shuffle&Sort阶段+ Having 在Reduce阶段
  • JOIN Map或者Reduce阶段都可以进行
  • ORDER BY/ SORT BY 在Reduce阶段,开销比ORDER BY 小

正则表达式

在使用外部数据创建表格的时候,当需要正则表达式来分开某列中复杂表达式的各个项的时候,需要用到正则表达式把复杂的项分开。例如

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] “GET /apache_pb.gif HTTP/1.0” 200

可以用下面的正则表达式来分开

ROW FORMAT
    SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES(
    'input.regex' = '^(\\S*)(\\S*)(\\S*)\\[([^\\]]*)\\]"([^"]*)"(\\S*) .*$'
    )

SERDE前半部分是’SER’,即serialisation,后半部分是Deserialisation。后面跟的是JAVA的类 在上面的表达式中,’^’代表以字符开始,一个()代表1列,’\‘代表后面的’'是反义的,’\S’代表一个非空格的字符,*代表是任意的。

正则表达式匹配的对象是数据,但是有时候列的标题也被匹配了,从而出现’NULL’行,可以在LOCATION后添加一行,tbl properties,跳过标题行。

TBLPROPERTIES('skip.header.line.count' = '1');

视图

1.1 创建视图

视图是基于基本表创建的,无法修改基本表中的内容,删除也不会影响基本表。

CREATE VIEW apache_log_view(
    ip,
    request_year,
    request,
    status_code
)
AS SELECT
    ip,
    regexp_extract(request_time, "\\d+\\/\\w+\\/(\\d+)", 1),
    request, status_code
FROM apache_log;

上面的正则表达式对应的是”10/Oct/2000”,想提取的是2000,所以用括号括起来。

1.2 查询视图的有关信息

SHOW TABLES; #可以显示已经创建的基本表和视图
SHOW CREATE TABLE table_name; #可以显示创建表/视图的语法

1.3 视图的缺点

  • 只读,无法修改
  • 创建视图比起直接读取会消耗更多的内存
  • 当基本表发生变动的时候,现在的表可能会变得没用

用Hive来分析

可以使用自己定义的函数进行分析,流程

  1. 定义自己的函数UDF(user-defined functions)只能用JAVA写
  2. 编译成.jar文件
  3. 把它们应用到集群cluster上
    hive> add jar /path/to/lib.jar;
    hive> create temporary function func_name as 'java.class.name'; #创建暂时的函数
    hive> select func_name(...)...; #使用新函数
    ...
    hive> drop temporary function func_name; #删除新函数
    

    如果不加temporary,则是永久的。 UDF(1对1)用在map阶段,UDAF(用户定义聚合函数,n对1)用在reduce阶段,UDTF(用户定义表格生成函数,1对n)可以用在任意阶段。

SELECT maneger_name, employee
FORM management
    LATERAL VIEW explode(direct_reports) lateral_table
    AS employee

其中的LATERAL VIEW是用来把UDTF函数explode的结果和基本表management结合起来的,如果一个SELECT里面又有column又有UDTF就会报错。

如果想知道每个操作是怎么执行的,只需要在语言(e.g.SELECT)前面加EXPLAIN即可。

Hive streaming

可以不使用JAVA,而使用python或者其他的语言如shell来操作定义的函数

FROM mytable
SELECT TRANSFORM(columnA, columnB)
USING "/bin/cat"
AS new_A, new_B

或者

FROM mytable
SELECT TRANSFORM(columnA, columnB)
USING "/bin/cat -f1"
AS new_A, new_B

在下面的情况中,输出的结果不是两列,因为在cat函数中把后面的砍掉了,所以输出的两列中第二列全部都是NULL。

在python中,WordCount样例中的mapper.py:

from __future__ import print_function
import sys

for line in sys.stdin:
    article_id, content = line.split("\t", 1)
    words = content.split()
    for word in words:
        print(word, 1, sep = "\t"

在python中,WordCount样例中的reducer.py

for line in sys.stdin:
    word, counts = line.split("\t", 1)
    counts = int(counts)
    if word == current_word:
        word_count += counts
    else:
        if current_word:
            print(current_word, word_count, sep="\t")
        current_word = word
        word_count = counts

在sql中,整个流程

ADD FILE /path/mapper.py;
ADD FILE /path/reducer.py;
FROM (
    FROM wiki_sample
    SELECT TRANSFORM (line)
    USING "./mapper.py" AS word, counts
    DISTRIBUTE BY word SORT BY word
    ) word_pairs
    SELECT TRANSFORM (word_pairs.word, word_pairs.counts)
    USING "./reducer.py"
    AS word, counts

distribute by是控制在map端如何拆分数据给reduce端的,是一个Partitioner。hive会根据distribute by后面的列,在这里是word,对应reduce的个数进行分发。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。

如果distribute by 和sort by后面跟的是同一列,那么可以直接使用cluster by,但是只能倒序。

不能只使用MAP()或者REDUCE()而不使用DISTRIBUTE BY/CLUSTER BY,因为后者才能保证你的script运行了而前者可能根本没有运行。

总结,可以正确保证reduce阶段的语句有

  1. DISTRIBUTE BY … REDUCE
  2. DISTRIBUTE BY … TRANSFORM
  3. CLUSTER BY … TRANSFORM

窗口函数

PTF(partition table functions),是n:m的映射 有一个动态的窗口,里面显示的就是我们要分析与统计的数据。

SELECT column_A, 
    ROW_NUMBER() OVER (PARTITION BY column_C),
    RANK() OVER (PARTITION BY column_C),
    DENSE_RANK() OVER (PARTITION BY column_C)
FROM table_name;

SELECT column_A, 
    ROW_NUMBER() OVER w,
    RANK() OVER w,
    DENSE_RANK() OVER w
FROM table_name
WINDOW w AS (PARTITION BY column_C)

上面两种写法都可以,一个是没有定义窗口名字,一个定义了窗口名字。

三个函数的区别:

  1. ROW_NUMBER() 给每行一个不同的数字,12345这么排列
  2. RANK() 是秩,相同属性的秩是一样的,如果一个属性对应了好多条,下一个属性的秩会相加,秩不是连续的aabc对应1134
  3. DENSE_RANK() 就是紧密连续的秩。aabc对应1123

PTF函数接收一整个partition,然后根据特定的条件返回一部分partition,并不会使输出的行数增加。

Hive优化技巧

1. Partition

优化的原理:进行正确的分割,让大部分任务可以通过选取一个小子集来完成。

在选择数据中的一部分进行聚合的时候(WHERE),都是先筛选,然后再把数据读入存储空间进行计算。 当筛选的结构比较多的时候,也可以建立层级结构。

需要注意的是,PARTITION里面出现的列不能出现在CREATE TABLE里面。

CREATE TABLE partitioned(
    ip STRING,
    ...
    )
PARTITIONED BY( 
    year STRING,
    month STRING,
    day STRING);

从raw_access里面导出数据到新表格partitioned,因为在创建表格的时候有partitioned by,所以需要遵守一些规则。

FROM raw_access
INSERT OVERWRITE TABLE partitioned
PARTITION (year = ?, month=?, day=?)
SELECT ip,...,year,month,day

这种是进行动态分割,让其他部分不受影响。 需要遵守的规则有

  • 用来分割的动态的列排在最后
  • 用来分割的列的顺序很重要,SELECT和PARTITION中的顺序需要一样
  • 使用hive布局的一些参数,如果文件数目很多,需要调上限
  • 控制空白分割,如令error.on.empty.partition = True
  • 用来分割的列可以是动态和静态混合的,比如一个指定了’2017’,其他没有指定

2. Bucket和Sampling

优化的原理:在存储的时候优化存储结构,让抽样可以进行得比较迅速;在根据Bucket进行连接的时候也比较快;合并排序也是。比如说把一个用户的所有订单放在一个桶里面,这样聚合的时候就比较省时间。(因为对于用户的聚合操作经常发生)

在每个分割的文件夹中,有很多个数据桶bucket,我们可以指定每个文件夹中有多少个桶,按照什么样的顺序来排序。下面是如何设置。

CREATE TABLE granular_access_log(
    ip STRING,
    ...
)
PARTITIONED BY (request_date STRING)
CLUSTERED BY (column_name, ...)
    SORTED BY (column_nameA,...)
    INTO 200 BUCKETS;

在设置完桶之后,可以对桶进行抽样。下面的就是抽25%的数据。

SELECT ip,...
FROM granular_access_log
    TABLESAMPLE(BUCKET 1 OUT OF 4 ON user_id)

在写入的时候,我们需要注意让数据适应表格和桶的布局,设置相对应的reducer数目,DISTRIBUTE BY COLUMNS。 例如”SET mapred.reduce,tasks = 200;”

如果其中有一些参数过时了,那么就会出错,可以通过改变设置来避免这个错误。

SET hive.enforce.bucketing = true;

如果上面这个参数是TRUE,那么MapReduce会自动增加一些阶段避免出错。

在优化中,如果我们一直对固定的一些列做聚合操作,比如在WordCount问题中,只涉及到了Map阶段,没有涉及后面两个阶段,因此在中间建立一个安排好桶的表格可以节约大量的资源。

Hadoop的工作流程

  1. client node从HDFS系统中下载一个小表到cache中
  2. 建立哈希表
  3. 把哈希表上传到分布式系统的cache中

如果两张表在做连接的时候bucket数目不一样,应该怎么连接? 数据一方面是由哈希表决定的,另一方面要除以bucket的数目。 比如一边2个一边4个,4个的那边余数分别是1,0,1,0。???没搞懂

所以希望哈希桶数目总是2的倍数。

在Hive中,最有效率的排序是外部排序-合并排序。

3. Data Skew

第一种选择是List table,会创建很多文件夹。

CREATE TABLE skewed_log(
    ip STRING,
    ...
    user_id STRING,
    ...
)
SKEWED BY (user_id) ON ("unknown", "1")
    STORED AS DIRECTORIES

注意,当使用data skew的时候,需要让hive.mapred.supports.subdirectories=true。这样的话,Hive会自动生成子文件夹。

第二种选择是skewed table。

CREATE TABLE skewed_access(
    ip STRING,
    ...
)
SKEWED BY (user_id) ON ("unknown", "1")

在这种情况下,不会创建多的子文件夹,Hive自动扫描好几遍表格。第一遍,提取并处理skewed keys,第二遍,Hive框架处理其他的键,然后合并。

如果skewed keys和普通的键都有很多,第一种会创建很多很多文件夹,浪费空间;但是如果日常工作中需要把skewed和unskewed分开处理的话,第一种方法比较好。