如何使 Hive 对 DynamoDB 进行并发读取?

问题背景

DynamoDB 创建了类似如下的表:

{
"accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", // Primary key.
"date": "2018-12-12",
"when": "2018-12-12 17:22:15",
"context": "something"
}

这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。

我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].

hive> INSERT OVERWRITE TABLE  s3_table
    > SELECT * FROM ddb_table WHERE date="2018-12-12";

Dynamodb表里存放了很多天的数据,执行以上操作会消耗很长时间,如何能加快速度?比如让 Hive 进行并发读取?

测试使用的 AWS EMR 版本为 emr-5.20.0 (Amazon 2.8.5, Ganglia 3.7.2, Hive 2.3.4, Hue 4.3.0, Mahout 0.13.0, Pig 0.17.0, Tez 0.9.1). emr-dynamodb connector 的版本是 4.7.

$ sudo rpm -qa | grep emr-ddb
emr-ddb-hadoop-4.7.0-1.amzn1.noarch
emr-ddb-hive-4.7.0-1.amzn1.noarch
emr-ddb-4.7.0-1.amzn1.noarch

DynamoDB 的 Query 和 Scan 操作

DynamoDB 是个 NoSQL 数据库。在 Hive 操作中,如果 WHERE 后面的条件是 Primary Key,比如:

hive > SELECT * FROM ddb_table WHERE accessid="c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9";

这时,Hive 会进行 Query 操作,能直接拿到结果。可以理解为 O(1) 的操作。

而如果 WHERE 后面的条件只是个普通的 Attribute, 那 Hive 需要调用 Scan 操作,扫描整个表的条目,过滤出符合条件的条件。可以理解为 O(n) 的操作。比如:

hive> INSERT OVERWRITE TABLE  s3_table
    > SELECT * FROM ddb_table WHERE date="2018-12-12";

Hive 需要读取整个表,对比每个条目的 date 是否为"2018-12-12". 因此,这个操作需要读取大量数据,消耗大量 DynamoDB 的 RCU.

Scan 操作是可以分 segment 的[2]。在这种情况下,如果 Hive 能在不同节点同时读取不同的 segment,这样可以加快读取速度。

Query 操作能直接读取 Key 对应的 Value, 没有并行的需要。

Hive 对 DynamoDB 的并发读取

按照上面说法,如果能让 Hive 启动多个 container 扫描 dynamodb table 的不同 segment, 可以起到并发加速的效果。

Hive 查询 DynamoDB 的动作实际上由 emr-dynamodb-connector 来实现。它会根据 DynamoDB 当前的 RCU 设定,来决定分多少个 segment 同时读取。一般来说,每 200 RCU 会启动一个 container。所以,如果我们把 DynamoDB 的 RCU 设置为 6000 (在 DynamoDB 侧设定), Hive 会同时启动 30 个 container 来读取数据。

hive> INSERT OVERWRITE TABLE s3_table
    > SELECT * FROM ddb_table;
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     30         30        0        0       0       0
----------------------------------------------------------------------------------------------

具体逻辑可参考 emr-dynamodb-connector 相关代码[3].

使用合适的 DynamoDB Key 来用 Query 代替 Scan

尽管并发可以提高速度,但是当 DynamoDB table 增长后,scan 的耗时将会越来越长,而且加大 RCU 会产生不少的费用。

如果可以优化 DynamoDB Table 的设计,将 Scan 的操作转换为 Query,速度能极大地提升,且能节省费用。

考虑到 Hive 需要用 date 进行查询,建立 Global Secondary Index 或许是个方法[4]。简单地说,就是建立除了 Primary Key 之外的索引,使得查询这个 GSI 的时候,也能做到 O(1) 的效果。比如,在上述例子中,可以将 date 设置为 GSI. 这样,使用 date 作为查询条件,也可以调用 query 操作,不需要 scan. 可惜,现在的 emr-dynamodb-connector 还不支持 GSI,Hive 还会傻傻地去扫描整个 Table.[5]

为了绕过这个问题,我们可以把 Date 设置成 Primary Key, 把 when 设置成 sort key (因为一个 key 只能有一个 value, 所以要用一个组合 key 来避免覆盖),示例数据:

{
  "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", //<-- Global Secondary Index Partition Key.
  "date": "2018-12-19",  //<-- Primary Key
  "context": "8c5d5b6c-1f50-4fc6-9462-c3dc01efe54a",
  "when": "2018-12-19 0 0426c743-0d1a-44ef-8bd8-f9765387c981", //<-- Sort Key, 后面加入随机字符串以避免key重复
}

如此一来,使用 Hive 以 date 作为条件进行查询的时候,就会调用 query,加快速度并节省费用。

hive> INSERT OVERWRITE TABLE  ddb_table_1
    > SELECT * FROM ddb_tkio_ds WHERE ds="2018-12-18";
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 10.50 s
----------------------------------------------------------------------------------------------

几万条数据,只用了10秒就搞定。

修改 Table 后,应用程序需要相应修改来通过 GSI 提取数据。使用 GSI 需要指定 index-name。例如:

response = table.query(
    IndexName='accessid-index',  ##<---
    KeyConditionExpression=Key(''accessid").eq("c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9")
)

具体可参考[6].

后续

前面提到,如果要使 Hive 并发进行 Scan,需要手动调大 RCU。长期调大 RCU 可能会产生大量费用。
目前,emr-dynamodb-connector 正在添加对 DynamoDB on-demand capacity 的支持[7]。如果这个功能最后能实现,用户可以把 DynamoDB table 设置成 On-Demand 模式,按实际的读写数量来收费。Hive 在进行 scan 的时候,也会自动地并发。

参考文档

[1] https://docs.amazonaws.cn/en_us/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
[2] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan
[3] https://github.com/awslabs/emr-dynamodb-connector/blob/47a1a9f752b71767f981b284b522f6edb50c1370/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java#L98
[4] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html
[5] https://github.com/awslabs/emr-dynamodb-connector/issues/87
[6] https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
[7] https://github.com/awslabs/emr-dynamodb-connector/issues/86