go_study/fabric-main/integration/ledger/snapshot_test.go

1098 lines
50 KiB
Go

/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package ledger
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
docker "github.com/fsouza/go-dockerclient"
cb "github.com/hyperledger/fabric-protos-go/common"
ab "github.com/hyperledger/fabric-protos-go/orderer"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/integration/chaincode/kvexecutor"
"github.com/hyperledger/fabric/integration/channelparticipation"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/integration/nwo/commands"
"github.com/hyperledger/fabric/integration/nwo/runner"
"github.com/hyperledger/fabric/integration/pvtdata/marblechaincodeutil"
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"github.com/tedsuo/ifrit"
)
const testchannelID = "testchannel"
var _ = Describe("Snapshot Generation and Bootstrap", func() {
var (
setup *setup
helper *marblesTestHelper
couchProcess []ifrit.Process
legacyChaincode nwo.Chaincode
newlifecycleChaincode nwo.Chaincode
)
BeforeEach(func() {
By("initializing and starting the network")
setup = initAndStartFourOrgsNetwork()
helper = &marblesTestHelper{
networkHelper: &networkHelper{
Network: setup.network,
orderer: setup.orderer,
peers: setup.peers,
testDir: setup.testDir,
channelID: setup.channelID,
},
}
})
AfterEach(func() {
setup.cleanup()
for _, proc := range couchProcess {
proc.Signal(syscall.SIGTERM)
Eventually(proc.Wait(), setup.network.EventuallyTimeout).Should(Receive())
}
os.RemoveAll(setup.network.RootDir)
})
When("chaincode has no private data collections", func() {
BeforeEach(func() {
legacyChaincode = nwo.Chaincode{
Name: "marbles",
Version: "0.0",
Path: chaincodePathWithIndex,
Ctor: `{"Args":[]}`,
Policy: `OR ('Org1MSP.member','Org2MSP.member','Org3MSP.member','Org4MSP.member')`,
PackageFile: filepath.Join(setup.testDir, "marbles_legacy.tar.gz"),
}
newlifecycleChaincode = nwo.Chaincode{
Name: "marbles",
Version: "0.0",
Path: components.Build(chaincodePathWithIndex),
Lang: "binary",
CodeFiles: filesWithIndex,
PackageFile: filepath.Join(setup.testDir, "marbles.tar.gz"),
SignaturePolicy: `OR ('Org1MSP.member','Org2MSP.member','Org3MSP.member','Org4MSP.member')`,
Sequence: "1",
Label: "marbles",
}
By("deploying legacy chaincode on initial peers (stateleveldb)")
nwo.DeployChaincodeLegacy(setup.network, testchannelID, setup.orderer, legacyChaincode)
})
// Below test does the following when peers are using either leveldb or couchdb.
// Note that we do not support a channel with mixed DBs. However, for testing,
// it would be fine to use mixed DBs to test with both couchdb and leveldb
// - create snapshots on the 2 peers and verify they are same
// - bootstrap a peer (couchdb) in an existing org from the snapshot
// - bootstrap a peer (leveldb) in a new org from the snapshot
// - verify couchdb index exists
// - verify chaincode invocation, history, qscc, channel config update
// - upgrade to new lifecycle chaincode
// - create a new snapshot again from a peer (couchdb) bootstrapped from a snapshot
// - bootstrap peers (couchdb) in existing org and new org from the new snapshot
// - verify couchdb index exists
// - verify chaincode invocation, history, qscc
// - verify chaincode upgrade and new chaincode install on all peers
It("generates snapshot and bootstraps from snapshots", func() {
org1peer0 := setup.network.Peer("Org1", "peer0")
org2peer0 := setup.network.Peer("Org2", "peer0")
By("invoking marbles chaincode")
testKey := "marble-0"
helper.invokeMarblesChaincode(legacyChaincode.Name, org1peer0, "initMarble", "marble-0", "blue", "35", "tom")
helper.invokeMarblesChaincode(legacyChaincode.Name, org1peer0, "initMarble", "marble-1", "red", "100", "tom")
By("getting an existing transaction from a block before snapshot is generated")
txenvBeforeSnapshot, txidBeforeSnapshot := getTxFromLastBlock(setup.network, org1peer0)
// verify snapshot commands with different parameters")
blockNum := nwo.GetLedgerHeight(setup.network, org1peer0, testchannelID) - 1
verifySnapshotRequestCmds(setup.network, org1peer0, testchannelID, blockNum)
// test 1: generate snapshots on 2 peers for the same blockNum and verify they are same
_, snapshotDir := generateAndCompareSnapshots(setup.network, org1peer0, org2peer0, blockNum)
// test 2: bootstrap a peer in an existing org from snapshot and verify
By("starting new peer org2peer1 in existing org2 (couchdb)")
org2peer1, couchProc := startPeer(setup, "Org2", "peer1", testchannelID, true)
couchProcess = append(couchProcess, couchProc)
By("installing legacy chaincode on new peer org2peer1")
nwo.InstallChaincodeLegacy(setup.network, legacyChaincode, org2peer1)
By("joining new peer org2peer1 to the channel")
joinBySnapshot(setup.network, setup.orderer, org2peer1, testchannelID, snapshotDir, blockNum)
By("verifying index created on org2peer1")
verifySizeIndexExists(setup.network, testchannelID, setup.orderer, org2peer1, "marbles")
By("invoking marbles chaincode on bootstrapped peer org2peer1")
helper.invokeMarblesChaincode(legacyChaincode.Name, org2peer1, "transferMarble", testKey, "newowner2")
By("verifying history on peer org2peer1")
expectedHistory := []*marbleHistoryResult{
{IsDelete: "false", Value: newMarble(testKey, "blue", 35, "newowner2")},
}
helper.assertGetHistoryForMarble(legacyChaincode.Name, org2peer1, expectedHistory, testKey)
verifyQSCC(setup.network, org2peer1, testchannelID, blockNum, txidBeforeSnapshot)
// test 3: bootstrap a peer in a new org from snapshot and verify
By("starting a peer Org3.peer0 in new org3 (stateleveldb)")
org3peer0, _ := startPeer(setup, "Org3", "peer0", testchannelID, false)
By("installing legacy chaincode on new peer org3peer0")
nwo.InstallChaincodeLegacy(setup.network, legacyChaincode, org3peer0)
By("joining new peer org3peer0 to the channel")
joinBySnapshot(setup.network, setup.orderer, org3peer0, testchannelID, snapshotDir, blockNum)
By("invoking marbles chaincode on bootstrapped peer org3peer0")
helper.invokeMarblesChaincode(legacyChaincode.Name, org3peer0, "transferMarble", testKey, "newowner3")
By("getting an existing transaction from a block after snapshot is generated")
txenvAfterSnapshot, txidAfterSnapshot := getTxFromLastBlock(setup.network, org1peer0)
By("verifying history on peer org3peer0")
expectedHistory = []*marbleHistoryResult{
{IsDelete: "false", Value: newMarble(testKey, "blue", 35, "newowner3")},
{IsDelete: "false", Value: newMarble(testKey, "blue", 35, "newowner2")},
}
helper.assertGetHistoryForMarble(legacyChaincode.Name, org3peer0, expectedHistory, testKey)
verifyQSCC(setup.network, org3peer0, testchannelID, blockNum, txidBeforeSnapshot)
// verify DUPLICATE_TXID error when resubmitting old tx on a peer bootstrapped from snapshot (v1_4 capability)
By("resubmitting an old transaction committed before snapshot, expecting duplicated txid error")
err := commitTx(setup.network, setup.orderer, org3peer0, testchannelID, txenvBeforeSnapshot, txidBeforeSnapshot)
Expect(err.Error()).To(ContainSubstring("transaction invalidated with status (DUPLICATE_TXID)"))
By("resubmitting an old transaction committed after snapshot, expecting duplicated txid error")
err = commitTx(setup.network, setup.orderer, org3peer0, testchannelID, txenvAfterSnapshot, txidAfterSnapshot)
Expect(err.Error()).To(Equal("transaction invalidated with status (DUPLICATE_TXID)"))
// test 4: upgrade legacy chaincode to new lifecycle
By("enabling V2_0 capabilities")
channelPeers := setup.network.PeersWithChannel(testchannelID)
nwo.EnableCapabilities(setup.network, testchannelID, "Application", "V2_0", setup.orderer, channelPeers...)
By("upgrading legacy chaincode to new lifecycle chaincode")
nwo.DeployChaincode(setup.network, testchannelID, setup.orderer, newlifecycleChaincode, channelPeers...)
By("invoking chaincode after upgraded to new lifecycle chaincode")
helper.invokeMarblesChaincode(newlifecycleChaincode.Name, org1peer0, "initMarble", "marble-upgrade", "blue", "35", "tom")
// test 5: generate snapshot again on a peer bootstrapped from a snapshot and upgraded to new lifecycle chaincode
blockNumForNextSnapshot := nwo.GetLedgerHeight(setup.network, org2peer1, testchannelID)
By(fmt.Sprintf("generating a snapshot at blockNum %d on org2peer1 that was bootstrapped by a snapshot", blockNumForNextSnapshot))
submitSnapshotRequest(setup.network, testchannelID, blockNumForNextSnapshot, org2peer1, false, "Snapshot request submitted successfully")
// invoke chaincode to trigger snapshot generation
// 1st call should be committed before snapshot generation, 2nd call should be committed after snapshot generation
helper.invokeMarblesChaincode(newlifecycleChaincode.Name, org2peer1, "transferMarble", testKey, "newowner_beforesnapshot")
helper.invokeMarblesChaincode(newlifecycleChaincode.Name, org2peer1, "transferMarble", testKey, "newowner_aftersnapshot")
By("verifying snapshot completed on org2peer1")
verifyNoPendingSnapshotRequest(setup.network, org2peer1, testchannelID)
nextSnapshotDir := filepath.Join(setup.network.PeerDir(org2peer1), "filesystem", "snapshots", "completed", testchannelID, strconv.Itoa(blockNumForNextSnapshot))
By("getting an existing transaction from a block after new snapshot is generated")
helper.invokeMarblesChaincode(legacyChaincode.Name, org2peer1, "initMarble", "marble-3", "red", "100", "tom")
txenvAfterSnapshot, txidAfterSnapshot = getTxFromLastBlock(setup.network, org1peer0)
// test 6: bootstrap a peer in a different org from the new snapshot
By("starting a peer (org1peer1) in existing org1 (couchdb)")
org1peer1, couchProc := startPeer(setup, "Org1", "peer1", testchannelID, true)
couchProcess = append(couchProcess, couchProc)
By("installing new lifecycle chaincode on peer org1peer1")
nwo.InstallChaincode(setup.network, newlifecycleChaincode, org1peer1)
By("joining new peer org1peer1 to the channel")
joinBySnapshot(setup.network, setup.orderer, org1peer1, testchannelID, nextSnapshotDir, blockNumForNextSnapshot)
By("verifying index created on org1peer1")
verifySizeIndexExists(setup.network, testchannelID, setup.orderer, org1peer1, "marbles")
By("verifying history on peer org1peer1")
expectedHistory = []*marbleHistoryResult{
{IsDelete: "false", Value: newMarble(testKey, "blue", 35, "newowner_aftersnapshot")},
}
helper.assertGetHistoryForMarble(newlifecycleChaincode.Name, org1peer1, expectedHistory, testKey)
verifyQSCC(setup.network, org1peer1, testchannelID, blockNumForNextSnapshot, txidBeforeSnapshot)
// test 7: bootstrap a peer in a new org from the new snapshot
By("starting a peer (org4peer0) in new org4 (couchdb)")
org4peer0, couchProc := startPeer(setup, "Org4", "peer0", testchannelID, true)
couchProcess = append(couchProcess, couchProc)
By("joining new peer org4peer0 to the channel")
joinBySnapshot(setup.network, setup.orderer, org4peer0, testchannelID, nextSnapshotDir, blockNumForNextSnapshot)
By("installing and approving chaincode on new peer org4peer0")
installAndApproveChaincode(setup.network, setup.orderer, org4peer0, testchannelID, newlifecycleChaincode, []string{"Org1", "Org2", "Org3", "Org4"})
By("verifying index created on org4peer0")
verifySizeIndexExists(setup.network, testchannelID, setup.orderer, org2peer1, "marbles")
By("invoking chaincode on bootstrapped peer org4peer0")
helper.invokeMarblesChaincode(newlifecycleChaincode.Name, org4peer0, "delete", testKey)
By("verifying history on peer org4peer0")
expectedHistory = []*marbleHistoryResult{
{IsDelete: "true"},
{IsDelete: "false", Value: newMarble(testKey, "blue", 35, "newowner_aftersnapshot")},
}
helper.assertGetHistoryForMarble(newlifecycleChaincode.Name, org4peer0, expectedHistory, testKey)
verifyQSCC(setup.network, org4peer0, testchannelID, blockNumForNextSnapshot, txidBeforeSnapshot)
// verify DUPLICATE_TXID error when resubmitting old tx on a peer bootstrapped from snapshot (v_20 capability)
By("resubmitting an old transaction committed before snapshot, expecting duplicated txid error")
err = commitTx(setup.network, setup.orderer, org4peer0, testchannelID, txenvBeforeSnapshot, txidBeforeSnapshot)
Expect(err.Error()).To(ContainSubstring("transaction invalidated with status (DUPLICATE_TXID)"))
By("resubmitting an old transaction committed after snapshot, expecting duplicated txid error")
err = commitTx(setup.network, setup.orderer, org4peer0, testchannelID, txenvAfterSnapshot, txidAfterSnapshot)
Expect(err.Error()).To(Equal("transaction invalidated with status (DUPLICATE_TXID)"))
// test 8: verify cscc works correctly to get an orderer endpoint from the channel config
// even if the peer does not have a channel config block when bootstrapped from snapshot
By("invoking chaincode without passing orderer endpoint on org4peer0")
invokeWithoutPassingOrdererEndPoint(setup.network, org4peer0, testchannelID, newlifecycleChaincode.Name, "initMarble", "marble-cscctest", "blue", "35", "tom")
// test 9: verify chaincode upgrade and install after bootstrapping
By("upgrading chaincode to version 2.0 on all peers after bootstrapping from snapshot")
newlifecycleChaincode.Version = "2.0"
newlifecycleChaincode.Sequence = "2"
nwo.DeployChaincode(setup.network, testchannelID, setup.orderer, newlifecycleChaincode)
By("deploying a new chaincode on all the peers after bootstrapping from snapshot")
cc2 := nwo.Chaincode{
Name: "kvexecutor",
Version: "1.0",
Path: components.Build("github.com/hyperledger/fabric/integration/chaincode/kvexecutor/cmd"),
Lang: "binary",
SignaturePolicy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member', 'Org4MSP.member')`,
PackageFile: filepath.Join(setup.testDir, "kvexecutor20.tar.gz"),
Label: "kvexecutor-20",
Sequence: "1",
}
nwo.DeployChaincode(setup.network, testchannelID, setup.orderer, cc2)
By("invoking the new chaincode")
kvdata := []kvexecutor.KVData{
{Key: "key1", Value: "value1"},
{Key: "key2", Value: "value2"},
}
invokeAndQueryKVExecutorChaincode(setup.network, setup.orderer, testchannelID, cc2, kvdata, setup.network.PeersWithChannel(testchannelID)...)
})
})
When("chaincode has private data collections", func() {
BeforeEach(func() {
newlifecycleChaincode = nwo.Chaincode{
Name: "marblesp",
Version: "1.0",
Path: components.Build("github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd"),
Lang: "binary",
PackageFile: filepath.Join(setup.testDir, "marbles-pvtdata.tar.gz"),
Label: "marbles-private-20",
SignaturePolicy: `OR ('Org1MSP.member','Org2MSP.member')`,
CollectionsConfig: filepath.Join("testdata", "collection_configs", "collections_config1.json"),
Sequence: "1",
}
// start org3peer0 so that we have majority number of orgs (3 out of 4) to satify the channel config update policy
org3peer0, _ := startPeer(setup, "Org3", "peer0", testchannelID, false)
setup.network.JoinChannel(testchannelID, setup.orderer, org3peer0)
By("ensuring new peer has joined gossip network")
setup.network.VerifyMembership(setup.network.PeersWithChannel(testchannelID), testchannelID)
By("enabling V2_0 capabilities")
channelPeers := setup.network.PeersWithChannel(testchannelID)
nwo.EnableCapabilities(setup.network, testchannelID, "Application", "V2_0", setup.orderer, channelPeers...)
By("deploying newlifecycle chaincode on initial peers (leveldb)")
nwo.DeployChaincode(setup.network, testchannelID, setup.orderer, newlifecycleChaincode)
})
// This test verifies the following:
// bootstrapped peer can pull private data
// bootstrapped peer can supply private data to other bootstrapped peer
It("generates snapshot and bootstraps from snapshots", func() {
org1peer0 := setup.network.Peer("Org1", "peer0")
org2peer0 := setup.network.Peer("Org2", "peer0")
channelPeers := setup.network.PeersWithChannel(testchannelID)
// prepare test data: add and delete marble1, add and transfer marble1
By("adding marble1")
marblechaincodeutil.AddMarble(setup.network, setup.orderer, testchannelID, newlifecycleChaincode.Name,
`{"name":"marble1", "color":"blue", "size":35, "owner":"tom", "price":99}`, org2peer0)
By("deleting marble1")
marblechaincodeutil.DeleteMarble(setup.network, setup.orderer, testchannelID, newlifecycleChaincode.Name,
`{"name":"marble1"}`, org2peer0)
By("verifying the deletion of marble1")
marblechaincodeutil.AssertDoesNotExistInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", channelPeers...)
marblechaincodeutil.AssertDoesNotExistInCollectionMPD(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", channelPeers...)
By("adding marble2")
marblechaincodeutil.AddMarble(setup.network, setup.orderer, testchannelID, newlifecycleChaincode.Name,
`{"name":"marble2", "color":"blue", "size":35, "owner":"tom", "price":99}`, org2peer0)
By("transferring marble2")
marblechaincodeutil.TransferMarble(setup.network, setup.orderer, testchannelID, newlifecycleChaincode.Name,
`{"name":"marble2", "owner":"jerry"}`, org2peer0)
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble2")
By("verifying the new ownership of marble2")
marblechaincodeutil.AssertOwnershipInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble2", "jerry", org1peer0, org2peer0)
// test 1: generate snapshots on 2 peers for the same blockNum and verify they are same
blockNum := nwo.GetLedgerHeight(setup.network, org2peer0, testchannelID) - 1
_, snapshotDir := generateAndCompareSnapshots(setup.network, org1peer0, org2peer0, blockNum)
// test 2: bootstrap a new peer org2peer1 from snapshot and verify pvtdata
By("starting new peer org2peer1 (couchdb)")
org2peer1, couchProc := startPeer(setup, "Org2", "peer1", testchannelID, true)
couchProcess = append(couchProcess, couchProc)
By("installing chaincode on peer org2peer1")
nwo.InstallChaincode(setup.network, newlifecycleChaincode, org2peer1)
By("joining peer org2peer1 to the channel by snapshot")
joinBySnapshot(setup.network, setup.orderer, org2peer1, testchannelID, snapshotDir, blockNum)
By("waiting for pvtdata to be reconciled on org2peer1")
waitForMarblePvtdataReconciliation(setup.network, org2peer1, testchannelID, newlifecycleChaincode.Name, []string{"marble2"})
// verify pvtdata reconciliation after joinbysnapshot
By("verifying marble2 pvtdata reconciliation on org2peer1")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble2", org2peer1)
By("verifying the new ownership of marble2")
marblechaincodeutil.AssertOwnershipInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble2", "jerry", org2peer1)
By("verifying marble1 does not exist")
marblechaincodeutil.AssertDoesNotExistInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", org2peer1)
marblechaincodeutil.AssertDoesNotExistInCollectionMPD(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", org2peer1)
// test 3: submit a request to generate snapshot again on a peer (org2peer1) bootstrapped from a snapshot
blockNumForNextSnapshot := nwo.GetLedgerHeight(setup.network, org2peer1, testchannelID)
By(fmt.Sprintf("generating a snapshot at blockNum %d on org2peer1 that was bootstrapped by a snapshot", blockNumForNextSnapshot))
submitSnapshotRequest(setup.network, testchannelID, blockNumForNextSnapshot, org2peer1, false, "Snapshot request submitted successfully")
// block for marble3 tx is in snapshot, but block for marble4 tx is post snapshot
By("adding marble3")
marblechaincodeutil.AddMarble(setup.network, setup.orderer, testchannelID, newlifecycleChaincode.Name,
`{"name":"marble3", "color":"blue", "size":35, "owner":"tom", "price":99}`, org2peer1)
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble3")
By("adding marble4")
marblechaincodeutil.AddMarble(setup.network, setup.orderer, testchannelID, newlifecycleChaincode.Name,
`{"name":"marble4", "color":"blue", "size":35, "owner":"tom", "price":99}`, org2peer1)
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble4")
By("verifying snapshot completed on org2peer1")
verifyNoPendingSnapshotRequest(setup.network, org2peer1, testchannelID)
nextSnapshotDir := filepath.Join(setup.network.PeerDir(org2peer1), "filesystem", "snapshots", "completed", testchannelID, strconv.Itoa(blockNumForNextSnapshot))
// stop all the peers and only restart org2peer1
setup.stopPeers()
setup.startPeer(org2peer1)
setup.peers = []*nwo.Peer{org2peer1}
setup.network.Peers = setup.peers
// test 4: bootstrap a new peer org2peer2 by snapshot and verify pvtdata reconciliation
By("starting a peer (org2peer2) in existing org (leveldb)")
org2peer2, _ := startPeer(setup, "Org2", "peer2", testchannelID, false)
By("installing new lifecycle chaincode2 on peer org2peer2")
nwo.InstallChaincode(setup.network, newlifecycleChaincode, org2peer2)
By("joining peer org2peer2 to the channel by snapshot")
joinBySnapshot(setup.network, setup.orderer, org2peer2, testchannelID, nextSnapshotDir, blockNumForNextSnapshot)
By("waiting for pvtdata to be reconciled on org2peer2")
waitForMarblePvtdataReconciliation(setup.network, org2peer2, testchannelID, newlifecycleChaincode.Name, []string{"marble2", "marble3", "marble4"})
By("verifying marble4 pvtdata reconciliation on org2peer2")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble4", org2peer2)
By("verifying marble3 pvtdata reconciliation on org2peer2")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble3", org2peer2)
By("verifying marble2 pvtdata reconciliation on org2peer2")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble2", org2peer2)
By("verifying the new ownership of marble2")
marblechaincodeutil.AssertOwnershipInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble2", "jerry", org2peer2)
By("verifying marble1 does not exist")
marblechaincodeutil.AssertDoesNotExistInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", org2peer2)
marblechaincodeutil.AssertDoesNotExistInCollectionMPD(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", org2peer2)
// test 5: bootstrap a new peer Org2peer3 from genesis block to verify pvtdata reconciliation
By("startinging a peer Org2peer3 in an new org (leveldb)")
org2peer3, _ := startPeer(setup, "Org2", "peer3", testchannelID, false)
By("installing newlifecycleChaincode on new peer Org2peer3")
nwo.InstallChaincode(setup.network, newlifecycleChaincode, org2peer3)
By("joining peer Org2peer3 to the channel by genesis block")
setup.network.JoinChannel(testchannelID, setup.orderer, org2peer3)
By("waiting for the new peer to have the same ledger height")
channelHeight := nwo.GetMaxLedgerHeight(setup.network, testchannelID, org2peer1)
nwo.WaitUntilEqualLedgerHeight(setup.network, testchannelID, channelHeight, org2peer3)
By("waiting for pvtdata to be reconciled on org2peer3")
waitForMarblePvtdataReconciliation(setup.network, org2peer3, testchannelID, newlifecycleChaincode.Name, []string{"marble2", "marble3", "marble4"})
By("verifying marble4 pvtdata reconciliation on org2peer3")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble4", org2peer3)
By("verifying marble3 pvtdata reconciliation on org2peer3")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble3", org2peer3)
By("verifying marble2 pvtdata reconciliation on org2peer3")
assertPvtdataPresencePerCollectionConfig1(setup.network, newlifecycleChaincode.Name, "marble2", org2peer3)
By("verifying the new ownership of marble2")
marblechaincodeutil.AssertOwnershipInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble2", "jerry", org2peer3)
By("verifying marble1 does not exist")
marblechaincodeutil.AssertDoesNotExistInCollectionM(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", org2peer3)
marblechaincodeutil.AssertDoesNotExistInCollectionMPD(setup.network, testchannelID, newlifecycleChaincode.Name, "marble1", org2peer3)
// verify pvtdata hash on bootstrapped peers
peers := []*nwo.Peer{org2peer1, org2peer2, org2peer3}
for i := 2; i <= 4; i++ {
name := fmt.Sprintf("marble%d", i)
owner := "tom"
if name == "marble2" {
owner = "jerry"
}
By(fmt.Sprintf("verifying getMarbleHash for %s from all peers that has the chaincode instantiated", name))
expectedBytes := util.ComputeStringHash(fmt.Sprintf(`{"docType":"marble","name":"%s","color":"blue","size":35,"owner":"%s"}`, name, owner))
marblechaincodeutil.AssertMarblesPrivateHashM(setup.network, testchannelID, newlifecycleChaincode.Name, name, expectedBytes, peers)
By(fmt.Sprintf("verifying getMarblePrivateDetailsHash for %s from all peers that has the chaincode instantiated", name))
expectedBytes = util.ComputeStringHash(fmt.Sprintf(`{"docType":"marblePrivateDetails","name":"%s","price":99}`, name))
marblechaincodeutil.AssertMarblesPrivateDetailsHashMPD(setup.network, testchannelID, newlifecycleChaincode.Name, name, expectedBytes, peers)
}
})
})
})
func configPeerWithCouchDB(s *setup, peer *nwo.Peer) ifrit.Process {
couchDB := &runner.CouchDB{}
couchProc := ifrit.Invoke(couchDB)
Eventually(couchProc.Ready(), runner.DefaultStartTimeout).Should(BeClosed())
Consistently(couchProc.Wait()).ShouldNot(Receive())
core := s.network.ReadPeerConfig(peer)
core.Ledger.State.StateDatabase = "CouchDB"
core.Ledger.State.CouchDBConfig.CouchDBAddress = couchDB.Address()
By("configuring peer to couchdb address " + couchDB.Address())
s.network.WritePeerConfig(peer, core)
return couchProc
}
// initAndStartFourOrgsNetwork creates a network with multiple orgs.
// Initially only start Org1.peer0 and Org2.peer0 are started and join the channel.
func initAndStartFourOrgsNetwork() *setup {
var err error
testDir, err := ioutil.TempDir("", "snapshot")
Expect(err).NotTo(HaveOccurred())
client, err := docker.NewClientFromEnv()
Expect(err).NotTo(HaveOccurred())
config := nwo.BasicEtcdRaft()
for _, peer := range config.Peers {
peer.Channels = []*nwo.PeerChannel{
{Name: testchannelID, Anchor: true},
}
}
// add more peers to Org1 and Org2
config.Peers = append(
config.Peers,
&nwo.Peer{
Name: "peer1",
Organization: "Org1",
Channels: []*nwo.PeerChannel{},
},
&nwo.Peer{
Name: "peer1",
Organization: "Org2",
Channels: []*nwo.PeerChannel{},
},
&nwo.Peer{
Name: "peer2",
Organization: "Org2",
Channels: []*nwo.PeerChannel{},
},
&nwo.Peer{
Name: "peer3",
Organization: "Org2",
Channels: []*nwo.PeerChannel{},
},
)
// add org3 with one peer
config.Organizations = append(config.Organizations, &nwo.Organization{
Name: "Org3",
MSPID: "Org3MSP",
Domain: "org3.example.com",
EnableNodeOUs: true,
Users: 2,
CA: &nwo.CA{Hostname: "ca"},
})
config.Profiles[0].Organizations = append(config.Profiles[0].Organizations, "Org3")
config.Peers = append(config.Peers, &nwo.Peer{
Name: "peer0",
Organization: "Org3",
Channels: []*nwo.PeerChannel{
{Name: testchannelID, Anchor: true},
},
})
// add org4 with one peer
config.Organizations = append(config.Organizations, &nwo.Organization{
Name: "Org4",
MSPID: "Org4MSP",
Domain: "org4.example.com",
EnableNodeOUs: true,
Users: 2,
CA: &nwo.CA{Hostname: "ca"},
})
config.Profiles[0].Organizations = append(config.Profiles[0].Organizations, "Org4")
config.Peers = append(config.Peers, &nwo.Peer{
Name: "peer0",
Organization: "Org4",
Channels: []*nwo.PeerChannel{
{Name: testchannelID, Anchor: true},
},
})
n := nwo.New(config, testDir, client, StartPort(), components)
n.GenerateConfigTree()
n.Bootstrap()
// set ReconcileSleepInterval to 1 second to reconcile pvtdata faster
// set DeprioritizedDataReconcilerInterval to 2 seconds to resume reconciliation quickly
// to prevent CI flake in case peer connection is temporarily lost.
for _, p := range n.Peers {
core := n.ReadPeerConfig(p)
core.Peer.Gossip.PvtData.ReconcileSleepInterval = 1 * time.Second
core.Ledger.PvtdataStore.DeprioritizedDataReconcilerInterval = 2 * time.Second
n.WritePeerConfig(p, core)
}
// set org2peer2 and org2peer3's gossip bootstrap endpoints pointing to org2peer1
org2peer1 := n.Peer("Org2", "peer1")
for _, p := range []*nwo.Peer{n.Peer("Org2", "peer2"), n.Peer("Org2", "peer3")} {
core := n.ReadPeerConfig(p)
core.Peer.Gossip.Bootstrap = n.PeerAddress(org2peer1, nwo.ListenPort)
n.WritePeerConfig(p, core)
}
// only keep Org1.peer0 and Org2.peer0 so we can add other peers back later to test join channel by snapshot
peers := []*nwo.Peer{}
for _, p := range n.Peers {
if p.ID() == "Org1.peer0" || p.ID() == "Org2.peer0" {
peers = append(peers, p)
}
}
n.Peers = peers
setup := &setup{
testDir: testDir,
network: n,
peers: peers,
channelID: testchannelID,
orderer: n.Orderer("orderer"),
}
Expect(setup.testDir).To(Equal(setup.network.RootDir))
By("starting orderer")
setup.startOrderer()
By("starting peers")
setup.startPeers()
By("creating and joining testchannel")
channelparticipation.JoinOrdererJoinPeersAppChannel(setup.network, "testchannel", setup.orderer, setup.ordererRunner)
By("verifying membership for testchannel")
n.VerifyMembership(n.PeersWithChannel(testchannelID), testchannelID)
return setup
}
// verifySnapshotRequestCmds invokes snapshot commands and verify the expected rersults.
// At the end, there will be no pending request.
func verifySnapshotRequestCmds(n *nwo.Network, peer *nwo.Peer, channel string, blockNum int) {
By("submitting snaphost request for a future blockNum, expecting success")
submitSnapshotRequest(n, channel, blockNum+10, peer, false, "Snapshot request submitted successfully")
By("submitting snaphost request at same blockNum again, expecting error")
submitSnapshotRequest(n, channel, blockNum+10, peer, true,
fmt.Sprintf("duplicate snapshot request for block number %d", blockNum+10))
By("submitting snaphost request for a previous blockNum, expecting error")
submitSnapshotRequest(n, channel, blockNum-1, peer, true,
fmt.Sprintf("requested snapshot for block number %d cannot be less than the last committed block number %d", blockNum-1, blockNum))
By("listing pending snaphost requests, expecting success")
pendingRequests := listPendingSnapshotRequests(n, channel, peer, n.PeerAddress(peer, nwo.ListenPort), false)
Expect(pendingRequests).To(ContainSubstring(fmt.Sprintf("Successfully got pending snapshot requests: [%d]\n", blockNum+10)))
By("canceling a pending snaphost request, expecting success")
cancelSnapshotRequest(n, channel, blockNum+10, peer, n.PeerAddress(peer, nwo.ListenPort), false, "Snapshot request cancelled successfully")
By("canceling the same snaphost request, expecting error")
cancelSnapshotRequest(n, channel, blockNum+10, peer, n.PeerAddress(peer, nwo.ListenPort), true,
fmt.Sprintf("no snapshot request exists for block number %d", blockNum+10))
By("listing pending snaphost requests, expecting success")
pendingRequests = listPendingSnapshotRequests(n, channel, peer, n.PeerAddress(peer, nwo.ListenPort), false)
Expect(pendingRequests).To(ContainSubstring("Successfully got pending snapshot requests: []\n"))
}
func generateAndCompareSnapshots(n *nwo.Network, peer1, peer2 *nwo.Peer, blockNumForSnapshot int) (string, string) {
By(fmt.Sprintf("submitting snapshot request at blockNum %d on peer %s", blockNumForSnapshot, peer1.ID()))
submitSnapshotRequest(n, testchannelID, blockNumForSnapshot, peer1, false, "Snapshot request submitted successfully")
By(fmt.Sprintf("submitting snaphost request at blockNum %d on peer %s", blockNumForSnapshot, peer2.ID()))
submitSnapshotRequest(n, testchannelID, blockNumForSnapshot, peer2, false, "Snapshot request submitted successfully")
By("verifying snapshot completed on peer1")
verifyNoPendingSnapshotRequest(n, peer1, testchannelID)
By("verifying snapshot completed on peer2")
verifyNoPendingSnapshotRequest(n, peer2, testchannelID)
By("comparing snapshot metadata generated on different peers for the same block number")
snapshotDir1 := filepath.Join(n.PeerDir(peer1), "filesystem", "snapshots", "completed", testchannelID, strconv.Itoa(blockNumForSnapshot))
snapshotDir2 := filepath.Join(n.PeerDir(peer2), "filesystem", "snapshots", "completed", testchannelID, strconv.Itoa(blockNumForSnapshot))
compareSnapshotMetadata(snapshotDir1, snapshotDir2)
return snapshotDir1, snapshotDir2
}
func verifyNoPendingSnapshotRequest(n *nwo.Network, peer *nwo.Peer, channelID string) {
checkPending := func() []byte {
return listPendingSnapshotRequests(n, channelID, peer, n.PeerAddress(peer, nwo.ListenPort), false)
}
Eventually(checkPending, n.EventuallyTimeout, 10*time.Second).Should(ContainSubstring("Successfully got pending snapshot requests: []\n"))
}
func submitSnapshotRequest(n *nwo.Network, channel string, blockNum int, peer *nwo.Peer, expectedError bool, expectedMsg string) {
sess, err := n.PeerAdminSession(peer, commands.SnapshotSubmitRequest{
ChannelID: channel,
BlockNumber: strconv.Itoa(blockNum),
ClientAuth: n.ClientAuthRequired,
PeerAddress: n.PeerAddress(peer, nwo.ListenPort),
})
Expect(err).NotTo(HaveOccurred())
if !expectedError {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(expectedMsg))
} else {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(1))
Expect(sess.Err).To(gbytes.Say(expectedMsg))
}
}
func cancelSnapshotRequest(n *nwo.Network, channel string, blockNum int, peer *nwo.Peer, peerAddress string, expectedError bool, expectedMsg string) {
sess, err := n.PeerAdminSession(peer, commands.SnapshotCancelRequest{
ChannelID: channel,
BlockNumber: strconv.Itoa(blockNum),
ClientAuth: n.ClientAuthRequired,
PeerAddress: peerAddress,
})
Expect(err).NotTo(HaveOccurred())
if !expectedError {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(expectedMsg))
} else {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(1))
Expect(sess.Err).To(gbytes.Say(expectedMsg))
}
}
func listPendingSnapshotRequests(n *nwo.Network, channel string, peer *nwo.Peer, peerAddress string, expectedError bool) []byte {
sess, err := n.PeerAdminSession(peer, commands.SnapshotListPending{
ChannelID: channel,
ClientAuth: n.ClientAuthRequired,
PeerAddress: peerAddress,
})
Expect(err).NotTo(HaveOccurred())
if expectedError {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(1))
return sess.Err.Contents()
}
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
return sess.Buffer().Contents()
}
func compareSnapshotMetadata(snapshotDir1, snapshotDir2 string) {
for _, snapshotDir := range []string{snapshotDir1, snapshotDir2} {
By("verifying snapshot dir exists: " + snapshotDir)
Expect(snapshotDir).To(BeADirectory())
}
// compare metadata files
for _, file := range []string{"_snapshot_signable_metadata.json", "_snapshot_additional_metadata.json"} {
By("comparing metadata file from snapshots on multiple peers: " + file)
fileContent1, err := ioutil.ReadFile(filepath.Join(snapshotDir1, file))
Expect(err).NotTo(HaveOccurred())
fileContent2, err := ioutil.ReadFile(filepath.Join(snapshotDir2, file))
Expect(err).NotTo(HaveOccurred())
Expect(fileContent1).To(Equal(fileContent2))
}
}
// startPeer starts a peer to prepare for join channel test
func startPeer(s *setup, orgName, peerName, channelID string, useCouchDB bool) (*nwo.Peer, ifrit.Process) {
peer := &nwo.Peer{
Name: peerName,
Organization: orgName,
Channels: []*nwo.PeerChannel{
{Name: channelID},
},
}
s.network.Peers = append(s.network.Peers, peer)
s.peers = append(s.peers, peer)
var couchProc ifrit.Process
if useCouchDB {
By("starting couch process and configuring it for peer " + peer.ID())
couchProc = configPeerWithCouchDB(s, peer)
}
By("starting the new peer " + peer.ID())
s.startPeer(peer)
return peer, couchProc
}
func joinBySnapshot(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channelID string, snapshotDir string, lastBlockInSnapshot int) {
channelHeight := nwo.GetMaxLedgerHeight(n, channelID, n.PeersWithChannel(channelID)...)
By(fmt.Sprintf("joining a peer via snapshot %s", snapshotDir))
n.JoinChannelBySnapshot(snapshotDir, peer)
By("calling JoinBySnapshotStatus")
checkStatus := func() string { return n.JoinBySnapshotStatus(peer) }
Eventually(checkStatus, n.EventuallyTimeout, 10*time.Second).Should(ContainSubstring("No joinbysnapshot operation is in progress"))
By("waiting for the new peer to have the same ledger height")
nwo.WaitUntilEqualLedgerHeight(n, channelID, channelHeight, peer)
By("verifying blockchain info on peer " + peer.ID())
sess, err := n.PeerUserSession(peer, "Admin", commands.ChannelInfo{
ChannelID: channelID,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
channelInfoStr := strings.TrimPrefix(string(sess.Buffer().Contents()[:]), "Blockchain info:")
bcInfo := cb.BlockchainInfo{}
err = json.Unmarshal([]byte(channelInfoStr), &bcInfo)
Expect(err).NotTo(HaveOccurred())
Expect(bcInfo.Height).To(Equal(uint64(channelHeight)))
Expect(bcInfo.BootstrappingSnapshotInfo.LastBlockInSnapshot).To(Equal(uint64(lastBlockInSnapshot)))
}
func verifyQSCC(n *nwo.Network, peer *nwo.Peer, channelID string, lastBlockInSnapshot int, txidBeforeSnapshot string) {
peerID := peer.ID()
By("verifying qscc GetBlockByNumber returns an error for block number before snapshot on peer " + peerID)
resp := callQSCC(n, peer, "qscc", "GetBlockByNumber", 1, channelID, strconv.Itoa(lastBlockInSnapshot))
Expect(resp).To(ContainSubstring(fmt.Sprintf("The ledger is bootstrapped from a snapshot. First available block = [%d]", lastBlockInSnapshot+1)))
By("verifying qscc GetBlockByNumber succeeds for a block number after snapshot on peer " + peerID)
callQSCC(n, peer, "qscc", "GetBlockByNumber", 0, channelID, fmt.Sprintf("%d", lastBlockInSnapshot+1))
By("verifying qscc GetBlockByTxID returns an error for a txid before snapshot on peer " + peerID)
resp = callQSCC(n, peer, "qscc", "GetBlockByTxID", 1, channelID, txidBeforeSnapshot)
Expect(resp).To(ContainSubstring(fmt.Sprintf("Failed to get block for txID %s, error details for the TXID [%s] not available. Ledger bootstrapped from a snapshot. First available block = [%d]",
txidBeforeSnapshot, txidBeforeSnapshot, lastBlockInSnapshot+1)))
By("verifying qscc GetBlockByTxID succeeds for a txid after snapshot on peer " + peerID)
_, newTxid := getTxFromLastBlock(n, peer)
callQSCC(n, peer, "qscc", "GetBlockByTxID", 0, channelID, newTxid)
By("verifying qscc GetTransactionByID returns an error for a txid before snapshot on peer " + peerID)
resp = callQSCC(n, peer, "qscc", "GetTransactionByID", 1, channelID, txidBeforeSnapshot)
Expect(resp).To(ContainSubstring(fmt.Sprintf("Failed to get transaction with id %s, error details for the TXID [%s] not available. Ledger bootstrapped from a snapshot. First available block = [%d]",
txidBeforeSnapshot, txidBeforeSnapshot, lastBlockInSnapshot+1)))
By("verifying qscc GetTransactionByID succeeds for a txid after snapshot on peer " + peerID)
callQSCC(n, peer, "qscc", "GetTransactionByID", 0, channelID, newTxid)
}
func installAndApproveChaincode(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channelID string, chaincode nwo.Chaincode, orgNames []string) {
nwo.InstallChaincode(n, chaincode, peer)
checkOrgs := make([]*nwo.Organization, len(orgNames))
for i, orgName := range orgNames {
checkOrgs[i] = n.Organization(orgName)
}
nwo.ApproveChaincodeForMyOrg(n, channelID, orderer, chaincode, n.PeersInOrg(peer.Organization)...)
nwo.EnsureChaincodeCommitted(n, channelID, chaincode.Name, chaincode.Version, chaincode.Sequence, checkOrgs, peer)
}
// getTxFromLastBlock gets a transaction id from the latest block that has been
// marshaled and stored on the filesystem
func getTxFromLastBlock(n *nwo.Network, peer *nwo.Peer) (*cb.Envelope, string) {
blockfile := filepath.Join(n.RootDir, "newest_block.pb")
fetchNewest := commands.ChannelFetch{
ChannelID: "testchannel",
Block: "newest",
OutputFile: blockfile,
}
sess, err := n.PeerAdminSession(peer, fetchNewest)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess.Err).To(gbytes.Say("Received block: "))
block := nwo.UnmarshalBlockFromFile(blockfile)
txEnvelope, err := protoutil.UnmarshalEnvelope(block.Data.Data[0])
Expect(err).NotTo(HaveOccurred())
txID, err := protoutil.GetOrComputeTxIDFromEnvelope(block.Data.Data[0])
Expect(err).NotTo(HaveOccurred())
return txEnvelope, txID
}
func invokeAndQueryKVExecutorChaincode(n *nwo.Network, orderer *nwo.Orderer, channelID string, chaincode nwo.Chaincode, kvdata []kvexecutor.KVData, peers ...*nwo.Peer) {
By("invoking kvexecutor chaincode")
writeInputBytes, err := json.Marshal(kvdata)
Expect(err).NotTo(HaveOccurred())
writeInputBase64 := base64.StdEncoding.EncodeToString(writeInputBytes)
peerAddresses := make([]string, 0)
for _, peer := range peers {
peerAddresses = append(peerAddresses, n.PeerAddress(peer, nwo.ListenPort))
}
invokeCommand := commands.ChaincodeInvoke{
ChannelID: channelID,
Orderer: n.OrdererAddress(orderer, nwo.ListenPort),
Name: chaincode.Name,
Ctor: fmt.Sprintf(`{"Args":["readWriteKVs","%s","%s"]}`, "", writeInputBase64),
PeerAddresses: peerAddresses,
WaitForEvent: true,
}
invokeChaincode(n, peers[0], invokeCommand)
channelPeers := n.PeersWithChannel(channelID)
nwo.WaitUntilEqualLedgerHeight(n, channelID, nwo.GetLedgerHeight(n, peers[0], channelID), channelPeers...)
By("querying kvexecutor chaincode")
expectedMsg, err := json.Marshal(kvdata)
Expect(err).NotTo(HaveOccurred())
readInputBytes, err := json.Marshal(kvdata)
Expect(err).NotTo(HaveOccurred())
readInputBase64 := base64.StdEncoding.EncodeToString(readInputBytes)
querycommand := commands.ChaincodeQuery{
ChannelID: channelID,
Name: chaincode.Name,
Ctor: fmt.Sprintf(`{"Args":["readWriteKVs","%s","%s"]}`, readInputBase64, ""),
}
queryChaincode(n, peers[0], querycommand, string(expectedMsg), true)
}
func invokeChaincode(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeInvoke) {
sess, err := n.PeerUserSession(peer, "User1", command)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful."))
}
func queryChaincode(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeQuery, expectedMessage string, expectSuccess bool) {
sess, err := n.PeerUserSession(peer, "User1", command)
Expect(err).NotTo(HaveOccurred())
if expectSuccess {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(expectedMessage))
} else {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit())
Expect(sess.Err).To(gbytes.Say(expectedMessage))
}
}
func callQSCC(n *nwo.Network, peer *nwo.Peer, scc, operation string, retCode int, args ...string) []byte {
args = append([]string{operation}, args...)
chaincodeQuery := commands.ChaincodeQuery{
ChannelID: testchannelID,
Name: scc,
Ctor: toCLIChaincodeArgs(args...),
}
sess, err := n.PeerAdminSession(peer, chaincodeQuery)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(retCode))
if retCode != 0 {
return sess.Err.Contents()
}
return sess.Out.Contents()
}
func toCLIChaincodeArgs(args ...string) string {
type cliArgs struct {
Args []string
}
cArgs := &cliArgs{Args: args}
cArgsJSON, err := json.Marshal(cArgs)
Expect(err).NotTo(HaveOccurred())
return string(cArgsJSON)
}
// waitForMarblePvtdataReconciliation queries the chaincode until it returns a success exit code, which means the data is available.
func waitForMarblePvtdataReconciliation(n *nwo.Network, peer *nwo.Peer, channelID, chaincodeName string, marbleNames []string) {
for _, marbleName := range marbleNames {
for _, funcName := range []string{"readMarble", "readMarblePrivateDetails"} {
query := fmt.Sprintf(`{"Args":["%s","%s"]}`, funcName, marbleName)
command := commands.ChaincodeQuery{
ChannelID: channelID,
Name: chaincodeName,
Ctor: query,
}
queryData := func() int {
sess, err := n.PeerUserSession(peer, "User1", command)
Expect(err).NotTo(HaveOccurred())
return sess.Wait(n.EventuallyTimeout).ExitCode()
}
Eventually(queryData, n.EventuallyTimeout).Should(Equal(0))
}
}
}
func assertPvtdataPresencePerCollectionConfig1(n *nwo.Network, chaincodeName, marbleName string, peers ...*nwo.Peer) {
if len(peers) == 0 {
peers = n.Peers
}
for _, peer := range peers {
switch peer.Organization {
case "Org1":
By("asserting collection data M in org1 peer " + peer.ID() + " for " + marbleName)
marblechaincodeutil.AssertPresentInCollectionM(n, testchannelID, chaincodeName, marbleName, peer)
By("asserting no collection data MPD in org1 peer " + peer.ID() + " for " + marbleName)
marblechaincodeutil.AssertNotPresentInCollectionMPD(n, testchannelID, chaincodeName, marbleName, peer)
case "Org2":
By("asserting collection data M in org2 peer " + peer.ID() + " for " + marbleName)
marblechaincodeutil.AssertPresentInCollectionM(n, testchannelID, chaincodeName, marbleName, peer)
By("asserting collection data MPD in org2 peer " + peer.ID() + " for " + marbleName)
marblechaincodeutil.AssertPresentInCollectionMPD(n, testchannelID, chaincodeName, marbleName, peer)
case "Org3":
By("asserting no collection data M in org3 peer " + peer.ID() + " for " + marbleName)
marblechaincodeutil.AssertNotPresentInCollectionM(n, testchannelID, chaincodeName, marbleName, peer)
By("asserting collection data MPD in org3 peer " + peer.ID() + " for " + marbleName)
marblechaincodeutil.AssertPresentInCollectionMPD(n, testchannelID, chaincodeName, marbleName, peer)
}
}
}
// invokeWithoutPassingOrdererEndPoint does not pass orderer endpoint to a chaincode invoke command.
// As a result, the command will send a cscc query to the peer and cscc will return the orderer endpoint from the channel config.
func invokeWithoutPassingOrdererEndPoint(n *nwo.Network, peer *nwo.Peer, channelID, chaincodeName string, funcAndArgs ...string) {
command := commands.ChaincodeInvoke{
ChannelID: channelID,
Name: chaincodeName,
Ctor: prepareChaincodeInvokeArgs(funcAndArgs...),
PeerAddresses: []string{
n.PeerAddress(peer, nwo.ListenPort),
},
WaitForEvent: true,
}
invokeChaincode(n, peer, command)
nwo.WaitUntilEqualLedgerHeight(n, channelID, nwo.GetLedgerHeight(n, peer, channelID), n.PeersWithChannel(channelID)...)
}
// commitTx commits a transaction for a given transaction envelope
func commitTx(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channelID string, tx *cb.Envelope, txid string) error {
By("getting the signer for user1 on peer " + peer.ID())
signer := n.PeerUserSigner(peer, "User1")
By("creating the deliver client to peer " + peer.ID())
pcc := n.PeerClientConn(peer)
defer pcc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
df, err := pb.NewDeliverClient(pcc).DeliverFiltered(ctx)
Expect(err).NotTo(HaveOccurred())
defer df.CloseSend()
By("starting filtered delivery on peer " + peer.ID())
deliverEnvelope, err := protoutil.CreateSignedEnvelope(
cb.HeaderType_DELIVER_SEEK_INFO,
channelID,
signer,
&ab.SeekInfo{
Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
Start: &ab.SeekPosition{
Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}},
},
Stop: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{Number: math.MaxUint64},
},
},
},
0,
0,
)
Expect(err).NotTo(HaveOccurred())
err = df.Send(deliverEnvelope)
Expect(err).NotTo(HaveOccurred())
By("creating orderer client and send transaction to the orderer" + orderer.ID())
occ := n.OrdererClientConn(orderer)
defer occ.Close()
broadcastClient, err := ab.NewAtomicBroadcastClient(occ).Broadcast(context.Background())
Expect(err).NotTo(HaveOccurred())
err = broadcastClient.Send(tx)
Expect(err).NotTo(HaveOccurred())
By("waiting for deliver event on peer " + peer.ID())
for {
resp, err := df.Recv()
if err != nil {
return err
}
fb, ok := resp.Type.(*pb.DeliverResponse_FilteredBlock)
if !ok {
return fmt.Errorf("unexpected filtered block, received %T", resp.Type)
}
for _, tx := range fb.FilteredBlock.FilteredTransactions {
if tx.Txid != txid {
continue
}
if tx.TxValidationCode != pb.TxValidationCode_VALID {
return fmt.Errorf("transaction invalidated with status (%s)", tx.TxValidationCode)
}
return nil
}
}
}