flume-异常Closing file:log.xxxtmp failed. Will retry again in 180 seconds

现象

21/12/10 16:24:13 WARN hdfs.BucketWriter: Closing file: hdfs://xxx/origin_data/kafka/topicBoxLauncher/2021/12/10/09/log.1639099219793.tmp failed. Will retry again in 180 seconds.
java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1993)
        at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2404)
        at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:2349)
        at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync(AbstractHDFSWriter.java:266)
        at org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:134)
        at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:319)
        at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:316)
        at org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:727)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:724)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

21/12/10 16:54:34 ERROR hdfs.HDFSEventSink: process failed
java.lang.InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
        at org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:708)
        at org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:477)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:441)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:748)
21/12/10 16:54:34 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
        at org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:708)
        at org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:477)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:441)
        ... 3 more

原因

虽然在HDFSWriter.close(.aaa.lzo.tmp)时发生了TimeoutException,进行了Cancel操作造成InterruptedException,并使用线程池进行文件close的重试。但是rename过程是依旧会继续执行的,因为上述放入close重试过程是通过submit()方法放入线程池异步执行的。
也就是说,文件名.aaa.lzo.tmp已经被修改为aaa.lzo,但一直在以旧的文件名.aaa.lzo.tmp重复close文件。

解决方案

手动关闭
hdfs fsck /xxx/20190813 -openforwrite | grep OPENFORWRITE |awk -F ’ ’ ‘{print $1}’
hdfs debug recoverLease -path /xxx/20190813/aaa.lzo

修改conf

app.sinks.k3.hdfs.callTimeout=600000
app.sinks.k3.hdfs.closeTries=20
app.sinks.k3.hdfs.idleTimeout = 10

根本方案 修改源码

public synchronized void close(boolean callCloseCallback)
           throws IOException, InterruptedException {
       checkAndThrowInterruptedException();
       try {
           flush();
       } catch (IOException e) {
           LOG.warn("pre-close flush failed", e);
       }
       boolean failedToClose = false;
       LOG.info("Closing {}", bucketPath);
       CallRunner<Void> closeCallRunner = createCloseCallRunner();
       int tryTime = 1;
       while (isOpen && tryTime <= 5) {
           try {
               callWithTimeout(closeCallRunner);
               sinkCounter.incrementConnectionClosedCount();
           } catch (IOException e) {
               LOG.warn(
                       "failed to close() HDFSWriter for file (try times:" + tryTime + "): " + bucketPath +
                               ". Exception follows.", e);
               sinkCounter.incrementConnectionFailedCount();
               failedToClose = true;
           }
           if (failedToClose) {
               isOpen = true;
               tryTime++;
               Thread.sleep(this.callTimeout);
           } else {
               isOpen = false;
           }
       }
       //如果isopen失敗
       if (isOpen) {
           LOG.error("failed to close file: " + bucketPath + " after " + tryTime + " tries.");
       } else {
           LOG.info("HDFSWriter is already closed: {}", bucketPath);
       }
 
       // NOTE: timed rolls go through this codepath as well as other roll types
       if (timedRollFuture != null && !timedRollFuture.isDone())
 
       {
           timedRollFuture.cancel(false); // do not cancel myself if running!
           timedRollFuture = null;
       }
 
       if (idleFuture != null && !idleFuture.isDone())
 
       {
           idleFuture.cancel(false); // do not cancel myself if running!
           idleFuture = null;
       }
 
       if (bucketPath != null && fileSystem != null) {
           // could block or throw IOException
           try {
               renameBucket(bucketPath, targetPath, fileSystem);
           } catch (Exception e) {
               LOG.warn(
                       "failed to rename() file (" + bucketPath +
                               "). Exception follows.", e);
               sinkCounter.incrementConnectionFailedCount();
               final Callable<Void> scheduledRename =
                       createScheduledRenameCallable();
               timedRollerPool.schedule(scheduledRename, retryInterval,
                       TimeUnit.SECONDS);
           }
       }
       if (callCloseCallback)
 
       {
           runCloseAction();
           closed = true;
       }
   }

版权声明:本文为qq_16038125原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。