sample_chain/peer/shim/handle.go

281 lines
7.6 KiB
Go

package shim
import (
"fmt"
"log"
"schain/common/signer"
"schain/config"
pb "schain/proto"
"schain/proto/util"
"sync"
"sync/atomic"
cmap "github.com/orcaman/concurrent-map/v2"
"google.golang.org/protobuf/proto"
)
type PeerChaincodeStream interface {
Send(*pb.ChaincodeMessage) error
Recv() (*pb.ChaincodeMessage, error)
}
// ClientStream supports the (original) chaincode-as-client interaction pattern
type ClientStream interface {
PeerChaincodeStream
CloseSend() error
}
type Handler struct {
serialLock sync.Mutex
cc Chaincode
chatStream PeerChaincodeStream
readWriteSet cmap.ConcurrentMap[string, *pb.KVRWSet]
signer *signer.Signer
}
// serialSend 序列化发送
func (h *Handler) serialSend(msg *pb.ChaincodeMessage) error {
h.serialLock.Lock()
defer h.serialLock.Unlock()
return h.chatStream.Send(msg)
}
// 异步发送链码消息
func (h *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan<- error) {
go func() {
errc <- h.serialSend(msg)
}()
}
func newChaincodeHandler(peerChatStream PeerChaincodeStream) *Handler {
sig, err := signer.NewSigner(config.PeerConf)
if err != nil {
log.Fatal(err)
}
return &Handler{
chatStream: peerChatStream,
readWriteSet: cmap.New[*pb.KVRWSet](),
signer: sig,
}
}
type stubHandlerFunc func(*pb.ChaincodeMessage) (*pb.ChaincodeMessage, error)
func (h *Handler) handleStubInteraction(handler stubHandlerFunc, msg *pb.ChaincodeMessage, errc chan<- error) {
resp, err := handler(msg)
if err != nil {
resp = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error()), TxID: msg.TxID}
}
h.serialSendAsync(resp, errc)
}
// 查看读集或者写集中是否有重复的元素
func hasDuplicate(rwSet *pb.KVRWSet) bool {
for i := 0; i < len(rwSet.Reads); i++ {
for j := i + 1; j < len(rwSet.Reads); j++ {
if rwSet.Reads[i].Key == rwSet.Reads[j].Key {
return true
}
}
}
for i := 0; i < len(rwSet.Writes); i++ {
for j := i + 1; j < len(rwSet.Writes); j++ {
if rwSet.Writes[i].Key == rwSet.Writes[j].Key {
return true
}
}
}
return false
}
// detectionKVRwSet 检测事务的读写集,让不可序列化的事务提前中止
func detectionKVRwSet(kvRwSet *pb.KVRWSet, txID string) *pb.ChaincodeMessage {
for _, write := range kvRwSet.Writes {
if _, ok := detection.Get(write.Key); ok {
err := fmt.Errorf("this is a Non-serializable transaction")
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error()), TxID: txID}
}
}
for _, write := range kvRwSet.Writes {
detection.Set(write.Key, struct{}{})
}
return nil
}
func (h *Handler) handleTransaction(msg *pb.ChaincodeMessage) (*pb.ChaincodeMessage, error) {
input := &pb.ChaincodeInput{}
//Unmarshal parses the wire-format message in b and places the result in m
err := proto.Unmarshal(msg.Payload, input)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal input: %s", err)
}
stub, err := newChaincodeStub(h, msg.TxID, input, msg.Proposal)
if err != nil {
return nil, fmt.Errorf("failed to create new ChaincodeStub: %s", err)
}
name, err := stub.getChainCodeName()
if err != nil {
return nil, fmt.Errorf("failed to get Chaincode Name: %s", err)
}
if name == "SmallBank" {
h.cc = new(SmallBank)
} else if name == "KvStore" {
h.cc = new(KvStore)
}
//调用链码
res := h.cc.Invoke(stub)
if res.Status != 200 {
err = fmt.Errorf("transaction invoke fail")
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error()), TxID: msg.TxID}, nil
}
t := &timer{startTime: msg.Timestamp}
TxLatency.Set(msg.TxID, t)
// 获取事务的读写集
kvRwSet, ok := h.readWriteSet.Get(msg.TxID)
if !ok {
err = fmt.Errorf("obtain the transaction read/write set fail")
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error()), TxID: msg.TxID}, nil
}
// 检测事务读集或写集是否存在重复地址,如果存在则说明事务错误
if hasDuplicate(kvRwSet) {
err := fmt.Errorf("this is a Non-serializable transaction")
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error())}, nil
}
//检测该事务的读写集,判断它是否为一个不可序列化的事务,如果不可序列化则直接将该事务中止
chaincodeMsg := detectionKVRwSet(kvRwSet, msg.TxID)
if chaincodeMsg != nil {
return chaincodeMsg, nil
}
//创建背书,将背书结果返回给客户端
result := marshalOrPanic(kvRwSet)
//获取提案的头部和负载
shdr := stub.proposal.GetHeader()
payload := stub.proposal.GetPayload()
pr, err := util.CreateProposalResponse(shdr, payload, &res, result, h.signer)
if err != nil {
err = fmt.Errorf("failed to CreateProposalResponse: %s", err)
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error()), TxID: msg.TxID}, nil
}
resBytes, err := proto.Marshal(pr)
if err != nil {
err = fmt.Errorf("failed to marshal ProposalResponse: %s", err)
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: []byte(err.Error()), TxID: msg.TxID}, nil
}
var msgType pb.ChaincodeMessage_Type
if kvRwSet.Writes == nil {
msgType = pb.ChaincodeMessage_Type_ONLY_READ_TRANSACTION
atomic.AddInt32(&SuccessNum, 1)
} else {
msgType = pb.ChaincodeMessage_Type_COMPLETED
}
return &pb.ChaincodeMessage{Type: msgType, Payload: resBytes, Proposal: msg.Proposal, TxID: msg.TxID}, nil
}
// handleGetState 获取状态数据库中的数据(世界状态)
func (h *Handler) handleGetState(key string, txID string) ([]byte, error) {
s, ok := State.Get(key)
if !ok {
s = &StateDB{}
}
kVRead := &pb.KVRead{Key: key, Version: s.Version}
readWriteSet, ok := h.readWriteSet.Get(txID)
if !ok {
readWriteSet = &pb.KVRWSet{}
}
readWriteSet.Reads = append(readWriteSet.Reads, kVRead)
readWriteSet.TxID = txID
h.readWriteSet.Set(txID, readWriteSet)
return s.Value, nil
}
// handlePutState 更改状态数据库中的数据
func (h *Handler) handlePutState(key string, value []byte, txID string) error {
kVWrite := &pb.KVWrite{Key: key, Value: value}
readWriteSet, ok := h.readWriteSet.Get(txID)
if !ok {
readWriteSet = &pb.KVRWSet{}
}
readWriteSet.Writes = append(readWriteSet.Writes, kVWrite)
readWriteSet.TxID = txID
h.readWriteSet.Set(txID, readWriteSet)
return nil
}
// handleDelState 删除状态数据库中的数据
func (h *Handler) handleDelState(key string, txID string) error {
kVWrite := &pb.KVWrite{Key: key, IsDelete: true}
readWriteSet, _ := h.readWriteSet.Get(txID)
if readWriteSet == nil {
readWriteSet = &pb.KVRWSet{}
}
readWriteSet.Writes = append(readWriteSet.Writes, kVWrite)
readWriteSet.TxID = txID
h.readWriteSet.Set(txID, readWriteSet)
return nil
}
// handleReady 处理已经准备好的消息
func (h *Handler) handleReady(msg *pb.ChaincodeMessage, errc chan error) error {
switch msg.Type {
case pb.ChaincodeMessage_Type_TRANSACTION:
go h.handleStubInteraction(h.handleTransaction, msg, errc)
return nil
default:
return fmt.Errorf("[%s] Chaincode h cannot core message (%s)", msg.TxID, msg.Type)
}
}
// handleMessage 处理发送过来的消息
func (h *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error {
err := h.handleReady(msg, errc)
if err != nil {
payload := []byte(err.Error())
errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_Type_ERROR, Payload: payload, TxID: msg.TxID}
return h.serialSend(errorMsg)
}
return nil
}
// marshalOrPanic 对proto消息进行编码
func marshalOrPanic(msg proto.Message) []byte {
bytes, err := proto.Marshal(msg)
if err != nil {
panic(fmt.Sprintf("failed to marshal message: %s", err))
}
return bytes
}