tldr: This is a bug in EMR 5.6. Upgrading to EMR 5.8 or above can solve the issue.
User reported that he sees 2000+ s3n-worker threads after job finished. He has to restart the hive-server2 service everyday to mitigate the issue.
The threads are repeating from s3n-worker-0 to s3n-worker-19. In another word, there are 100 * 20 s3n-worker threads.
"s3n-worker-1" #52 daemon prio=5 os_prio=0 tid=0x00007f5ac5462000 nid=0x109b waiting on condition [0x00007f5aca23f000]
"s3n-worker-0" #51 daemon prio=5 os_prio=0 tid=0x00007f5ac5480000 nid=0x109a waiting on condition [0x00007f5aca641000]
1. For "remaining threads" issue, first we might check the "lsof" output to see if there's any CLOSE_WAIT connections. However, in user's environment, there are only 2~3 CLOSE_WAIT connections. We can rule out this one.
2. From hive-server2.log, I can see the user runs 3 kinds of commands (grep the keyword "Executing command"). "load data inpath 's3://xxxxxx' OVERWRITE into table xxxxx", "insert overwrite table xxxxx select xxxxx" and "select xxx from xxxx".
I did a test in my environment. If I run "load data inpath" to move a large file into a table, it will trigger 20* s3n-worker threads. However, if I run the "load data inpath" again, those 20* s3n-worker will be reused.
If I create a table in another S3 bucket, and run "load data inpath" again, there will be another 20* s3n-worker. From this point, we can see the number of s3n-worker threads is related to the number of S3 buckets.
3. However, I only saw 8 tables in hive-server2.log. Also, user reported that only 2 S3 buckets are used by this cluster.
table xxxx
table yyyy
table zzzz
4. Checked Hadoop 2.7.3 source code, we can find the reuse logic.
When hive performs file-related operations, it needs an FileSystem object to deal with the filesystem stuff. The FileSystem object will be reused if the key matches.
/** Returns the FileSystem for this URI's scheme and authority. The scheme
* of the URI determines a configuration property name,
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();"feichashao: Test7 - Dig more info ugi");"scheme: " + scheme);"authority: " + authority);"uri: " + uri.toString());"conf: " + conf.toString());"CACHE: " + CACHE.hashCode());
if (scheme == null && authority == null) { // use default FS
return get(conf);
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); //<----- If I turn cache off, I can see 20 new s3n-worker threads whenever I execute an S3 operation.
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
return CACHE.get(uri, conf); //<----- By default, it will return an FileSystem Object from CACHE. If CACHE does not have matched FileSystem Object, it will create a new one.
Here we can see the logic of Cache.
static class Cache {
private final ClientFinalizer clientFinalizer = new ClientFinalizer();
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
private final Set<Key> toAutoClose = new HashSet<Key>();
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);"KEY: " + key.toString());
return getInternal(uri, conf, key); //<----
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
synchronized (this) {
fs = map.get(key); //<---- Check if key matches any FS in cache.
if (fs != null) {"feichashao - Hit Cache");
return fs; //<---- return the existing FS obj.
}"feichashao - Miss Cache");
fs = createFileSystem(uri, conf); //<---- If cache is missed, create a new FileSystem.
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
// now insert the new file system into the map
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
fs.key = key;
map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
return fs;
How is the key determined as matched? We can focus on the equals() part. In short, there are 3 factors to determine if the key matches: Scheme, Authority and ugi.
For example, if user feichashao is running "load data inpath xxxx overwrite into table table1" via Hue, and table1 locates at s3://feichashao-hadoop/tmp1/, then the 3 factors would be:
Scheme: S3
Authority: feichashao-hadoop
ugi: feichashao (auth:PROXY) via hive (auth:SIMPLE)
/** FileSystem.Cache.Key */
static class Key {
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
// Configuration is not used actually.
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this.unique = unique;
this.ugi = UserGroupInformation.getCurrentUser();
static boolean isEqual(Object a, Object b) {
return a == b || (a != null && a.equals(b));
public boolean equals(Object obj) {
if (obj == this) {
return true;
if (obj != null && obj instanceof Key) {
Key that = (Key)obj;
return isEqual(this.scheme, that.scheme) //<----- S3
&& isEqual(this.authority, that.authority) // <----- S3 bucket name
&& isEqual(this.ugi, that.ugi) // <---- current user.
&& (this.unique == that.unique); // <---- Always 0.
return false;
public String toString() {
return "("+ugi.toString() + ")@" + scheme + "://" + authority;
5. So far, we know that if the combination
In user's environment, there are only 2 S3 buckets in used, and only 3 users are using hive-server2. How come 100 * 20 s3n-worker threads?
6. From hive-server2.log, I noticed there are a lot IOException like below (about 500 per day):
at ~[hadoop-common-2.7.3-amzn-2.jar:?]
at [emrfs-hadoop-assembly-
at<init>( [emrfs-hadoop-assembly-2.17.0.jar:?]
at [emrfs-hadoop
at [emrfs-hadoop-assembly-2.17.0.jar:?]
at [emrfs-hadoop-assembly-2.17.0.jar:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem( [hadoop-common-2.7.3-amzn-2.jar:?] //<----------
at org.apache.hadoop.fs.FileSystem.access$200( [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal( [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.get( [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.FileSystem.get( [hadoop-common-2.7.3-amzn-2.jar:?]
at org.apache.hadoop.fs.Path.getFileSystem( [hadoop-common-2.7.3-amzn-2.jar:?]
The IOException itself is not a problem, I can also see this exception in my test environment sometimes. However, each time we see such an Exception, there's a FileSystem object created, see the last 6 line. And that means, the Cache is not hit.
What the ?!
Let's review the logic of key matching:
&& isEqual(this.authority, that.authority) // <----- S3 bucket name
&& isEqual(this.ugi, that.ugi) // <---- current user.
&& (this.unique == that.unique); // <---- Always 0.
Schema and authority are string type, should not be a problem. UGI is more complicated. It contains not only the user name, but also things like "principles" and "credentials".
7. In the meanwhile, a colleague found that using multiple beeline sessions to connect hive-server2, and do some "load data inpath", there will be multiple 20* s3n-worker threads. That said, by using multiple beeline session, we can reproduce the issue!
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath 's3://feichashao-hadoop/tmp1/sampletable/' into table tmptbl9;
### This time, there will be 20 s3n-worker threads in hive-server2.
### Start another beeline session with the same user name and some load data command.
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath 's3://feichashao-hadoop/tmp1/sampletable/' into table tmptbl9;
### This time, there will be 40 s3n-worker in hive-server2.
### Quit the beeline sessions, and the 40 s3n-worker threads remains.
0: jdbc:hive2://localhost:10000> !quit
Why the cache is not hit if I use multiple beeline sessions with same username? We can insert some into hadoop-common to dig more information.
static UserGroupInformation getCurrentUser() throws IOException {
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (context == null) {"feichashao UGI context is null");
} else {"feichashao UGI context: " + context.toString());
if (subject == null) {"feichashao UGI subject is null");
} else {"feichashao UGI subject: " + subject.toString());
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {"feichashao UGI position 1");
return getLoginUser();
} else {"feichashao UGI position 2");
return new UserGroupInformation(subject);
fs.FileSystem ( - CACHE: 1563634025
security.UserGroupInformation ( - feichashao UGI context:
security.UserGroupInformation ( - feichashao UGI subject: Subject:
Principal: scott
Principal: hive (auth:SIMPLE)
Private Credential:
### 2nd beeline session
fs.FileSystem ( - CACHE: 1563634025
security.UserGroupInformation ( - feichashao UGI context:
security.UserGroupInformation ( - feichashao UGI subject: Subject:
Principal: scott
Principal: hive (auth:SIMPLE)
Private Credential:
The CACHE object is the same, so the 2 beeline session is reading from the same Cache. But, the UGI (Credentials specifically) are different between two beeline session.
8. OK, that answers the questions about 2000+ s3n-worker threads. If we run X beeline sessions with Y S3 buckets, there could be X*Y*20 s3n-worker threads.
Then, why those 2000+ threads remain after job finished? We expect to see those threads close after ending the beeline sessions, as those threads would no longer be reused.
9. A colleague found that it is a emrfs bug in EMR 5.6. There's a thread-leaking problem after the filesystem is closed. As I don't have access to emrfs code, I don't have detail information on this.
To solve the thread-leaking issue, we can use EMR 5.8 or above version.
If we need to continue using EMR 5.6, we can update the emrfs package in EMR 5.6.
First, copy a newer version of emrfs rpm from EMR 5.8, the package locates at /var/aws/emr/packages/bigtop/emrfs/noarch/emrfs-2.18.0-1.amzn1.noarch.rpm. Then, update the package in EMR 5.6 master node by:
Preparing... ################################# [100%]
Updating / installing...
1:emrfs-2.18.0-1.amzn1 ################################# [ 50%]
Cleaning up / removing...
2:emrfs-2.17.0-1.amzn1 ################################# [100%]
Then, restart hive-server2 service:
$ sudo initctl start hive-server2
To compile hadoop, we can use below command:
Then, replace /usr/lib/hadoop/hadoop-common.jar with the compiled jar.