文章时效性提示

这是一篇发布于 166 天前的文章,部分内容可能已过时。

链服务结构优化

调整区块链开发架构,优化链服务与主业务的耦合性,以及策略模式的应用场景…

背景

在链服务的开发中,由于交易成功具有一定的滞后性,即交易成功上链,但是最后这比交易的状态失败了。 出现的问题可能有:

  1. 并发问题:同一个钱包地址发起交易,第一笔交易未到pending状态,第二笔交易很快发布,导致使用了相同的nonce,导致交易失败。
  2. 合约设计:同一block中,不允许有重复的函数调用,否则会生成一个相同的链上签名hash,导致交易失败。
  3. 业务层面:需要根据交易状态进行后续的上链业务处理,但是由于交易状态的滞后性,导致第一笔交易未成功,之后的业务全部失败。
  4. 设计层面:由于上链调用的底层方法相同,区别在于函数的名称与构建合约如参数,所有就有大量的if else 逻辑,导致代码维护和扩展性差。
  5. 其他偶发与并发导致的上链、监听失败问题…

解决方案

  1. 参考消息队列机制,将所有需要上链的交易先存入一张表中,通过异步方式起一个定时任务查找需要上链的任务。
  2. 添加错误重发机制,避免出现因并发或者合约问题而交易失败,保证只要参数合法,交易必须完成上链。
  3. 拆分为上链服务与监听服务,上链服务只负责上链,监听服务负责监听链上交易状态,并根据状态进行后续业务处理。
  4. 利用Java的反射机制,以及通过策略优化上链逻辑,减少if else 逻辑,提高代码扩展性。

可能会带来的问题:整个项目流程可能变慢,需要定时任务轮询,才可以完成对应业务的上链处理。数据量大时,需要排队等待上链,但是可以保证业务的稳定性。

代码实现

数据库设计

添加Todo 表,用于存储需要上链的交易信息,包含交易hash、交易参数、交易状态、重试次数等。

1
2
3
4
5
6
7
8
9
10
11
12
13
create table if not exists request_list
(
id bigint auto_increment primary key comment 'key id',
contract_address varchar(66) null comment '合约地址',
function_name varchar(66) not null comment '请求方法名称',
from_address varchar(66) not null comment '交易发起方钱包地址',
blockchain_params text null comment '上链参数',
transaction_hash varchar(66) default null comment '交易hash',
request_status int not null default 0 comment '请求当前状态 0-未上链 1-pending 2-上链成功 3-上链失败',
times int not null default 0 comment '重试次数',
create_time datetime not null default current_timestamp comment '请求创建时间',
update_time datetime default null on update current_timestamp comment '请求更新时间'
)
SQL

说明:

  • contract_address: 合约地址,记录该交易所使用的合约地址
  • function_name: 请求方法名称,用于构建合约方法的名称
  • from_address: 交易发起方钱包地址
  • blockchain_params: 上链参数,用于构建合约方法的参数
  • transaction_hash: 交易hash,上链成功后返回的交易hash,交易失败后存储合约的错误信息
  • request_status: 请求当前状态,实时监听链上交易状态,并更新状态,如果失败,则进行重新上链
  • times: 重试次数,用于控制需要发起的交易,如果超过一定次数,则不再发起交易

定时任务控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 伪代码
// 发送待处理请求
@Async
@Scheduled(cron = "0/10 * * * * ? ")
public void requestListHandler() {
try {
// 查询所有需要上链的数据,包括未上链的请求和上链失败需要重试的请求
List<RequestList> requestLists = requestListService.getResultListToSend(RequestState.NOT_ON_CHAIN.getStateCode(),ReTimes);

// 轮训所有待上链的数据
for (RequestList requestList : requestLists) {
// 获取上链数据对应的 实现类名
FunctionNameEnum functionNameEnum = FunctionNameEnum.queryByName(requestList.getFunctionName());

if (functionNameEnum ==null){
...
continue;
}

// 利用 反射机制 获取对应的实现类
FunctionHandleService functionHandleService = SpringBeanUtil.getBeanByName(functionNameEnum.getTradeServiceName(), FunctionHandleService.class);
if (functionHandleService == null){
...
continue;
}

// 处理对应实现类的发送交易逻辑
Result<Object> result = functionHandleService.handleFunc(requestList);
if (result.getResultCode() == ResultDesc.SUCCESS.getResultCode()){
// 交易发送成功,存储hash
// 此时交易的状态为需要修改为 pending
...
}else {
// 交易发送失败,记录失败原因
...

// 重试次数+1,如果重试次数大于一定次数,则修改状态为失败,下一次轮训不再处理该数据
if (requestList.getTimes() == ReTimes){
// 多次处理之后仍然没有成功
logger.warn("send transaction failed:{},reason:{}", JacksonUtil.obj2json(requestList),result.getData());
...
}
}

// 更新数据
this.updateRequestList(updateRequestList,true);
}
} catch (Exception e) {
logger.error("requestListHandler have error:{}", String.valueOf(e));
throw new RuntimeException(e);
}
}


// 伪代码
// 处理交易发送但是未成功的交易
@Async
@Scheduled(cron = "0/10 * * * * ? ")
public void transactionStatusHandler() {
try {
// 获取已经上链成功,但是状态为pending的数据
List<RequestList> requestLists = requestListService.getResultListToSend(RequestState.PENDING.getStateCode(),ReTimes);

// 轮训所有待查询状态的数据
for (RequestList requestList : requestLists) {
// 查询并更新交易的状态
Integer requestStatus = this.getTransactionStatus(requestList.getTransactionHash());
requestList.setRequestStatus(requestStatus);

// 不管什么状态,都不添加重试次数,总不能一直没状态吧,除非链有问题
this.updateRequestList(updateRequestList,false);
}
}catch (Exception e){
logger.error("transactionStatusHandler have error:{}",e);
}
}

JAVA

策略模式引入

定义接口,所有发送交易的实现类都需要实现该接口

1
2
3
4
5
6
// 伪代码

// 定义接口,所有发送交易的实现类都需要实现该接口
public interface FunctionHandleService {
Result<Object> handleFunc(RequestList requestList);
}
JAVA

实现BaseFunctionHandleService公共实现类,用于统一链上处理,避免重复代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 实现公共实现类,避免重复代码
public class BaseFunctionHandleService {

// 发送合约交易
public String callContractFunction(RequestList requestList, Type... args) {
// 根据实现类构造的args参数,构造并发送交易
String fromAddress = CommonUtil.removeHexPrefixIfExists(requestList.getFromAddress());
String privateKey = EthTransactionUtil.decryptKeystore(fromAddress); // 获取交易发起方的私钥
...

EthSendTransaction transaction = ercUtil.ethSendRawTransaction(privateKey, requestList.getContractAddress(), requestList.getFunctionName(), args);
return transaction.getTransactionHash();
}

// 其他业务相同的逻辑,比如:校验是否为hash,处理函数..
...
}

JAVA

实现类根据业务逻辑实现,需要继承BaseFunctionHandleService,并实现FunctionHandleService。
主要用于构建合约方法参数,通过类控制来减少if…else if…else的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 伪代码
@Service
public class DemoHandleService extends BaseFunctionHandleService implements FunctionHandleService {
@Override
public Result<Object> handleFunc(RequestList requestList) {
Result<Object> result = new Result<>();

// 构建合约参数
DemoParam param = JacksonUtil.json2pojo(requestList.getBlockchainParams(), DemoParam.class);
Type<?>[] chainArguments = {
new Address(CommonUtil.removeHexPrefixIfExists(param.getToAddress())),
...
};

// 发送交易
String transactionHash = this.callContractFunction(requestList,chainArguments);
if (StringUtils.isBlank(transactionHash)) {
// 遇到交易失败的情况,打印日志记录失败原因
// 不用更新数据库等逻辑
...
}

return result;
}
}

JAVA

延伸

此方法也可以用于监听区块链事件:

  • 比如A方法抛出了event_A,B方法抛出了event_B
  • 通过topic来区分,获取对应的事件处理实现类,进入对应的业务处理

总结

  • 通过策略模式,实现了业务逻辑的解耦,同时实现了业务逻辑的复用,实现了代码的扩展性
  • 引入队列模式,可以实现异步处理,提高系统的吞吐量,提高系统的可用性

链服务结构优化
https://zhyyao.me/2024/10/22/experience/chain_queue/
作者
zhyyao
发布于
2024年10月22日
许可协议