hive-server2 remains 2k+ s3n-worker threads after job finished


tldr: This is a bug in EMR 5.6. Upgrading to EMR 5.8 or above can solve the issue.

Issue

User reported that he sees 2000+ s3n-worker threads after job finished. He has to restart the hive-server2 service everyday to mitigate the issue.

# sudo -u hive jstack 11089 | grep s3n-worker | wc -l
2000

The threads are repeating from s3n-worker-0 to s3n-worker-19. In another word, there are 100 * 20 s3n-worker threads.

"s3n-worker-19" #70 daemon prio=5 os_prio=0 tid=0x00007f5ac4cf0800 nid=0x10ad waiting on condition [0x00007f5ac1dee000]
......
"s3n-worker-1" #52 daemon prio=5 os_prio=0 tid=0x00007f5ac5462000 nid=0x109b waiting on condition [0x00007f5aca23f000]
"s3n-worker-0" #51 daemon prio=5 os_prio=0 tid=0x00007f5ac5480000 nid=0x109a waiting on condition [0x00007f5aca641000]
......

Environment

AWS EMR 5.6

Analysis

1. For "remaining threads" issue, first we might check the "lsof" output to see if there's any CLOSE_WAIT connections. However, in user's environment, there are only 2~3 CLOSE_WAIT connections. We can rule out this one.

2. From hive-server2.log, I can see the user runs 3 kinds of commands (grep the keyword "Executing command"). "load data inpath 's3://xxxxxx' OVERWRITE into table xxxxx", "insert overwrite table xxxxx select xxxxx" and "select xxx from xxxx".

I did a test in my environment. If I run "load data inpath" to move a large file into a table, it will trigger 20* s3n-worker threads. However, if I run the "load data inpath" again, those 20* s3n-worker will be reused.

If I create a table in another S3 bucket, and run "load data inpath" again, there will be another 20* s3n-worker. From this point, we can see the number of s3n-worker threads is related to the number of S3 buckets.

3. However, I only saw 8 tables in hive-server2.log. Also, user reported that only 2 S3 buckets are used by this cluster.

$ grep "load data inpath" hive-server2.log | grep -o -E 'table [^ ]*' | sort | uniq
table xxxx
table yyyy
table zzzz

4. Checked Hadoop 2.7.3 source code, we can find the reuse logic.

When hive performs file-related operations, it needs an FileSystem object to deal with the filesystem stuff. The FileSystem object will be reused if the key matches.

// File: /hadoop/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java:352
  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */

  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    LOG.info("feichashao: Test7 - Dig more info ugi");
    LOG.info("scheme: " + scheme);
    LOG.info("authority: " + authority);
    LOG.info("uri: " + uri.toString());
    LOG.info("conf: " + conf.toString());
    LOG.info("CACHE: " + CACHE.hashCode());

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }

    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); //<----- If I turn cache off, I can see 20 new s3n-worker threads whenever I execute an S3 operation.
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);   //<----- By default, it will return an FileSystem Object from CACHE. If CACHE does not have matched FileSystem Object, it will create a new one.
  }

Here we can see the logic of Cache.

  /** Caching FileSystem objects */
  static class Cache {
    private final ClientFinalizer clientFinalizer = new ClientFinalizer();

    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    private final Set<Key> toAutoClose = new HashSet<Key>();

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      LOG.info("KEY: " + key.toString());
      return getInternal(uri, conf, key);   //<----
    }

    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      synchronized (this) {
        fs = map.get(key);  //<---- Check if key matches any FS in cache.
      }
      if (fs != null) {
        LOG.info("feichashao - Hit Cache");
        return fs; //<---- return the existing FS obj.
      }

      LOG.info("feichashao - Miss Cache");
      fs = createFileSystem(uri, conf);   //<---- If cache is missed, create a new FileSystem.
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }

How is the key determined as matched? We can focus on the equals() part. In short, there are 3 factors to determine if the key matches: Scheme, Authority and ugi.

For example, if user feichashao is running "load data inpath xxxx overwrite into table table1" via Hue, and table1 locates at s3://feichashao-hadoop/tmp1/, then the 3 factors would be:
Scheme: S3
Authority: feichashao-hadoop
ugi: feichashao (auth:PROXY) via hive (auth:SIMPLE)

// File: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

    /** FileSystem.Cache.Key */
    static class Key {
      final String scheme;
      final String authority;
      final UserGroupInformation ugi;
      final long unique;   // an artificial way to make a key unique

      Key(URI uri, Configuration conf) throws IOException {
        this(uri, conf, 0);
      }

     // Configuration is not used actually.
      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null ?
            "" : StringUtils.toLowerCase(uri.getScheme());
        authority = uri.getAuthority()==null ?
            "" : StringUtils.toLowerCase(uri.getAuthority());
        this.unique = unique;

        this.ugi = UserGroupInformation.getCurrentUser();
      }

      static boolean isEqual(Object a, Object b) {
        return a == b || (a != null && a.equals(b));
      }

      @Override
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj != null && obj instanceof Key) {
          Key that = (Key)obj;
          return isEqual(this.scheme, that.scheme)  //<----- S3
                 && isEqual(this.authority, that.authority) // <----- S3 bucket name
                 && isEqual(this.ugi, that.ugi)  // <---- current user.
                 && (this.unique == that.unique); // <---- Always 0.
        }
        return false;
      }
     
      @Override
      public String toString() {
        return "("+ugi.toString() + ")@" + scheme + "://" + authority;
      }

5. So far, we know that if the combination matches, the corresponding 20 s3n-worker threads will be reused.

In user's environment, there are only 2 S3 buckets in used, and only 3 users are using hive-server2. How come 100 * 20 s3n-worker threads?

6. From hive-server2.log, I noticed there are a lot IOException like below (about 500 per day):

java.io.IOException: There is no primary group for UGI xxxxx (auth:PROXY) via hive (auth:SIMPLE)
        at org.apache.hadoop.security.UserGroupInformation.getPrimaryGroupName(UserGroupInformation.java:1432) ~[hadoop-common-2.7.3-amzn-2.jar:?]
        at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.getGroupNameFromUser(CurrentUserGroupInformation.java:49) [emrfs-hadoop-assembly-
2.17.0.jar:?]
        at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.<init>(CurrentUserGroupInformation.java:30) [emrfs-hadoop-assembly-2.17.0.jar:?]
        at com.amazon.ws.emr.hadoop.fs.util.CurrentUserGroupInformation.getCurrentUserGroupInformation(CurrentUserGroupInformation.java:67) [emrfs-hadoop
-assembly-2.17.0.jar:?]
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.initialize(S3NativeFileSystem.java:436) [emrfs-hadoop-assembly-2.17.0.jar:?]
        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:109) [emrfs-hadoop-assembly-2.17.0.jar:?]
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2717) [hadoop-common-2.7.3-amzn-2.jar:?] //<----------
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) [hadoop-common-2.7.3-amzn-2.jar:?]
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2751) [hadoop-common-2.7.3-amzn-2.jar:?]
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2733) [hadoop-common-2.7.3-amzn-2.jar:?]
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:377) [hadoop-common-2.7.3-amzn-2.jar:?]
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) [hadoop-common-2.7.3-amzn-2.jar:?]

The IOException itself is not a problem, I can also see this exception in my test environment sometimes. However, each time we see such an Exception, there's a FileSystem object created, see the last 6 line. And that means, the Cache is not hit.

What the ?!

Let's review the logic of key matching:

 return isEqual(this.scheme, that.scheme)  //<----- S3
       && isEqual(this.authority, that.authority) // <----- S3 bucket name
       && isEqual(this.ugi, that.ugi)  // <---- current user.
      && (this.unique == that.unique); // <---- Always 0.

Schema and authority are string type, should not be a problem. UGI is more complicated. It contains not only the user name, but also things like "principles" and "credentials".

7. In the meanwhile, a colleague found that using multiple beeline sessions to connect hive-server2, and do some "load data inpath", there will be multiple 20* s3n-worker threads. That said, by using multiple beeline session, we can reproduce the issue!

### Start a beeline session.
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath 's3://feichashao-hadoop/tmp1/sampletable/' into table tmptbl9;

### This time, there will be 20 s3n-worker threads in hive-server2.

### Start another beeline session with the same user name and some load data command.
beeline> !connect jdbc:hive2://localhost:10000 scott tiger
0: jdbc:hive2://localhost:10000> load data inpath 's3://feichashao-hadoop/tmp1/sampletable/' into table tmptbl9;

### This time, there will be 40 s3n-worker in hive-server2.

### Quit the beeline sessions, and the 40 s3n-worker threads remains.
0: jdbc:hive2://localhost:10000> !quit

Why the cache is not hit if I use multiple beeline sessions with same username? We can insert some LOG.info into hadoop-common to dig more information.

  public synchronized
  static UserGroupInformation getCurrentUser() throws IOException {
    AccessControlContext context = AccessController.getContext();
    Subject subject = Subject.getSubject(context);
    if (context == null) {
        LOG.info("feichashao UGI context is null");
    } else {
        LOG.info("feichashao UGI context: " + context.toString());
    }

    if (subject == null) {
        LOG.info("feichashao UGI subject is null");
    } else {
        LOG.info("feichashao UGI subject: " + subject.toString());
    }
    if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
      LOG.info("feichashao UGI position 1");
      return getLoginUser();
    } else {
      LOG.info("feichashao UGI position 2");
      return new UserGroupInformation(subject);
    }
  }
### 1st beeline session
fs.FileSystem (FileSystem.java:get(361)) - CACHE: 1563634025
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(636)) - feichashao UGI context: java.security.AccessControlContext@2450b102
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(642)) - feichashao UGI subject: Subject:
        Principal: scott
        Principal: hive (auth:SIMPLE)
        Private Credential: org.apache.hadoop.security.Credentials@6c89490c

### 2nd beeline session
fs.FileSystem (FileSystem.java:get(361)) - CACHE: 1563634025
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(636)) - feichashao UGI context: java.security.AccessControlContext@1b768401
security.UserGroupInformation (UserGroupInformation.java:getCurrentUser(642)) - feichashao UGI subject: Subject:
        Principal: scott
        Principal: hive (auth:SIMPLE)
        Private Credential: org.apache.hadoop.security.Credentials@10fc6de

The CACHE object is the same, so the 2 beeline session is reading from the same Cache. But, the UGI (Credentials specifically) are different between two beeline session.

8. OK, that answers the questions about 2000+ s3n-worker threads. If we run X beeline sessions with Y S3 buckets, there could be X*Y*20 s3n-worker threads.

Then, why those 2000+ threads remain after job finished? We expect to see those threads close after ending the beeline sessions, as those threads would no longer be reused.

9. A colleague found that it is a emrfs bug in EMR 5.6. There's a thread-leaking problem after the filesystem is closed. As I don't have access to emrfs code, I don't have detail information on this.

Solution

To solve the thread-leaking issue, we can use EMR 5.8 or above version.

If we need to continue using EMR 5.6, we can update the emrfs package in EMR 5.6.

First, copy a newer version of emrfs rpm from EMR 5.8, the package locates at /var/aws/emr/packages/bigtop/emrfs/noarch/emrfs-2.18.0-1.amzn1.noarch.rpm. Then, update the package in EMR 5.6 master node by:

$ sudo rpm -Uvh emrfs-2.18.0-1.amzn1.noarch.rpm
 Preparing...                          ################################# [100%]
 Updating / installing...
   1:emrfs-2.18.0-1.amzn1             ################################# [ 50%]
 Cleaning up / removing...
   2:emrfs-2.17.0-1.amzn1             ################################# [100%]

Then, restart hive-server2 service:

$ sudo initctl stop hive-server2
$ sudo initctl start hive-server2

Note

To compile hadoop, we can use below command:

hadoop-2.7.3-src]$ mvn clean package

Then, replace /usr/lib/hadoop/hadoop-common.jar with the compiled jar.