FISCO-BCOS中间件

Seven 2022-01-08 17:58:34
Categories: > > Tags:

区块链中间件是在区块链节点和区块链应用间的通用组件,由于多数情况下会存在多个应用共用一条链的情况,因此中间件功能的通用性和完备性就非常重要。针对不同的链,中间件跟链的接口是不同的,但是提供给上层应用的功能和接口应该是稳定的,而且理想状态下应该尽可能的不与具体的链特性做绑定(实际上很多情况下做不到完全的屏蔽底层链特性),即适配层。一般来说,中间件需要提供有以下的功能:

其他的还有跨链WeCross链上数据导出WeBASE-Collect-Bee等。设计遵从模块独立可扩展,分布式按需部署的理念。在后端设计上通常使用http restful接口服务的形式,减少对调用方的要求,同时屏蔽底层,使应用开发方专注于业务开发。下面具体分析各个组件。

目录:

  1. 合约事件监听
  2. WeBankBlockchain-Governance链治理通用组件
  3. WeBankBlockchain-Data数据治理通用组件

1. 合约事件监听

合约里经常会包含有事件(event关键字),应用通过监听链上指定合约的指定事件,从而得到相应的数据触发相应的处理,也就是不同的系统不再需要链下的交互(例如api调用等),在这里,链扮演了类似持久化的消息队列的角色。在多数场景下,对于链上事件,应用方会有着 可以重复读,可以从指定块开始读,不能丢失事件 等等的需求,这些在设计中间件时都需要考虑在内。

1.1 基于MQ的通知方案

该实现在WeBASE-Front:支持链上事件订阅和通知内,大致流程(摘录):

  1. WeBASE-Front连接到MQ-Server(目前支持RabbitMQ-Server);
  2. WeBASE-Front接收节点的事件Push后,如出块通知,WeBASE-Front将出块消息发送到消息队列中;
  3. 区块链应用连接MQ-Server,获取消息队列中待消费的消息,即可获得事件通知;

接口文档在订阅合约event事件通知(基于版本v1.4.1,下同)。其中主要参数有(部分)

中文 参数名 说明
应用编号 appId 注册事件通知的应用的唯一编号,仅支持数字字母和下划线
合约abi contractAbi 合约的ABI,用于合约event解析
event起始区块 fromBlock 默认latest,表示一直监听最新区块,最小值为1
event末区块 toBlock 最小值为1,最大值为当前区块高度,需 要大于等于fromBlock;填写latest,表示一直监听最新区块
合约地址 contractAddress 合约地址
合约event列表 topicList List类型,合约Event事件列表,Event参数之间不带空格

大致流程如下:
img

  1. 应用通过API请求订阅合约event事件通知接口
  2. EventService类构造了ContractEventCallback用于后续收到异步推送的回掉实现,同时生成UUID作为filterID,然后将(filterID-callback)这键值对记录到eventLogFilterManager(本地map结构)内。然后向FISCO BCOS节点同步发送ChannelMessageType.CLIENT_REGISTER_EVENT_LOG类型的注册请求。(客户端与节点的交互基于Channel协议。交互分为三个阶段:注册请求,节点回复,Event Log数据推送。)实际实现是基于netty通信框架,这里的webase-front中间件与链节点间维持着长链接。参考Fisco Bcos的合约事件推送。然后同步注册成功/失败后将结果返回给API调用方。
  3. 中间件收到链节点推送的ChannelMessageType.EVENT_LOG_PUSH类型消息,根据消息体里携带的filterID从eventLogFilterManager里取出相应的callback,然后调用ContractEventCallback#onPushMessageLog
  4. 在callback里调用ContractEventCallback#pushMeessage2MQ,将监听到的log推送到MQ。在此之前,应用已经订阅了MQ的指定主题,后续的管理交由MQ。
观点:
  1. 在多consumer场景协同消费等情况下,传统MQ的管理是相对复杂的,而且也有很成熟的方案和组件。因此,区块链中间件负责监听链上数据并导入MQ,后续管理消费交由MQ,各司其职,也充分利用MQ的特性,值得借鉴。
  2. 该方案没有持久化,每次front中间件重启之后需要重新注册。因此,需要在应用层持久化当前消费到的blockNumber,下次重启后指定fromBlock。下次重启后,很可能会造成重复消费。
  3. 如果同一个应用的多个实例向多个front中间件注册,由于没有持久化,并且都是记录在进程map数据结构内,不同的filterID,就会造成多个front中间件独立注册(例如向不同的链节点),造成同一个事件的数据被重复推送(例如,总共3个front,各自向3个不同的链节点注册同一个事件,则3个front中间件都会收到同一个事件,也就是被3次写入MQ)。因此,需要在应用层做好多实例下的同步,包括实例的加入/退出场景,避免重复注册监听器。
  4. 监听的事件通知是通过PUSH的模式实现的,也就是链上监听的lastBlockNumber是记录在链节点端的。存在一种极端的情况,例如ContractEventCallback#pushMeessage2MQ异常,没法推送到MQ里,当前只是记录到日志里error级别。在链节点端看来推送成功(无论是否处理成功),lastBlockNumber继续往前推进,而实际上MQ里没有这条数据,导致数据的丢失。即使后期通过日志来补发MQ,对于处理事件有先后顺序的特殊应用可能会引起连锁反应,(例如某个合约里在block N有事件一但是丢失了,block N+10里事件二,由于事件一未处理,状态对应不上,导致事件二也失败)。这种PUSH的方式在某些场景下难以接受。
解决思路:
  1. 仍然参考引入第三方MQ的思路,将链上事件导入MQ,由MQ来管理队列消费。此处考虑引入zk + kafka。
  2. 改用PULL的方式。在front中间件持久化当前已经成功发送到MQ的lastBlockNumber,每次在成功发送MQ后再更新持久化的lastBlockNumber。能保证数据不会丢失,但是可能会重复加入MQ。这里的持久化可以考虑使用zk,将消费信息记录到zk上,如果交互可能会比较频繁,也可以考虑使用mysql等关系型数据库。
  3. 多个front节点只需要一个实例来PULL链上的event log,而且需要有且仅有唯一。可以使用zk来做分布式锁,解决多实例的加入退出下的动态选主。
  4. 由于在front中间件有持久化监听信息,因此即使在多个应用实例同时注册时,也可以通过判断同一个监听是否已被注册而且在运行中等状态(这里需要适当的锁),从而避免多实例重复注册的问题。

1.2 WeEvent框架

基于WeEvent v1.4.0版本。概念及模型参考核心概念。consumer的实现逻辑如下图:
img

  1. 通过stomp协议订阅topic,这个是websocket的使用比较广泛的报文格式。这个在weevent-broker包里。里面有mqtt(用于IoT)和stomp协议用于consumer,另外有jsonrpc和rest用于producer。参看接入说明,其中,**暂时不支持消息确认ACK和事务Transaction语义。(**摘录)
  2. 针对每个订阅,新建notifyTask并且注册。(weevent-core包)
  1. 链节点在出块时通过AMOP协议向Consummer推送BLOCK_NOTIFY消息。(该话题是consumer初始化时订阅的)
  2. BlockEventListenerBlockNotifyQueue队列加入最新的blockNumer。
  1. 常驻线程MainEventLoopBlockNotifyQueuepoll队列内的blockNumber,获得最新的blockNumer
  2. MainEventLoop本地维护上次消费到的lastBlock,比较lastBlock + 1 <= blockNumber,通过getBlock接口向链节点获取下一个区块(lastBlock + 1,每次只消费一个区块),从而获取区块包含的交易回执,从而decode出具体的event。
  3. 然后dispatch提交到各自到eventQueue队列内,然后更新lastBlock += 1。此处lastBlock是线程内部变量,不持久化。
  4. NotifyTask从自身的eventQueuepoll队列内的event,然后回调BrokerStomp.EventListener#onEvent方法。
  5. BrokerStomp使用stomp协议里SEND向应用推送event。
观点:
  1. 链节点到event中间件间的AMOP协议是PUSH方式,event中间件向链节点获取区块时是PULL方式。因此,即使PULL方式偶尔失败,例如丢失若干个中间出块事件的块高,最后还是会以最大的块高为准。而获取区块getBlock使用pull的方式,可重复读,保证数据不会丢失。
  2. 消费lastBlock没有持久化,应用每次subscribe后都是按照从最新的block开始取,不支持指定(fromBlock, toBlock)消费历史数据。
  3. BrokerStomp到应用这一段的推送不支持ACK及事务,因此这部分推送有可能丢失。
  4. weevent-processor包里主要是链上的流事件处理。(这个与大数据流计算的对比有待后续分析)

2. WeBankBlockchain-Governance链治理通用组件

微众银行提出面向区块链的多方协作治理框架—— Multilateral Collaborative Governance Framework(MCGF)。具体见白皮书

2.1 WeBankBlockchain-Governance-Key 私钥管理组件

  1. 助记词生成私钥
    参考类PkeyByMnemonicService.java。通过助记词mnemonic与密码passphrase进行PBKDF2WithHmacSha512哈希算法生成确定性的公私钥对keyPair和用于安全派生子密钥的chaincode
  2. 私钥托管
    参考类KeyHandler.java。以数据库存储为例,私钥 *加密* 后存储,加密的方式例如Ethereum wallet file(适配Ethereum的模式), 或者p12的KeyStore(密码访问模式)等。按照加密口令根据是否同时托管也分为半托管(加密口令在用户手中,加密后的私钥存放在数据库中,需要使用私钥签名时候需要同时传入加密口令解密)和全托管。全托管方式下,加密口令和加密后的私钥分库存储,降低安全风险。
  3. 私钥派生
  4. 私钥分片

2.2 WeBankBlockchain-Governance-Account 账户治理组件

增加普通账户(Normal Account)的概念,与外部账户一对一绑定,在链上的身份用的是普通账户的地址。即

1
2
3
 ------------------------      --------       --------
| 外部账户(用户手持的公私钥) | -> | 普通账户 | -> | 应用合约 |
------------------------ -------- --------

因此,管理员可以添加,替换,冻结,解冻普通账户(也可以由用户自身提出),例如用户遗失/泄露私钥的场景下,只需要更新<外部, 普通>账户对应关系,而链上的应用合约对应的普通账户不变。应用合约需要引入AccountManager.sol,使用_accountManager里记录的普通用户地址来代替msg.sender来判断身份权限。

管理员的角色可以分类两类,一类是超级管理员(唯一),一类是管员委员会(投票决策)。部署的合约分别是AdminGovernBuilder.sol(Weight)VoteGovernBuilder.sol。以AdminGovernBuilder.sol为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// AdminGovernBuilder.sol
// 部署合约,目的是为了实例化以下的合约。
contract AdminGovernBuilder {
address public _governance;

constructor() public {
WEGovernance governance = new WEGovernance(0);
governance.setOwner(msg.sender);
_governance = address(governance);
address _accountManager = governance.getAccountManager();
AccountManager accountManager = AccountManager(_accountManager);
accountManager.newAccount(msg.sender);
}

function getGovernance() public view returns (address) {...}

function getAccountManager() public view returns (address) {...}
}

// WEGovernance.sol
// 该合约是管理规则,例如在管理委员会方式下,发起投票requestVote和各个委员vote的汇总计算逻辑都是在此合约里。
contract WEGovernance is WEBoardVoteGuard {
address _accountManager;
// 0: admin-mode; 1: vote-mode.
uint8 public _mode = 0;

constructor(uint8 mode) public {
_mode = mode;
AccountManager accountManager = new AccountManager();
_accountManager = address(accountManager);
}

// 发起投票,登记该投票信息
// return id : 唯一标志提案,与vote方法的入参相对应
function register(uint8 txType, ...) external validVoter returns (bool b, uint256 id) {...}

// 投票
function vote(uint256 id, bool flag) external validVoter returns (bool b){...}
}

// AccountManager.sol
// <外部, 普通>账户对应关系的存放
contract AccountManager is WEBasicAuth {
// 外部账户 -> 普通账户
mapping(address => address) _externalAccountMapping;

// 普通账户 -> 外部账户
mapping(address => address) _userAccountMapping;
}

// EvidenceDemo.sol
// 应用合约示例。引入AccountManager即可
contract EvidenceDemo {
address public _owner;
AccountManager _accountManager;

constructor(address accountManager) public {
_accountManager = AccountManager(accountManager);
_owner = _accountManager.getUserAccount(msg.sender);
}

// 使用_accountManager里记录的普通用户地址来代替 msg.sender
modifier onlyOwner() {
address userAccountAddress = _accountManager.getUserAccount(msg.sender);
require(userAccountAddress == _owner, "Not admin");
_;
}
}

2.3 WeBankBlockchain-Governance-Auth 权限治理组件

与账户治理组件非常类似,也是引入了权限管理合约,由超级管理员或者管理员委员会进行管理。在应用合约里访问权限管理合约,调用canCallFunction来判断是否可以访问指定的合约方法。架构图如下:
img


3. WeBankBlockchain-Data数据治理通用组件

参考文档

3.1 WeBankBlockchain-Data-Reconcile对账服务

金融机构特别是央行/商业银行间清结算,而且在当前区块链的性能限制条件下,批量定时的对账是常采用的方式。这个组件其实是比较完成的对账项目代码,饱含了基本流程。具体参考文档。流程如下图所示:
img

以正向定时任务为例,代码里主要采用了责任链的方式,具体配置在ReconcileHandlerFactory.java类中。具体的hanlder chain如下

1
2
3
4
5
6
 ------------------------------------------      ------------------------------------------------------
| ReconcileTaskHandler(创建任务,记录在数据库) | -> | ReconcileFileObtainHandler(从FTP/区块链上获取对账文件) |
------------------------------------------ ------------------------------------------------------
------------------------------------------------------ ------------------------------------------
-> | ReconcileExecuteHandler(对账过程,具体每条数据以索引对齐) | -> | ReconcileResultHandler(将对账结果上传到FTP)|
------------------------------------------------------ ------------------------------------------

实际过程中,需要考虑数据/文件的加密,特别是在多方对账的场景,需要在多个层面做好数据加密和隔离,这点在项目中没有体现。例如,上链方生成对称密钥,对每笔数据(例如涉及到金额的敏感字段等)使用对称密钥加密,并且在上链的数据中附带上该对称密钥的标志。上链方采用对账方的公钥对对称密钥进行加密,并且通过报文发送给对账方。对账方使用自己的私钥解密后,进而对链上数据进行对称解密。另外,上链方在FTP中存放的对账文件,也需要相应的进行加密写入,并且,不同的对账方也需要相互隔离,在FTP上使用不同的路径,并且配置可访问权限。

引用文章: