tldr: This is a bug in EMR 5.6. Upgrading to EMR 5.8 or above can solve the issue.
Issue
User reported that he sees 2000+ s3n-worker threads after job finished. He has to restart the hive-server2 service everyday to mitigate the issue.
2000
The threads are repeating from s3n-worker-0 to s3n-worker-19. In another word, there are 100 * 20 s3n-worker threads.
......
"s3n-worker-1" #52 daemon prio=5 os_prio=0 tid=0x00007f5ac5462000 nid=0x109b waiting on condition [0x00007f5aca23f000]
"s3n-worker-0" #51 daemon prio=5 os_prio=0 tid=0x00007f5ac5480000 nid=0x109a waiting on condition [0x00007f5aca641000]
......
Environment
AWS EMR 5.6
Analysis
1. For "remaining threads" issue, first we might check the "lsof" output to see if there's any CLOSE_WAIT connections. However, in user's environment, there are only 2~3 CLOSE_WAIT connections. We can rule out this one.
2. From hive-server2.log, I can see the user runs 3 kinds of commands (grep the keyword "Executing command"). "load data inpath 's3://xxxxxx' OVERWRITE into table xxxxx", "insert overwrite table xxxxx select xxxxx" and "select xxx from xxxx".
I did a test in my environment. If I run "load data inpath" to move a large file into a table, it will trigger 20* s3n-worker threads. However, if I run the "load data inpath" again, those 20* s3n-worker will be reused.
If I create a table in another S3 bucket, and run "load data inpath" again, there will be another 20* s3n-worker. From this point, we can see the number of s3n-worker threads is related to the number of S3 buckets.
3. However, I only saw 8 tables in hive-server2.log. Also, user reported that only 2 S3 buckets are used by this cluster.
table xxxx
table yyyy
table zzzz
4. Checked Hadoop 2.7.3 source code, we can find the reuse logic.
When hive performs file-related operations, it needs an FileSystem object to deal with the filesystem stuff. The FileSystem object will be reused if the key matches.
/** Returns the FileSystem for this URI's scheme and authority. The scheme
* of the URI determines a configuration property name,
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
LOG.info("feichashao: Test7 - Dig more info ugi");
LOG.info("scheme: " + scheme);
LOG.info("authority: " + authority);
LOG.info("uri: " + uri.toString());
LOG.info("conf: " + conf.toString());
LOG.info("CACHE: " + CACHE.hashCode());
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); //<----- If I turn cache off, I can see 20 new s3n-worker threads whenever I execute an S3 operation.
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf); //<----- By default, it will return an FileSystem Object from CACHE. If CACHE does not have matched FileSystem Object, it will create a new one.
}
Here we can see the logic of Cache.
static class Cache {
private final ClientFinalizer clientFinalizer = new ClientFinalizer();
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
private final Set<Key> toAutoClose = new HashSet<Key>();
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
LOG.info("KEY: " + key.toString());
return getInternal(uri, conf, key); //<----
}
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
synchronized (this) {
fs = map.get(key); //<---- Check if key matches any FS in cache.
}
if (fs != null) {
LOG.info("feichashao - Hit Cache");
return fs; //<---- return the existing FS obj.
}
LOG.info("feichashao - Miss Cache");
fs = createFileSystem(uri, conf); //<---- If cache is missed, create a new FileSystem.
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
}
// now insert the new file system into the map
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
}
fs.key = key;
map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
toAutoClose.add(key);
}
return fs;
}
}
How is the key determined as matched? We can focus on the equals() part. In short, there are 3 factors to determine if the key matches: Scheme, Authority and ugi.
For example, if user feichashao is running "load data inpath xxxx overwrite into table table1" via Hue, and table1 locates at s3://feichashao-hadoop/tmp1/, then the 3 factors would be:
Scheme: S3
Authority: feichashao-hadoop
ugi: feichashao (auth:PROXY) via hive (auth:SIMPLE)
/** FileSystem.Cache.Key */
static class Key {
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
}
// Configuration is not used actually.
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this.unique = unique;
this.ugi = UserGroupInformation.getCurrentUser();
}
static boolean isEqual(Object a, Object b) {
return a == b || (a != null && a.equals(b));
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj != null && obj instanceof Key) {
Key that = (Key)obj;
return isEqual(this.scheme, that.scheme) //<----- S3
&& isEqual(this.authority, that.authority) // <----- S3 bucket name
&& isEqual(this.ugi, that.ugi) // <---- current user.
&& (this.unique == that.unique); // <---- Always 0.
}
return false;
}
@Override
public String toString() {
return "("+ugi.toString() + ")@" + scheme + "://" + authority;
}
5. So far, we know that if the combination
In user's environment, there are only 2 S3 buckets in used, and only 3 users are using hive-server2. How come 100 * 20 s3n-worker threads?
6. From hive-server2.log, I noticed there are a lot IOException like below (about 500 per day):
at org.apache.hadoop.security.UserGroupInformation.getPrimaryGroupName(UserGroupInformation.java:1432) ~[hadoop-common-2.7.3-amzn-2.jar:?]
at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.getGroupNameFromUser(CurrentUserGroupInformation.java:49) [emrfs-hadoop-assembly-
2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.<init>(CurrentUserGroupInformation.java:30) [emrfs-hadoop-assembly-2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.getCurrentUserGroupInformation(CurrentUserGroupInformation.java:67) [emrfs-hadoop
-assembly-2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.initialize(S3NativeFileSystem.java:436) [emrfs-hadoop-assembly-2.17.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:109) [emrfs-hadoop-assembly-2.17.0.jar:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2717) [hadoop-common-2.7.3-amzn-2.jar:?] //<----------
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2751) [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2733) [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:377) [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) [hadoop-common-2.7.3-amzn-2.jar:?]
The IOException itself is not a problem, I can also see this exception in my test environment sometimes. However, each time we see such an Exception, there's a FileSystem object created, see the last 6 line. And that means, the Cache is not hit.
What the ?!
Let's review the logic of key matching:
&& isEqual(this.authority, that.authority) // <----- S3 bucket name
&& isEqual(this.ugi, that.ugi) // <---- current user.
&& (this.unique == that.unique); // <---- Always 0.
Schema and authority are string type, should not be a problem. UGI is more complicated. It contains not only the user name, but also things like "principles" and "credentials".
7. In the meanwhile, a colleague found that using multiple beeline sessions to connect hive-server2, and do some "load data inpath", there will be multiple 20* s3n-worker threads. That said, by using multiple beeline session, we can reproduce the issue!
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath 's3://feichashao-hadoop/tmp1/sampletable/' into table tmptbl9;
### This time, there will be 20 s3n-worker threads in hive-server2.
### Start another beeline session with the same user name and some load data command.
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath 's3://feichashao-hadoop/tmp1/sampletable/' into table tmptbl9;
### This time, there will be 40 s3n-worker in hive-server2.
### Quit the beeline sessions, and the 40 s3n-worker threads remains.
0: jdbc:hive2://localhost:10000> !quit
Why the cache is not hit if I use multiple beeline sessions with same username? We can insert some LOG.info into hadoop-common to dig more information.
static UserGroupInformation getCurrentUser() throws IOException {
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (context == null) {
LOG.info("feichashao UGI context is null");
} else {
LOG.info("feichashao UGI context: " + context.toString());
}
if (subject == null) {
LOG.info("feichashao UGI subject is null");
} else {
LOG.info("feichashao UGI subject: " + subject.toString());
}
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
LOG.info("feichashao UGI position 1");
return getLoginUser();
} else {
LOG.info("feichashao UGI position 2");
return new UserGroupInformation(subject);
}
}
fs.FileSystem (FileSystem.java:get(361)) - CACHE: 1563634025
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(636)) - feichashao UGI context: java.security.AccessControlContext@2450b102
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(642)) - feichashao UGI subject: Subject:
Principal: scott
Principal: hive (auth:SIMPLE)
Private Credential: org.apache.hadoop.security.Credentials@6c89490c
### 2nd beeline session
fs.FileSystem (FileSystem.java:get(361)) - CACHE: 1563634025
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(636)) - feichashao UGI context: java.security.AccessControlContext@1b768401
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(642)) - feichashao UGI subject: Subject:
Principal: scott
Principal: hive (auth:SIMPLE)
Private Credential: org.apache.hadoop.security.Credentials@10fc6de
The CACHE object is the same, so the 2 beeline session is reading from the same Cache. But, the UGI (Credentials specifically) are different between two beeline session.
8. OK, that answers the questions about 2000+ s3n-worker threads. If we run X beeline sessions with Y S3 buckets, there could be X*Y*20 s3n-worker threads.
Then, why those 2000+ threads remain after job finished? We expect to see those threads close after ending the beeline sessions, as those threads would no longer be reused.
9. A colleague found that it is a emrfs bug in EMR 5.6. There's a thread-leaking problem after the filesystem is closed. As I don't have access to emrfs code, I don't have detail information on this.
Solution
To solve the thread-leaking issue, we can use EMR 5.8 or above version.
If we need to continue using EMR 5.6, we can update the emrfs package in EMR 5.6.
First, copy a newer version of emrfs rpm from EMR 5.8, the package locates at /var/aws/emr/packages/bigtop/emrfs/noarch/emrfs-2.18.0-1.amzn1.noarch.rpm. Then, update the package in EMR 5.6 master node by:
Preparing... ################################# [100%]
Updating / installing...
1:emrfs-2.18.0-1.amzn1 ################################# [ 50%]
Cleaning up / removing...
2:emrfs-2.17.0-1.amzn1 ################################# [100%]
Then, restart hive-server2 service:
$ sudo initctl start hive-server2
Note
To compile hadoop, we can use below command:
Then, replace /usr/lib/hadoop/hadoop-common.jar with the compiled jar.