Web3j监听功能代码研究
高洪涛 2021-03-19
本周深入研究了web3j工具包实现以太坊的监听功能,实现了交易监听和代币监听的方法,对监听过程中的常见问题进行了处理,本文就是对这部分开发经验的总结。
1 web3j 版本
Web3官网:https://www.web3labs.com/web3j-sdk
Docs: https://docs.web3j.io/latest/quickstart/
我使用了3个版本的web3j, 3.6、4.5.5、4.8.4,分别进行说明。
1.1 Web3j 3.6版本
3.6版本可以实现各种监听。来源已经搞不清楚了,是一个文件夹,包含有许多jar文件。
这个文件夹打包链接:
使用时需要添加到编译路径中。
1.2 4.5.5版本
我下载了4.5.5版本工具包,只有一个文件,里面包含了各种jar包:console-4.5.5-all.jar
经过测试,该包可以正常的完成查询和交易,但是无法监听,提示缺少rxjava相关jar,我就没有再折腾,放弃了。
1.3 4.8.4版本
该版本可以实现监听。从官网自动下载,maven工程中添加依赖:
<dependency>
<groupId>org.web3j</groupId>
<artifactId>core</artifactId>
<version>4.8.4</version>
</dependency>
要实现监听,还需要添加另一个依赖:
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.11</version>
</dependency>
使用到了json需要添加依赖:
<!-- JSONObject对象依赖的jar包 开始-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>net.sf.ezmorph</groupId>
<artifactId>ezmorph</artifactId>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.2.3</version>
<classifier>jdk15</classifier>
<!-- jdk版本-->
</dependency>
<!-- Json依赖架包下载结束-->
2 3.6版本监听
我认为监听有3种类型,分别是:
代币监听:监听ERC20代币交易,从startBlock区块开始监听token转账事件
重放交易:监听过往交易,需要指定开始和结束区块号
交易监听:从当前区块开始监听交易
其中交易监听收到的交易事件最多,包含了代币交易。代币监听优点是直接过滤指定的代币转账事件,用起来方便。重放交易是查询历史交易记录,可以针对某段时间查询交易。
2.1 代币监听
一般步骤:
- 指定监听事件event;
- 指定过滤器filter,包含起始区块,代币合约列表;
- 启动监听,
- 检查交易地址是否是自己需要的,是的话就调用具体的事件处理函数。
说明: 再检查交易地址是否是自己需要的这一步,一般做法是采用地址字符串比较,这样非常费时间,我把关注的地址保存在hashMap中,查找时直接调用htAddress.containsKey(fromAddress),这样速度最快。
publicList<String>contracts; //代币合约地址列表,可以存放多个地址
publicSubscriptiontokenSubscription; //token事件订阅对象
publicSubscriptionethMissSubscription;//ETH交易空档事件订阅对象
publicSubscriptionethSubscription; //ETH交易事件订阅对象
/*启动监听, 从startBlock区块开始监听token转账事件
代币监听会出现的问题: 如果启动区块距离当前区块稍远,非常可能的情况是中间出现的交易太多,监视代码内部出现空指针异常。
如果监听启动时接近当前区块问题出现概率小。
*/
public voidstartTransferListen_Token(BigIntegerstartBlock) {
//要监听的合约事件
Eventevent=newEvent("Transfer",
Arrays.asList(
newTypeReference<Address>() {},
newTypeReference<Address>() {},
newTypeReference<Uint>(){}));
//过滤器
EthFilterfilter=newEthFilter(
DefaultBlockParameter.valueOf(startBlock),
DefaultBlockParameterName.LATEST,
contracts);
filter.addSingleTopic(EventEncoder.encode(event));
//注册监听,解析日志中的事件
block_TokenSub=startBlock.intValue();
tokenSubscription=web3j.ethLogObservable(filter).subscribe(log-> {
block_TokenSub=log.getBlockNumber().intValue();
Stringtoken=log.getAddress(); //这是Token合约地址
StringtxHash=log.getTransactionHash();
List<String>topics=log.getTopics(); //提取转账记录
StringfromAddress="0x"+topics.get(1).substring(26);
StringtoAddress="0x"+topics.get(2).substring(26);
System.out.println(" ---token ="+token+", txHash ="+txHash);
//检查发送地址、接收地址是否属于系统用户, 不是系统用户就不予处理
if(htAddress.containsKey(fromAddress) ||htAddress.containsKey(toAddress)) {
Stringvalue1=log.getData();
BigIntegerbig=newBigInteger(value1.substring(2), 16);
BigDecimalvalue= Convert.fromWei(big.toString(), Convert.Unit.ETHER);
// System.out.println("value="+value);
Stringtimestamp="";
try{
EthBlockethBlock=web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(log.getBlockNumber()),false).send();
timestamp= String.valueOf(ethBlock.getBlock().getTimestamp());
}catch(IOExceptione) {
System.out.println("Block timestamp get failure,block number is {}"+log.getBlockNumber());
System.out.println("Block timestamp get failure,{}"+ e.getMessage());
}
//执行关键的回调函数
callBack_Token(token,txHash,fromAddress,toAddress,value,timestamp);
}
},error->{
System.out.println(" ### tokenSubscription error= "+error);
error.printStackTrace();
});
System.out.println("tokenSubscription ="+tokenSubscription);
System.out.println(tokenSubscription.isUnsubscribed());
}
2.2 重放交易
重放交易功能很重要,尤其涉及充币业务时,如果充币运行服务器停机维护,那么在此期间的代币充值就无法知晓造成遗漏损失。解决方法时充币运行服务器实时记录自己监听的区块高度,记录在数据库中,下次启动时查找这个区块到最新区块之间的交易。
说明: 当指定的区块交易重放完毕,该监听就自动终止。ethMissSubscription.isUnsubscribed()返回值就是false。
//启动监听以太坊上的过往交易
public voidstartReplayListen_ETH(BigIntegerstartBlockNum) {
System.out.println(" startReplayListen_ETH: startBlockNum="+startBlockNum);
//回放空档期间的交易
BigIntegercurrentBlockNum=null;
try{
//获取当前区块号
currentBlockNum=web3j.ethBlockNumber().send().getBlockNumber();
System.out.println(" 000 currentBlockNum= "+currentBlockNum.intValue());
if(startBlockNum.compareTo(currentBlockNum) > 0) {
return; //测试曾经出现currentBlockNum得到错误数字,比startBlockNum还小,这时不能启动监听
}
}catch(IOExceptione) {
//TODOAuto-generated catch block
System.out.println(" 111 getBlockNumber() Error: ");
e.printStackTrace();
return; //出现异常不能启动监听
}
//创建开始与结束区块, 重放这段时间内的交易,防止遗漏
DefaultBlockParameterstartBlock=newDefaultBlockParameterNumber(startBlockNum);
DefaultBlockParameterendBlock=newDefaultBlockParameterNumber(currentBlockNum);
System.out.println("[ startTransferListen_ETH: miss startBlock="+startBlockNum+", endBlock="+currentBlockNum+"]");
block_EthMissSub=startBlockNum.intValue();
ethMissSubscription=web3j.replayTransactionsObservable(startBlock,endBlock)
.subscribe(tx-> {
//更新检查过的区块高度
block_EthMissSub=tx.getBlockNumber().intValue();
System.out.println(" ---replayPastTransactionsFlowable block_EthMissSub = "+block_EthMissSub);
StringfromAddress=tx.getFrom();
StringtoAddress =tx.getTo();
// System.out.println("toAddress="+toAddress);
if(htAddress.containsKey(fromAddress) ||htAddress.containsKey(toAddress)) { //发现了指定地址上的交易
StringtxHash=tx.getHash();
BigDecimalvalue= Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);
Stringtimestamp="";
try{
EthBlockethBlock=web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()),false).send();
timestamp= String.valueOf(ethBlock.getBlock().getTimestamp());
}catch(IOExceptione) {
System.out.println("Block timestamp get failure,block number is {}"+tx.getBlockNumber());
System.out.println("Block timestamp get failure,{}"+ e.getMessage());
}
//监听以太坊上是否有系统生成地址的交易
callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);
}
},error->{
System.out.println(" ### replayPastTransactionsFlowable error= "+error);
error.printStackTrace();
});
}
2.3 交易监听
这种方式监听每一笔交易,以太坊上交易量太大,只能自己过滤出关注的交易进行处理。要尽可能的快速处理。可以考虑线程池模型进行处理。
//启动监听以太坊上的交易
public voidstartTransactionListen_ETH() {
//监听当前区块以后的交易
ethSubscription=web3j.transactionObservable().subscribe(tx-> {
//更新检查过的区块高度
block_EthSub=tx.getBlockNumber().intValue();
System.out.println(" ---transactionFlowable block_EthSub = "+block_EthSub);
StringtxHash=tx.getHash();
StringfromAddress=tx.getFrom();
StringtoAddress=tx.getTo();
if(htAddress.containsKey(fromAddress) ||htAddress.containsKey(toAddress)) { //发现了指定地址上的交易
BigDecimalvalue= Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);
Stringtimestamp="";
try{
EthBlockethBlock=web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()),false).send();
timestamp= String.valueOf(ethBlock.getBlock().getTimestamp());
}catch(IOExceptione) {
System.out.println("Block timestamp get failure,block number is {}"+tx.getBlockNumber());
System.out.println("Block timestamp get failure,{}"+ e.getMessage());
}
//监听以太坊上是否有系统生成地址的交易
callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);
}
},error->{
System.out.println(" ### transactionFlowable error= "+error);
error.printStackTrace();
});
}
最后回调函数示例:
//token转账事件的处理函数
public void callBack_Token(Stringtoken, StringtxHash, Stringfrom, Stringto, BigDecimalvalue, Stringtimestamp) {
System.out.println("----callBack_Token:");
System.out.println(" token = "+token);
System.out.println(" txHash = "+token);
System.out.println(" from = "+from);
System.out.println(" to = "+to);
System.out.println(" value = "+value.doubleValue());
}
3 4.8.4版本监听
版本升级后原来的监听函数改变了,用法如下:
publicDisposable tokenSubscription; //token事件订阅对象, 如果监视启动成功,isDisposed()返回false;否则监视失败返回true
publicDisposable ethMissSubscription;//ETH交易空档事件订阅对象
publicDisposable ethSubscription; //ETH交易事件订阅对象
tokenSubscription=web3j.ethLogFlowable(filter)
.subscribe(log-> {……});
ethMissSubscription=web3j.replayPastTransactionsFlowable(startBlock,endBlock)
.subscribe(tx-> {……});
ethSubscription=web3j.transactionFlowable()
.subscribe(tx-> {……});
判断监听对象是否运行:
tokenSubscription.isDisposed()
原来通过监听对象取消监听:
ethSubscription.cancel();
现在没有这个方法啦, 就是不能主动停止监听啦。
4 常见问题
4.1 监听无法启动
指定监听开始区块高度后,出现启动监听失败,监听对象为false。原因未知,我多次实践经验:
开始区块距离最新区块越远越容易失败;
一个开始区块启动监视成功,以后该区块重新监听也大概率成功,小概率失败;
即使监听成功,持续运行期间内部常常出现空指针异常,可能导致监视停止运行;
对于监听成功启动后出现的停止运行问题,我的做法是另开一个线程专门检查监听对象的状态,一旦发现停止运行就立即重新启动监听,该方法有效。
-----End-----