/* Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ package etcdraft_test import ( "encoding/pem" "fmt" "io/ioutil" "os" "os/user" "path" "sync" "sync/atomic" "time" "code.cloudfoundry.org/clock/fakeclock" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" raftprotos "github.com/hyperledger/fabric-protos-go/orderer/etcdraft" "github.com/hyperledger/fabric/bccsp" "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/bccsp/sw" "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/orderer/common/cluster" orderer_types "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus/etcdraft" "github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks" consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks" mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter" "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/types" "github.com/pkg/errors" raft "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.uber.org/zap" ) const ( interval = 100 * time.Millisecond LongEventualTimeout = 10 * time.Second // 10 is the default setting of ELECTION_TICK. // We used to have a small number here (2) to reduce the time for test - we don't // need to tick node 10 times to trigger election - however, we are using another // mechanism to trigger it now which does not depend on time: send an artificial // MsgTimeoutNow to node. ELECTION_TICK = 10 HEARTBEAT_TICK = 1 ) //go:generate counterfeiter -o mocks/halt_callbacker.go --fake-name HaltCallbacker . haltCallbacker type haltCallbacker interface { HaltCallback() } func init() { factory.InitFactories(nil) } func mockOrderer(metadata []byte) *mocks.OrdererConfig { return mockOrdererWithBatchTimeout(time.Second, metadata) } func mockOrdererWithBatchTimeout(batchTimeout time.Duration, metadata []byte) *mocks.OrdererConfig { mockOrderer := &mocks.OrdererConfig{} mockOrderer.BatchTimeoutReturns(batchTimeout) mockOrderer.ConsensusMetadataReturns(metadata) return mockOrderer } func mockOrdererWithTLSRootCert(batchTimeout time.Duration, metadata []byte, tlsCA tlsgen.CA) *mocks.OrdererConfig { mockOrderer := mockOrdererWithBatchTimeout(batchTimeout, metadata) mockOrg := &mocks.OrdererOrg{} mockMSP := &mocks.MSP{} mockMSP.GetTLSRootCertsReturns([][]byte{tlsCA.CertBytes()}) mockOrg.MSPReturns(mockMSP) mockOrderer.OrganizationsReturns(map[string]channelconfig.OrdererOrg{ "fake-org": mockOrg, }) return mockOrderer } // for some test cases we chmod file/dir to test failures caused by exotic permissions. // however this does not work if tests are running as root, i.e. in a container. func skipIfRoot() { u, err := user.Current() Expect(err).NotTo(HaveOccurred()) if u.Uid == "0" { Skip("you are running test as root, there's no way to make files unreadable") } } var _ = Describe("Chain", func() { var ( env *common.Envelope channelID string tlsCA tlsgen.CA logger *flogging.FabricLogger ) BeforeEach(func() { tlsCA, _ = tlsgen.NewCA() channelID = "test-channel" logger = flogging.MustGetLogger("test") env = &common.Envelope{ Payload: marshalOrPanic(&common.Payload{ Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})}, Data: []byte("TEST_MESSAGE"), }), } }) Describe("Single Raft node", func() { var ( configurator *mocks.FakeConfigurator consenterMetadata *raftprotos.ConfigMetadata consenters map[uint64]*raftprotos.Consenter clock *fakeclock.FakeClock opts etcdraft.Options support *consensusmocks.FakeConsenterSupport cutter *mockblockcutter.Receiver storage *raft.MemoryStorage observeC chan raft.SoftState chain *etcdraft.Chain dataDir string walDir string snapDir string err error fakeFields *fakeMetricsFields cryptoProvider bccsp.BCCSP fakeHaltCallbacker *mocks.HaltCallbacker ) BeforeEach(func() { cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) Expect(err).NotTo(HaveOccurred()) configurator = &mocks.FakeConfigurator{} clock = fakeclock.NewFakeClock(time.Now()) storage = raft.NewMemoryStorage() dataDir, err = ioutil.TempDir("", "wal-") Expect(err).NotTo(HaveOccurred()) walDir = path.Join(dataDir, "wal") snapDir = path.Join(dataDir, "snapshot") observeC = make(chan raft.SoftState, 1) support = &consensusmocks.FakeConsenterSupport{} support.ChannelIDReturns(channelID) consenterMetadata = createMetadata(1, tlsCA) support.SharedConfigReturns(mockOrdererWithTLSRootCert(time.Hour, marshalOrPanic(consenterMetadata), tlsCA)) cutter = mockblockcutter.NewReceiver() support.BlockCutterReturns(cutter) // for block creator initialization support.HeightReturns(1) support.BlockReturns(getSeedBlock()) meta := &raftprotos.BlockMetadata{ ConsenterIds: make([]uint64, len(consenterMetadata.Consenters)), NextConsenterId: 1, } for i := range meta.ConsenterIds { meta.ConsenterIds[i] = meta.NextConsenterId meta.NextConsenterId++ } consenters = map[uint64]*raftprotos.Consenter{} for i, c := range consenterMetadata.Consenters { consenters[meta.ConsenterIds[i]] = c } fakeFields = newFakeMetricsFields() opts = etcdraft.Options{ RPCTimeout: time.Second * 5, RaftID: 1, Clock: clock, TickInterval: interval, ElectionTick: ELECTION_TICK, HeartbeatTick: HEARTBEAT_TICK, MaxSizePerMsg: 1024 * 1024, MaxInflightBlocks: 256, BlockMetadata: meta, Consenters: consenters, Logger: logger, MemoryStorage: storage, WALDir: walDir, SnapDir: snapDir, Metrics: newFakeMetrics(fakeFields), } fakeHaltCallbacker = &mocks.HaltCallbacker{} }) campaign := func(c *etcdraft.Chain, observeC <-chan raft.SoftState) { Eventually(func() <-chan raft.SoftState { c.Consensus(&orderer.ConsensusRequest{Payload: protoutil.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow, To: 1})}, 0) return observeC }, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader))) } JustBeforeEach(func() { rpc := &mocks.FakeRPC{} chain, err = etcdraft.NewChain(support, opts, configurator, rpc, cryptoProvider, noOpBlockPuller, fakeHaltCallbacker.HaltCallback, observeC) Expect(err).NotTo(HaveOccurred()) chain.Start() cRel, status := chain.StatusReport() Expect(cRel).To(Equal(orderer_types.ConsensusRelationConsenter)) Expect(status).To(Equal(orderer_types.StatusActive)) // When the Raft node bootstraps, it produces a ConfChange // to add itself, which needs to be consumed with Ready(). // If there are pending configuration changes in raft, // it refuses to campaign, no matter how many ticks elapse. // This is not a problem in the production code because raft.Ready // will be consumed eventually, as the wall clock advances. // // However, this is problematic when using the fake clock and // artificial ticks. Instead of ticking raft indefinitely until // raft.Ready is consumed, this check is added to indirectly guarantee // that the first ConfChange is actually consumed and we can safely // proceed to tick the Raft FSM. Eventually(func() error { _, err := storage.Entries(1, 1, 1) return err }, LongEventualTimeout).ShouldNot(HaveOccurred()) }) AfterEach(func() { chain.Halt() Eventually(chain.Errored, LongEventualTimeout).Should(BeClosed()) // Make sure no timer leak Eventually(clock.WatcherCount, LongEventualTimeout).Should(BeZero()) os.RemoveAll(dataDir) }) Context("when a node starts up", func() { It("properly configures the communication layer", func() { expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata) Eventually(configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(1)) _, arg2 := configurator.ConfigureArgsForCall(0) Expect(arg2).To(Equal(expectedNodeConfig)) }) It("correctly sets the metrics labels and publishes requisite metrics", func() { type withImplementers interface { WithCallCount() int WithArgsForCall(int) []string } metricsList := []withImplementers{ fakeFields.fakeClusterSize, fakeFields.fakeIsLeader, fakeFields.fakeActiveNodes, fakeFields.fakeCommittedBlockNumber, fakeFields.fakeSnapshotBlockNumber, fakeFields.fakeLeaderChanges, fakeFields.fakeProposalFailures, fakeFields.fakeDataPersistDuration, fakeFields.fakeNormalProposalsReceived, fakeFields.fakeConfigProposalsReceived, } for _, m := range metricsList { Expect(m.WithCallCount()).To(Equal(1)) Expect(func() string { return m.WithArgsForCall(0)[1] }()).To(Equal(channelID)) } Expect(fakeFields.fakeClusterSize.SetCallCount()).To(Equal(1)) Expect(fakeFields.fakeClusterSize.SetArgsForCall(0)).To(Equal(float64(1))) Expect(fakeFields.fakeIsLeader.SetCallCount()).To(Equal(1)) Expect(fakeFields.fakeIsLeader.SetArgsForCall(0)).To(Equal(float64(0))) Expect(fakeFields.fakeActiveNodes.SetCallCount()).To(Equal(1)) Expect(fakeFields.fakeActiveNodes.SetArgsForCall(0)).To(Equal(float64(0))) }) }) Context("when no Raft leader is elected", func() { It("fails to order envelope", func() { err := chain.Order(env, 0) Expect(err).To(MatchError("no Raft leader")) Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(0)) Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1))) }) It("starts proactive campaign", func() { // assert that even tick supplied are less than ELECTION_TIMEOUT, // a leader can still be successfully elected. for i := 0; i < ELECTION_TICK; i++ { clock.Increment(interval) time.Sleep(10 * time.Millisecond) } Eventually(observeC, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader))) }) }) Context("when Raft leader is elected", func() { JustBeforeEach(func() { campaign(chain, observeC) }) It("updates metrics upon leader election", func() { Expect(fakeFields.fakeIsLeader.SetCallCount()).To(Equal(2)) Expect(fakeFields.fakeIsLeader.SetArgsForCall(1)).To(Equal(float64(1))) Expect(fakeFields.fakeLeaderChanges.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeLeaderChanges.AddArgsForCall(0)).To(Equal(float64(1))) }) It("fails to order envelope if chain is halted", func() { chain.Halt() err := chain.Order(env, 0) Expect(err).To(MatchError("chain is stopped")) Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1))) }) It("produces blocks following batch rules", func() { close(cutter.Block) By("cutting next batch directly") cutter.SetCutNext(true) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1))) // There are three calls to DataPersistDuration by now corresponding to the following three // arriving on the Ready channel: // 1. an EntryConfChange to let this node join the Raft cluster // 2. a SoftState and an associated increase of term in the HardState due to the node being elected leader // 3. a block being committed // The duration being emitted is zero since we don't tick the fake clock during this time Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(3)) Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(0)).Should(Equal(float64(0))) Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(1)).Should(Equal(float64(0))) Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(2)).Should(Equal(float64(0))) By("respecting batch timeout") cutter.SetCutNext(false) timeout := time.Second support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) err = chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2)) Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1))) clock.WaitForNWatchersAndIncrement(timeout, 2) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2))) Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(4)) Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(3)).Should(Equal(float64(0))) }) It("does not reset timer for every envelope", func() { close(cutter.Block) timeout := time.Second support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) clock.WaitForNWatchersAndIncrement(timeout/2, 2) err = chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(2)) // the second envelope should not reset the timer; it should // therefore expire if we increment it by just timeout/2 clock.Increment(timeout / 2) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) It("does not write a block if halted before timeout", func() { close(cutter.Block) timeout := time.Second support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) // wait for timer to start Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2)) chain.Halt() Consistently(support.WriteBlockCallCount).Should(Equal(0)) }) It("stops the timer if a batch is cut", func() { close(cutter.Block) timeout := time.Second support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) clock.WaitForNWatchersAndIncrement(timeout/2, 2) By("force a batch to be cut before timer expires") cutter.SetCutNext(true) err = chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) b, _ := support.WriteBlockArgsForCall(0) Expect(b.Data.Data).To(HaveLen(2)) Expect(cutter.CurBatch()).To(HaveLen(0)) // this should start a fresh timer cutter.SetCutNext(false) err = chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) clock.WaitForNWatchersAndIncrement(timeout/2, 2) Consistently(support.WriteBlockCallCount).Should(Equal(1)) clock.Increment(timeout / 2) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) b, _ = support.WriteBlockArgsForCall(1) Expect(b.Data.Data).To(HaveLen(1)) }) It("cut two batches if incoming envelope does not fit into first batch", func() { close(cutter.Block) timeout := time.Second support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) cutter.SetIsolatedTx(true) err = chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) Context("revalidation", func() { BeforeEach(func() { close(cutter.Block) timeout := time.Hour support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) support.SequenceReturns(1) }) It("enqueue if envelope is still valid", func() { support.ProcessNormalMsgReturns(1, nil) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2)) }) It("does not enqueue if envelope is not valid", func() { support.ProcessNormalMsgReturns(1, errors.Errorf("Envelope is invalid")) err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Consistently(cutter.CurBatch).Should(HaveLen(0)) Consistently(clock.WatcherCount).Should(Equal(1)) }) }) It("unblocks Errored if chain is halted", func() { errorC := chain.Errored() Expect(errorC).NotTo(BeClosed()) chain.Halt() Eventually(errorC, LongEventualTimeout).Should(BeClosed()) }) It("does not call the halt callback function when halting externally", func() { chain.Halt() Consistently(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(0)) }) Describe("Config updates", func() { var ( configEnv *common.Envelope configSeq uint64 ) Context("when a type A config update comes", func() { Context("for existing channel", func() { // use to prepare the Orderer Values BeforeEach(func() { newValues := map[string]*common.ConfigValue{ "BatchTimeout": { Version: 1, Value: marshalOrPanic(&orderer.BatchTimeout{ Timeout: "3ms", }), }, "ConsensusType": { Version: 4, }, } oldValues := map[string]*common.ConfigValue{ "ConsensusType": { Version: 4, }, } configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, oldValues, newValues), ) configSeq = 0 }) // BeforeEach block Context("without revalidation (i.e. correct config sequence)", func() { Context("without pending normal envelope", func() { It("should create a config block and no normal block", func() { err := chain.Configure(configEnv, configSeq) Expect(err).NotTo(HaveOccurred()) Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) Consistently(support.WriteBlockCallCount).Should(Equal(0)) Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1))) }) }) Context("with pending normal envelope", func() { It("should create a normal block and a config block", func() { // We do not need to block the cutter from ordering in our test case and therefore close this channel. close(cutter.Block) By("adding a normal envelope") err := chain.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) By("adding a config envelope") err = chain.Configure(configEnv, configSeq) Expect(err).NotTo(HaveOccurred()) Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2))) }) }) }) Context("with revalidation (i.e. incorrect config sequence)", func() { BeforeEach(func() { close(cutter.Block) support.SequenceReturns(1) // this causes the revalidation }) It("should create config block upon correct revalidation", func() { support.ProcessConfigMsgReturns(configEnv, 1, nil) // nil implies correct revalidation Expect(chain.Configure(configEnv, configSeq)).To(Succeed()) Consistently(clock.WatcherCount).Should(Equal(1)) Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) It("should not create config block upon incorrect revalidation", func() { support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence")) Expect(chain.Configure(configEnv, configSeq)).To(Succeed()) Consistently(clock.WatcherCount).Should(Equal(1)) Consistently(support.WriteConfigBlockCallCount).Should(Equal(0)) // no call to WriteConfigBlock }) It("should not disturb current running timer upon incorrect revalidation", func() { support.ProcessNormalMsgReturns(1, nil) support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence")) Expect(chain.Order(env, configSeq)).To(Succeed()) Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2)) clock.Increment(30 * time.Minute) Consistently(support.WriteBlockCallCount).Should(Equal(0)) Expect(chain.Configure(configEnv, configSeq)).To(Succeed()) Consistently(clock.WatcherCount).Should(Equal(2)) Consistently(support.WriteBlockCallCount).Should(Equal(0)) Consistently(support.WriteConfigBlockCallCount).Should(Equal(0)) clock.Increment(30 * time.Minute) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) }) Context("for creating a new channel", func() { // use to prepare the Orderer Values BeforeEach(func() { chainID := "mychannel" values := make(map[string]*common.ConfigValue) configEnv = newConfigEnv(chainID, common.HeaderType_CONFIG, newConfigUpdateEnv(chainID, nil, values), ) configSeq = 0 }) // BeforeEach block It("should be able to create a channel", func() { err := chain.Configure(configEnv, configSeq) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) }) // Context block for type A config Context("when a type B config update comes", func() { Context("updating protocol values", func() { // use to prepare the Orderer Values BeforeEach(func() { values := map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(consenterMetadata), }), }, } configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, values)) configSeq = 0 }) // BeforeEach block It("should be able to process config update of type B", func() { err := chain.Configure(configEnv, configSeq) Expect(err).NotTo(HaveOccurred()) Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1)) Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) Context("updating consenters set by exactly one node", func() { It("should be able to process config update adding single node", func() { metadata := proto.Clone(consenterMetadata).(*raftprotos.ConfigMetadata) metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{ Host: "localhost", Port: 7050, ServerTlsCert: serverTLSCert(tlsCA), ClientTlsCert: clientTLSCert(tlsCA), }) values := map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(metadata), }), }, } configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, values)) configSeq = 0 err := chain.Configure(configEnv, configSeq) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) }) }) Describe("Crash Fault Tolerance", func() { var raftMetadata *raftprotos.BlockMetadata BeforeEach(func() { raftMetadata = &raftprotos.BlockMetadata{ ConsenterIds: []uint64{1}, NextConsenterId: 2, } }) Describe("when a chain is started with existing WAL", func() { var ( m1 *raftprotos.BlockMetadata m2 *raftprotos.BlockMetadata ) JustBeforeEach(func() { // to generate WAL data, we start a chain, // order several envelopes and then halt the chain. close(cutter.Block) cutter.SetCutNext(true) // enque some data to be persisted on disk by raft err := chain.Order(env, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) _, metadata := support.WriteBlockArgsForCall(0) m1 = &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m1) err = chain.Order(env, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) _, metadata = support.WriteBlockArgsForCall(1) m2 = &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m2) chain.Halt() }) It("replays blocks from committed entries", func() { c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) c.init() c.Start() defer c.Halt() Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) _, metadata := c.support.WriteBlockArgsForCall(0) m := &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) Expect(m.RaftIndex).To(Equal(m1.RaftIndex)) _, metadata = c.support.WriteBlockArgsForCall(1) m = &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) Expect(m.RaftIndex).To(Equal(m2.RaftIndex)) // chain should keep functioning campaign(c.Chain, c.observe) c.cutter.SetCutNext(true) err := c.Order(env, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) }) It("only replays blocks after Applied index", func() { raftMetadata.RaftIndex = m1.RaftIndex c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) c.support.WriteBlock(support.WriteBlockArgsForCall(0)) c.init() c.Start() defer c.Halt() Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) _, metadata := c.support.WriteBlockArgsForCall(1) m := &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) Expect(m.RaftIndex).To(Equal(m2.RaftIndex)) // chain should keep functioning campaign(c.Chain, c.observe) c.cutter.SetCutNext(true) err := c.Order(env, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) }) It("does not replay any block if already in sync", func() { raftMetadata.RaftIndex = m2.RaftIndex c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) c.init() c.Start() defer c.Halt() Consistently(c.support.WriteBlockCallCount).Should(Equal(0)) // chain should keep functioning campaign(c.Chain, c.observe) c.cutter.SetCutNext(true) err := c.Order(env, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) Context("WAL file is not readable", func() { It("fails to load wal", func() { skipIfRoot() files, err := ioutil.ReadDir(walDir) Expect(err).NotTo(HaveOccurred()) for _, f := range files { os.Chmod(path.Join(walDir, f.Name()), 0o300) } c, err := etcdraft.NewChain(support, opts, configurator, nil, cryptoProvider, noOpBlockPuller, nil, observeC) Expect(c).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("permission denied"))) }) }) }) Describe("when snapshotting is enabled (snapshot interval is not zero)", func() { var ( ledgerLock sync.Mutex ledger map[uint64]*common.Block ) countFiles := func() int { files, err := ioutil.ReadDir(snapDir) Expect(err).NotTo(HaveOccurred()) return len(files) } BeforeEach(func() { opts.SnapshotCatchUpEntries = 2 close(cutter.Block) cutter.SetCutNext(true) ledgerLock.Lock() ledger = map[uint64]*common.Block{ 0: getSeedBlock(), // genesis block } ledgerLock.Unlock() support.WriteBlockStub = func(block *common.Block, meta []byte) { b := proto.Clone(block).(*common.Block) bytes, err := proto.Marshal(&common.Metadata{Value: meta}) Expect(err).NotTo(HaveOccurred()) b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes ledgerLock.Lock() defer ledgerLock.Unlock() ledger[b.Header.Number] = b } support.HeightStub = func() uint64 { ledgerLock.Lock() defer ledgerLock.Unlock() return uint64(len(ledger)) } }) Context("Small SnapshotInterval", func() { BeforeEach(func() { opts.SnapshotIntervalSize = 1 }) It("writes snapshot file to snapDir", func() { // Scenario: start a chain with SnapInterval = 1 byte, expect it to take // one snapshot for each block i, _ := opts.MemoryStorage.FirstIndex() Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2)) // incl. initial call s, _ := opts.MemoryStorage.Snapshot() b := protoutil.UnmarshalBlockOrPanic(s.Data) Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number))) i, _ = opts.MemoryStorage.FirstIndex() Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(countFiles, LongEventualTimeout).Should(Equal(2)) Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(3)) // incl. initial call s, _ = opts.MemoryStorage.Snapshot() b = protoutil.UnmarshalBlockOrPanic(s.Data) Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(2)).To(Equal(float64(b.Header.Number))) }) It("pauses chain if sync is in progress", func() { // Scenario: // after a snapshot is taken, reboot chain with raftIndex = 0 // chain should attempt to sync upon reboot, and blocks on // `WaitReady` API i, _ := opts.MemoryStorage.FirstIndex() Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) i, _ = opts.MemoryStorage.FirstIndex() Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(countFiles, LongEventualTimeout).Should(Equal(2)) Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) chain.Halt() c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) c.init() signal := make(chan struct{}) c.puller.PullBlockStub = func(i uint64) *common.Block { <-signal // blocking for assertions ledgerLock.Lock() defer ledgerLock.Unlock() if i >= uint64(len(ledger)) { return nil } // This is a false assumption - single node shouldn't be able to pull block from anywhere. // However, this test is mainly to assert that chain should attempt catchup upon start, // so we could live with it. return ledger[i] } err := c.WaitReady() Expect(err).To(MatchError("chain is not started")) c.Start() defer c.Halt() // pull block is called, so chain should be catching up now, WaitReady should block signal <- struct{}{} done := make(chan error) go func() { done <- c.WaitReady() }() Consistently(done).ShouldNot(Receive()) close(signal) // unblock block puller Eventually(done).Should(Receive(nil)) // WaitReady should be unblocked Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) It("commits block from snapshot if it's missing from ledger", func() { // Scenario: // Single node exists right after a snapshot is taken, while the block // in it hasn't been successfully persisted into ledger (there can be one // async block write in-flight). Then the node is restarted, and catches // up using the block in snapshot. Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) chain.Halt() c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) c.init() c.Start() defer c.Halt() Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) It("restores snapshot w/o extra entries", func() { // Scenario: // after a snapshot is taken, no more entries are appended. // then node is restarted, it loads snapshot, finds its term // and index. While replaying WAL to memory storage, it should // not append any entry because no extra entry was appended // after snapshot was taken. Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) _, metadata := support.WriteBlockArgsForCall(0) m := &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1)) snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created Expect(err).NotTo(HaveOccurred()) i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory Expect(err).NotTo(HaveOccurred()) // expect storage to preserve SnapshotCatchUpEntries entries before snapshot Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1)) chain.Halt() raftMetadata.RaftIndex = m.RaftIndex c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) c.opts.SnapshotIntervalSize = 1 c.init() c.Start() // following arithmetic reflects how etcdraft MemoryStorage is implemented // when no entry is appended after snapshot being loaded. Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1)) Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index)) // chain keeps functioning Eventually(func() <-chan raft.SoftState { c.clock.Increment(interval) return c.observe }, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader))) c.cutter.SetCutNext(true) err = c.Order(env, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(countFiles, LongEventualTimeout).Should(Equal(2)) c.Halt() _, metadata = c.support.WriteBlockArgsForCall(0) m = &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) raftMetadata.RaftIndex = m.RaftIndex cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) cx.init() cx.Start() defer cx.Halt() // chain keeps functioning Eventually(func() <-chan raft.SoftState { cx.clock.Increment(interval) return cx.observe }, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader))) }) }) Context("Large SnapshotInterval", func() { BeforeEach(func() { opts.SnapshotIntervalSize = 1024 }) It("restores snapshot w/ extra entries", func() { // Scenario: // after a snapshot is taken, more entries are appended. // then node is restarted, it loads snapshot, finds its term // and index. While replaying WAL to memory storage, it should // append some entries. largeEnv := &common.Envelope{ Payload: marshalOrPanic(&common.Payload{ Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})}, Data: make([]byte, 500), }), } By("Ordering two large envelopes to trigger snapshot") Expect(chain.Order(largeEnv, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Expect(chain.Order(largeEnv, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) _, metadata := support.WriteBlockArgsForCall(1) m := &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) // check snapshot does exit Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1)) snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created Expect(err).NotTo(HaveOccurred()) i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory Expect(err).NotTo(HaveOccurred()) // expect storage to preserve SnapshotCatchUpEntries entries before snapshot Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1)) By("Ordering another envlope to append new data to memory after snaphost") Expect(chain.Order(env, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) lasti, _ := opts.MemoryStorage.LastIndex() chain.Halt() raftMetadata.RaftIndex = m.RaftIndex c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) cnt := support.WriteBlockCallCount() for i := 0; i < cnt; i++ { c.support.WriteBlock(support.WriteBlockArgsForCall(i)) } By("Restarting the node") c.init() c.Start() defer c.Halt() By("Checking latest index is larger than index in snapshot") Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1)) Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(lasti)) }) When("local ledger is in sync with snapshot", func() { It("does not pull blocks and still respects snapshot interval", func() { // Scenario: // - snapshot is taken at block 2 // - order one more envelope (block 3) // - reboot chain at block 2 // - block 3 should be replayed from wal // - order another envelope to trigger snapshot, containing block 3 & 4 // Assertions: // - block puller should NOT be called // - chain should keep functioning after reboot // - chain should respect snapshot interval to trigger next snapshot largeEnv := &common.Envelope{ Payload: marshalOrPanic(&common.Payload{ Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})}, Data: make([]byte, 500), }), } By("Ordering two large envelopes to trigger snapshot") Expect(chain.Order(largeEnv, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Expect(chain.Order(largeEnv, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) _, metadata := support.WriteBlockArgsForCall(1) m := &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) By("Cutting block [3]") // order another envelope. this should not trigger snapshot err = chain.Order(largeEnv, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) chain.Halt() raftMetadata.RaftIndex = m.RaftIndex c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) // replay block 1&2 c.support.WriteBlock(support.WriteBlockArgsForCall(0)) c.support.WriteBlock(support.WriteBlockArgsForCall(1)) c.opts.SnapshotIntervalSize = 1024 By("Restarting node at block [2]") c.init() c.Start() defer c.Halt() // elect leader campaign(c.Chain, c.observe) By("Ordering one more block to trigger snapshot") c.cutter.SetCutNext(true) err = c.Order(largeEnv, uint64(0)) Expect(err).NotTo(HaveOccurred()) Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(4)) Expect(c.puller.PullBlockCallCount()).Should(BeZero()) // old snapshot file is retained Eventually(countFiles, LongEventualTimeout).Should(Equal(2)) }) }) It("respects snapshot interval after reboot", func() { largeEnv := &common.Envelope{ Payload: marshalOrPanic(&common.Payload{ Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})}, Data: make([]byte, 500), }), } Expect(chain.Order(largeEnv, uint64(0))).To(Succeed()) Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) // check no snapshot is taken Consistently(countFiles).Should(Equal(0)) _, metadata := support.WriteBlockArgsForCall(0) m := &raftprotos.BlockMetadata{} proto.Unmarshal(metadata, m) chain.Halt() raftMetadata.RaftIndex = m.RaftIndex c1 := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil) cnt := support.WriteBlockCallCount() for i := 0; i < cnt; i++ { c1.support.WriteBlock(support.WriteBlockArgsForCall(i)) } c1.cutter.SetCutNext(true) c1.opts.SnapshotIntervalSize = 1024 By("Restarting chain") c1.init() c1.Start() // chain keeps functioning campaign(c1.Chain, c1.observe) Expect(c1.Order(largeEnv, uint64(0))).To(Succeed()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) // check snapshot does exit Eventually(countFiles, LongEventualTimeout).Should(Equal(1)) }) }) }) }) Context("Invalid WAL dir", func() { support := &consensusmocks.FakeConsenterSupport{} BeforeEach(func() { // for block creator initialization support.HeightReturns(1) support.BlockReturns(getSeedBlock()) }) When("WAL dir is a file", func() { It("replaces file with fresh WAL dir", func() { f, err := ioutil.TempFile("", "wal-") Expect(err).NotTo(HaveOccurred()) defer os.RemoveAll(f.Name()) chain, err := etcdraft.NewChain( support, etcdraft.Options{ WALDir: f.Name(), SnapDir: snapDir, Logger: logger, MemoryStorage: storage, BlockMetadata: &raftprotos.BlockMetadata{}, Metrics: newFakeMetrics(newFakeMetricsFields()), }, configurator, nil, cryptoProvider, nil, nil, observeC) Expect(chain).NotTo(BeNil()) Expect(err).NotTo(HaveOccurred()) info, err := os.Stat(f.Name()) Expect(err).NotTo(HaveOccurred()) Expect(info.IsDir()).To(BeTrue()) }) }) When("WAL dir is not writeable", func() { It("replace it with fresh WAL dir", func() { d, err := ioutil.TempDir("", "wal-") Expect(err).NotTo(HaveOccurred()) defer os.RemoveAll(d) err = os.Chmod(d, 0o500) Expect(err).NotTo(HaveOccurred()) chain, err := etcdraft.NewChain( support, etcdraft.Options{ WALDir: d, SnapDir: snapDir, Logger: logger, MemoryStorage: storage, BlockMetadata: &raftprotos.BlockMetadata{}, Metrics: newFakeMetrics(newFakeMetricsFields()), }, nil, nil, cryptoProvider, noOpBlockPuller, nil, nil) Expect(chain).NotTo(BeNil()) Expect(err).NotTo(HaveOccurred()) }) }) When("WAL parent dir is not writeable", func() { It("fails to bootstrap fresh raft node", func() { skipIfRoot() d, err := ioutil.TempDir("", "wal-") Expect(err).NotTo(HaveOccurred()) defer os.RemoveAll(d) err = os.Chmod(d, 0o500) Expect(err).NotTo(HaveOccurred()) chain, err := etcdraft.NewChain( support, etcdraft.Options{ WALDir: path.Join(d, "wal-dir"), SnapDir: snapDir, Logger: logger, BlockMetadata: &raftprotos.BlockMetadata{}, }, nil, nil, cryptoProvider, noOpBlockPuller, nil, nil) Expect(chain).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("failed to initialize WAL: mkdir"))) }) }) }) }) }) Describe("2-node Raft cluster", func() { var ( network *network channelID string timeout time.Duration dataDir string c1, c2 *chain raftMetadata *raftprotos.BlockMetadata consenters map[uint64]*raftprotos.Consenter configEnv *common.Envelope cryptoProvider bccsp.BCCSP fakeHaltCallbacker *mocks.HaltCallbacker ) BeforeEach(func() { var err error channelID = "multi-node-channel" timeout = 10 * time.Second dataDir, err = ioutil.TempDir("", "raft-test-") Expect(err).NotTo(HaveOccurred()) cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) Expect(err).NotTo(HaveOccurred()) raftMetadata = &raftprotos.BlockMetadata{ ConsenterIds: []uint64{1, 2}, NextConsenterId: 3, } consenters = map[uint64]*raftprotos.Consenter{ 1: { Host: "localhost", Port: 7051, ClientTlsCert: clientTLSCert(tlsCA), ServerTlsCert: serverTLSCert(tlsCA), }, 2: { Host: "localhost", Port: 7051, ClientTlsCert: clientTLSCert(tlsCA), ServerTlsCert: serverTLSCert(tlsCA), }, } metadata := &raftprotos.ConfigMetadata{ Options: &raftprotos.Options{ TickInterval: "500ms", ElectionTick: 10, HeartbeatTick: 1, MaxInflightBlocks: 5, SnapshotIntervalSize: 200, }, Consenters: []*raftprotos.Consenter{consenters[2]}, } value := map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(metadata), }), }, } // prepare config update to remove 1 configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value)) fakeHaltCallbacker = &mocks.HaltCallbacker{} network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters, cryptoProvider, tlsCA, fakeHaltCallbacker.HaltCallback) c1, c2 = network.chains[1], network.chains[2] c1.cutter.SetCutNext(true) network.init() network.start() }) AfterEach(func() { network.stop() network.exec(func(c *chain) { Eventually(c.clock.WatcherCount, LongEventualTimeout).Should(BeZero()) }) os.RemoveAll(dataDir) }) It("can remove leader by reconfiguring cluster", func() { network.elect(1) // trigger status dissemination Eventually(func() int { c1.clock.Increment(interval) return c2.fakeFields.fakeActiveNodes.SetCallCount() }, LongEventualTimeout).Should(Equal(2)) Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) By("Configuring cluster to remove node") Expect(c1.Configure(configEnv, 0)).To(Succeed()) select { case <-c1.observe: case <-time.After(LongEventualTimeout): // abdicateleader might fail to transfer the leadership when the next candidate // busy with applying committed entries Fail("Expected a new leader to present") } Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(func() <-chan raft.SoftState { c2.clock.Increment(interval) return c2.observe }, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateLeader))) By("Asserting the haltCallback is called when the node is removed from the replica set") Eventually(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(1)) By("Asserting the StatusReport responds correctly after eviction") Eventually( func() orderer_types.ConsensusRelation { cRel, _ := c1.StatusReport() return cRel }, ).Should(Equal(orderer_types.ConsensusRelationConfigTracker)) _, status := c1.StatusReport() Expect(status).To(Equal(orderer_types.StatusInactive)) By("Asserting leader can still serve requests as single-node cluster") c2.cutter.SetCutNext(true) Expect(c2.Order(env, 0)).To(Succeed()) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(2)).To(Equal(float64(0))) // was halted Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) }) It("remove leader by reconfiguring cluster, check haltCallback is called", func() { network.elect(1) // trigger status dissemination Eventually(func() int { c1.clock.Increment(interval) return c2.fakeFields.fakeActiveNodes.SetCallCount() }, LongEventualTimeout).Should(Equal(2)) Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) By("Configuring cluster to remove node") Expect(c1.Configure(configEnv, 0)).To(Succeed()) c1.clock.WaitForNWatchersAndIncrement((ELECTION_TICK-1)*interval, 2) Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) By("Asserting the haltCallback is called when Halt is called before eviction") c1.clock.Increment(interval) Eventually(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(1)) Eventually(func() <-chan raft.SoftState { c2.clock.Increment(interval) return c2.observe }, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateLeader))) By("Asserting leader can still serve requests as single-node cluster") c2.cutter.SetCutNext(true) Expect(c2.Order(env, 0)).To(Succeed()) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) // active nodes metric hasn't changed because c.halt() wasn't called Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) }) It("can remove leader by retrying even if leadership transfer fails at first", func() { network.elect(1) var messageOmission uint32 step1 := c1.getStepFunc() c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { stepMsg := &raftpb.Message{} if err := proto.Unmarshal(msg.Payload, stepMsg); err != nil { return fmt.Errorf("failed to unmarshal StepRequest payload to Raft Message: %s", err) } if stepMsg.Type == raftpb.MsgTimeoutNow && atomic.CompareAndSwapUint32(&messageOmission, 0, 1) { return nil } return step1(dest, msg) }) By("Configuring cluster to remove node") Expect(c1.Configure(configEnv, 0)).To(Succeed()) Eventually(func() <-chan raft.SoftState { c1.clock.Increment(interval) return c1.observe }, LongEventualTimeout).Should(Receive()) Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(func() <-chan raft.SoftState { c2.clock.Increment(interval) return c2.observe }, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateLeader))) By("Asserting the haltCallback is called when the node is removed from the replica set") Eventually(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(1)) By("Asserting the StatusReport responds correctly after eviction") Eventually( func() orderer_types.ConsensusRelation { cRel, _ := c1.StatusReport() return cRel }, ).Should(Equal(orderer_types.ConsensusRelationConfigTracker)) _, status := c1.StatusReport() Expect(status).To(Equal(orderer_types.StatusInactive)) By("Asserting leader can still serve requests as single-node cluster") c2.cutter.SetCutNext(true) Expect(c2.Order(env, 0)).To(Succeed()) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) lastSetActiveNodes := c1.fakeFields.fakeActiveNodes.SetCallCount() - 1 Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(lastSetActiveNodes)).To(Equal(float64(0))) // was halted Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) }) It("can remove follower by reconfiguring cluster", func() { network.elect(2) Expect(c1.Configure(configEnv, 0)).To(Succeed()) network.exec(func(c *chain) { Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) Eventually(c2.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c1.Chain.Errored, LongEventualTimeout).Should(BeClosed()) By("Asserting leader can still serve requests as single-node cluster") c2.cutter.SetCutNext(true) Expect(c2.Order(env, 0)).To(Succeed()) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) Describe("3-node Raft cluster", func() { var ( network *network channelID string timeout time.Duration dataDir string c1, c2, c3 *chain raftMetadata *raftprotos.BlockMetadata consenters map[uint64]*raftprotos.Consenter cryptoProvider bccsp.BCCSP ) BeforeEach(func() { var err error channelID = "multi-node-channel" timeout = 10 * time.Second dataDir, err = ioutil.TempDir("", "raft-test-") Expect(err).NotTo(HaveOccurred()) raftMetadata = &raftprotos.BlockMetadata{ ConsenterIds: []uint64{1, 2, 3}, NextConsenterId: 4, } cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) Expect(err).NotTo(HaveOccurred()) consenters = map[uint64]*raftprotos.Consenter{ 1: { Host: "localhost", Port: 7051, ClientTlsCert: clientTLSCert(tlsCA), ServerTlsCert: serverTLSCert(tlsCA), }, 2: { Host: "localhost", Port: 7051, ClientTlsCert: clientTLSCert(tlsCA), ServerTlsCert: serverTLSCert(tlsCA), }, 3: { Host: "localhost", Port: 7051, ClientTlsCert: clientTLSCert(tlsCA), ServerTlsCert: serverTLSCert(tlsCA), }, } network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters, cryptoProvider, tlsCA, nil) c1 = network.chains[1] c2 = network.chains[2] c3 = network.chains[3] }) AfterEach(func() { network.stop() network.exec(func(c *chain) { Eventually(c.clock.WatcherCount, LongEventualTimeout).Should(BeZero()) }) os.RemoveAll(dataDir) }) When("2/3 nodes are running", func() { It("late node can catch up", func() { network.init() network.start(1, 2) network.elect(1) // trigger status dissemination Eventually(func() int { c1.clock.Increment(interval) return c2.fakeFields.fakeActiveNodes.SetCallCount() }, LongEventualTimeout).Should(Equal(2)) Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2))) c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) Eventually(func() int { return c2.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0)) network.start(3) c1.clock.Increment(interval) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) network.stop() }) It("late node receives snapshot from leader", func() { c1.opts.SnapshotIntervalSize = 1 c1.opts.SnapshotCatchUpEntries = 1 c1.cutter.SetCutNext(true) var blocksLock sync.Mutex blocks := make(map[uint64]*common.Block) // storing written blocks for block puller c1.support.WriteBlockStub = func(b *common.Block, meta []byte) { blocksLock.Lock() defer blocksLock.Unlock() bytes, err := proto.Marshal(&common.Metadata{Value: meta}) Expect(err).NotTo(HaveOccurred()) b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes blocks[b.Header.Number] = b } c3.puller.PullBlockStub = func(i uint64) *common.Block { blocksLock.Lock() defer blocksLock.Unlock() b, exist := blocks[i] if !exist { return nil } return b } network.init() network.start(1, 2) network.elect(1) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) Eventually(func() int { return c2.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0)) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(2)) Eventually(func() int { return c2.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(2)) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0)) network.start(3) c1.clock.Increment(interval) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(2)) network.stop() }) }) When("reconfiguring raft cluster", func() { const ( defaultTimeout = 5 * time.Second ) var ( options = &raftprotos.Options{ TickInterval: "500ms", ElectionTick: 10, HeartbeatTick: 1, MaxInflightBlocks: 5, SnapshotIntervalSize: 200, } updateRaftConfigValue = func(metadata *raftprotos.ConfigMetadata) map[string]*common.ConfigValue { return map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(metadata), }), }, } } addConsenterConfigValue = func() map[string]*common.ConfigValue { metadata := &raftprotos.ConfigMetadata{Options: options} for _, consenter := range consenters { metadata.Consenters = append(metadata.Consenters, consenter) } newConsenter := &raftprotos.Consenter{ Host: "localhost", Port: 7050, ServerTlsCert: serverTLSCert(tlsCA), ClientTlsCert: clientTLSCert(tlsCA), } metadata.Consenters = append(metadata.Consenters, newConsenter) return updateRaftConfigValue(metadata) } removeConsenterConfigValue = func(id uint64) map[string]*common.ConfigValue { metadata := &raftprotos.ConfigMetadata{Options: options} for nodeID, consenter := range consenters { if nodeID == id { continue } metadata.Consenters = append(metadata.Consenters, consenter) } return updateRaftConfigValue(metadata) } ) BeforeEach(func() { network.exec(func(c *chain) { c.opts.EvictionSuspicion = time.Millisecond * 100 c.opts.LeaderCheckInterval = time.Millisecond * 100 }) network.init() network.start() network.elect(1) By("Submitting first tx to cut the block") c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) c1.clock.Increment(interval) network.exec( func(c *chain) { Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1)) }) }) AfterEach(func() { network.stop() }) Context("reconfiguration", func() { It("can rotate certificate by adding and removing 1 node in one config update", func() { metadata := &raftprotos.ConfigMetadata{Options: options} for id, consenter := range consenters { if id == 2 { // remove second consenter continue } metadata.Consenters = append(metadata.Consenters, consenter) } // add new consenter newConsenter := &raftprotos.Consenter{ Host: "localhost", Port: 7050, ServerTlsCert: serverTLSCert(tlsCA), ClientTlsCert: clientTLSCert(tlsCA), } metadata.Consenters = append(metadata.Consenters, newConsenter) value := map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(metadata), }), }, } By("creating new configuration with removed node and new one") configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value)) c1.cutter.SetCutNext(true) By("sending config transaction") Expect(c1.Configure(configEnv, 0)).To(Succeed()) network.exec(func(c *chain) { Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2)) }) }) It("rotates leader certificate and triggers leadership transfer", func() { metadata := &raftprotos.ConfigMetadata{Options: options} for id, consenter := range consenters { if id == 1 { // remove first consenter, which is the leader continue } metadata.Consenters = append(metadata.Consenters, consenter) } // add new consenter newConsenter := &raftprotos.Consenter{ Host: "localhost", Port: 7050, ServerTlsCert: serverTLSCert(tlsCA), ClientTlsCert: clientTLSCert(tlsCA), } metadata.Consenters = append(metadata.Consenters, newConsenter) value := map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(metadata), }), }, } By("creating new configuration with removed node and new one") configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value)) c1.cutter.SetCutNext(true) By("sending config transaction") Expect(c1.Configure(configEnv, 0)).To(Succeed()) Eventually(c1.observe, LongEventualTimeout).Should(Receive(BeFollower())) network.exec(func(c *chain) { Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2)) }) }) When("Leader is disconnected after cert rotation", func() { It("still configures communication after failed leader transfer attempt", func() { metadata := &raftprotos.ConfigMetadata{Options: options} for id, consenter := range consenters { if id == 1 { // remove second consenter continue } metadata.Consenters = append(metadata.Consenters, consenter) } // add new consenter newConsenter := &raftprotos.Consenter{ Host: "localhost", Port: 7050, ServerTlsCert: serverTLSCert(tlsCA), ClientTlsCert: clientTLSCert(tlsCA), } metadata.Consenters = append(metadata.Consenters, newConsenter) value := map[string]*common.ConfigValue{ "ConsensusType": { Version: 1, Value: marshalOrPanic(&orderer.ConsensusType{ Metadata: marshalOrPanic(metadata), }), }, } By("creating new configuration with removed node and new one") configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value)) c1.cutter.SetCutNext(true) step1 := c1.getStepFunc() count := c1.rpc.SendConsensusCallCount() // record current step call count c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { // disconnect network after 4 MsgApp are sent by c1: // - 2 MsgApp to c2 & c3 that replicate data to raft followers // - 2 MsgApp to c2 & c3 that instructs followers to commit data if c1.rpc.SendConsensusCallCount() == count+4 { defer network.disconnect(1) } return step1(dest, msg) }) network.exec(func(c *chain) { Consistently(c.clock.WatcherCount).Should(Equal(1)) }) By("sending config transaction") Expect(c1.Configure(configEnv, 0)).To(Succeed()) c1.clock.WaitForNWatchersAndIncrement(time.Duration(ELECTION_TICK)*interval, 2) }) }) It("adding node to the cluster", func() { addConsenterUpdate := addConsenterConfigValue() configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterUpdate)) c1.cutter.SetCutNext(true) By("sending config transaction") err := c1.Configure(configEnv, 0) Expect(err).NotTo(HaveOccurred()) Expect(c1.fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1)) Expect(c1.fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) network.exec(func(c *chain) { Eventually(c.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1)) Eventually(c.fakeFields.fakeClusterSize.SetCallCount, LongEventualTimeout).Should(Equal(2)) Expect(c.fakeFields.fakeClusterSize.SetArgsForCall(1)).To(Equal(float64(4))) }) _, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0) meta := &common.Metadata{Value: raftmetabytes} raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil) Expect(err).NotTo(HaveOccurred()) c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil) // if we join a node to existing network, it MUST already obtained blocks // till the config block that adds this node to cluster. c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0)) c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0)) c4.init() network.addChain(c4) c4.Start() // ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added // to leader's node list right away. An immediate tick does not trigger a heartbeat // being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins // the cluster successfully. Eventually(func() <-chan raft.SoftState { c1.clock.Increment(interval) return c4.observe }, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower}))) Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1)) Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1)) By("submitting new transaction to follower") c1.cutter.SetCutNext(true) err = c4.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(c4.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(c4.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(2)) }) }) It("disconnecting follower node -> adding new node to the cluster -> writing blocks on the ledger -> reconnecting the follower node", func() { By("Disconnecting a follower node") network.disconnect(c2.id) By("Configuring an additional node") addConsenterUpdate := addConsenterConfigValue() configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterUpdate)) c1.cutter.SetCutNext(true) By("Sending config transaction") err := c1.Configure(configEnv, 0) Expect(err).NotTo(HaveOccurred()) Expect(c1.fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1)) Expect(c1.fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) network.exec(func(c *chain) { if c.id == c2.id { return } Eventually(c.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1)) Eventually(c.fakeFields.fakeClusterSize.SetCallCount, LongEventualTimeout).Should(Equal(2)) Expect(c.fakeFields.fakeClusterSize.SetArgsForCall(1)).To(Equal(float64(4))) }) _, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0) meta := &common.Metadata{Value: raftmetabytes} raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil) Expect(err).NotTo(HaveOccurred()) By("Starting the new node") c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil) // if we join a node to existing network, it MUST already obtained blocks // till the config block that adds this node to cluster. c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0)) c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0)) c4.init() network.addChain(c4) c4.Start() // ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added // to leader's node list right away. An immediate tick does not trigger a heartbeat // being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins // the cluster successfully. Eventually(func() <-chan raft.SoftState { c1.clock.Increment(interval) return c4.observe }, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower}))) Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1)) Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1)) By("Sending data blocks to leader") numOfBlocks := 100 for i := 0; i < numOfBlocks; i++ { c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) } By("Reconnecting the follower node") network.connect(c2.id) c1.clock.Increment(interval) By("Checking correct synchronization") network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1 + numOfBlocks)) Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) It("stop leader and continue reconfiguration failing over to new leader", func() { // Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader // configure chain support mock to disconnect c1 right after it writes configuration block // into the ledger, this to simulate failover. // Next boostraping a new node c4 to join a cluster and creating config transaction, submitting // it to the leader. Once leader writes configuration block it fails and leadership transferred to // c2. // Test asserts that new node c4, will join the cluster and c2 will handle failover of // re-configuration. Later we connecting c1 back and making sure it capable of catching up with // new configuration and successfully rejoins replica set. configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterConfigValue())) c1.cutter.SetCutNext(true) step1 := c1.getStepFunc() count := c1.rpc.SendConsensusCallCount() // record current step call count c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { // disconnect network after 4 MsgApp are sent by c1: // - 2 MsgApp to c2 & c3 that replicate data to raft followers // - 2 MsgApp to c2 & c3 that instructs followers to commit data if c1.rpc.SendConsensusCallCount() == count+4 { defer network.disconnect(1) } return step1(dest, msg) }) By("sending config transaction") err := c1.Configure(configEnv, 0) Expect(err).NotTo(HaveOccurred()) // every node has written config block to the OSN ledger network.exec( func(c *chain) { Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) Eventually(c1.rpc.SendConsensusCallCount, LongEventualTimeout).Should(Equal(count + 6)) c1.setStepFunc(step1) // elect node with higher index i2, _ := c2.storage.LastIndex() // err is always nil i3, _ := c3.storage.LastIndex() candidate := uint64(2) if i3 > i2 { candidate = 3 } network.chains[candidate].cutter.SetCutNext(true) network.elect(candidate) _, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0) meta := &common.Metadata{Value: raftmetabytes} raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil) Expect(err).NotTo(HaveOccurred()) c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil) // if we join a node to existing network, it MUST already obtained blocks // till the config block that adds this node to cluster. c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0)) c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0)) c4.init() network.addChain(c4) c4.start() Expect(c4.WaitReady()).To(Succeed()) network.join(4, true) Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) By("submitting new transaction to follower") err = c4.Order(env, 0) Expect(err).NotTo(HaveOccurred()) // rest nodes are alive include a newly added, hence should write 2 blocks Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) // node 1 has been stopped should not write any block Consistently(c1.support.WriteBlockCallCount).Should(Equal(1)) network.join(1, true) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) It("stop cluster quorum and continue reconfiguration after the restart", func() { // Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader // configure chain support mock to stop cluster after config block is committed. // Restart the cluster and ensure it picks up updates and capable to finish reconfiguration. configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterConfigValue())) c1.cutter.SetCutNext(true) step1 := c1.getStepFunc() count := c1.rpc.SendConsensusCallCount() // record current step call count c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { // disconnect network after 4 MsgApp are sent by c1: // - 2 MsgApp to c2 & c3 that replicate data to raft followers // - 2 MsgApp to c2 & c3 that instructs followers to commit data if c1.rpc.SendConsensusCallCount() == count+4 { defer func() { network.disconnect(1) network.disconnect(2) network.disconnect(3) }() } return step1(dest, msg) }) By("sending config transaction") err := c1.Configure(configEnv, 0) Expect(err).NotTo(HaveOccurred()) // every node has written config block to the OSN ledger network.exec( func(c *chain) { Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) // assert conf change proposals have been dropped, before proceed to reconnect network Eventually(c1.rpc.SendConsensusCallCount, LongEventualTimeout).Should(Equal(count + 6)) c1.setStepFunc(step1) _, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0) meta := &common.Metadata{Value: raftmetabytes} raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil) Expect(err).NotTo(HaveOccurred()) c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil) // if we join a node to existing network, it MUST already obtained blocks // till the config block that adds this node to cluster. c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0)) c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0)) c4.init() network.addChain(c4) By("reconnecting nodes back") for i := uint64(1); i < 4; i++ { network.connect(i) } // elect node with higher index i2, _ := c2.storage.LastIndex() // err is always nil i3, _ := c3.storage.LastIndex() candidate := uint64(2) if i3 > i2 { candidate = 3 } network.chains[candidate].cutter.SetCutNext(true) network.elect(candidate) c4.start() Expect(c4.WaitReady()).To(Succeed()) network.join(4, false) Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) By("submitting new transaction to follower") err = c4.Order(env, 0) Expect(err).NotTo(HaveOccurred()) // rest nodes are alive include a newly added, hence should write 2 blocks Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) It("removes leader from replica set", func() { // Scenario: Starting replica set of 3 nodes, electing nodeID = 1 to be the leader. // Prepare config update transaction which removes leader (nodeID = 1), this to // ensure we handle re-configuration of node removal correctly and remaining two // nodes still capable to form functional quorum and Raft capable of making further progress. // Moreover test asserts that removed node stops Rafting with rest of the cluster, i.e. // should not be able to get updates or forward transactions. configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, removeConsenterConfigValue(1))) // remove nodeID == 1 c1.cutter.SetCutNext(true) By("sending config transaction") err := c1.Configure(configEnv, 0) Expect(err).NotTo(HaveOccurred()) time.Sleep(time.Duration(ELECTION_TICK) * interval) Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c2.fakeFields.fakeClusterSize.SetCallCount, LongEventualTimeout).Should(Equal(2)) // Assert c1 has exited Eventually(c1.Errored, LongEventualTimeout).Should(BeClosed()) var newLeader, remainingFollower *chain var c2state raft.SoftState var c3state raft.SoftState retry := 1 for newLeader == nil || remainingFollower == nil { select { case c2state = <-c2.observe: case c3state = <-c3.observe: case <-time.After(LongEventualTimeout): // abdicateleader might fail to transfer the leadership when the next candidate // busy with applying committed entries; in that case, // send an artificial MsgTimeoutNow to node to pick next leader if retry > 0 { retry -= 1 By("leadership transfer not complete, hence retrying") c2.Consensus(&orderer.ConsensusRequest{Payload: protoutil.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow, To: 2})}, 0) continue } Fail("Expected a new leader to present") } // an agreed leader among the two, which is one of the two remaining nodes if ((c2state.RaftState == raft.StateFollower && c3state.RaftState == raft.StateLeader) || (c2state.RaftState == raft.StateLeader && c3state.RaftState == raft.StateFollower)) && c2state.Lead == c3state.Lead && c2state.Lead != raft.None { newLeader = network.chains[c2state.Lead] if c2state.RaftState == raft.StateFollower { remainingFollower = c2 } else { remainingFollower = c3 } } } By("submitting transaction to new leader") newLeader.cutter.SetCutNext(true) err = newLeader.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(newLeader.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(remainingFollower.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) By("trying to submit to new node, expected to fail") c1.cutter.SetCutNext(true) err = c1.Order(env, 0) Expect(err).To(HaveOccurred()) // number of block writes should remain the same Consistently(newLeader.support.WriteBlockCallCount).Should(Equal(2)) Consistently(remainingFollower.support.WriteBlockCallCount).Should(Equal(2)) Consistently(c1.support.WriteBlockCallCount).Should(Equal(1)) }) It("does not deadlock if leader steps down while config block is in-flight", func() { configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterConfigValue())) c1.cutter.SetCutNext(true) signal := make(chan struct{}) stub := c1.support.WriteConfigBlockStub c1.support.WriteConfigBlockStub = func(b *common.Block, meta []byte) { signal <- struct{}{} <-signal stub(b, meta) } By("Sending config transaction") Expect(c1.Configure(configEnv, 0)).To(Succeed()) Eventually(signal, LongEventualTimeout).Should(Receive()) network.disconnect(1) By("Ticking leader till it steps down") Eventually(func() raft.SoftState { c1.clock.Increment(interval) return c1.Node.Status().SoftState }, LongEventualTimeout).Should(StateEqual(0, raft.StateFollower)) close(signal) Eventually(c1.observe, LongEventualTimeout).Should(Receive(StateEqual(0, raft.StateFollower))) By("Re-electing 1 as leader") network.connect(1) network.elect(1) _, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0) meta := &common.Metadata{Value: raftmetabytes} raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil) Expect(err).NotTo(HaveOccurred()) c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil) // if we join a node to existing network, it MUST already obtained blocks // till the config block that adds this node to cluster. c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0)) c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0)) c4.init() network.addChain(c4) c4.Start() Eventually(func() <-chan raft.SoftState { c1.clock.Increment(interval) return c4.observe }, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateFollower))) By("Submitting tx to confirm network is still working") Expect(c1.Order(env, 0)).To(Succeed()) network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) }) }) When("3/3 nodes are running", func() { JustBeforeEach(func() { network.init() network.start() network.elect(1) }) AfterEach(func() { network.stop() }) It("correctly sets the cluster size and leadership metrics", func() { // the network should see only one leadership change network.exec(func(c *chain) { Expect(c.fakeFields.fakeLeaderChanges.AddCallCount()).Should(Equal(1)) Expect(c.fakeFields.fakeLeaderChanges.AddArgsForCall(0)).Should(Equal(float64(1))) Expect(c.fakeFields.fakeClusterSize.SetCallCount()).Should(Equal(1)) Expect(c.fakeFields.fakeClusterSize.SetArgsForCall(0)).To(Equal(float64(3))) }) // c1 should be the leader Expect(c1.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(2)) Expect(c1.fakeFields.fakeIsLeader.SetArgsForCall(1)).Should(Equal(float64(1))) // c2 and c3 should continue to remain followers Expect(c2.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(1)) Expect(c2.fakeFields.fakeIsLeader.SetArgsForCall(0)).Should(Equal(float64(0))) Expect(c3.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(1)) Expect(c3.fakeFields.fakeIsLeader.SetArgsForCall(0)).Should(Equal(float64(0))) }) It("orders envelope on leader", func() { By("instructed to cut next block") c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) network.exec( func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) By("respect batch timeout") c1.cutter.SetCutNext(false) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2)) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1))) Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) c1.clock.WaitForNWatchersAndIncrement(timeout, 2) network.exec( func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) }) It("orders envelope on follower", func() { By("instructed to cut next block") c1.cutter.SetCutNext(true) err := c2.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(c2.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(c2.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(0)) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) By("respect batch timeout") c1.cutter.SetCutNext(false) err = c2.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Expect(c2.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2)) Expect(c2.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1))) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(0)) Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) c1.clock.WaitForNWatchersAndIncrement(timeout, 2) network.exec( func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) }) When("MaxInflightBlocks is reached", func() { BeforeEach(func() { network.exec(func(c *chain) { c.opts.MaxInflightBlocks = 1 }) }) It("waits for in flight blocks to be committed", func() { c1.cutter.SetCutNext(true) // disconnect c1 to disrupt consensus network.disconnect(1) Expect(c1.Order(env, 0)).To(Succeed()) doneProp := make(chan struct{}) go func() { defer GinkgoRecover() Expect(c1.Order(env, 0)).To(Succeed()) close(doneProp) }() // expect second `Order` to block Consistently(doneProp).ShouldNot(BeClosed()) network.exec(func(c *chain) { Consistently(c.support.WriteBlockCallCount).Should(BeZero()) }) network.connect(1) c1.clock.Increment(interval) Eventually(doneProp, LongEventualTimeout).Should(BeClosed()) network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) }) It("resets block in flight when steps down from leader", func() { c1.cutter.SetCutNext(true) c2.cutter.SetCutNext(true) // disconnect c1 to disrupt consensus network.disconnect(1) Expect(c1.Order(env, 0)).To(Succeed()) doneProp := make(chan struct{}) go func() { defer GinkgoRecover() Expect(c1.Order(env, 0)).To(Succeed()) close(doneProp) }() // expect second `Order` to block Consistently(doneProp).ShouldNot(BeClosed()) network.exec(func(c *chain) { Consistently(c.support.WriteBlockCallCount).Should(BeZero()) }) network.elect(2) Expect(c3.Order(env, 0)).To(Succeed()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) network.connect(1) c2.clock.Increment(interval) Eventually(doneProp, LongEventualTimeout).Should(BeClosed()) network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) }) }) When("gRPC stream to leader is stuck", func() { BeforeEach(func() { c2.opts.RPCTimeout = time.Second network.Lock() network.delayWG.Add(1) network.Unlock() }) It("correctly times out", func() { err := c2.Order(env, 0) Expect(err).To(MatchError("timed out (1s) waiting on forwarding to 1")) network.delayWG.Done() }) }) When("leader is disconnected", func() { It("correctly returns a failure to the client when forwarding from a follower", func() { network.disconnect(1) err := c2.Order(env, 0) Expect(err).To(MatchError("connection lost")) }) It("proactively steps down to follower", func() { network.disconnect(1) By("Ticking leader until it steps down") Eventually(func() <-chan raft.SoftState { c1.clock.Increment(interval) return c1.observe }, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 0, RaftState: raft.StateFollower}))) By("Ensuring it does not accept message due to the cluster being leaderless") err := c1.Order(env, 0) Expect(err).To(MatchError("no Raft leader")) network.elect(2) // c1 should have lost leadership Expect(c1.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(3)) Expect(c1.fakeFields.fakeIsLeader.SetArgsForCall(2)).Should(Equal(float64(0))) // c2 should become the leader Expect(c2.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(2)) Expect(c2.fakeFields.fakeIsLeader.SetArgsForCall(1)).Should(Equal(float64(1))) // c2 should continue to remain follower Expect(c3.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(1)) network.join(1, true) network.exec(func(c *chain) { Expect(c.fakeFields.fakeLeaderChanges.AddCallCount()).Should(Equal(3)) Expect(c.fakeFields.fakeLeaderChanges.AddArgsForCall(2)).Should(Equal(float64(1))) }) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) }) It("does not deadlock if propose is blocked", func() { signal := make(chan struct{}) c1.cutter.SetCutNext(true) c1.support.SequenceStub = func() uint64 { signal <- struct{}{} <-signal return 0 } By("Sending a normal transaction") Expect(c1.Order(env, 0)).To(Succeed()) Eventually(signal).Should(Receive()) network.disconnect(1) By("Ticking leader till it steps down") Eventually(func() raft.SoftState { c1.clock.Increment(interval) return c1.Node.Status().SoftState }).Should(StateEqual(0, raft.StateFollower)) close(signal) Eventually(c1.observe).Should(Receive(StateEqual(0, raft.StateFollower))) c1.support.SequenceStub = nil network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) }) By("Re-electing 1 as leader") network.connect(1) network.elect(1) By("Sending another normal transaction") Expect(c1.Order(env, 0)).To(Succeed()) network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) }) }) }) When("follower is disconnected", func() { It("should return error when receiving an env", func() { network.disconnect(2) errorC := c2.Errored() Consistently(errorC).ShouldNot(BeClosed()) // assert that errorC is not closed By("Ticking node 2 until it becomes pre-candidate") Eventually(func() <-chan raft.SoftState { c2.clock.Increment(interval) return c2.observe }, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 0, RaftState: raft.StatePreCandidate}))) Eventually(errorC).Should(BeClosed()) err := c2.Order(env, 0) Expect(err).To(HaveOccurred()) Expect(c2.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1)) Expect(c2.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1))) Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(0)) network.connect(2) c1.clock.Increment(interval) Expect(errorC).To(BeClosed()) Eventually(c2.Errored).ShouldNot(BeClosed()) }) }) It("leader retransmits lost messages", func() { // This tests that heartbeats will trigger leader to retransmit lost MsgApp c1.cutter.SetCutNext(true) network.disconnect(1) // drop MsgApp err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Consistently(func() int { return c.support.WriteBlockCallCount() }).Should(Equal(0)) }) network.connect(1) // reconnect leader c1.clock.Increment(interval) // trigger a heartbeat network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) }) It("allows the leader to create multiple normal blocks without having to wait for them to be written out", func() { // this ensures that the created blocks are not written out network.disconnect(1) c1.cutter.SetCutNext(true) for i := 0; i < 3; i++ { Expect(c1.Order(env, 0)).To(Succeed()) } Consistently(c1.support.WriteBlockCallCount).Should(Equal(0)) network.connect(1) // After FAB-13722, leader would pause replication if it gets notified that message // delivery to certain node is failed, i.e. connection refused. Replication to that // follower is resumed if leader receives a MsgHeartbeatResp from it. // We could certainly repeatedly tick leader to trigger heartbeat broadcast, but we // would also risk a slow leader stepping down due to excessive ticks. // // Instead, we can simply send artificial MsgHeartbeatResp to leader to resume. m2 := &raftpb.Message{To: c1.id, From: c2.id, Type: raftpb.MsgHeartbeatResp} c1.Consensus(&orderer.ConsensusRequest{Channel: channelID, Payload: protoutil.MarshalOrPanic(m2)}, c2.id) m3 := &raftpb.Message{To: c1.id, From: c3.id, Type: raftpb.MsgHeartbeatResp} c1.Consensus(&orderer.ConsensusRequest{Channel: channelID, Payload: protoutil.MarshalOrPanic(m3)}, c3.id) network.exec(func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) }) }) It("new leader should wait for in-fight blocks to commit before accepting new env", func() { // Scenario: when a node is elected as new leader and there are still in-flight blocks, // it should not immediately start accepting new envelopes, instead it should wait for // those in-flight blocks to be committed, otherwise we may create uncle block which // forks and panicks chain. // // Steps: // - start raft cluster with three nodes and genesis block0 // - order env1 on c1, which creates block1 // - drop MsgApp from 1 to 3 // - drop second round of MsgApp sent from 1 to 2, so that block1 is only committed on c1 // - disconnect c1 and elect c2 // - order env2 on c2. This env must NOT be immediately accepted, otherwise c2 would create // an uncle block1 based on block0. // - c2 commits block1 // - c2 accepts env2, and creates block2 // - c2 commits block2 c1.cutter.SetCutNext(true) c2.cutter.SetCutNext(true) step1 := c1.getStepFunc() c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { stepMsg := &raftpb.Message{} Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred()) if dest == 3 { return nil } if stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) == 0 { return nil } return step1(dest, msg) }) Expect(c1.Order(env, 0)).NotTo(HaveOccurred()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Consistently(c2.support.WriteBlockCallCount).Should(Equal(0)) Consistently(c3.support.WriteBlockCallCount).Should(Equal(0)) network.disconnect(1) step2 := c2.getStepFunc() c2.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { stepMsg := &raftpb.Message{} Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred()) if stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) != 0 && dest == 3 { for _, ent := range stepMsg.Entries { if len(ent.Data) != 0 { return nil } } } return step2(dest, msg) }) network.elect(2) go func() { defer GinkgoRecover() Expect(c2.Order(env, 0)).NotTo(HaveOccurred()) }() Consistently(c2.support.WriteBlockCallCount).Should(Equal(0)) Consistently(c3.support.WriteBlockCallCount).Should(Equal(0)) c2.setStepFunc(step2) c2.clock.Increment(interval) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) b, _ := c2.support.WriteBlockArgsForCall(0) Expect(b.Header.Number).To(Equal(uint64(1))) b, _ = c2.support.WriteBlockArgsForCall(1) Expect(b.Header.Number).To(Equal(uint64(2))) }) Context("handling config blocks", func() { var configEnv *common.Envelope BeforeEach(func() { values := map[string]*common.ConfigValue{ "BatchTimeout": { Version: 1, Value: marshalOrPanic(&orderer.BatchTimeout{ Timeout: "3ms", }), }, } configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, values), ) }) It("holds up block creation on leader once a config block has been created and not written out", func() { // this ensures that the created blocks are not written out network.disconnect(1) c1.cutter.SetCutNext(true) // config block err := c1.Order(configEnv, 0) Expect(err).NotTo(HaveOccurred()) // to avoid data races since we are accessing these within a goroutine tempEnv := env tempC1 := c1 done := make(chan struct{}) // normal block go func() { defer GinkgoRecover() // This should be blocked if config block is not committed err := tempC1.Order(tempEnv, 0) Expect(err).NotTo(HaveOccurred()) close(done) }() Consistently(done).ShouldNot(BeClosed()) network.connect(1) c1.clock.Increment(interval) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteConfigBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) }) It("continues creating blocks on leader after a config block has been successfully written out", func() { c1.cutter.SetCutNext(true) // config block err := c1.Configure(configEnv, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteConfigBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) // normal block following config block err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) }) }) When("Snapshotting is enabled", func() { BeforeEach(func() { c1.opts.SnapshotIntervalSize = 1 c1.opts.SnapshotCatchUpEntries = 1 }) It("take snapshot on accumlated bytes condition met", func() { // change the SnapshotIntervalSize on the chains c3.opts.SnapshotIntervalSize = 1 c3.opts.SnapshotCatchUpEntries = 1 c2.opts.SnapshotIntervalSize = 1 c2.opts.SnapshotCatchUpEntries = 1 countSnapShotsForChain := func(cn *chain) int { files, err := ioutil.ReadDir(cn.opts.SnapDir) Expect(err).NotTo(HaveOccurred()) return len(files) } Expect(countSnapShotsForChain(c1)).Should(Equal(0)) Expect(countSnapShotsForChain(c3)).Should(Equal(0)) By("order envelop on node 1 to accumulate bytes") c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) // all three nodes should take snapshots network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) // order data on all nodes except node 3, empty the raft message directed to node 3 // node 1 should take a snapshot but node 3 should not snapshots_on_node3 := countSnapShotsForChain(c3) step1 := c1.getStepFunc() c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { stepMsg := &raftpb.Message{} Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred()) if dest == 3 && stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) > 0 { stepMsg.Entries = stepMsg.Entries[0:1] stepMsg.Entries[0].Data = nil msg.Payload = protoutil.MarshalOrPanic(stepMsg) } return step1(dest, msg) }) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) // order data on all nodes except node 3, send raft raftpb.EntryConfChange message to node 3 // node 1 should take a snapshot but node 3 should not c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { stepMsg := &raftpb.Message{} Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred()) if dest == 3 && stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) > 0 { stepMsg.Entries = stepMsg.Entries[0:1] // change message type to raftpb.EntryConfChange stepMsg.Entries[0].Type = raftpb.EntryConfChange cc := &raftpb.ConfChange{NodeID: uint64(3), Type: raftpb.ConfChangeRemoveNode} data, err := cc.Marshal() Expect(err).NotTo(HaveOccurred()) stepMsg.Entries[0].Data = data msg.Payload = protoutil.MarshalOrPanic(stepMsg) } return step1(dest, msg) }) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) countSnapShotsForc1 := func() int { return countSnapShotsForChain(c1) } Eventually(countSnapShotsForc1, LongEventualTimeout).Should(Equal(3)) // No snapshot would be taken for node 3 after this orrderer request additional_snapshots_for_node3 := countSnapShotsForChain(c3) - snapshots_on_node3 Expect(additional_snapshots_for_node3).Should(Equal(0)) }) It("keeps running if some entries in memory are purged", func() { // Scenario: snapshotting is enabled on node 1 and it purges memory storage // per every snapshot. Cluster should be correctly functioning. i, err := c1.opts.MemoryStorage.FirstIndex() Expect(err).NotTo(HaveOccurred()) Expect(i).To(Equal(uint64(1))) c1.cutter.SetCutNext(true) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) Eventually(c1.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) i, err = c1.opts.MemoryStorage.FirstIndex() Expect(err).NotTo(HaveOccurred()) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) Eventually(c1.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) i, err = c1.opts.MemoryStorage.FirstIndex() Expect(err).NotTo(HaveOccurred()) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3)) }) Eventually(c1.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i)) }) It("lagged node can catch up using snapshot", func() { network.disconnect(2) c1.cutter.SetCutNext(true) c2Lasti, _ := c2.opts.MemoryStorage.LastIndex() var blockCnt int // Order blocks until first index of c1 memory is greater than last index of c2, // so a snapshot will be sent to c2 when it rejoins network Eventually(func() bool { c1Firsti, _ := c1.opts.MemoryStorage.FirstIndex() if c1Firsti > c2Lasti+1 { return true } Expect(c1.Order(env, 0)).To(Succeed()) blockCnt++ Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(blockCnt)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(blockCnt)) return false }, LongEventualTimeout).Should(BeTrue()) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) network.join(2, false) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(blockCnt)) indices := etcdraft.ListSnapshots(logger, c2.opts.SnapDir) Expect(indices).To(HaveLen(1)) gap := indices[0] - c2Lasti // TODO In theory, "equal" is the accurate behavior we expect. However, eviction suspector, // which calls block puller, is still replying on real clock, and sometimes increment puller // call count. Therefore we are being more lenient here until suspector starts using fake clock // so we have more deterministic control over it. Expect(c2.puller.PullBlockCallCount()).To(BeNumerically(">=", int(gap))) // chain should keeps functioning Expect(c2.Order(env, 0)).To(Succeed()) network.exec( func(c *chain) { Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(blockCnt + 1)) }) }) }) Context("failover", func() { It("follower should step up as leader upon failover", func() { network.stop(1) network.elect(2) By("order envelope on new leader") c2.cutter.SetCutNext(true) err := c2.Order(env, 0) Expect(err).NotTo(HaveOccurred()) // block should not be produced on chain 1 Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) // block should be produced on chain 2 & 3 Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) By("order envelope on follower") err = c3.Order(env, 0) Expect(err).NotTo(HaveOccurred()) // block should not be produced on chain 1 Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) // block should be produced on chain 2 & 3 Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) }) It("follower cannot be elected if its log is not up-to-date", func() { network.disconnect(2) c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) network.disconnect(1) network.connect(2) // node 2 has not caught up with other nodes for tick := 0; tick < 2*ELECTION_TICK-1; tick++ { c2.clock.Increment(interval) Consistently(c2.observe).ShouldNot(Receive(Equal(2))) } // When PreVote is enabled, node 2 would fail to collect enough // PreVote because its index is not up-to-date. Therefore, it // does not cause leader change on other nodes. Consistently(c3.observe).ShouldNot(Receive()) network.elect(3) // node 3 has newest logs among 2&3, so it can be elected }) It("PreVote prevents reconnected node from disturbing network", func() { network.disconnect(2) c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) network.connect(2) for tick := 0; tick < 2*ELECTION_TICK-1; tick++ { c2.clock.Increment(interval) Consistently(c2.observe).ShouldNot(Receive(Equal(2))) } Consistently(c1.observe).ShouldNot(Receive()) Consistently(c3.observe).ShouldNot(Receive()) }) It("follower can catch up and then campaign with success", func() { network.disconnect(2) c1.cutter.SetCutNext(true) for i := 0; i < 10; i++ { err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) } Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10)) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0)) Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10)) network.join(2, false) Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10)) network.disconnect(1) network.elect(2) }) It("purges blockcutter, stops timer and discards created blocks if leadership is lost", func() { // enqueue one transaction into 1's blockcutter to test for purging of block cutter c1.cutter.SetCutNext(false) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1)) // no block should be written because env is not cut into block yet c1.clock.WaitForNWatchersAndIncrement(interval, 2) Consistently(c1.support.WriteBlockCallCount).Should(Equal(0)) network.disconnect(1) network.elect(2) network.join(1, true) Eventually(c1.clock.WatcherCount, LongEventualTimeout).Should(Equal(1)) // blockcutter time is stopped Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(0)) // the created block should be discarded since there is a leadership change Consistently(c1.support.WriteBlockCallCount).Should(Equal(0)) network.disconnect(2) network.elect(1) err = c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) // The following group of assertions is redundant - it's here for completeness. // If the blockcutter has not been reset, fast-forwarding 1's clock to 'timeout', should result in the blockcutter firing. // If the blockcucter has been reset, fast-forwarding won't do anything. // // Put differently: // // correct: // stop start fire // |--------------|---------------------------| // n*intervals timeout // (advanced in election) // // wrong: // unstop fire // |---------------------------| // timeout // // timeout-n*interval n*interval // |-----------|----------------| // ^ ^ // at this point of time it should fire // timer should not fire at this point c1.clock.WaitForNWatchersAndIncrement(timeout-interval, 2) Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0)) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0)) c1.clock.Increment(interval) Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1)) }) It("stale leader should not be able to propose block because of lagged term", func() { network.disconnect(1) network.elect(2) network.connect(1) c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Consistently(c.support.WriteBlockCallCount).Should(Equal(0)) }) }) It("aborts waiting for block to be committed upon leadership lost", func() { network.disconnect(1) c1.cutter.SetCutNext(true) err := c1.Order(env, 0) Expect(err).NotTo(HaveOccurred()) network.exec( func(c *chain) { Consistently(c.support.WriteBlockCallCount).Should(Equal(0)) }) network.elect(2) network.connect(1) c2.clock.Increment(interval) // this check guarantees that signal on resignC is consumed in commitBatches method. Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower}))) }) }) }) }) }) func nodeConfigFromMetadata(consenterMetadata *raftprotos.ConfigMetadata) []cluster.RemoteNode { var nodes []cluster.RemoteNode for i, consenter := range consenterMetadata.Consenters { // For now, skip ourselves if i == 0 { continue } serverDER, _ := pem.Decode(consenter.ServerTlsCert) clientDER, _ := pem.Decode(consenter.ClientTlsCert) node := cluster.RemoteNode{ NodeAddress: cluster.NodeAddress{ ID: uint64(i + 1), Endpoint: "localhost:7050", }, NodeCerts: cluster.NodeCerts{ ServerTLSCert: serverDER.Bytes, ClientTLSCert: clientDER.Bytes, }, } nodes = append(nodes, node) } return nodes } func createMetadata(nodeCount int, tlsCA tlsgen.CA) *raftprotos.ConfigMetadata { md := &raftprotos.ConfigMetadata{Options: &raftprotos.Options{ TickInterval: time.Duration(interval).String(), ElectionTick: ELECTION_TICK, HeartbeatTick: HEARTBEAT_TICK, MaxInflightBlocks: 5, }} for i := 0; i < nodeCount; i++ { md.Consenters = append(md.Consenters, &raftprotos.Consenter{ Host: "localhost", Port: 7050, ServerTlsCert: serverTLSCert(tlsCA), ClientTlsCert: clientTLSCert(tlsCA), }) } return md } func serverTLSCert(tlsCA tlsgen.CA) []byte { cert, err := tlsCA.NewServerCertKeyPair("localhost") if err != nil { panic(err) } return cert.Cert } func clientTLSCert(tlsCA tlsgen.CA) []byte { cert, err := tlsCA.NewClientCertKeyPair() if err != nil { panic(err) } return cert.Cert } // marshalOrPanic serializes a protobuf message and panics if this // operation fails func marshalOrPanic(pb proto.Message) []byte { data, err := proto.Marshal(pb) if err != nil { panic(err) } return data } // helpers to facilitate tests type stepFunc func(dest uint64, msg *orderer.ConsensusRequest) error type chain struct { id uint64 stepLock sync.Mutex step stepFunc // msgBuffer serializes ingress messages for a chain // so they are delivered in the same order msgBuffer chan *msg support *consensusmocks.FakeConsenterSupport cutter *mockblockcutter.Receiver configurator *mocks.FakeConfigurator rpc *mocks.FakeRPC storage *raft.MemoryStorage clock *fakeclock.FakeClock opts etcdraft.Options puller *mocks.FakeBlockPuller // store written blocks to be returned by mock block puller ledgerLock sync.RWMutex ledger map[uint64]*common.Block ledgerHeight uint64 lastConfigBlockNumber uint64 observe chan raft.SoftState unstarted chan struct{} stopped chan struct{} haltCallback func() fakeFields *fakeMetricsFields *etcdraft.Chain cryptoProvider bccsp.BCCSP } type msg struct { req *orderer.ConsensusRequest sender uint64 } func newChain( timeout time.Duration, channel, dataDir string, id uint64, raftMetadata *raftprotos.BlockMetadata, consenters map[uint64]*raftprotos.Consenter, cryptoProvider bccsp.BCCSP, support *consensusmocks.FakeConsenterSupport, haltCallback func(), ) *chain { rpc := &mocks.FakeRPC{} clock := fakeclock.NewFakeClock(time.Now()) storage := raft.NewMemoryStorage() fakeFields := newFakeMetricsFields() opts := etcdraft.Options{ RPCTimeout: timeout, RaftID: uint64(id), Clock: clock, TickInterval: interval, ElectionTick: ELECTION_TICK, HeartbeatTick: HEARTBEAT_TICK, MaxSizePerMsg: 1024 * 1024, MaxInflightBlocks: 256, BlockMetadata: raftMetadata, LeaderCheckInterval: 500 * time.Millisecond, Consenters: consenters, Logger: flogging.NewFabricLogger(zap.NewExample()), MemoryStorage: storage, WALDir: path.Join(dataDir, "wal"), SnapDir: path.Join(dataDir, "snapshot"), Metrics: newFakeMetrics(fakeFields), } if support == nil { support = &consensusmocks.FakeConsenterSupport{} support.ChannelIDReturns(channel) support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) } cutter := mockblockcutter.NewReceiver() close(cutter.Block) support.BlockCutterReturns(cutter) // upon leader change, lead is reset to 0 before set to actual // new leader, i.e. 1 -> 0 -> 2. Therefore 2 numbers will be // sent on this chan, so we need size to be 2 observe := make(chan raft.SoftState, 2) configurator := &mocks.FakeConfigurator{} puller := &mocks.FakeBlockPuller{} ch := make(chan struct{}) close(ch) c := &chain{ id: id, support: support, cutter: cutter, rpc: rpc, storage: storage, observe: observe, clock: clock, opts: opts, unstarted: ch, stopped: make(chan struct{}), configurator: configurator, puller: puller, ledger: map[uint64]*common.Block{ 0: getSeedBlock(), // Very first block }, ledgerHeight: 1, fakeFields: fakeFields, cryptoProvider: cryptoProvider, msgBuffer: make(chan *msg, 500), haltCallback: haltCallback, } // receives normal blocks and metadata and appends it into // the ledger struct to simulate write behaviour appendNormalBlockToLedger := func(b *common.Block, meta []byte) { c.ledgerLock.Lock() defer c.ledgerLock.Unlock() b = proto.Clone(b).(*common.Block) bytes, err := proto.Marshal(&common.Metadata{Value: meta}) Expect(err).NotTo(HaveOccurred()) b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes lastConfigValue := protoutil.MarshalOrPanic(&common.LastConfig{Index: c.lastConfigBlockNumber}) b.Metadata.Metadata[common.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&common.Metadata{ Value: lastConfigValue, }) c.ledger[b.Header.Number] = b if c.ledgerHeight < b.Header.Number+1 { c.ledgerHeight = b.Header.Number + 1 } } // receives config blocks and metadata and appends it into // the ledger struct to simulate write behaviour appendConfigBlockToLedger := func(b *common.Block, meta []byte) { c.ledgerLock.Lock() defer c.ledgerLock.Unlock() b = proto.Clone(b).(*common.Block) bytes, err := proto.Marshal(&common.Metadata{Value: meta}) Expect(err).NotTo(HaveOccurred()) b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes c.lastConfigBlockNumber = b.Header.Number lastConfigValue := protoutil.MarshalOrPanic(&common.LastConfig{Index: c.lastConfigBlockNumber}) b.Metadata.Metadata[common.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&common.Metadata{ Value: lastConfigValue, }) c.ledger[b.Header.Number] = b if c.ledgerHeight < b.Header.Number+1 { c.ledgerHeight = b.Header.Number + 1 } } c.support.WriteBlockStub = appendNormalBlockToLedger c.support.WriteConfigBlockStub = appendConfigBlockToLedger // returns current ledger height c.support.HeightStub = func() uint64 { c.ledgerLock.RLock() defer c.ledgerLock.RUnlock() return c.ledgerHeight } // reads block from the ledger c.support.BlockStub = func(number uint64) *common.Block { c.ledgerLock.RLock() defer c.ledgerLock.RUnlock() return c.ledger[number] } // consume ingress messages for chain go func() { for msg := range c.msgBuffer { c.Consensus(msg.req, msg.sender) } }() return c } func (c *chain) init() { ch, err := etcdraft.NewChain( c.support, c.opts, c.configurator, c.rpc, c.cryptoProvider, func() (etcdraft.BlockPuller, error) { return c.puller, nil }, c.haltCallback, c.observe, ) Expect(err).NotTo(HaveOccurred()) c.Chain = ch } func (c *chain) start() { c.unstarted = nil c.Start() } func (c *chain) setStepFunc(f stepFunc) { c.stepLock.Lock() c.step = f c.stepLock.Unlock() } func (c *chain) getStepFunc() stepFunc { c.stepLock.Lock() defer c.stepLock.Unlock() return c.step } type network struct { delayWG sync.WaitGroup sync.RWMutex leader uint64 chains map[uint64]*chain // links simulates the configuration of comm layer (link is bi-directional). // if links[left][right] == true, right can send msg to left. links map[uint64]map[uint64]bool // connectivity determines if a node is connected to network. This is used for tests // to simulate network partition. connectivity map[uint64]bool } func (n *network) link(from []uint64, to uint64) { links := make(map[uint64]bool) for _, id := range from { links[id] = true } n.Lock() defer n.Unlock() n.links[to] = links } func (n *network) linked(from, to uint64) bool { n.RLock() defer n.RUnlock() return n.links[to][from] } func (n *network) connect(id uint64) { n.Lock() defer n.Unlock() n.connectivity[id] = true } func (n *network) disconnect(id uint64) { n.Lock() defer n.Unlock() n.connectivity[id] = false } func (n *network) connected(id uint64) bool { n.RLock() defer n.RUnlock() return n.connectivity[id] } func (n *network) addChain(c *chain) { n.connect(c.id) // chain is connected by default c.step = func(dest uint64, req *orderer.ConsensusRequest) error { if !n.linked(c.id, dest) { return errors.Errorf("connection refused") } if !n.connected(c.id) || !n.connected(dest) { return errors.Errorf("connection lost") } n.RLock() target := n.chains[dest] n.RUnlock() target.msgBuffer <- &msg{req: req, sender: c.id} return nil } c.rpc.SendConsensusStub = func(dest uint64, msg *orderer.ConsensusRequest) error { c.stepLock.Lock() defer c.stepLock.Unlock() return c.step(dest, msg) } c.rpc.SendSubmitStub = func(dest uint64, msg *orderer.SubmitRequest, f func(error)) error { if !n.linked(c.id, dest) { err := errors.Errorf("connection refused") f(err) return err } if !n.connected(c.id) || !n.connected(dest) { err := errors.Errorf("connection lost") f(err) return err } n.RLock() target := n.chains[dest] n.RUnlock() go func() { n.Lock() n.delayWG.Wait() n.Unlock() defer GinkgoRecover() target.Submit(msg, c.id) f(nil) }() return nil } c.puller.PullBlockStub = func(i uint64) *common.Block { n.RLock() leaderChain := n.chains[n.leader] n.RUnlock() leaderChain.ledgerLock.RLock() defer leaderChain.ledgerLock.RUnlock() block := leaderChain.ledger[i] return block } c.puller.HeightsByEndpointsStub = func() (map[string]uint64, error) { n.RLock() leader := n.chains[n.leader] n.RUnlock() if leader == nil { return nil, errors.Errorf("ledger not available") } leader.ledgerLock.RLock() defer leader.ledgerLock.RUnlock() return map[string]uint64{"leader": leader.ledgerHeight}, nil } c.configurator.ConfigureCalls(func(channel string, nodes []cluster.RemoteNode) { var ids []uint64 for _, node := range nodes { ids = append(ids, node.ID) } n.link(ids, c.id) }) n.Lock() defer n.Unlock() n.chains[c.id] = c } func createNetwork( timeout time.Duration, channel, dataDir string, raftMetadata *raftprotos.BlockMetadata, consenters map[uint64]*raftprotos.Consenter, cryptoProvider bccsp.BCCSP, tlsCA tlsgen.CA, haltCallback func(), ) *network { n := &network{ chains: make(map[uint64]*chain), connectivity: make(map[uint64]bool), links: make(map[uint64]map[uint64]bool), } for _, nodeID := range raftMetadata.ConsenterIds { dir, err := ioutil.TempDir(dataDir, fmt.Sprintf("node-%d-", nodeID)) Expect(err).NotTo(HaveOccurred()) m := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata) support := &consensusmocks.FakeConsenterSupport{} support.ChannelIDReturns(channel) support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil)) mockOrdererConfig := mockOrdererWithTLSRootCert(timeout, nil, tlsCA) support.SharedConfigReturns(mockOrdererConfig) n.addChain(newChain(timeout, channel, dir, nodeID, m, consenters, cryptoProvider, support, haltCallback)) } return n } // tests could alter configuration of a chain before creating it func (n *network) init() { n.exec(func(c *chain) { c.init() }) } func (n *network) start(ids ...uint64) { nodes := ids if len(nodes) == 0 { for i := range n.chains { nodes = append(nodes, i) } } for _, id := range nodes { n.chains[id].start() // When the Raft node bootstraps, it produces a ConfChange // to add itself, which needs to be consumed with Ready(). // If there are pending configuration changes in raft, // it refused to campaign, no matter how many ticks supplied. // This is not a problem in production code because eventually // raft.Ready will be consumed as real time goes by. // // However, this is problematic when using fake clock and artificial // ticks. Instead of ticking raft indefinitely until raft.Ready is // consumed, this check is added to indirectly guarantee // that first ConfChange is actually consumed and we can safely // proceed to tick raft. Eventually(func() error { _, err := n.chains[id].storage.Entries(1, 1, 1) return err }, LongEventualTimeout).ShouldNot(HaveOccurred()) Eventually(n.chains[id].WaitReady, LongEventualTimeout).ShouldNot(HaveOccurred()) } } func (n *network) stop(ids ...uint64) { nodes := ids if len(nodes) == 0 { for i := range n.chains { nodes = append(nodes, i) } } for _, id := range nodes { c := n.chains[id] c.Halt() Eventually(c.Errored).Should(BeClosed()) select { case <-c.stopped: default: close(c.stopped) } } } func (n *network) exec(f func(c *chain), ids ...uint64) { if len(ids) == 0 { for _, c := range n.chains { f(c) } return } for _, i := range ids { f(n.chains[i]) } } // connect a node to network and tick leader to trigger // a heartbeat so newly joined node can detect leader. // // expectLeaderChange controls whether leader change should // be observed on newly joined node. // - it should be true if newly joined node was leader // - it should be false if newly joined node was follower, and // already knows the leader. func (n *network) join(id uint64, expectLeaderChange bool) { n.connect(id) n.RLock() leader, follower := n.chains[n.leader], n.chains[id] n.RUnlock() step := leader.getStepFunc() signal := make(chan struct{}) leader.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error { if dest == id { // close signal channel when a message targeting newly // joined node is observed on wire. select { case <-signal: default: close(signal) } } return step(dest, msg) }) // Tick leader so it sends out a heartbeat to new node. // One tick _may_ not be enough because leader might be busy // and this tick is droppped on the floor. Eventually(func() <-chan struct{} { leader.clock.Increment(interval) return signal }, LongEventualTimeout, 100*time.Millisecond).Should(BeClosed()) leader.setStepFunc(step) if expectLeaderChange { Eventually(follower.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: n.leader, RaftState: raft.StateFollower}))) } // wait for newly joined node to catch up with leader i, err := n.chains[n.leader].opts.MemoryStorage.LastIndex() Expect(err).NotTo(HaveOccurred()) Eventually(n.chains[id].opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(i)) } // elect deterministically elects a node as leader func (n *network) elect(id uint64) { n.RLock() // skip observing leader change on followers if the same leader is elected as the previous one, // because this may happen too quickly from a slow follower's point of view, and 0 -> X transition // may not be omitted at all. observeFollowers := id != n.leader candidate := n.chains[id] var followers []*chain for _, c := range n.chains { if c.id != id { followers = append(followers, c) } } n.RUnlock() // Send node an artificial MsgTimeoutNow to emulate leadership transfer. fmt.Fprintf(GinkgoWriter, "Send artificial MsgTimeoutNow to elect node %d\n", id) candidate.Consensus(&orderer.ConsensusRequest{Payload: protoutil.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow, To: id})}, 0) Eventually(candidate.observe, LongEventualTimeout).Should(Receive(StateEqual(id, raft.StateLeader))) n.Lock() n.leader = id n.Unlock() if !observeFollowers { return } // now observe leader change on other nodes for _, c := range followers { if c.id == id { continue } select { case <-c.stopped: // skip check if node n is stopped case <-c.unstarted: // skip check if node is not started yet default: if n.linked(c.id, id) && n.connected(c.id) { Eventually(c.observe, LongEventualTimeout).Should(Receive(StateEqual(id, raft.StateFollower))) } } } } // sets the configEnv var declared above func newConfigEnv(chainID string, headerType common.HeaderType, configUpdateEnv *common.ConfigUpdateEnvelope) *common.Envelope { return &common.Envelope{ Payload: marshalOrPanic(&common.Payload{ Header: &common.Header{ ChannelHeader: marshalOrPanic(&common.ChannelHeader{ Type: int32(headerType), ChannelId: chainID, }), }, Data: marshalOrPanic(&common.ConfigEnvelope{ LastUpdate: &common.Envelope{ Payload: marshalOrPanic(&common.Payload{ Header: &common.Header{ ChannelHeader: marshalOrPanic(&common.ChannelHeader{ Type: int32(common.HeaderType_CONFIG_UPDATE), ChannelId: chainID, }), }, Data: marshalOrPanic(configUpdateEnv), }), // common.Payload }, // LastUpdate }), }), } } func newConfigUpdateEnv(chainID string, oldValues, newValues map[string]*common.ConfigValue) *common.ConfigUpdateEnvelope { return &common.ConfigUpdateEnvelope{ ConfigUpdate: marshalOrPanic(&common.ConfigUpdate{ ChannelId: chainID, ReadSet: &common.ConfigGroup{ Groups: map[string]*common.ConfigGroup{ "Orderer": { Values: oldValues, }, }, }, WriteSet: &common.ConfigGroup{ Groups: map[string]*common.ConfigGroup{ "Orderer": { Values: newValues, }, }, }, // WriteSet }), } } func getSeedBlock() *common.Block { return &common.Block{ Header: &common.BlockHeader{}, Data: &common.BlockData{Data: [][]byte{[]byte("foo")}}, Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)}, } } func StateEqual(lead uint64, state raft.StateType) types.GomegaMatcher { return Equal(raft.SoftState{Lead: lead, RaftState: state}) } func BeFollower() types.GomegaMatcher { return &StateMatcher{expect: raft.StateFollower} } type StateMatcher struct { expect raft.StateType } func (stmatcher *StateMatcher) Match(actual interface{}) (success bool, err error) { state, ok := actual.(raft.SoftState) if !ok { return false, errors.Errorf("StateMatcher expects a raft SoftState") } return state.RaftState == stmatcher.expect, nil } func (stmatcher *StateMatcher) FailureMessage(actual interface{}) (message string) { state, ok := actual.(raft.SoftState) if !ok { return "StateMatcher expects a raft SoftState" } return fmt.Sprintf("Expected %s to be %s", state.RaftState, stmatcher.expect) } func (stmatcher *StateMatcher) NegatedFailureMessage(actual interface{}) (message string) { state, ok := actual.(raft.SoftState) if !ok { return "StateMatcher expects a raft SoftState" } return fmt.Sprintf("Expected %s not to be %s", state.RaftState, stmatcher.expect) } func noOpBlockPuller() (etcdraft.BlockPuller, error) { bp := &mocks.FakeBlockPuller{} return bp, nil }