hive-server2 remains 2k+ s3n-worker threads after job finished


tldr: This is a bug in EMR 5.6. Upgrading to EMR 5.8 or above can solve the issue.

Issue

User reported that he sees 2000+ s3n-worker threads after job finished. He has to restart the hive-server2 service everyday to mitigate the issue.
[cc lang=”text”]
# sudo -u hive jstack 11089 | grep s3n-worker | wc -l
2000
[/cc]

The threads are repeating from s3n-worker-0 to s3n-worker-19. In another word, there are 100 * 20 s3n-worker threads.
[cc lang=”text”]
“s3n-worker-19” #70 daemon prio=5 os_prio=0 tid=0x00007f5ac4cf0800 nid=0x10ad waiting on condition [0x00007f5ac1dee000]
……
“s3n-worker-1” #52 daemon prio=5 os_prio=0 tid=0x00007f5ac5462000 nid=0x109b waiting on condition [0x00007f5aca23f000]
“s3n-worker-0” #51 daemon prio=5 os_prio=0 tid=0x00007f5ac5480000 nid=0x109a waiting on condition [0x00007f5aca641000]
……
[/cc]

Environment

AWS EMR 5.6

Analysis

1. For “remaining threads” issue, first we might check the “lsof” output to see if there’s any CLOSE_WAIT connections. However, in user’s environment, there are only 2~3 CLOSE_WAIT connections. We can rule out this one.

2. From hive-server2.log, I can see the user runs 3 kinds of commands (grep the keyword “Executing command”). “load data inpath ‘s3://xxxxxx’ OVERWRITE into table xxxxx”, “insert overwrite table xxxxx select xxxxx” and “select xxx from xxxx”.

I did a test in my environment. If I run “load data inpath” to move a large file into a table, it will trigger 20* s3n-worker threads. However, if I run the “load data inpath” again, those 20* s3n-worker will be reused.

If I create a table in another S3 bucket, and run “load data inpath” again, there will be another 20* s3n-worker. From this point, we can see the number of s3n-worker threads is related to the number of S3 buckets.

3. However, I only saw 8 tables in hive-server2.log. Also, user reported that only 2 S3 buckets are used by this cluster.
[cc lang=”text”]
$ grep “load data inpath” hive-server2.log | grep -o -E ‘table [^ ]*’ | sort | uniq
table xxxx
table yyyy
table zzzz
[/cc]

4. Checked Hadoop 2.7.3 source code, we can find the reuse logic.

When hive performs file-related operations, it needs an FileSystem object to deal with the filesystem stuff. The FileSystem object will be reused if the key matches.
[cc lang=”java”]
// File: /hadoop/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java:352
/** Returns the FileSystem for this URI’s scheme and authority. The scheme
* of the URI determines a configuration property name,
* fs.scheme.class whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance’s initialize method.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();

LOG.info(“feichashao: Test7 – Dig more info ugi”);
LOG.info(“scheme: ” + scheme);
LOG.info(“authority: ” + authority);
LOG.info(“uri: ” + uri.toString());
LOG.info(“conf: ” + conf.toString());
LOG.info(“CACHE: ” + CACHE.hashCode());

if (scheme == null && authority == null) { // use default FS
return get(conf);
}

if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}

String disableCacheName = String.format(“fs.%s.impl.disable.cache”, scheme); //<----- If I turn cache off, I can see 20 new s3n-worker threads whenever I execute an S3 operation. if (conf.getBoolean(disableCacheName, false)) { return createFileSystem(uri, conf); } return CACHE.get(uri, conf); //<----- By default, it will return an FileSystem Object from CACHE. If CACHE does not have matched FileSystem Object, it will create a new one. } [/cc] Here we can see the logic of Cache. [cc lang="java"] /** Caching FileSystem objects */ static class Cache { private final ClientFinalizer clientFinalizer = new ClientFinalizer(); private final Map map = new HashMap();
private final Set toAutoClose = new HashSet();

FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
LOG.info(“KEY: ” + key.toString());
return getInternal(uri, conf, key); //<---- } private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ FileSystem fs; synchronized (this) { fs = map.get(key); //<---- Check if key matches any FS in cache. } if (fs != null) { LOG.info("feichashao - Hit Cache"); return fs; //<---- return the existing FS obj. } LOG.info("feichashao - Miss Cache"); fs = createFileSystem(uri, conf); //<---- If cache is missed, create a new FileSystem. synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) { ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); } fs.key = key; map.put(key, fs); if (conf.getBoolean("fs.automatic.close", true)) { toAutoClose.add(key); } return fs; } } [/cc] How is the key determined as matched? We can focus on the equals() part. In short, there are 3 factors to determine if the key matches: Scheme, Authority and ugi. For example, if user feichashao is running "load data inpath xxxx overwrite into table table1" via Hue, and table1 locates at s3://feichashao-hadoop/tmp1/, then the 3 factors would be: Scheme: S3 Authority: feichashao-hadoop ugi: feichashao (auth:PROXY) via hive (auth:SIMPLE) [cc lang="java"] // File: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java /** FileSystem.Cache.Key */ static class Key { final String scheme; final String authority; final UserGroupInformation ugi; final long unique; // an artificial way to make a key unique Key(URI uri, Configuration conf) throws IOException { this(uri, conf, 0); } // Configuration is not used actually. Key(URI uri, Configuration conf, long unique) throws IOException { scheme = uri.getScheme()==null ? "" : StringUtils.toLowerCase(uri.getScheme()); authority = uri.getAuthority()==null ? "" : StringUtils.toLowerCase(uri.getAuthority()); this.unique = unique; this.ugi = UserGroupInformation.getCurrentUser(); } static boolean isEqual(Object a, Object b) { return a == b || (a != null && a.equals(b)); } @Override public boolean equals(Object obj) { if (obj == this) { return true; } if (obj != null && obj instanceof Key) { Key that = (Key)obj; return isEqual(this.scheme, that.scheme) //<----- S3 && isEqual(this.authority, that.authority) // <----- S3 bucket name && isEqual(this.ugi, that.ugi) // <---- current user. && (this.unique == that.unique); // <---- Always 0. } return false; } @Override public String toString() { return "("+ugi.toString() + ")@" + scheme + "://" + authority; } [/cc] 5. So far, we know that if the combination matches, the corresponding 20 s3n-worker threads will be reused.

In user’s environment, there are only 2 S3 buckets in used, and only 3 users are using hive-server2. How come 100 * 20 s3n-worker threads?

6. From hive-server2.log, I noticed there are a lot IOException like below (about 500 per day):
[cc lang=”text”]
java.io.IOException: There is no primary group for UGI xxxxx (auth:PROXY) via hive (auth:SIMPLE)
at org.apache.hadoop.security.UserGroupInformation.getPrimaryGroupName(UserGroupInformation.java:1432) ~[hadoop-common-2.7.3-amzn-2.jar:?]
at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.getGroupNameFromUser(CurrentUserGroupInformation.java:49) [emrfs-hadoop-assembly-
2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.(CurrentUserGroupInformation.java:30) [emrfs-hadoop-assembly-2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.getCurrentUserGroupInformation(CurrentUserGroupInformation.java:67) [emrfs-hadoop
-assembly-2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.initialize(S3NativeFileSystem.java:436) [emrfs-hadoop-assembly-2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:109) [emrfs-hadoop-assembly-2.17.0.jar:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2717) [hadoop-common-2.7.3-amzn-2.jar:?] //<---------- at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) [hadoop-common-2.7.3-amzn-2.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2751) [hadoop-common-2.7.3-amzn-2.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2733) [hadoop-common-2.7.3-amzn-2.jar:?] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:377) [hadoop-common-2.7.3-amzn-2.jar:?] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) [hadoop-common-2.7.3-amzn-2.jar:?] [/cc] The IOException itself is not a problem, I can also see this exception in my test environment sometimes. However, each time we see such an Exception, there's a FileSystem object created, see the last 6 line. And that means, the Cache is not hit. What the ?! Let's review the logic of key matching: [cc lang="java"] return isEqual(this.scheme, that.scheme) //<----- S3 && isEqual(this.authority, that.authority) // <----- S3 bucket name && isEqual(this.ugi, that.ugi) // <---- current user. && (this.unique == that.unique); // <---- Always 0. [/cc] Schema and authority are string type, should not be a problem. UGI is more complicated. It contains not only the user name, but also things like "principles" and "credentials". 7. In the meanwhile, a colleague found that using multiple beeline sessions to connect hive-server2, and do some "load data inpath", there will be multiple 20* s3n-worker threads. That said, by using multiple beeline session, we can reproduce the issue! [cc lang="text"] ### Start a beeline session. beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath ‘s3://feichashao-hadoop/tmp1/sampletable/’ into table tmptbl9;

### This time, there will be 20 s3n-worker threads in hive-server2.

### Start another beeline session with the same user name and some load data command.
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath ‘s3://feichashao-hadoop/tmp1/sampletable/’ into table tmptbl9;

### This time, there will be 40 s3n-worker in hive-server2.

### Quit the beeline sessions, and the 40 s3n-worker threads remains.
0: jdbc:hive2://localhost:10000> !quit
[/cc]

Why the cache is not hit if I use multiple beeline sessions with same username? We can insert some LOG.info into hadoop-common to dig more information.
[cc lang=”java”]
public synchronized
static UserGroupInformation getCurrentUser() throws IOException {
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (context == null) {
LOG.info(“feichashao UGI context is null”);
} else {
LOG.info(“feichashao UGI context: ” + context.toString());
}

if (subject == null) {
LOG.info(“feichashao UGI subject is null”);
} else {
LOG.info(“feichashao UGI subject: ” + subject.toString());
}
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
LOG.info(“feichashao UGI position 1”);
return getLoginUser();
} else {
LOG.info(“feichashao UGI position 2″);
return new UserGroupInformation(subject);
}
}
[/cc]

[cc lang=”text”]
### 1st beeline session
fs.FileSystem (FileSystem.java:get(361)) – CACHE: 1563634025
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(636)) – feichashao UGI context: java.security.AccessControlContext@2450b102
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(642)) – feichashao UGI subject: Subject:
Principal: scott
Principal: hive (auth:SIMPLE)
Private Credential: org.apache.hadoop.security.Credentials@6c89490c

### 2nd beeline session
fs.FileSystem (FileSystem.java:get(361)) – CACHE: 1563634025
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(636)) – feichashao UGI context: java.security.AccessControlContext@1b768401
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(642)) – feichashao UGI subject: Subject:
Principal: scott
Principal: hive (auth:SIMPLE)
Private Credential: org.apache.hadoop.security.Credentials@10fc6de
[/cc]
The CACHE object is the same, so the 2 beeline session is reading from the same Cache. But, the UGI (Credentials specifically) are different between two beeline session.

8. OK, that answers the questions about 2000+ s3n-worker threads. If we run X beeline sessions with Y S3 buckets, there could be X*Y*20 s3n-worker threads.

Then, why those 2000+ threads remain after job finished? We expect to see those threads close after ending the beeline sessions, as those threads would no longer be reused.

9. A colleague found that it is a emrfs bug in EMR 5.6. There’s a thread-leaking problem after the filesystem is closed. As I don’t have access to emrfs code, I don’t have detail information on this.

Solution

To solve the thread-leaking issue, we can use EMR 5.8 or above version.

If we need to continue using EMR 5.6, we can update the emrfs package in EMR 5.6.

First, copy a newer version of emrfs rpm from EMR 5.8, the package locates at /var/aws/emr/packages/bigtop/emrfs/noarch/emrfs-2.18.0-1.amzn1.noarch.rpm. Then, update the package in EMR 5.6 master node by:
[cc lang=”text”]
$ sudo rpm -Uvh emrfs-2.18.0-1.amzn1.noarch.rpm
Preparing… ################################# [100%]
Updating / installing…
1:emrfs-2.18.0-1.amzn1 ################################# [ 50%]
Cleaning up / removing…
2:emrfs-2.17.0-1.amzn1 ################################# [100%]
[/cc]
Then, restart hive-server2 service:
[cc lang=”text”]
$ sudo initctl stop hive-server2
$ sudo initctl start hive-server2
[/cc]

Note

To compile hadoop, we can use below command:
[cc lang=”text”]
hadoop-2.7.3-src]$ mvn clean package
[/cc]
Then, replace /usr/lib/hadoop/hadoop-common.jar with the compiled jar.