调整区块链开发架构,优化链服务与主业务的耦合性,以及策略模式的应用场景…
背景
在链服务的开发中,由于交易成功具有一定的滞后性,即交易成功上链,但是最后这比交易的状态失败了。 出现的问题可能有:
- 并发问题:同一个钱包地址发起交易,第一笔交易未到pending状态,第二笔交易很快发布,导致使用了相同的nonce,导致交易失败。
- 合约设计:同一block中,不允许有重复的函数调用,否则会生成一个相同的链上签名hash,导致交易失败。
- 业务层面:需要根据交易状态进行后续的上链业务处理,但是由于交易状态的滞后性,导致第一笔交易未成功,之后的业务全部失败。
- 设计层面:由于上链调用的底层方法相同,区别在于函数的名称与构建合约如参数,所有就有大量的if else 逻辑,导致代码维护和扩展性差。
- 其他偶发与并发导致的上链、监听失败问题…
解决方案
- 参考消息队列机制,将所有需要上链的交易先存入一张表中,通过异步方式起一个定时任务查找需要上链的任务。
- 添加错误重发机制,避免出现因并发或者合约问题而交易失败,保证只要参数合法,交易必须完成上链。
- 拆分为上链服务与监听服务,上链服务只负责上链,监听服务负责监听链上交易状态,并根据状态进行后续业务处理。
- 利用Java的反射机制,以及通过策略优化上链逻辑,减少if else 逻辑,提高代码扩展性。
可能会带来的问题:整个项目流程可能变慢,需要定时任务轮询,才可以完成对应业务的上链处理。数据量大时,需要排队等待上链,但是可以保证业务的稳定性。
代码实现
数据库设计
添加Todo 表,用于存储需要上链的交易信息,包含交易hash、交易参数、交易状态、重试次数等。
1 2 3 4 5 6 7 8 9 10 11 12 13
| create table if not exists request_list ( id bigint auto_increment primary key comment 'key id', contract_address varchar(66) null comment '合约地址', function_name varchar(66) not null comment '请求方法名称', from_address varchar(66) not null comment '交易发起方钱包地址', blockchain_params text null comment '上链参数', transaction_hash varchar(66) default null comment '交易hash', request_status int not null default 0 comment '请求当前状态 0-未上链 1-pending 2-上链成功 3-上链失败', times int not null default 0 comment '重试次数', create_time datetime not null default current_timestamp comment '请求创建时间', update_time datetime default null on update current_timestamp comment '请求更新时间' )
SQL
|
说明:
- contract_address: 合约地址,记录该交易所使用的合约地址
- function_name: 请求方法名称,用于构建合约方法的名称
- from_address: 交易发起方钱包地址
- blockchain_params: 上链参数,用于构建合约方法的参数
- transaction_hash: 交易hash,上链成功后返回的交易hash,交易失败后存储合约的错误信息
- request_status: 请求当前状态,实时监听链上交易状态,并更新状态,如果失败,则进行重新上链
- times: 重试次数,用于控制需要发起的交易,如果超过一定次数,则不再发起交易
定时任务控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
@Async @Scheduled(cron = "0/10 * * * * ? ") public void requestListHandler() { try { List<RequestList> requestLists = requestListService.getResultListToSend(RequestState.NOT_ON_CHAIN.getStateCode(),ReTimes); for (RequestList requestList : requestLists) { FunctionNameEnum functionNameEnum = FunctionNameEnum.queryByName(requestList.getFunctionName());
if (functionNameEnum ==null){ ... continue; }
FunctionHandleService functionHandleService = SpringBeanUtil.getBeanByName(functionNameEnum.getTradeServiceName(), FunctionHandleService.class); if (functionHandleService == null){ ... continue; }
Result<Object> result = functionHandleService.handleFunc(requestList); if (result.getResultCode() == ResultDesc.SUCCESS.getResultCode()){ ... }else { ... if (requestList.getTimes() == ReTimes){ logger.warn("send transaction failed:{},reason:{}", JacksonUtil.obj2json(requestList),result.getData()); ... } } this.updateRequestList(updateRequestList,true); } } catch (Exception e) { logger.error("requestListHandler have error:{}", String.valueOf(e)); throw new RuntimeException(e); } }
@Async @Scheduled(cron = "0/10 * * * * ? ") public void transactionStatusHandler() { try { List<RequestList> requestLists = requestListService.getResultListToSend(RequestState.PENDING.getStateCode(),ReTimes);
for (RequestList requestList : requestLists) { Integer requestStatus = this.getTransactionStatus(requestList.getTransactionHash()); requestList.setRequestStatus(requestStatus); this.updateRequestList(updateRequestList,false); } }catch (Exception e){ logger.error("transactionStatusHandler have error:{}",e); } }
JAVA
|
策略模式引入
定义接口,所有发送交易的实现类都需要实现该接口
1 2 3 4 5 6
|
public interface FunctionHandleService { Result<Object> handleFunc(RequestList requestList); }
JAVA
|
实现BaseFunctionHandleService公共实现类,用于统一链上处理,避免重复代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class BaseFunctionHandleService { public String callContractFunction(RequestList requestList, Type... args) { String fromAddress = CommonUtil.removeHexPrefixIfExists(requestList.getFromAddress()); String privateKey = EthTransactionUtil.decryptKeystore(fromAddress); ... EthSendTransaction transaction = ercUtil.ethSendRawTransaction(privateKey, requestList.getContractAddress(), requestList.getFunctionName(), args); return transaction.getTransactionHash(); } ... }
JAVA
|
实现类根据业务逻辑实现,需要继承BaseFunctionHandleService,并实现FunctionHandleService。
主要用于构建合约方法参数,通过类控制来减少if…else if…else的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Service public class DemoHandleService extends BaseFunctionHandleService implements FunctionHandleService { @Override public Result<Object> handleFunc(RequestList requestList) { Result<Object> result = new Result<>();
DemoParam param = JacksonUtil.json2pojo(requestList.getBlockchainParams(), DemoParam.class); Type<?>[] chainArguments = { new Address(CommonUtil.removeHexPrefixIfExists(param.getToAddress())), ... };
String transactionHash = this.callContractFunction(requestList,chainArguments); if (StringUtils.isBlank(transactionHash)) { ... } return result; } }
JAVA
|
延伸
此方法也可以用于监听区块链事件:
- 比如A方法抛出了event_A,B方法抛出了event_B
- 通过topic来区分,获取对应的事件处理实现类,进入对应的业务处理
总结
- 通过策略模式,实现了业务逻辑的解耦,同时实现了业务逻辑的复用,实现了代码的扩展性
- 引入队列模式,可以实现异步处理,提高系统的吞吐量,提高系统的可用性