397 lines
16 KiB
Go
397 lines
16 KiB
Go
/*
|
|
* Copyright IBM Corp. All Rights Reserved.
|
|
*
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package gossip
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"syscall"
|
|
"time"
|
|
|
|
docker "github.com/fsouza/go-dockerclient"
|
|
"github.com/hyperledger/fabric/integration/channelparticipation"
|
|
"github.com/hyperledger/fabric/integration/nwo"
|
|
"github.com/hyperledger/fabric/integration/nwo/commands"
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
"github.com/onsi/gomega/gbytes"
|
|
"github.com/onsi/gomega/gexec"
|
|
"github.com/tedsuo/ifrit"
|
|
ginkgomon "github.com/tedsuo/ifrit/ginkgomon_v2"
|
|
)
|
|
|
|
var _ = Describe("Gossip State Transfer and Membership", func() {
|
|
var (
|
|
testDir string
|
|
network *nwo.Network
|
|
nwprocs *networkProcesses
|
|
chaincode nwo.Chaincode
|
|
channelName string
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
var err error
|
|
testDir, err = ioutil.TempDir("", "gossip-statexfer")
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
dockerClient, err := docker.NewClientFromEnv()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
channelName = "testchannel"
|
|
network = nwo.New(nwo.FullEtcdRaft(), testDir, dockerClient, StartPort(), components)
|
|
network.GenerateConfigTree()
|
|
|
|
nwprocs = &networkProcesses{
|
|
network: network,
|
|
peerRunners: map[string]*ginkgomon.Runner{},
|
|
peerProcesses: map[string]ifrit.Process{},
|
|
}
|
|
|
|
chaincode = nwo.Chaincode{
|
|
Name: "mycc",
|
|
Version: "0.0",
|
|
Path: "github.com/hyperledger/fabric/integration/chaincode/simple/cmd",
|
|
Ctor: `{"Args":["init","a","100","b","200"]}`,
|
|
Policy: `OR ('Org1MSP.member','Org2MSP.member')`,
|
|
}
|
|
})
|
|
|
|
AfterEach(func() {
|
|
if nwprocs != nil {
|
|
nwprocs.terminateAll()
|
|
}
|
|
if network != nil {
|
|
network.Cleanup()
|
|
}
|
|
os.RemoveAll(testDir)
|
|
})
|
|
|
|
It("syncs blocks from the peer via state transfer when no orderer is available", func() {
|
|
// modify peer config to enable state transfer on all peers, and configure leaders as follows:
|
|
// Org1: leader election
|
|
// Org2: no leader election
|
|
// peer0: follower
|
|
// peer1: leader
|
|
for _, peer := range network.Peers {
|
|
if peer.Organization == "Org1" {
|
|
if peer.Name == "peer0" {
|
|
core := network.ReadPeerConfig(peer)
|
|
core.Peer.Gossip.State.Enabled = true
|
|
core.Peer.Gossip.UseLeaderElection = true
|
|
core.Peer.Gossip.OrgLeader = false
|
|
network.WritePeerConfig(peer, core)
|
|
}
|
|
if peer.Name == "peer1" {
|
|
core := network.ReadPeerConfig(peer)
|
|
core.Peer.Gossip.State.Enabled = true
|
|
core.Peer.Gossip.UseLeaderElection = true
|
|
core.Peer.Gossip.OrgLeader = false
|
|
core.Peer.Gossip.Bootstrap = fmt.Sprintf("127.0.0.1:%d", network.ReservePort())
|
|
network.WritePeerConfig(peer, core)
|
|
}
|
|
}
|
|
if peer.Organization == "Org2" {
|
|
core := network.ReadPeerConfig(peer)
|
|
core.Peer.Gossip.State.Enabled = true
|
|
core.Peer.Gossip.UseLeaderElection = false
|
|
core.Peer.Gossip.OrgLeader = peer.Name == "peer1"
|
|
network.WritePeerConfig(peer, core)
|
|
}
|
|
}
|
|
|
|
network.Bootstrap()
|
|
orderer := network.Orderer("orderer")
|
|
nwprocs.ordererRunner = network.OrdererRunner(orderer)
|
|
nwprocs.ordererProcess = ifrit.Invoke(nwprocs.ordererRunner)
|
|
Eventually(nwprocs.ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())
|
|
|
|
peer0Org1, peer1Org1 := network.Peer("Org1", "peer0"), network.Peer("Org1", "peer1")
|
|
peer0Org2, peer1Org2 := network.Peer("Org2", "peer0"), network.Peer("Org2", "peer1")
|
|
|
|
By("bringing up all four peers")
|
|
startPeers(nwprocs, false, peer0Org1, peer1Org1, peer0Org2, peer1Org2)
|
|
|
|
channelparticipation.JoinOrdererAppChannel(network, "testchannel", orderer, nwprocs.ordererRunner)
|
|
|
|
By("joining all peers to channel")
|
|
network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1, peer0Org2, peer1Org2)
|
|
|
|
// base peer will be used for chaincode interactions
|
|
basePeerForTransactions := peer0Org1
|
|
nwo.DeployChaincodeLegacy(network, channelName, orderer, chaincode, basePeerForTransactions)
|
|
|
|
By("verifying peer0Org1 discovers all the peers and the legacy chaincode before starting the tests")
|
|
Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer0Org1, "_lifecycle", "mycc"),
|
|
network.DiscoveredPeer(peer1Org1, "_lifecycle"),
|
|
network.DiscoveredPeer(peer0Org2, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org2, "_lifecycle"),
|
|
))
|
|
|
|
By("STATE TRANSFER TEST 1: newly joined peers should receive blocks from the peers that are already up")
|
|
|
|
// Note, a better test would be to bring orderer down before joining the two peers.
|
|
// However, network.JoinChannel() requires orderer to be up so that genesis block can be fetched from orderer before joining peers.
|
|
// Therefore, for now we've joined all four peers and stop the two peers that should be synced up.
|
|
stopPeers(nwprocs, peer1Org1, peer1Org2)
|
|
|
|
By("confirming peer0Org1 was elected to be a leader")
|
|
expectedMsg := "Elected as a leader, starting delivery service for channel testchannel"
|
|
Eventually(nwprocs.peerRunners[peer0Org1.ID()].Err(), network.EventuallyTimeout).Should(gbytes.Say(expectedMsg))
|
|
|
|
sendTransactionsAndSyncUpPeers(nwprocs, orderer, basePeerForTransactions, channelName, peer1Org1, peer1Org2)
|
|
|
|
By("STATE TRANSFER TEST 2: restarted peers should receive blocks from the peers that are already up")
|
|
basePeerForTransactions = peer1Org1
|
|
nwo.InstallChaincodeLegacy(network, chaincode, basePeerForTransactions)
|
|
|
|
By("verifying peer0Org1 discovers all the peers and the additional legacy chaincode installed on peer1Org1")
|
|
Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer0Org1, "_lifecycle", "mycc"),
|
|
network.DiscoveredPeer(peer1Org1, "_lifecycle", "mycc"),
|
|
network.DiscoveredPeer(peer0Org2, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org2, "_lifecycle"),
|
|
))
|
|
|
|
By("stopping peer0Org1 (currently elected leader in Org1) and peer1Org2 (static leader in Org2)")
|
|
stopPeers(nwprocs, peer0Org1, peer1Org2)
|
|
|
|
By("confirming peer1Org1 was elected to be a leader")
|
|
Eventually(nwprocs.peerRunners[peer1Org1.ID()].Err(), network.EventuallyTimeout).Should(gbytes.Say(expectedMsg))
|
|
|
|
// Note that with the static leader in Org2 down, the static follower peer0Org2 will also get blocks via state transfer
|
|
// This effectively tests leader election as well, since the newly elected leader in Org1 (peer1Org1) will be the only peer
|
|
// that receives blocks from orderer and will therefore serve as the provider of blocks to all other peers.
|
|
sendTransactionsAndSyncUpPeers(nwprocs, orderer, basePeerForTransactions, channelName, peer0Org1, peer1Org2)
|
|
|
|
By("verifying peer0Org1 can still discover all the peers and the legacy chaincode after it has been restarted")
|
|
Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer0Org1, "_lifecycle", "mycc"),
|
|
network.DiscoveredPeer(peer1Org1, "_lifecycle", "mycc"),
|
|
network.DiscoveredPeer(peer0Org2, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org2, "_lifecycle"),
|
|
))
|
|
})
|
|
|
|
When("gossip connection is lost and restored", func() {
|
|
var (
|
|
orderer *nwo.Orderer
|
|
peerEndpoints map[string]string = map[string]string{}
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
// modify peer config
|
|
for _, peer := range network.Peers {
|
|
core := network.ReadPeerConfig(peer)
|
|
core.Peer.Gossip.AliveTimeInterval = 1 * time.Second
|
|
core.Peer.Gossip.AliveExpirationTimeout = 2 * core.Peer.Gossip.AliveTimeInterval
|
|
core.Peer.Gossip.ReconnectInterval = 2 * time.Second
|
|
core.Peer.Gossip.MsgExpirationFactor = 2
|
|
core.Peer.Gossip.MaxConnectionAttempts = 10
|
|
network.WritePeerConfig(peer, core)
|
|
peerEndpoints[peer.ID()] = core.Peer.Address
|
|
}
|
|
|
|
network.Bootstrap()
|
|
orderer = network.Orderer("orderer")
|
|
nwprocs.ordererRunner = network.OrdererRunner(orderer)
|
|
nwprocs.ordererProcess = ifrit.Invoke(nwprocs.ordererRunner)
|
|
Eventually(nwprocs.ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())
|
|
})
|
|
|
|
It("updates membership when peers in the same org are stopped and restarted", func() {
|
|
peer0Org1 := network.Peer("Org1", "peer0")
|
|
peer1Org1 := network.Peer("Org1", "peer1")
|
|
|
|
By("bringing up all peers")
|
|
startPeers(nwprocs, false, peer0Org1, peer1Org1)
|
|
|
|
By("creating and joining a channel")
|
|
channelparticipation.JoinOrdererAppChannel(network, "testchannel", orderer, nwprocs.ordererRunner)
|
|
|
|
network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1)
|
|
|
|
By("verifying peer1Org1 discovers all the peers before testing membership change on it")
|
|
Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer0Org1, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org1, "_lifecycle"),
|
|
))
|
|
|
|
By("verifying membership change on peer1Org1 when an anchor peer in the same org is stopped and restarted")
|
|
expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org1.ID()])
|
|
assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org1}, nwprocs, expectedMsgFromExpirationCallback)
|
|
|
|
By("verifying peer0Org1 discovers all the peers before testing membership change on it")
|
|
Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer0Org1, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org1, "_lifecycle"),
|
|
))
|
|
|
|
By("verifying membership change on peer0Org1 when a non-anchor peer in the same org is stopped and restarted")
|
|
expectedMsgFromExpirationCallback = fmt.Sprintf("Removing member: Endpoint: %s", peerEndpoints[peer1Org1.ID()])
|
|
assertPeerMembershipUpdate(network, peer0Org1, []*nwo.Peer{peer1Org1}, nwprocs, expectedMsgFromExpirationCallback)
|
|
})
|
|
|
|
It("updates peer membership when peers in another org are stopped and restarted", func() {
|
|
peer0Org1, peer1Org1 := network.Peer("Org1", "peer0"), network.Peer("Org1", "peer1")
|
|
peer0Org2, peer1Org2 := network.Peer("Org2", "peer0"), network.Peer("Org2", "peer1")
|
|
|
|
By("bringing up all peers")
|
|
startPeers(nwprocs, false, peer0Org1, peer1Org1, peer0Org2, peer1Org2)
|
|
|
|
By("creating and joining a channel")
|
|
channelparticipation.JoinOrdererAppChannel(network, "testchannel", orderer, nwprocs.ordererRunner)
|
|
network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1, peer0Org2, peer1Org2)
|
|
|
|
By("verifying membership on peer1Org1")
|
|
Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer0Org1, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org1, "_lifecycle"),
|
|
network.DiscoveredPeer(peer0Org2, "_lifecycle"),
|
|
network.DiscoveredPeer(peer1Org2, "_lifecycle"),
|
|
))
|
|
|
|
By("stopping anchor peer peer0Org1 to have only one peer in org1")
|
|
stopPeers(nwprocs, peer0Org1)
|
|
|
|
By("verifying peer membership update when peers in another org are stopped and restarted")
|
|
expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org2.ID()])
|
|
assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org2, peer1Org2}, nwprocs, expectedMsgFromExpirationCallback)
|
|
})
|
|
})
|
|
})
|
|
|
|
func runTransactions(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, chaincodeName string, channelID string) {
|
|
for i := 0; i < 5; i++ {
|
|
sess, err := n.PeerUserSession(peer, "User1", commands.ChaincodeInvoke{
|
|
ChannelID: channelID,
|
|
Orderer: n.OrdererAddress(orderer, nwo.ListenPort),
|
|
Name: chaincodeName,
|
|
Ctor: `{"Args":["invoke","a","b","10"]}`,
|
|
PeerAddresses: []string{
|
|
n.PeerAddress(peer, nwo.ListenPort),
|
|
},
|
|
WaitForEvent: true,
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
|
|
Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful. result: status:200"))
|
|
}
|
|
}
|
|
|
|
// networkProcesses holds references to the network, its runners, and processes.
|
|
type networkProcesses struct {
|
|
network *nwo.Network
|
|
|
|
ordererRunner *ginkgomon.Runner
|
|
ordererProcess ifrit.Process
|
|
|
|
peerRunners map[string]*ginkgomon.Runner
|
|
peerProcesses map[string]ifrit.Process
|
|
}
|
|
|
|
func (n *networkProcesses) terminateAll() {
|
|
if n.ordererProcess != nil {
|
|
n.ordererProcess.Signal(syscall.SIGTERM)
|
|
Eventually(n.ordererProcess.Wait(), n.network.EventuallyTimeout).Should(Receive())
|
|
}
|
|
for _, process := range n.peerProcesses {
|
|
process.Signal(syscall.SIGTERM)
|
|
Eventually(process.Wait(), n.network.EventuallyTimeout).Should(Receive())
|
|
}
|
|
}
|
|
|
|
func startPeers(n *networkProcesses, forceStateTransfer bool, peersToStart ...*nwo.Peer) {
|
|
env := []string{"FABRIC_LOGGING_SPEC=info:gossip.state=debug:gossip.discovery=debug"}
|
|
|
|
// Setting CORE_PEER_GOSSIP_STATE_CHECKINTERVAL to 200ms (from default of 10s) will ensure that state transfer happens quickly,
|
|
// before blocks are gossipped through normal mechanisms
|
|
if forceStateTransfer {
|
|
env = append(env, "CORE_PEER_GOSSIP_STATE_CHECKINTERVAL=200ms")
|
|
}
|
|
|
|
for _, peer := range peersToStart {
|
|
runner := n.network.PeerRunner(peer, env...)
|
|
process := ifrit.Invoke(runner)
|
|
Eventually(process.Ready(), n.network.EventuallyTimeout).Should(BeClosed())
|
|
|
|
n.peerProcesses[peer.ID()] = process
|
|
n.peerRunners[peer.ID()] = runner
|
|
}
|
|
}
|
|
|
|
func stopPeers(n *networkProcesses, peersToStop ...*nwo.Peer) {
|
|
for _, peer := range peersToStop {
|
|
id := peer.ID()
|
|
proc := n.peerProcesses[id]
|
|
proc.Signal(syscall.SIGTERM)
|
|
Eventually(proc.Wait(), n.network.EventuallyTimeout).Should(Receive())
|
|
delete(n.peerProcesses, id)
|
|
}
|
|
}
|
|
|
|
func assertPeersLedgerHeight(n *nwo.Network, peersToSyncUp []*nwo.Peer, expectedVal int, channelID string) {
|
|
for _, peer := range peersToSyncUp {
|
|
Eventually(func() int {
|
|
return nwo.GetLedgerHeight(n, peer, channelID)
|
|
}, n.EventuallyTimeout).Should(Equal(expectedVal))
|
|
}
|
|
}
|
|
|
|
// send transactions, stop orderering server, then start peers to ensure they received blcoks via state transfer
|
|
func sendTransactionsAndSyncUpPeers(n *networkProcesses, orderer *nwo.Orderer, basePeer *nwo.Peer, channelName string, peersToSyncUp ...*nwo.Peer) {
|
|
By("creating transactions")
|
|
runTransactions(n.network, orderer, basePeer, "mycc", channelName)
|
|
basePeerLedgerHeight := nwo.GetLedgerHeight(n.network, basePeer, channelName)
|
|
|
|
By("stopping orderer")
|
|
n.ordererProcess.Signal(syscall.SIGTERM)
|
|
Eventually(n.ordererProcess.Wait(), n.network.EventuallyTimeout).Should(Receive())
|
|
n.ordererProcess = nil
|
|
|
|
By("starting the peers contained in the peersToSyncUp list")
|
|
startPeers(n, true, peersToSyncUp...)
|
|
|
|
By("ensuring the peers are synced up")
|
|
assertPeersLedgerHeight(n.network, peersToSyncUp, basePeerLedgerHeight, channelName)
|
|
|
|
By("restarting orderer")
|
|
n.ordererRunner = n.network.OrdererRunner(orderer)
|
|
n.ordererProcess = ifrit.Invoke(n.ordererRunner)
|
|
Eventually(n.ordererProcess.Ready(), n.network.EventuallyTimeout).Should(BeClosed())
|
|
}
|
|
|
|
// assertPeerMembershipUpdate stops and restart peersToRestart and verify peer membership
|
|
func assertPeerMembershipUpdate(network *nwo.Network, peer *nwo.Peer, peersToRestart []*nwo.Peer, nwprocs *networkProcesses, expectedMsgFromExpirationCallback string) {
|
|
stopPeers(nwprocs, peersToRestart...)
|
|
|
|
// timeout is the same amount of time as it takes to remove a message from the aliveMsgStore, and add a second as buffer
|
|
core := network.ReadPeerConfig(peer)
|
|
timeout := core.Peer.Gossip.AliveExpirationTimeout*time.Duration(core.Peer.Gossip.MsgExpirationFactor) + time.Second
|
|
By("verifying peer membership after all other peers are stopped")
|
|
Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf(
|
|
network.DiscoveredPeer(peer, "_lifecycle"),
|
|
))
|
|
|
|
By("verifying expected log message from expiration callback")
|
|
runner := nwprocs.peerRunners[peer.ID()]
|
|
Eventually(runner.Err(), network.EventuallyTimeout).Should(gbytes.Say(expectedMsgFromExpirationCallback))
|
|
|
|
By("restarting peers")
|
|
startPeers(nwprocs, false, peersToRestart...)
|
|
|
|
By("verifying peer membership, expected to discover restarted peers")
|
|
expectedPeers := make([]nwo.DiscoveredPeer, len(peersToRestart)+1)
|
|
expectedPeers[0] = network.DiscoveredPeer(peer, "_lifecycle")
|
|
for i, p := range peersToRestart {
|
|
expectedPeers[i+1] = network.DiscoveredPeer(p, "_lifecycle")
|
|
}
|
|
timeout = 3 * core.Peer.Gossip.ReconnectInterval
|
|
Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf(expectedPeers))
|
|
}
|