现象
21/12/10 16:24:13 WARN hdfs.BucketWriter: Closing file: hdfs://xxx/origin_data/kafka/topicBoxLauncher/2021/12/10/09/log.1639099219793.tmp failed. Will retry again in 180 seconds.
java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1993)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2404)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:2349)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync(AbstractHDFSWriter.java:266)
at org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:134)
at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:319)
at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:316)
at org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:727)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:724)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/12/10 16:54:34 ERROR hdfs.HDFSEventSink: process failed
java.lang.InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
at org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:708)
at org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:477)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:441)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
21/12/10 16:54:34 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
at org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:708)
at org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:477)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:441)
... 3 more
原因
虽然在HDFSWriter.close(.aaa.lzo.tmp)时发生了TimeoutException,进行了Cancel操作造成InterruptedException,并使用线程池进行文件close的重试。但是rename过程是依旧会继续执行的,因为上述放入close重试过程是通过submit()方法放入线程池异步执行的。
也就是说,文件名.aaa.lzo.tmp已经被修改为aaa.lzo,但一直在以旧的文件名.aaa.lzo.tmp重复close文件。
解决方案
手动关闭
hdfs fsck /xxx/20190813 -openforwrite | grep OPENFORWRITE |awk -F ’ ’ ‘{print $1}’
hdfs debug recoverLease -path /xxx/20190813/aaa.lzo
修改conf
app.sinks.k3.hdfs.callTimeout=600000
app.sinks.k3.hdfs.closeTries=20
app.sinks.k3.hdfs.idleTimeout = 10
根本方案 修改源码
public synchronized void close(boolean callCloseCallback)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
try {
flush();
} catch (IOException e) {
LOG.warn("pre-close flush failed", e);
}
boolean failedToClose = false;
LOG.info("Closing {}", bucketPath);
CallRunner<Void> closeCallRunner = createCloseCallRunner();
int tryTime = 1;
while (isOpen && tryTime <= 5) {
try {
callWithTimeout(closeCallRunner);
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn(
"failed to close() HDFSWriter for file (try times:" + tryTime + "): " + bucketPath +
". Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
failedToClose = true;
}
if (failedToClose) {
isOpen = true;
tryTime++;
Thread.sleep(this.callTimeout);
} else {
isOpen = false;
}
}
//如果isopen失敗
if (isOpen) {
LOG.error("failed to close file: " + bucketPath + " after " + tryTime + " tries.");
} else {
LOG.info("HDFSWriter is already closed: {}", bucketPath);
}
// NOTE: timed rolls go through this codepath as well as other roll types
if (timedRollFuture != null && !timedRollFuture.isDone())
{
timedRollFuture.cancel(false); // do not cancel myself if running!
timedRollFuture = null;
}
if (idleFuture != null && !idleFuture.isDone())
{
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
if (bucketPath != null && fileSystem != null) {
// could block or throw IOException
try {
renameBucket(bucketPath, targetPath, fileSystem);
} catch (Exception e) {
LOG.warn(
"failed to rename() file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
final Callable<Void> scheduledRename =
createScheduledRenameCallable();
timedRollerPool.schedule(scheduledRename, retryInterval,
TimeUnit.SECONDS);
}
}
if (callCloseCallback)
{
runCloseAction();
closed = true;
}
}
版权声明:本文为qq_16038125原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。