web3j订阅区块事件,一段时间之后报错:The filter has not been found. Filter id: 109578057392013476276104808887179439712
代码片段
@PostConstruct
public void start() throws OnErrorNotImplementedException, ConnectException {
WebSocketService socketService = new WebSocketService(config.getMainNetSocket(), true); web3J();
log.info("Started >>>>>>> ");
Web3j web3jSocket = Web3j.build(socketService);
socketService.connect();
synchronized (this) {
Disposable subscription = web3jSocket.transactionFlowable().subscribe(
tx ->
{
log.info("Tx Hash > " + tx.getHash());
getTransactionByHash(tx.getHash());
}, throwable -> {
log.info("Message >>>> " + throwable.getMessage());
throwable.printStackTrace();
});
}
问题日志
2020-06-10 11:27:25.284 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Tx Hash > 0x3ee4adbe9789189662cdc3d61655a4b68845c791f01c37db83c7e53ba8fa0a4c
2020-06-10 11:27:26.758 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Got To Address >>>> 0x72ebd62060f78d91dc4bc33e8d88f39307365f87
2020-06-10 11:27:26.760 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Tx Hash > 0x3d50ebcdb3fa95e81d9ef8f4ee978a80d3aa5a43b8d274d9f4860ee440d36e27
2020-06-10 11:27:28.187 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Got To Address >>>> 0xdac17f958d2ee523a2206206994597c13d831ec7
2020-06-10 11:27:28.189 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Tx Hash > 0xde55b36788acdfcf75fe080f1ca808a24e1e8e70bb266e0c6ce875892959d5d5
2020-06-10 11:27:29.602 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Got To Address >>>> 0x47dcfcaf9aca005cbe955fbaff11165e70d28bca
2020-06-10 11:27:29.603 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Tx Hash > 0x7592438f0a11f6cdcfb2f8374f8ffb5a2e7f6d0b1605b57676bfd0e26f9de2ff
2020-06-10 11:27:31.020 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Got To Address >>>> 0xca9ca2dabf9024e7588ab10c8c2116f5809fa561
2020-06-10 11:27:31.023 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Tx Hash > 0x3f36cd6caaa8b3d158bb3b6535cbe7502925a85ac44c405857473cec9aa8ee9f
2020-06-10 11:27:32.439 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Got To Address >>>> 0xdac17f958d2ee523a2206206994597c13d831ec7
2020-06-10 11:27:32.441 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Tx Hash > 0xd42751dded2588e9152bf3d81b3f84b729ee3aa17e6b46ebc8cbf09ee14e35bd
2020-06-10 11:27:34.639 INFO 20268 --- [pool-9-thread-1] c.c.e.c.e.services.SubscriptionService : Got To Address >>>> 0x98ad263a95f1ab1abff41f4d44b07c3240251a0a
2020-06-10 11:27:34.643 WARN 20268 --- [pool-9-thread-2] org.web3j.protocol.core.filters.Filter : The filter has not been found. Filter id: 66674882484826508490032443079398703497
2020-06-10 11:27:34.654 ERROR 20268 --- [pool-9-thread-2] org.web3j.protocol.core.filters.Filter : Error sending request
org.web3j.protocol.core.filters.FilterException: Error sending request
at org.web3j.protocol.core.filters.Filter.throwException(Filter.java:194) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.filters.Filter.run(Filter.java:104) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.filters.Filter.reinstallFilter(Filter.java:155) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.filters.Filter.pollFilter(Filter.java:137) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.filters.Filter.lambda$run$0(Filter.java:92) ~[core-5.0.0.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_241]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_241]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_241]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_241]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_241]
Caused by: java.io.IOException: Interrupted WebSocket request
at org.web3j.protocol.websocket.WebSocketService.send(WebSocketService.java:168) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.Request.send(Request.java:87) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.filters.BlockFilter.sendRequest(BlockFil
ter.java:34) ~[core-5.0.0.jar:na]
at org.web3j.protocol.core.filters.Filter.run(Filter.java:59) ~[core-5.0.0.jar:na]
... 10 common frames omitted
Caused by: java.lang.InterruptedException: null
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) ~[na:1.8.0_241]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[na:1.8.0_241]
at org.web3j.protocol.websocket.WebSocketService.send(WebSocketService.java:165) ~[core-5.0.0.jar:na]
... 13 common frames omitted
解决方法
Web3j web3j=Web3j.build((new HttpService("https://mainnet.infura.io/v3/<Infura Key>")));
@PostConstruct
public void getLatestBlock(){
Thread thread = new Thread();
BlockNumberEntity entity;
Optional<BlockNumberEntity> optionalBlockNumberEntity=blockNumberRepository.findTopByIdGreaterThanEqual(1);
if (optionalBlockNumberEntity.isPresent()) {
thread.start();
entity = optionalBlockNumberEntity.get();
log.info("Last block number from DB >>>> " +entity.getLastBlockNumber());
web3j.replayPastAndFutureTransactionsFlowable(DefaultBlockParameter.valueOf(entity.getLastBlockNumber().subtract(BigInteger.ONE)))
.subscribe(tx -> {
entity.setLastBlockNumber(tx.getBlockNumber());
log.info("Block Number >>>> " + tx.getBlockNumber());
log.info("To Address >>>>> " + tx.getTo());
checkWalletAddress(tx.getHash(), tx.getTo());
}, exception -> {
log.info(exception.getMessage());
if (exception != null) {
blockNumberRepository.deleteAll();
blockNumberRepository.save(entity);
thread.interrupt();
getLatestBlock();
}
});
}
else{
thread.start();
entity=new BlockNumberEntity();
web3j.transactionFlowable().subscribe(tx -> {
entity.setLastBlockNumber(tx.getBlockNumber());
log.info("Block Number >>>> " + tx.getBlockNumber());
log.info("To Address >>>>> " + tx.getTo());
checkWalletAddress(tx.getHash(), tx.getTo());
}, exception -> {
log.info(exception.getMessage());
if (exception != null) {
blockNumberRepository.deleteAll();
blockNumberRepository.save(entity);
thread.interrupt();
getLatestBlock();
}
});
}