Contents
问题背景
DynamoDB 创建了类似如下的表:
"accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", // Primary key.
"date": "2018-12-12",
"when": "2018-12-12 17:22:15",
"context": "something"
}
这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。
我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].
> SELECT * FROM ddb_table WHERE date="2018-12-12";
Dynamodb表里存放了很多天的数据,执行以上操作会消耗很长时间,如何能加快速度?比如让 Hive 进行并发读取?
测试使用的 AWS EMR 版本为 emr-5.20.0 (Amazon 2.8.5, Ganglia 3.7.2, Hive 2.3.4, Hue 4.3.0, Mahout 0.13.0, Pig 0.17.0, Tez 0.9.1). emr-dynamodb connector 的版本是 4.7.
emr-ddb-hadoop-4.7.0-1.amzn1.noarch
emr-ddb-hive-4.7.0-1.amzn1.noarch
emr-ddb-4.7.0-1.amzn1.noarch
DynamoDB 的 Query 和 Scan 操作
DynamoDB 是个 NoSQL 数据库。在 Hive 操作中,如果 WHERE 后面的条件是 Primary Key,比如:
这时,Hive 会进行 Query 操作,能直接拿到结果。可以理解为 O(1) 的操作。
而如果 WHERE 后面的条件只是个普通的 Attribute, 那 Hive 需要调用 Scan 操作,扫描整个表的条目,过滤出符合条件的条件。可以理解为 O(n) 的操作。比如:
> SELECT * FROM ddb_table WHERE date="2018-12-12";
Hive 需要读取整个表,对比每个条目的 date 是否为"2018-12-12". 因此,这个操作需要读取大量数据,消耗大量 DynamoDB 的 RCU.
Scan 操作是可以分 segment 的[2]。在这种情况下,如果 Hive 能在不同节点同时读取不同的 segment,这样可以加快读取速度。
Query 操作能直接读取 Key 对应的 Value, 没有并行的需要。
Hive 对 DynamoDB 的并发读取
按照上面说法,如果能让 Hive 启动多个 container 扫描 dynamodb table 的不同 segment, 可以起到并发加速的效果。
Hive 查询 DynamoDB 的动作实际上由 emr-dynamodb-connector 来实现。它会根据 DynamoDB 当前的 RCU 设定,来决定分多少个 segment 同时读取。一般来说,每 200 RCU 会启动一个 container。所以,如果我们把 DynamoDB 的 RCU 设置为 6000 (在 DynamoDB 侧设定), Hive 会同时启动 30 个 container 来读取数据。
> SELECT * FROM ddb_table;
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 30 30 0 0 0 0
----------------------------------------------------------------------------------------------
具体逻辑可参考 emr-dynamodb-connector 相关代码[3].
使用合适的 DynamoDB Key 来用 Query 代替 Scan
尽管并发可以提高速度,但是当 DynamoDB table 增长后,scan 的耗时将会越来越长,而且加大 RCU 会产生不少的费用。
如果可以优化 DynamoDB Table 的设计,将 Scan 的操作转换为 Query,速度能极大地提升,且能节省费用。
考虑到 Hive 需要用 date 进行查询,建立 Global Secondary Index 或许是个方法[4]。简单地说,就是建立除了 Primary Key 之外的索引,使得查询这个 GSI 的时候,也能做到 O(1) 的效果。比如,在上述例子中,可以将 date 设置为 GSI. 这样,使用 date 作为查询条件,也可以调用 query 操作,不需要 scan. 可惜,现在的 emr-dynamodb-connector 还不支持 GSI,Hive 还会傻傻地去扫描整个 Table.[5]
为了绕过这个问题,我们可以把 Date 设置成 Primary Key, 把 when 设置成 sort key (因为一个 key 只能有一个 value, 所以要用一个组合 key 来避免覆盖),示例数据:
"accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", //<-- Global Secondary Index Partition Key.
"date": "2018-12-19", //<-- Primary Key
"context": "8c5d5b6c-1f50-4fc6-9462-c3dc01efe54a",
"when": "2018-12-19 0 0426c743-0d1a-44ef-8bd8-f9765387c981", //<-- Sort Key, 后面加入随机字符串以避免key重复
}
如此一来,使用 Hive 以 date 作为条件进行查询的时候,就会调用 query,加快速度并节省费用。
> SELECT * FROM ddb_tkio_ds WHERE ds="2018-12-18";
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 10.50 s
----------------------------------------------------------------------------------------------
几万条数据,只用了10秒就搞定。
修改 Table 后,应用程序需要相应修改来通过 GSI 提取数据。使用 GSI 需要指定 index-name。例如:
IndexName='accessid-index', ##<---
KeyConditionExpression=Key(''accessid").eq("c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9")
)
具体可参考[6].
后续
前面提到,如果要使 Hive 并发进行 Scan,需要手动调大 RCU。长期调大 RCU 可能会产生大量费用。
目前,emr-dynamodb-connector 正在添加对 DynamoDB on-demand capacity 的支持[7]。如果这个功能最后能实现,用户可以把 DynamoDB table 设置成 On-Demand 模式,按实际的读写数量来收费。Hive 在进行 scan 的时候,也会自动地并发。
参考文档
[1] https://docs.amazonaws.cn/en_us/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
[2] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan
[3] https://github.com/awslabs/emr-dynamodb-connector/blob/47a1a9f752b71767f981b284b522f6edb50c1370/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java#L98
[4] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html
[5] https://github.com/awslabs/emr-dynamodb-connector/issues/87
[6] https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
[7] https://github.com/awslabs/emr-dynamodb-connector/issues/86