154 lines
4.1 KiB
Go
154 lines
4.1 KiB
Go
// Copyright the Hyperledger Fabric contributors. All rights reserved.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
// Package shim provides APIs for the chaincode to access its state
|
|
// variables, transaction context and call other chaincodes.
|
|
package shim
|
|
|
|
import (
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"unicode/utf8"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/hyperledger/fabric-chaincode-go/shim/internal"
|
|
peerpb "github.com/hyperledger/fabric-protos-go/peer"
|
|
)
|
|
|
|
const (
|
|
minUnicodeRuneValue = 0 //U+0000
|
|
maxUnicodeRuneValue = utf8.MaxRune //U+10FFFF - maximum (and unallocated) code point
|
|
compositeKeyNamespace = "\x00"
|
|
emptyKeySubstitute = "\x01"
|
|
)
|
|
|
|
// peer as server
|
|
var peerAddress = flag.String("peer.address", "", "peer address")
|
|
|
|
//this separates the chaincode stream interface establishment
|
|
//so we can replace it with a mock peer stream
|
|
type peerStreamGetter func(name string) (ClientStream, error)
|
|
|
|
//UTs to setup mock peer stream getter
|
|
var streamGetter peerStreamGetter
|
|
|
|
//the non-mock user CC stream establishment func
|
|
func userChaincodeStreamGetter(name string) (ClientStream, error) {
|
|
if *peerAddress == "" {
|
|
return nil, errors.New("flag 'peer.address' must be set")
|
|
}
|
|
|
|
conf, err := internal.LoadConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
conn, err := internal.NewClientConn(*peerAddress, conf.TLS, conf.KaOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return internal.NewRegisterClient(conn)
|
|
}
|
|
|
|
// Start chaincodes
|
|
func Start(cc Chaincode) error {
|
|
flag.Parse()
|
|
chaincodename := os.Getenv("CORE_CHAINCODE_ID_NAME")
|
|
if chaincodename == "" {
|
|
return errors.New("'CORE_CHAINCODE_ID_NAME' must be set")
|
|
}
|
|
|
|
//mock stream not set up ... get real stream
|
|
if streamGetter == nil {
|
|
streamGetter = userChaincodeStreamGetter
|
|
}
|
|
|
|
stream, err := streamGetter(chaincodename)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = chaincodeAsClientChat(chaincodename, stream, cc)
|
|
|
|
return err
|
|
}
|
|
|
|
// StartInProc is an entry point for system chaincodes bootstrap. It is not an
|
|
// API for chaincodes.
|
|
func StartInProc(chaincodename string, stream ClientStream, cc Chaincode) error {
|
|
return chaincodeAsClientChat(chaincodename, stream, cc)
|
|
}
|
|
|
|
// this is the chat stream resulting from the chaincode-as-client model where the chaincode initiates connection
|
|
func chaincodeAsClientChat(chaincodename string, stream ClientStream, cc Chaincode) error {
|
|
defer stream.CloseSend()
|
|
return chatWithPeer(chaincodename, stream, cc)
|
|
}
|
|
|
|
// chat stream for peer-chaincode interactions post connection
|
|
func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {
|
|
// Create the shim handler responsible for all control logic
|
|
handler := newChaincodeHandler(stream, cc)
|
|
|
|
// Send the ChaincodeID during register.
|
|
chaincodeID := &peerpb.ChaincodeID{Name: chaincodename}
|
|
payload, err := proto.Marshal(chaincodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("error marshalling chaincodeID during chaincode registration: %s", err)
|
|
}
|
|
|
|
// Register on the stream
|
|
if err = handler.serialSend(&peerpb.ChaincodeMessage{Type: peerpb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
|
|
return fmt.Errorf("error sending chaincode REGISTER: %s", err)
|
|
|
|
}
|
|
|
|
// holds return values from gRPC Recv below
|
|
type recvMsg struct {
|
|
msg *peerpb.ChaincodeMessage
|
|
err error
|
|
}
|
|
msgAvail := make(chan *recvMsg, 1)
|
|
errc := make(chan error)
|
|
|
|
receiveMessage := func() {
|
|
in, err := stream.Recv()
|
|
msgAvail <- &recvMsg{in, err}
|
|
}
|
|
|
|
go receiveMessage()
|
|
for {
|
|
select {
|
|
case rmsg := <-msgAvail:
|
|
switch {
|
|
case rmsg.err == io.EOF:
|
|
return errors.New("received EOF, ending chaincode stream")
|
|
case rmsg.err != nil:
|
|
err := fmt.Errorf("receive failed: %s", rmsg.err)
|
|
return err
|
|
case rmsg.msg == nil:
|
|
err := errors.New("received nil message, ending chaincode stream")
|
|
return err
|
|
default:
|
|
err := handler.handleMessage(rmsg.msg, errc)
|
|
if err != nil {
|
|
err = fmt.Errorf("error handling message: %s", err)
|
|
return err
|
|
}
|
|
|
|
go receiveMessage()
|
|
}
|
|
|
|
case sendErr := <-errc:
|
|
if sendErr != nil {
|
|
err := fmt.Errorf("error sending: %s", sendErr)
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|