go_study/fabric-main/orderer/consensus/etcdraft/chain_test.go

3995 lines
136 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft_test
import (
"encoding/pem"
"fmt"
"io/ioutil"
"os"
"os/user"
"path"
"sync"
"sync/atomic"
"time"
"code.cloudfoundry.org/clock/fakeclock"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
raftprotos "github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/bccsp/sw"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
orderer_types "github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/pkg/errors"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.uber.org/zap"
)
const (
interval = 100 * time.Millisecond
LongEventualTimeout = 10 * time.Second
// 10 is the default setting of ELECTION_TICK.
// We used to have a small number here (2) to reduce the time for test - we don't
// need to tick node 10 times to trigger election - however, we are using another
// mechanism to trigger it now which does not depend on time: send an artificial
// MsgTimeoutNow to node.
ELECTION_TICK = 10
HEARTBEAT_TICK = 1
)
//go:generate counterfeiter -o mocks/halt_callbacker.go --fake-name HaltCallbacker . haltCallbacker
type haltCallbacker interface {
HaltCallback()
}
func init() {
factory.InitFactories(nil)
}
func mockOrderer(metadata []byte) *mocks.OrdererConfig {
return mockOrdererWithBatchTimeout(time.Second, metadata)
}
func mockOrdererWithBatchTimeout(batchTimeout time.Duration, metadata []byte) *mocks.OrdererConfig {
mockOrderer := &mocks.OrdererConfig{}
mockOrderer.BatchTimeoutReturns(batchTimeout)
mockOrderer.ConsensusMetadataReturns(metadata)
return mockOrderer
}
func mockOrdererWithTLSRootCert(batchTimeout time.Duration, metadata []byte, tlsCA tlsgen.CA) *mocks.OrdererConfig {
mockOrderer := mockOrdererWithBatchTimeout(batchTimeout, metadata)
mockOrg := &mocks.OrdererOrg{}
mockMSP := &mocks.MSP{}
mockMSP.GetTLSRootCertsReturns([][]byte{tlsCA.CertBytes()})
mockOrg.MSPReturns(mockMSP)
mockOrderer.OrganizationsReturns(map[string]channelconfig.OrdererOrg{
"fake-org": mockOrg,
})
return mockOrderer
}
// for some test cases we chmod file/dir to test failures caused by exotic permissions.
// however this does not work if tests are running as root, i.e. in a container.
func skipIfRoot() {
u, err := user.Current()
Expect(err).NotTo(HaveOccurred())
if u.Uid == "0" {
Skip("you are running test as root, there's no way to make files unreadable")
}
}
var _ = Describe("Chain", func() {
var (
env *common.Envelope
channelID string
tlsCA tlsgen.CA
logger *flogging.FabricLogger
)
BeforeEach(func() {
tlsCA, _ = tlsgen.NewCA()
channelID = "test-channel"
logger = flogging.MustGetLogger("test")
env = &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
Data: []byte("TEST_MESSAGE"),
}),
}
})
Describe("Single Raft node", func() {
var (
configurator *mocks.FakeConfigurator
consenterMetadata *raftprotos.ConfigMetadata
consenters map[uint64]*raftprotos.Consenter
clock *fakeclock.FakeClock
opts etcdraft.Options
support *consensusmocks.FakeConsenterSupport
cutter *mockblockcutter.Receiver
storage *raft.MemoryStorage
observeC chan raft.SoftState
chain *etcdraft.Chain
dataDir string
walDir string
snapDir string
err error
fakeFields *fakeMetricsFields
cryptoProvider bccsp.BCCSP
fakeHaltCallbacker *mocks.HaltCallbacker
)
BeforeEach(func() {
cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
Expect(err).NotTo(HaveOccurred())
configurator = &mocks.FakeConfigurator{}
clock = fakeclock.NewFakeClock(time.Now())
storage = raft.NewMemoryStorage()
dataDir, err = ioutil.TempDir("", "wal-")
Expect(err).NotTo(HaveOccurred())
walDir = path.Join(dataDir, "wal")
snapDir = path.Join(dataDir, "snapshot")
observeC = make(chan raft.SoftState, 1)
support = &consensusmocks.FakeConsenterSupport{}
support.ChannelIDReturns(channelID)
consenterMetadata = createMetadata(1, tlsCA)
support.SharedConfigReturns(mockOrdererWithTLSRootCert(time.Hour, marshalOrPanic(consenterMetadata), tlsCA))
cutter = mockblockcutter.NewReceiver()
support.BlockCutterReturns(cutter)
// for block creator initialization
support.HeightReturns(1)
support.BlockReturns(getSeedBlock())
meta := &raftprotos.BlockMetadata{
ConsenterIds: make([]uint64, len(consenterMetadata.Consenters)),
NextConsenterId: 1,
}
for i := range meta.ConsenterIds {
meta.ConsenterIds[i] = meta.NextConsenterId
meta.NextConsenterId++
}
consenters = map[uint64]*raftprotos.Consenter{}
for i, c := range consenterMetadata.Consenters {
consenters[meta.ConsenterIds[i]] = c
}
fakeFields = newFakeMetricsFields()
opts = etcdraft.Options{
RPCTimeout: time.Second * 5,
RaftID: 1,
Clock: clock,
TickInterval: interval,
ElectionTick: ELECTION_TICK,
HeartbeatTick: HEARTBEAT_TICK,
MaxSizePerMsg: 1024 * 1024,
MaxInflightBlocks: 256,
BlockMetadata: meta,
Consenters: consenters,
Logger: logger,
MemoryStorage: storage,
WALDir: walDir,
SnapDir: snapDir,
Metrics: newFakeMetrics(fakeFields),
}
fakeHaltCallbacker = &mocks.HaltCallbacker{}
})
campaign := func(c *etcdraft.Chain, observeC <-chan raft.SoftState) {
Eventually(func() <-chan raft.SoftState {
c.Consensus(&orderer.ConsensusRequest{Payload: protoutil.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow, To: 1})}, 0)
return observeC
}, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader)))
}
JustBeforeEach(func() {
rpc := &mocks.FakeRPC{}
chain, err = etcdraft.NewChain(support, opts, configurator, rpc, cryptoProvider, noOpBlockPuller, fakeHaltCallbacker.HaltCallback, observeC)
Expect(err).NotTo(HaveOccurred())
chain.Start()
cRel, status := chain.StatusReport()
Expect(cRel).To(Equal(orderer_types.ConsensusRelationConsenter))
Expect(status).To(Equal(orderer_types.StatusActive))
// When the Raft node bootstraps, it produces a ConfChange
// to add itself, which needs to be consumed with Ready().
// If there are pending configuration changes in raft,
// it refuses to campaign, no matter how many ticks elapse.
// This is not a problem in the production code because raft.Ready
// will be consumed eventually, as the wall clock advances.
//
// However, this is problematic when using the fake clock and
// artificial ticks. Instead of ticking raft indefinitely until
// raft.Ready is consumed, this check is added to indirectly guarantee
// that the first ConfChange is actually consumed and we can safely
// proceed to tick the Raft FSM.
Eventually(func() error {
_, err := storage.Entries(1, 1, 1)
return err
}, LongEventualTimeout).ShouldNot(HaveOccurred())
})
AfterEach(func() {
chain.Halt()
Eventually(chain.Errored, LongEventualTimeout).Should(BeClosed())
// Make sure no timer leak
Eventually(clock.WatcherCount, LongEventualTimeout).Should(BeZero())
os.RemoveAll(dataDir)
})
Context("when a node starts up", func() {
It("properly configures the communication layer", func() {
expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata)
Eventually(configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(1))
_, arg2 := configurator.ConfigureArgsForCall(0)
Expect(arg2).To(Equal(expectedNodeConfig))
})
It("correctly sets the metrics labels and publishes requisite metrics", func() {
type withImplementers interface {
WithCallCount() int
WithArgsForCall(int) []string
}
metricsList := []withImplementers{
fakeFields.fakeClusterSize,
fakeFields.fakeIsLeader,
fakeFields.fakeActiveNodes,
fakeFields.fakeCommittedBlockNumber,
fakeFields.fakeSnapshotBlockNumber,
fakeFields.fakeLeaderChanges,
fakeFields.fakeProposalFailures,
fakeFields.fakeDataPersistDuration,
fakeFields.fakeNormalProposalsReceived,
fakeFields.fakeConfigProposalsReceived,
}
for _, m := range metricsList {
Expect(m.WithCallCount()).To(Equal(1))
Expect(func() string {
return m.WithArgsForCall(0)[1]
}()).To(Equal(channelID))
}
Expect(fakeFields.fakeClusterSize.SetCallCount()).To(Equal(1))
Expect(fakeFields.fakeClusterSize.SetArgsForCall(0)).To(Equal(float64(1)))
Expect(fakeFields.fakeIsLeader.SetCallCount()).To(Equal(1))
Expect(fakeFields.fakeIsLeader.SetArgsForCall(0)).To(Equal(float64(0)))
Expect(fakeFields.fakeActiveNodes.SetCallCount()).To(Equal(1))
Expect(fakeFields.fakeActiveNodes.SetArgsForCall(0)).To(Equal(float64(0)))
})
})
Context("when no Raft leader is elected", func() {
It("fails to order envelope", func() {
err := chain.Order(env, 0)
Expect(err).To(MatchError("no Raft leader"))
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(0))
Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1)))
})
It("starts proactive campaign", func() {
// assert that even tick supplied are less than ELECTION_TIMEOUT,
// a leader can still be successfully elected.
for i := 0; i < ELECTION_TICK; i++ {
clock.Increment(interval)
time.Sleep(10 * time.Millisecond)
}
Eventually(observeC, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader)))
})
})
Context("when Raft leader is elected", func() {
JustBeforeEach(func() {
campaign(chain, observeC)
})
It("updates metrics upon leader election", func() {
Expect(fakeFields.fakeIsLeader.SetCallCount()).To(Equal(2))
Expect(fakeFields.fakeIsLeader.SetArgsForCall(1)).To(Equal(float64(1)))
Expect(fakeFields.fakeLeaderChanges.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeLeaderChanges.AddArgsForCall(0)).To(Equal(float64(1)))
})
It("fails to order envelope if chain is halted", func() {
chain.Halt()
err := chain.Order(env, 0)
Expect(err).To(MatchError("chain is stopped"))
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1)))
})
It("produces blocks following batch rules", func() {
close(cutter.Block)
By("cutting next batch directly")
cutter.SetCutNext(true)
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1)))
// There are three calls to DataPersistDuration by now corresponding to the following three
// arriving on the Ready channel:
// 1. an EntryConfChange to let this node join the Raft cluster
// 2. a SoftState and an associated increase of term in the HardState due to the node being elected leader
// 3. a block being committed
// The duration being emitted is zero since we don't tick the fake clock during this time
Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(3))
Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(0)).Should(Equal(float64(0)))
Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(1)).Should(Equal(float64(0)))
Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(2)).Should(Equal(float64(0)))
By("respecting batch timeout")
cutter.SetCutNext(false)
timeout := time.Second
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
err = chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1)))
clock.WaitForNWatchersAndIncrement(timeout, 2)
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2)))
Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(4))
Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(3)).Should(Equal(float64(0)))
})
It("does not reset timer for every envelope", func() {
close(cutter.Block)
timeout := time.Second
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
clock.WaitForNWatchersAndIncrement(timeout/2, 2)
err = chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(2))
// the second envelope should not reset the timer; it should
// therefore expire if we increment it by just timeout/2
clock.Increment(timeout / 2)
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
It("does not write a block if halted before timeout", func() {
close(cutter.Block)
timeout := time.Second
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
// wait for timer to start
Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
chain.Halt()
Consistently(support.WriteBlockCallCount).Should(Equal(0))
})
It("stops the timer if a batch is cut", func() {
close(cutter.Block)
timeout := time.Second
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
clock.WaitForNWatchersAndIncrement(timeout/2, 2)
By("force a batch to be cut before timer expires")
cutter.SetCutNext(true)
err = chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
b, _ := support.WriteBlockArgsForCall(0)
Expect(b.Data.Data).To(HaveLen(2))
Expect(cutter.CurBatch()).To(HaveLen(0))
// this should start a fresh timer
cutter.SetCutNext(false)
err = chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
clock.WaitForNWatchersAndIncrement(timeout/2, 2)
Consistently(support.WriteBlockCallCount).Should(Equal(1))
clock.Increment(timeout / 2)
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
b, _ = support.WriteBlockArgsForCall(1)
Expect(b.Data.Data).To(HaveLen(1))
})
It("cut two batches if incoming envelope does not fit into first batch", func() {
close(cutter.Block)
timeout := time.Second
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
cutter.SetIsolatedTx(true)
err = chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
Context("revalidation", func() {
BeforeEach(func() {
close(cutter.Block)
timeout := time.Hour
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
support.SequenceReturns(1)
})
It("enqueue if envelope is still valid", func() {
support.ProcessNormalMsgReturns(1, nil)
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
})
It("does not enqueue if envelope is not valid", func() {
support.ProcessNormalMsgReturns(1, errors.Errorf("Envelope is invalid"))
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Consistently(cutter.CurBatch).Should(HaveLen(0))
Consistently(clock.WatcherCount).Should(Equal(1))
})
})
It("unblocks Errored if chain is halted", func() {
errorC := chain.Errored()
Expect(errorC).NotTo(BeClosed())
chain.Halt()
Eventually(errorC, LongEventualTimeout).Should(BeClosed())
})
It("does not call the halt callback function when halting externally", func() {
chain.Halt()
Consistently(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(0))
})
Describe("Config updates", func() {
var (
configEnv *common.Envelope
configSeq uint64
)
Context("when a type A config update comes", func() {
Context("for existing channel", func() {
// use to prepare the Orderer Values
BeforeEach(func() {
newValues := map[string]*common.ConfigValue{
"BatchTimeout": {
Version: 1,
Value: marshalOrPanic(&orderer.BatchTimeout{
Timeout: "3ms",
}),
},
"ConsensusType": {
Version: 4,
},
}
oldValues := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 4,
},
}
configEnv = newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, oldValues, newValues),
)
configSeq = 0
}) // BeforeEach block
Context("without revalidation (i.e. correct config sequence)", func() {
Context("without pending normal envelope", func() {
It("should create a config block and no normal block", func() {
err := chain.Configure(configEnv, configSeq)
Expect(err).NotTo(HaveOccurred())
Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Consistently(support.WriteBlockCallCount).Should(Equal(0))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1)))
})
})
Context("with pending normal envelope", func() {
It("should create a normal block and a config block", func() {
// We do not need to block the cutter from ordering in our test case and therefore close this channel.
close(cutter.Block)
By("adding a normal envelope")
err := chain.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
By("adding a config envelope")
err = chain.Configure(configEnv, configSeq)
Expect(err).NotTo(HaveOccurred())
Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2)))
})
})
})
Context("with revalidation (i.e. incorrect config sequence)", func() {
BeforeEach(func() {
close(cutter.Block)
support.SequenceReturns(1) // this causes the revalidation
})
It("should create config block upon correct revalidation", func() {
support.ProcessConfigMsgReturns(configEnv, 1, nil) // nil implies correct revalidation
Expect(chain.Configure(configEnv, configSeq)).To(Succeed())
Consistently(clock.WatcherCount).Should(Equal(1))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
It("should not create config block upon incorrect revalidation", func() {
support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence"))
Expect(chain.Configure(configEnv, configSeq)).To(Succeed())
Consistently(clock.WatcherCount).Should(Equal(1))
Consistently(support.WriteConfigBlockCallCount).Should(Equal(0)) // no call to WriteConfigBlock
})
It("should not disturb current running timer upon incorrect revalidation", func() {
support.ProcessNormalMsgReturns(1, nil)
support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence"))
Expect(chain.Order(env, configSeq)).To(Succeed())
Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
clock.Increment(30 * time.Minute)
Consistently(support.WriteBlockCallCount).Should(Equal(0))
Expect(chain.Configure(configEnv, configSeq)).To(Succeed())
Consistently(clock.WatcherCount).Should(Equal(2))
Consistently(support.WriteBlockCallCount).Should(Equal(0))
Consistently(support.WriteConfigBlockCallCount).Should(Equal(0))
clock.Increment(30 * time.Minute)
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
})
Context("for creating a new channel", func() {
// use to prepare the Orderer Values
BeforeEach(func() {
chainID := "mychannel"
values := make(map[string]*common.ConfigValue)
configEnv = newConfigEnv(chainID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(chainID, nil, values),
)
configSeq = 0
}) // BeforeEach block
It("should be able to create a channel", func() {
err := chain.Configure(configEnv, configSeq)
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
}) // Context block for type A config
Context("when a type B config update comes", func() {
Context("updating protocol values", func() {
// use to prepare the Orderer Values
BeforeEach(func() {
values := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(consenterMetadata),
}),
},
}
configEnv = newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, nil, values))
configSeq = 0
}) // BeforeEach block
It("should be able to process config update of type B", func() {
err := chain.Configure(configEnv, configSeq)
Expect(err).NotTo(HaveOccurred())
Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
Context("updating consenters set by exactly one node", func() {
It("should be able to process config update adding single node", func() {
metadata := proto.Clone(consenterMetadata).(*raftprotos.ConfigMetadata)
metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
})
values := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
configEnv = newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, nil, values))
configSeq = 0
err := chain.Configure(configEnv, configSeq)
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
})
})
Describe("Crash Fault Tolerance", func() {
var raftMetadata *raftprotos.BlockMetadata
BeforeEach(func() {
raftMetadata = &raftprotos.BlockMetadata{
ConsenterIds: []uint64{1},
NextConsenterId: 2,
}
})
Describe("when a chain is started with existing WAL", func() {
var (
m1 *raftprotos.BlockMetadata
m2 *raftprotos.BlockMetadata
)
JustBeforeEach(func() {
// to generate WAL data, we start a chain,
// order several envelopes and then halt the chain.
close(cutter.Block)
cutter.SetCutNext(true)
// enque some data to be persisted on disk by raft
err := chain.Order(env, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
_, metadata := support.WriteBlockArgsForCall(0)
m1 = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m1)
err = chain.Order(env, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata = support.WriteBlockArgsForCall(1)
m2 = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m2)
chain.Halt()
})
It("replays blocks from committed entries", func() {
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.init()
c.Start()
defer c.Halt()
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata := c.support.WriteBlockArgsForCall(0)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Expect(m.RaftIndex).To(Equal(m1.RaftIndex))
_, metadata = c.support.WriteBlockArgsForCall(1)
m = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Expect(m.RaftIndex).To(Equal(m2.RaftIndex))
// chain should keep functioning
campaign(c.Chain, c.observe)
c.cutter.SetCutNext(true)
err := c.Order(env, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
})
It("only replays blocks after Applied index", func() {
raftMetadata.RaftIndex = m1.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.support.WriteBlock(support.WriteBlockArgsForCall(0))
c.init()
c.Start()
defer c.Halt()
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata := c.support.WriteBlockArgsForCall(1)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Expect(m.RaftIndex).To(Equal(m2.RaftIndex))
// chain should keep functioning
campaign(c.Chain, c.observe)
c.cutter.SetCutNext(true)
err := c.Order(env, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
})
It("does not replay any block if already in sync", func() {
raftMetadata.RaftIndex = m2.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.init()
c.Start()
defer c.Halt()
Consistently(c.support.WriteBlockCallCount).Should(Equal(0))
// chain should keep functioning
campaign(c.Chain, c.observe)
c.cutter.SetCutNext(true)
err := c.Order(env, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
Context("WAL file is not readable", func() {
It("fails to load wal", func() {
skipIfRoot()
files, err := ioutil.ReadDir(walDir)
Expect(err).NotTo(HaveOccurred())
for _, f := range files {
os.Chmod(path.Join(walDir, f.Name()), 0o300)
}
c, err := etcdraft.NewChain(support, opts, configurator, nil, cryptoProvider, noOpBlockPuller, nil, observeC)
Expect(c).To(BeNil())
Expect(err).To(MatchError(ContainSubstring("permission denied")))
})
})
})
Describe("when snapshotting is enabled (snapshot interval is not zero)", func() {
var (
ledgerLock sync.Mutex
ledger map[uint64]*common.Block
)
countFiles := func() int {
files, err := ioutil.ReadDir(snapDir)
Expect(err).NotTo(HaveOccurred())
return len(files)
}
BeforeEach(func() {
opts.SnapshotCatchUpEntries = 2
close(cutter.Block)
cutter.SetCutNext(true)
ledgerLock.Lock()
ledger = map[uint64]*common.Block{
0: getSeedBlock(), // genesis block
}
ledgerLock.Unlock()
support.WriteBlockStub = func(block *common.Block, meta []byte) {
b := proto.Clone(block).(*common.Block)
bytes, err := proto.Marshal(&common.Metadata{Value: meta})
Expect(err).NotTo(HaveOccurred())
b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes
ledgerLock.Lock()
defer ledgerLock.Unlock()
ledger[b.Header.Number] = b
}
support.HeightStub = func() uint64 {
ledgerLock.Lock()
defer ledgerLock.Unlock()
return uint64(len(ledger))
}
})
Context("Small SnapshotInterval", func() {
BeforeEach(func() {
opts.SnapshotIntervalSize = 1
})
It("writes snapshot file to snapDir", func() {
// Scenario: start a chain with SnapInterval = 1 byte, expect it to take
// one snapshot for each block
i, _ := opts.MemoryStorage.FirstIndex()
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2)) // incl. initial call
s, _ := opts.MemoryStorage.Snapshot()
b := protoutil.UnmarshalBlockOrPanic(s.Data)
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number)))
i, _ = opts.MemoryStorage.FirstIndex()
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(3)) // incl. initial call
s, _ = opts.MemoryStorage.Snapshot()
b = protoutil.UnmarshalBlockOrPanic(s.Data)
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(2)).To(Equal(float64(b.Header.Number)))
})
It("pauses chain if sync is in progress", func() {
// Scenario:
// after a snapshot is taken, reboot chain with raftIndex = 0
// chain should attempt to sync upon reboot, and blocks on
// `WaitReady` API
i, _ := opts.MemoryStorage.FirstIndex()
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
i, _ = opts.MemoryStorage.FirstIndex()
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
chain.Halt()
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.init()
signal := make(chan struct{})
c.puller.PullBlockStub = func(i uint64) *common.Block {
<-signal // blocking for assertions
ledgerLock.Lock()
defer ledgerLock.Unlock()
if i >= uint64(len(ledger)) {
return nil
}
// This is a false assumption - single node shouldn't be able to pull block from anywhere.
// However, this test is mainly to assert that chain should attempt catchup upon start,
// so we could live with it.
return ledger[i]
}
err := c.WaitReady()
Expect(err).To(MatchError("chain is not started"))
c.Start()
defer c.Halt()
// pull block is called, so chain should be catching up now, WaitReady should block
signal <- struct{}{}
done := make(chan error)
go func() {
done <- c.WaitReady()
}()
Consistently(done).ShouldNot(Receive())
close(signal) // unblock block puller
Eventually(done).Should(Receive(nil)) // WaitReady should be unblocked
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
It("commits block from snapshot if it's missing from ledger", func() {
// Scenario:
// Single node exists right after a snapshot is taken, while the block
// in it hasn't been successfully persisted into ledger (there can be one
// async block write in-flight). Then the node is restarted, and catches
// up using the block in snapshot.
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
chain.Halt()
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.init()
c.Start()
defer c.Halt()
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
It("restores snapshot w/o extra entries", func() {
// Scenario:
// after a snapshot is taken, no more entries are appended.
// then node is restarted, it loads snapshot, finds its term
// and index. While replaying WAL to memory storage, it should
// not append any entry because no extra entry was appended
// after snapshot was taken.
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
_, metadata := support.WriteBlockArgsForCall(0)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created
Expect(err).NotTo(HaveOccurred())
i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory
Expect(err).NotTo(HaveOccurred())
// expect storage to preserve SnapshotCatchUpEntries entries before snapshot
Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1))
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.opts.SnapshotIntervalSize = 1
c.init()
c.Start()
// following arithmetic reflects how etcdraft MemoryStorage is implemented
// when no entry is appended after snapshot being loaded.
Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1))
Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index))
// chain keeps functioning
Eventually(func() <-chan raft.SoftState {
c.clock.Increment(interval)
return c.observe
}, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader)))
c.cutter.SetCutNext(true)
err = c.Order(env, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
c.Halt()
_, metadata = c.support.WriteBlockArgsForCall(0)
m = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
raftMetadata.RaftIndex = m.RaftIndex
cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
cx.init()
cx.Start()
defer cx.Halt()
// chain keeps functioning
Eventually(func() <-chan raft.SoftState {
cx.clock.Increment(interval)
return cx.observe
}, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader)))
})
})
Context("Large SnapshotInterval", func() {
BeforeEach(func() {
opts.SnapshotIntervalSize = 1024
})
It("restores snapshot w/ extra entries", func() {
// Scenario:
// after a snapshot is taken, more entries are appended.
// then node is restarted, it loads snapshot, finds its term
// and index. While replaying WAL to memory storage, it should
// append some entries.
largeEnv := &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
Data: make([]byte, 500),
}),
}
By("Ordering two large envelopes to trigger snapshot")
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata := support.WriteBlockArgsForCall(1)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
// check snapshot does exit
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created
Expect(err).NotTo(HaveOccurred())
i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory
Expect(err).NotTo(HaveOccurred())
// expect storage to preserve SnapshotCatchUpEntries entries before snapshot
Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1))
By("Ordering another envlope to append new data to memory after snaphost")
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
lasti, _ := opts.MemoryStorage.LastIndex()
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
cnt := support.WriteBlockCallCount()
for i := 0; i < cnt; i++ {
c.support.WriteBlock(support.WriteBlockArgsForCall(i))
}
By("Restarting the node")
c.init()
c.Start()
defer c.Halt()
By("Checking latest index is larger than index in snapshot")
Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1))
Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(lasti))
})
When("local ledger is in sync with snapshot", func() {
It("does not pull blocks and still respects snapshot interval", func() {
// Scenario:
// - snapshot is taken at block 2
// - order one more envelope (block 3)
// - reboot chain at block 2
// - block 3 should be replayed from wal
// - order another envelope to trigger snapshot, containing block 3 & 4
// Assertions:
// - block puller should NOT be called
// - chain should keep functioning after reboot
// - chain should respect snapshot interval to trigger next snapshot
largeEnv := &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
Data: make([]byte, 500),
}),
}
By("Ordering two large envelopes to trigger snapshot")
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
_, metadata := support.WriteBlockArgsForCall(1)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
By("Cutting block [3]")
// order another envelope. this should not trigger snapshot
err = chain.Order(largeEnv, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
// replay block 1&2
c.support.WriteBlock(support.WriteBlockArgsForCall(0))
c.support.WriteBlock(support.WriteBlockArgsForCall(1))
c.opts.SnapshotIntervalSize = 1024
By("Restarting node at block [2]")
c.init()
c.Start()
defer c.Halt()
// elect leader
campaign(c.Chain, c.observe)
By("Ordering one more block to trigger snapshot")
c.cutter.SetCutNext(true)
err = c.Order(largeEnv, uint64(0))
Expect(err).NotTo(HaveOccurred())
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(4))
Expect(c.puller.PullBlockCallCount()).Should(BeZero())
// old snapshot file is retained
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
})
})
It("respects snapshot interval after reboot", func() {
largeEnv := &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
Data: make([]byte, 500),
}),
}
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
// check no snapshot is taken
Consistently(countFiles).Should(Equal(0))
_, metadata := support.WriteBlockArgsForCall(0)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c1 := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
cnt := support.WriteBlockCallCount()
for i := 0; i < cnt; i++ {
c1.support.WriteBlock(support.WriteBlockArgsForCall(i))
}
c1.cutter.SetCutNext(true)
c1.opts.SnapshotIntervalSize = 1024
By("Restarting chain")
c1.init()
c1.Start()
// chain keeps functioning
campaign(c1.Chain, c1.observe)
Expect(c1.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// check snapshot does exit
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
})
})
})
})
Context("Invalid WAL dir", func() {
support := &consensusmocks.FakeConsenterSupport{}
BeforeEach(func() {
// for block creator initialization
support.HeightReturns(1)
support.BlockReturns(getSeedBlock())
})
When("WAL dir is a file", func() {
It("replaces file with fresh WAL dir", func() {
f, err := ioutil.TempFile("", "wal-")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(f.Name())
chain, err := etcdraft.NewChain(
support,
etcdraft.Options{
WALDir: f.Name(),
SnapDir: snapDir,
Logger: logger,
MemoryStorage: storage,
BlockMetadata: &raftprotos.BlockMetadata{},
Metrics: newFakeMetrics(newFakeMetricsFields()),
},
configurator,
nil,
cryptoProvider,
nil,
nil,
observeC)
Expect(chain).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
info, err := os.Stat(f.Name())
Expect(err).NotTo(HaveOccurred())
Expect(info.IsDir()).To(BeTrue())
})
})
When("WAL dir is not writeable", func() {
It("replace it with fresh WAL dir", func() {
d, err := ioutil.TempDir("", "wal-")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(d)
err = os.Chmod(d, 0o500)
Expect(err).NotTo(HaveOccurred())
chain, err := etcdraft.NewChain(
support,
etcdraft.Options{
WALDir: d,
SnapDir: snapDir,
Logger: logger,
MemoryStorage: storage,
BlockMetadata: &raftprotos.BlockMetadata{},
Metrics: newFakeMetrics(newFakeMetricsFields()),
},
nil,
nil,
cryptoProvider,
noOpBlockPuller,
nil,
nil)
Expect(chain).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
})
})
When("WAL parent dir is not writeable", func() {
It("fails to bootstrap fresh raft node", func() {
skipIfRoot()
d, err := ioutil.TempDir("", "wal-")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(d)
err = os.Chmod(d, 0o500)
Expect(err).NotTo(HaveOccurred())
chain, err := etcdraft.NewChain(
support,
etcdraft.Options{
WALDir: path.Join(d, "wal-dir"),
SnapDir: snapDir,
Logger: logger,
BlockMetadata: &raftprotos.BlockMetadata{},
},
nil,
nil,
cryptoProvider,
noOpBlockPuller,
nil,
nil)
Expect(chain).To(BeNil())
Expect(err).To(MatchError(ContainSubstring("failed to initialize WAL: mkdir")))
})
})
})
})
})
Describe("2-node Raft cluster", func() {
var (
network *network
channelID string
timeout time.Duration
dataDir string
c1, c2 *chain
raftMetadata *raftprotos.BlockMetadata
consenters map[uint64]*raftprotos.Consenter
configEnv *common.Envelope
cryptoProvider bccsp.BCCSP
fakeHaltCallbacker *mocks.HaltCallbacker
)
BeforeEach(func() {
var err error
channelID = "multi-node-channel"
timeout = 10 * time.Second
dataDir, err = ioutil.TempDir("", "raft-test-")
Expect(err).NotTo(HaveOccurred())
cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
Expect(err).NotTo(HaveOccurred())
raftMetadata = &raftprotos.BlockMetadata{
ConsenterIds: []uint64{1, 2},
NextConsenterId: 3,
}
consenters = map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
}
metadata := &raftprotos.ConfigMetadata{
Options: &raftprotos.Options{
TickInterval: "500ms",
ElectionTick: 10,
HeartbeatTick: 1,
MaxInflightBlocks: 5,
SnapshotIntervalSize: 200,
},
Consenters: []*raftprotos.Consenter{consenters[2]},
}
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
// prepare config update to remove 1
configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value))
fakeHaltCallbacker = &mocks.HaltCallbacker{}
network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters, cryptoProvider, tlsCA, fakeHaltCallbacker.HaltCallback)
c1, c2 = network.chains[1], network.chains[2]
c1.cutter.SetCutNext(true)
network.init()
network.start()
})
AfterEach(func() {
network.stop()
network.exec(func(c *chain) {
Eventually(c.clock.WatcherCount, LongEventualTimeout).Should(BeZero())
})
os.RemoveAll(dataDir)
})
It("can remove leader by reconfiguring cluster", func() {
network.elect(1)
// trigger status dissemination
Eventually(func() int {
c1.clock.Increment(interval)
return c2.fakeFields.fakeActiveNodes.SetCallCount()
}, LongEventualTimeout).Should(Equal(2))
Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
By("Configuring cluster to remove node")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
select {
case <-c1.observe:
case <-time.After(LongEventualTimeout):
// abdicateleader might fail to transfer the leadership when the next candidate
// busy with applying committed entries
Fail("Expected a new leader to present")
}
Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c2.observe
}, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateLeader)))
By("Asserting the haltCallback is called when the node is removed from the replica set")
Eventually(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(1))
By("Asserting the StatusReport responds correctly after eviction")
Eventually(
func() orderer_types.ConsensusRelation {
cRel, _ := c1.StatusReport()
return cRel
},
).Should(Equal(orderer_types.ConsensusRelationConfigTracker))
_, status := c1.StatusReport()
Expect(status).To(Equal(orderer_types.StatusInactive))
By("Asserting leader can still serve requests as single-node cluster")
c2.cutter.SetCutNext(true)
Expect(c2.Order(env, 0)).To(Succeed())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(2)).To(Equal(float64(0))) // was halted
Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
})
It("remove leader by reconfiguring cluster, check haltCallback is called", func() {
network.elect(1)
// trigger status dissemination
Eventually(func() int {
c1.clock.Increment(interval)
return c2.fakeFields.fakeActiveNodes.SetCallCount()
}, LongEventualTimeout).Should(Equal(2))
Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
By("Configuring cluster to remove node")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
c1.clock.WaitForNWatchersAndIncrement((ELECTION_TICK-1)*interval, 2)
Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
By("Asserting the haltCallback is called when Halt is called before eviction")
c1.clock.Increment(interval)
Eventually(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(1))
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c2.observe
}, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateLeader)))
By("Asserting leader can still serve requests as single-node cluster")
c2.cutter.SetCutNext(true)
Expect(c2.Order(env, 0)).To(Succeed())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
// active nodes metric hasn't changed because c.halt() wasn't called
Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
})
It("can remove leader by retrying even if leadership transfer fails at first", func() {
network.elect(1)
var messageOmission uint32
step1 := c1.getStepFunc()
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
stepMsg := &raftpb.Message{}
if err := proto.Unmarshal(msg.Payload, stepMsg); err != nil {
return fmt.Errorf("failed to unmarshal StepRequest payload to Raft Message: %s", err)
}
if stepMsg.Type == raftpb.MsgTimeoutNow && atomic.CompareAndSwapUint32(&messageOmission, 0, 1) {
return nil
}
return step1(dest, msg)
})
By("Configuring cluster to remove node")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c1.observe
}, LongEventualTimeout).Should(Receive())
Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c2.observe
}, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateLeader)))
By("Asserting the haltCallback is called when the node is removed from the replica set")
Eventually(fakeHaltCallbacker.HaltCallbackCallCount).Should(Equal(1))
By("Asserting the StatusReport responds correctly after eviction")
Eventually(
func() orderer_types.ConsensusRelation {
cRel, _ := c1.StatusReport()
return cRel
},
).Should(Equal(orderer_types.ConsensusRelationConfigTracker))
_, status := c1.StatusReport()
Expect(status).To(Equal(orderer_types.StatusInactive))
By("Asserting leader can still serve requests as single-node cluster")
c2.cutter.SetCutNext(true)
Expect(c2.Order(env, 0)).To(Succeed())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
lastSetActiveNodes := c1.fakeFields.fakeActiveNodes.SetCallCount() - 1
Expect(c1.fakeFields.fakeActiveNodes.SetArgsForCall(lastSetActiveNodes)).To(Equal(float64(0))) // was halted
Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
})
It("can remove follower by reconfiguring cluster", func() {
network.elect(2)
Expect(c1.Configure(configEnv, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
Eventually(c2.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c1.Chain.Errored, LongEventualTimeout).Should(BeClosed())
By("Asserting leader can still serve requests as single-node cluster")
c2.cutter.SetCutNext(true)
Expect(c2.Order(env, 0)).To(Succeed())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
Describe("3-node Raft cluster", func() {
var (
network *network
channelID string
timeout time.Duration
dataDir string
c1, c2, c3 *chain
raftMetadata *raftprotos.BlockMetadata
consenters map[uint64]*raftprotos.Consenter
cryptoProvider bccsp.BCCSP
)
BeforeEach(func() {
var err error
channelID = "multi-node-channel"
timeout = 10 * time.Second
dataDir, err = ioutil.TempDir("", "raft-test-")
Expect(err).NotTo(HaveOccurred())
raftMetadata = &raftprotos.BlockMetadata{
ConsenterIds: []uint64{1, 2, 3},
NextConsenterId: 4,
}
cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
Expect(err).NotTo(HaveOccurred())
consenters = map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
3: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
}
network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters, cryptoProvider, tlsCA, nil)
c1 = network.chains[1]
c2 = network.chains[2]
c3 = network.chains[3]
})
AfterEach(func() {
network.stop()
network.exec(func(c *chain) {
Eventually(c.clock.WatcherCount, LongEventualTimeout).Should(BeZero())
})
os.RemoveAll(dataDir)
})
When("2/3 nodes are running", func() {
It("late node can catch up", func() {
network.init()
network.start(1, 2)
network.elect(1)
// trigger status dissemination
Eventually(func() int {
c1.clock.Increment(interval)
return c2.fakeFields.fakeActiveNodes.SetCallCount()
}, LongEventualTimeout).Should(Equal(2))
Expect(c2.fakeFields.fakeActiveNodes.SetArgsForCall(1)).To(Equal(float64(2)))
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
Eventually(func() int { return c2.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
network.start(3)
c1.clock.Increment(interval)
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
network.stop()
})
It("late node receives snapshot from leader", func() {
c1.opts.SnapshotIntervalSize = 1
c1.opts.SnapshotCatchUpEntries = 1
c1.cutter.SetCutNext(true)
var blocksLock sync.Mutex
blocks := make(map[uint64]*common.Block) // storing written blocks for block puller
c1.support.WriteBlockStub = func(b *common.Block, meta []byte) {
blocksLock.Lock()
defer blocksLock.Unlock()
bytes, err := proto.Marshal(&common.Metadata{Value: meta})
Expect(err).NotTo(HaveOccurred())
b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes
blocks[b.Header.Number] = b
}
c3.puller.PullBlockStub = func(i uint64) *common.Block {
blocksLock.Lock()
defer blocksLock.Unlock()
b, exist := blocks[i]
if !exist {
return nil
}
return b
}
network.init()
network.start(1, 2)
network.elect(1)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
Eventually(func() int { return c2.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(2))
Eventually(func() int { return c2.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(2))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
network.start(3)
c1.clock.Increment(interval)
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(2))
network.stop()
})
})
When("reconfiguring raft cluster", func() {
const (
defaultTimeout = 5 * time.Second
)
var (
options = &raftprotos.Options{
TickInterval: "500ms",
ElectionTick: 10,
HeartbeatTick: 1,
MaxInflightBlocks: 5,
SnapshotIntervalSize: 200,
}
updateRaftConfigValue = func(metadata *raftprotos.ConfigMetadata) map[string]*common.ConfigValue {
return map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
}
addConsenterConfigValue = func() map[string]*common.ConfigValue {
metadata := &raftprotos.ConfigMetadata{Options: options}
for _, consenter := range consenters {
metadata.Consenters = append(metadata.Consenters, consenter)
}
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
return updateRaftConfigValue(metadata)
}
removeConsenterConfigValue = func(id uint64) map[string]*common.ConfigValue {
metadata := &raftprotos.ConfigMetadata{Options: options}
for nodeID, consenter := range consenters {
if nodeID == id {
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
return updateRaftConfigValue(metadata)
}
)
BeforeEach(func() {
network.exec(func(c *chain) {
c.opts.EvictionSuspicion = time.Millisecond * 100
c.opts.LeaderCheckInterval = time.Millisecond * 100
})
network.init()
network.start()
network.elect(1)
By("Submitting first tx to cut the block")
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
c1.clock.Increment(interval)
network.exec(
func(c *chain) {
Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
})
})
AfterEach(func() {
network.stop()
})
Context("reconfiguration", func() {
It("can rotate certificate by adding and removing 1 node in one config update", func() {
metadata := &raftprotos.ConfigMetadata{Options: options}
for id, consenter := range consenters {
if id == 2 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value))
c1.cutter.SetCutNext(true)
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
It("rotates leader certificate and triggers leadership transfer", func() {
metadata := &raftprotos.ConfigMetadata{Options: options}
for id, consenter := range consenters {
if id == 1 {
// remove first consenter, which is the leader
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value))
c1.cutter.SetCutNext(true)
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(c1.observe, LongEventualTimeout).Should(Receive(BeFollower()))
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
When("Leader is disconnected after cert rotation", func() {
It("still configures communication after failed leader transfer attempt", func() {
metadata := &raftprotos.ConfigMetadata{Options: options}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value))
c1.cutter.SetCutNext(true)
step1 := c1.getStepFunc()
count := c1.rpc.SendConsensusCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.SendConsensusCallCount() == count+4 {
defer network.disconnect(1)
}
return step1(dest, msg)
})
network.exec(func(c *chain) {
Consistently(c.clock.WatcherCount).Should(Equal(1))
})
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
c1.clock.WaitForNWatchersAndIncrement(time.Duration(ELECTION_TICK)*interval, 2)
})
})
It("adding node to the cluster", func() {
addConsenterUpdate := addConsenterConfigValue()
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterUpdate))
c1.cutter.SetCutNext(true)
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c1.fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
Expect(c1.fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c.fakeFields.fakeClusterSize.SetCallCount, LongEventualTimeout).Should(Equal(2))
Expect(c.fakeFields.fakeClusterSize.SetArgsForCall(1)).To(Equal(float64(4)))
})
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
c4.init()
network.addChain(c4)
c4.Start()
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c4.observe
}, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower})))
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
By("submitting new transaction to follower")
c1.cutter.SetCutNext(true)
err = c4.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c4.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(c4.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(2))
})
})
It("disconnecting follower node -> adding new node to the cluster -> writing blocks on the ledger -> reconnecting the follower node", func() {
By("Disconnecting a follower node")
network.disconnect(c2.id)
By("Configuring an additional node")
addConsenterUpdate := addConsenterConfigValue()
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterUpdate))
c1.cutter.SetCutNext(true)
By("Sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c1.fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
Expect(c1.fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
network.exec(func(c *chain) {
if c.id == c2.id {
return
}
Eventually(c.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c.fakeFields.fakeClusterSize.SetCallCount, LongEventualTimeout).Should(Equal(2))
Expect(c.fakeFields.fakeClusterSize.SetArgsForCall(1)).To(Equal(float64(4)))
})
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
By("Starting the new node")
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
c4.init()
network.addChain(c4)
c4.Start()
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c4.observe
}, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower})))
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
By("Sending data blocks to leader")
numOfBlocks := 100
for i := 0; i < numOfBlocks; i++ {
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
}
By("Reconnecting the follower node")
network.connect(c2.id)
c1.clock.Increment(interval)
By("Checking correct synchronization")
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1 + numOfBlocks))
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
It("stop leader and continue reconfiguration failing over to new leader", func() {
// Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader
// configure chain support mock to disconnect c1 right after it writes configuration block
// into the ledger, this to simulate failover.
// Next boostraping a new node c4 to join a cluster and creating config transaction, submitting
// it to the leader. Once leader writes configuration block it fails and leadership transferred to
// c2.
// Test asserts that new node c4, will join the cluster and c2 will handle failover of
// re-configuration. Later we connecting c1 back and making sure it capable of catching up with
// new configuration and successfully rejoins replica set.
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterConfigValue()))
c1.cutter.SetCutNext(true)
step1 := c1.getStepFunc()
count := c1.rpc.SendConsensusCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.SendConsensusCallCount() == count+4 {
defer network.disconnect(1)
}
return step1(dest, msg)
})
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
// every node has written config block to the OSN ledger
network.exec(
func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
Eventually(c1.rpc.SendConsensusCallCount, LongEventualTimeout).Should(Equal(count + 6))
c1.setStepFunc(step1)
// elect node with higher index
i2, _ := c2.storage.LastIndex() // err is always nil
i3, _ := c3.storage.LastIndex()
candidate := uint64(2)
if i3 > i2 {
candidate = 3
}
network.chains[candidate].cutter.SetCutNext(true)
network.elect(candidate)
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
c4.init()
network.addChain(c4)
c4.start()
Expect(c4.WaitReady()).To(Succeed())
network.join(4, true)
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
By("submitting new transaction to follower")
err = c4.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
// rest nodes are alive include a newly added, hence should write 2 blocks
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// node 1 has been stopped should not write any block
Consistently(c1.support.WriteBlockCallCount).Should(Equal(1))
network.join(1, true)
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
It("stop cluster quorum and continue reconfiguration after the restart", func() {
// Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader
// configure chain support mock to stop cluster after config block is committed.
// Restart the cluster and ensure it picks up updates and capable to finish reconfiguration.
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterConfigValue()))
c1.cutter.SetCutNext(true)
step1 := c1.getStepFunc()
count := c1.rpc.SendConsensusCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.SendConsensusCallCount() == count+4 {
defer func() {
network.disconnect(1)
network.disconnect(2)
network.disconnect(3)
}()
}
return step1(dest, msg)
})
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
// every node has written config block to the OSN ledger
network.exec(
func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
// assert conf change proposals have been dropped, before proceed to reconnect network
Eventually(c1.rpc.SendConsensusCallCount, LongEventualTimeout).Should(Equal(count + 6))
c1.setStepFunc(step1)
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
c4.init()
network.addChain(c4)
By("reconnecting nodes back")
for i := uint64(1); i < 4; i++ {
network.connect(i)
}
// elect node with higher index
i2, _ := c2.storage.LastIndex() // err is always nil
i3, _ := c3.storage.LastIndex()
candidate := uint64(2)
if i3 > i2 {
candidate = 3
}
network.chains[candidate].cutter.SetCutNext(true)
network.elect(candidate)
c4.start()
Expect(c4.WaitReady()).To(Succeed())
network.join(4, false)
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
By("submitting new transaction to follower")
err = c4.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
// rest nodes are alive include a newly added, hence should write 2 blocks
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
It("removes leader from replica set", func() {
// Scenario: Starting replica set of 3 nodes, electing nodeID = 1 to be the leader.
// Prepare config update transaction which removes leader (nodeID = 1), this to
// ensure we handle re-configuration of node removal correctly and remaining two
// nodes still capable to form functional quorum and Raft capable of making further progress.
// Moreover test asserts that removed node stops Rafting with rest of the cluster, i.e.
// should not be able to get updates or forward transactions.
configEnv := newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, nil, removeConsenterConfigValue(1))) // remove nodeID == 1
c1.cutter.SetCutNext(true)
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Duration(ELECTION_TICK) * interval)
Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c2.fakeFields.fakeClusterSize.SetCallCount, LongEventualTimeout).Should(Equal(2))
// Assert c1 has exited
Eventually(c1.Errored, LongEventualTimeout).Should(BeClosed())
var newLeader, remainingFollower *chain
var c2state raft.SoftState
var c3state raft.SoftState
retry := 1
for newLeader == nil || remainingFollower == nil {
select {
case c2state = <-c2.observe:
case c3state = <-c3.observe:
case <-time.After(LongEventualTimeout):
// abdicateleader might fail to transfer the leadership when the next candidate
// busy with applying committed entries; in that case,
// send an artificial MsgTimeoutNow to node to pick next leader
if retry > 0 {
retry -= 1
By("leadership transfer not complete, hence retrying")
c2.Consensus(&orderer.ConsensusRequest{Payload: protoutil.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow, To: 2})}, 0)
continue
}
Fail("Expected a new leader to present")
}
// an agreed leader among the two, which is one of the two remaining nodes
if ((c2state.RaftState == raft.StateFollower && c3state.RaftState == raft.StateLeader) ||
(c2state.RaftState == raft.StateLeader && c3state.RaftState == raft.StateFollower)) &&
c2state.Lead == c3state.Lead && c2state.Lead != raft.None {
newLeader = network.chains[c2state.Lead]
if c2state.RaftState == raft.StateFollower {
remainingFollower = c2
} else {
remainingFollower = c3
}
}
}
By("submitting transaction to new leader")
newLeader.cutter.SetCutNext(true)
err = newLeader.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(newLeader.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(remainingFollower.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
By("trying to submit to new node, expected to fail")
c1.cutter.SetCutNext(true)
err = c1.Order(env, 0)
Expect(err).To(HaveOccurred())
// number of block writes should remain the same
Consistently(newLeader.support.WriteBlockCallCount).Should(Equal(2))
Consistently(remainingFollower.support.WriteBlockCallCount).Should(Equal(2))
Consistently(c1.support.WriteBlockCallCount).Should(Equal(1))
})
It("does not deadlock if leader steps down while config block is in-flight", func() {
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, addConsenterConfigValue()))
c1.cutter.SetCutNext(true)
signal := make(chan struct{})
stub := c1.support.WriteConfigBlockStub
c1.support.WriteConfigBlockStub = func(b *common.Block, meta []byte) {
signal <- struct{}{}
<-signal
stub(b, meta)
}
By("Sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(signal, LongEventualTimeout).Should(Receive())
network.disconnect(1)
By("Ticking leader till it steps down")
Eventually(func() raft.SoftState {
c1.clock.Increment(interval)
return c1.Node.Status().SoftState
}, LongEventualTimeout).Should(StateEqual(0, raft.StateFollower))
close(signal)
Eventually(c1.observe, LongEventualTimeout).Should(Receive(StateEqual(0, raft.StateFollower)))
By("Re-electing 1 as leader")
network.connect(1)
network.elect(1)
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters, cryptoProvider, nil, nil)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
c4.init()
network.addChain(c4)
c4.Start()
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c4.observe
}, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateFollower)))
By("Submitting tx to confirm network is still working")
Expect(c1.Order(env, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
})
})
When("3/3 nodes are running", func() {
JustBeforeEach(func() {
network.init()
network.start()
network.elect(1)
})
AfterEach(func() {
network.stop()
})
It("correctly sets the cluster size and leadership metrics", func() {
// the network should see only one leadership change
network.exec(func(c *chain) {
Expect(c.fakeFields.fakeLeaderChanges.AddCallCount()).Should(Equal(1))
Expect(c.fakeFields.fakeLeaderChanges.AddArgsForCall(0)).Should(Equal(float64(1)))
Expect(c.fakeFields.fakeClusterSize.SetCallCount()).Should(Equal(1))
Expect(c.fakeFields.fakeClusterSize.SetArgsForCall(0)).To(Equal(float64(3)))
})
// c1 should be the leader
Expect(c1.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(2))
Expect(c1.fakeFields.fakeIsLeader.SetArgsForCall(1)).Should(Equal(float64(1)))
// c2 and c3 should continue to remain followers
Expect(c2.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(1))
Expect(c2.fakeFields.fakeIsLeader.SetArgsForCall(0)).Should(Equal(float64(0)))
Expect(c3.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(1))
Expect(c3.fakeFields.fakeIsLeader.SetArgsForCall(0)).Should(Equal(float64(0)))
})
It("orders envelope on leader", func() {
By("instructed to cut next block")
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
network.exec(
func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
By("respect batch timeout")
c1.cutter.SetCutNext(false)
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2))
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1)))
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
c1.clock.WaitForNWatchersAndIncrement(timeout, 2)
network.exec(
func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
})
It("orders envelope on follower", func() {
By("instructed to cut next block")
c1.cutter.SetCutNext(true)
err := c2.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c2.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(c2.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(0))
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
By("respect batch timeout")
c1.cutter.SetCutNext(false)
err = c2.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Expect(c2.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2))
Expect(c2.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1)))
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(0))
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
c1.clock.WaitForNWatchersAndIncrement(timeout, 2)
network.exec(
func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
})
When("MaxInflightBlocks is reached", func() {
BeforeEach(func() {
network.exec(func(c *chain) { c.opts.MaxInflightBlocks = 1 })
})
It("waits for in flight blocks to be committed", func() {
c1.cutter.SetCutNext(true)
// disconnect c1 to disrupt consensus
network.disconnect(1)
Expect(c1.Order(env, 0)).To(Succeed())
doneProp := make(chan struct{})
go func() {
defer GinkgoRecover()
Expect(c1.Order(env, 0)).To(Succeed())
close(doneProp)
}()
// expect second `Order` to block
Consistently(doneProp).ShouldNot(BeClosed())
network.exec(func(c *chain) {
Consistently(c.support.WriteBlockCallCount).Should(BeZero())
})
network.connect(1)
c1.clock.Increment(interval)
Eventually(doneProp, LongEventualTimeout).Should(BeClosed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
})
It("resets block in flight when steps down from leader", func() {
c1.cutter.SetCutNext(true)
c2.cutter.SetCutNext(true)
// disconnect c1 to disrupt consensus
network.disconnect(1)
Expect(c1.Order(env, 0)).To(Succeed())
doneProp := make(chan struct{})
go func() {
defer GinkgoRecover()
Expect(c1.Order(env, 0)).To(Succeed())
close(doneProp)
}()
// expect second `Order` to block
Consistently(doneProp).ShouldNot(BeClosed())
network.exec(func(c *chain) {
Consistently(c.support.WriteBlockCallCount).Should(BeZero())
})
network.elect(2)
Expect(c3.Order(env, 0)).To(Succeed())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
network.connect(1)
c2.clock.Increment(interval)
Eventually(doneProp, LongEventualTimeout).Should(BeClosed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
})
})
When("gRPC stream to leader is stuck", func() {
BeforeEach(func() {
c2.opts.RPCTimeout = time.Second
network.Lock()
network.delayWG.Add(1)
network.Unlock()
})
It("correctly times out", func() {
err := c2.Order(env, 0)
Expect(err).To(MatchError("timed out (1s) waiting on forwarding to 1"))
network.delayWG.Done()
})
})
When("leader is disconnected", func() {
It("correctly returns a failure to the client when forwarding from a follower", func() {
network.disconnect(1)
err := c2.Order(env, 0)
Expect(err).To(MatchError("connection lost"))
})
It("proactively steps down to follower", func() {
network.disconnect(1)
By("Ticking leader until it steps down")
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c1.observe
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 0, RaftState: raft.StateFollower})))
By("Ensuring it does not accept message due to the cluster being leaderless")
err := c1.Order(env, 0)
Expect(err).To(MatchError("no Raft leader"))
network.elect(2)
// c1 should have lost leadership
Expect(c1.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(3))
Expect(c1.fakeFields.fakeIsLeader.SetArgsForCall(2)).Should(Equal(float64(0)))
// c2 should become the leader
Expect(c2.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(2))
Expect(c2.fakeFields.fakeIsLeader.SetArgsForCall(1)).Should(Equal(float64(1)))
// c2 should continue to remain follower
Expect(c3.fakeFields.fakeIsLeader.SetCallCount()).Should(Equal(1))
network.join(1, true)
network.exec(func(c *chain) {
Expect(c.fakeFields.fakeLeaderChanges.AddCallCount()).Should(Equal(3))
Expect(c.fakeFields.fakeLeaderChanges.AddArgsForCall(2)).Should(Equal(float64(1)))
})
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
})
It("does not deadlock if propose is blocked", func() {
signal := make(chan struct{})
c1.cutter.SetCutNext(true)
c1.support.SequenceStub = func() uint64 {
signal <- struct{}{}
<-signal
return 0
}
By("Sending a normal transaction")
Expect(c1.Order(env, 0)).To(Succeed())
Eventually(signal).Should(Receive())
network.disconnect(1)
By("Ticking leader till it steps down")
Eventually(func() raft.SoftState {
c1.clock.Increment(interval)
return c1.Node.Status().SoftState
}).Should(StateEqual(0, raft.StateFollower))
close(signal)
Eventually(c1.observe).Should(Receive(StateEqual(0, raft.StateFollower)))
c1.support.SequenceStub = nil
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
})
By("Re-electing 1 as leader")
network.connect(1)
network.elect(1)
By("Sending another normal transaction")
Expect(c1.Order(env, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
})
When("follower is disconnected", func() {
It("should return error when receiving an env", func() {
network.disconnect(2)
errorC := c2.Errored()
Consistently(errorC).ShouldNot(BeClosed()) // assert that errorC is not closed
By("Ticking node 2 until it becomes pre-candidate")
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c2.observe
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 0, RaftState: raft.StatePreCandidate})))
Eventually(errorC).Should(BeClosed())
err := c2.Order(env, 0)
Expect(err).To(HaveOccurred())
Expect(c2.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(c2.fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Expect(c1.fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(0))
network.connect(2)
c1.clock.Increment(interval)
Expect(errorC).To(BeClosed())
Eventually(c2.Errored).ShouldNot(BeClosed())
})
})
It("leader retransmits lost messages", func() {
// This tests that heartbeats will trigger leader to retransmit lost MsgApp
c1.cutter.SetCutNext(true)
network.disconnect(1) // drop MsgApp
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Consistently(func() int { return c.support.WriteBlockCallCount() }).Should(Equal(0))
})
network.connect(1) // reconnect leader
c1.clock.Increment(interval) // trigger a heartbeat
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
})
It("allows the leader to create multiple normal blocks without having to wait for them to be written out", func() {
// this ensures that the created blocks are not written out
network.disconnect(1)
c1.cutter.SetCutNext(true)
for i := 0; i < 3; i++ {
Expect(c1.Order(env, 0)).To(Succeed())
}
Consistently(c1.support.WriteBlockCallCount).Should(Equal(0))
network.connect(1)
// After FAB-13722, leader would pause replication if it gets notified that message
// delivery to certain node is failed, i.e. connection refused. Replication to that
// follower is resumed if leader receives a MsgHeartbeatResp from it.
// We could certainly repeatedly tick leader to trigger heartbeat broadcast, but we
// would also risk a slow leader stepping down due to excessive ticks.
//
// Instead, we can simply send artificial MsgHeartbeatResp to leader to resume.
m2 := &raftpb.Message{To: c1.id, From: c2.id, Type: raftpb.MsgHeartbeatResp}
c1.Consensus(&orderer.ConsensusRequest{Channel: channelID, Payload: protoutil.MarshalOrPanic(m2)}, c2.id)
m3 := &raftpb.Message{To: c1.id, From: c3.id, Type: raftpb.MsgHeartbeatResp}
c1.Consensus(&orderer.ConsensusRequest{Channel: channelID, Payload: protoutil.MarshalOrPanic(m3)}, c3.id)
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
})
})
It("new leader should wait for in-fight blocks to commit before accepting new env", func() {
// Scenario: when a node is elected as new leader and there are still in-flight blocks,
// it should not immediately start accepting new envelopes, instead it should wait for
// those in-flight blocks to be committed, otherwise we may create uncle block which
// forks and panicks chain.
//
// Steps:
// - start raft cluster with three nodes and genesis block0
// - order env1 on c1, which creates block1
// - drop MsgApp from 1 to 3
// - drop second round of MsgApp sent from 1 to 2, so that block1 is only committed on c1
// - disconnect c1 and elect c2
// - order env2 on c2. This env must NOT be immediately accepted, otherwise c2 would create
// an uncle block1 based on block0.
// - c2 commits block1
// - c2 accepts env2, and creates block2
// - c2 commits block2
c1.cutter.SetCutNext(true)
c2.cutter.SetCutNext(true)
step1 := c1.getStepFunc()
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
stepMsg := &raftpb.Message{}
Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred())
if dest == 3 {
return nil
}
if stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) == 0 {
return nil
}
return step1(dest, msg)
})
Expect(c1.Order(env, 0)).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Consistently(c2.support.WriteBlockCallCount).Should(Equal(0))
Consistently(c3.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(1)
step2 := c2.getStepFunc()
c2.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
stepMsg := &raftpb.Message{}
Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred())
if stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) != 0 && dest == 3 {
for _, ent := range stepMsg.Entries {
if len(ent.Data) != 0 {
return nil
}
}
}
return step2(dest, msg)
})
network.elect(2)
go func() {
defer GinkgoRecover()
Expect(c2.Order(env, 0)).NotTo(HaveOccurred())
}()
Consistently(c2.support.WriteBlockCallCount).Should(Equal(0))
Consistently(c3.support.WriteBlockCallCount).Should(Equal(0))
c2.setStepFunc(step2)
c2.clock.Increment(interval)
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
b, _ := c2.support.WriteBlockArgsForCall(0)
Expect(b.Header.Number).To(Equal(uint64(1)))
b, _ = c2.support.WriteBlockArgsForCall(1)
Expect(b.Header.Number).To(Equal(uint64(2)))
})
Context("handling config blocks", func() {
var configEnv *common.Envelope
BeforeEach(func() {
values := map[string]*common.ConfigValue{
"BatchTimeout": {
Version: 1,
Value: marshalOrPanic(&orderer.BatchTimeout{
Timeout: "3ms",
}),
},
}
configEnv = newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, nil, values),
)
})
It("holds up block creation on leader once a config block has been created and not written out", func() {
// this ensures that the created blocks are not written out
network.disconnect(1)
c1.cutter.SetCutNext(true)
// config block
err := c1.Order(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
// to avoid data races since we are accessing these within a goroutine
tempEnv := env
tempC1 := c1
done := make(chan struct{})
// normal block
go func() {
defer GinkgoRecover()
// This should be blocked if config block is not committed
err := tempC1.Order(tempEnv, 0)
Expect(err).NotTo(HaveOccurred())
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
network.connect(1)
c1.clock.Increment(interval)
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteConfigBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
})
It("continues creating blocks on leader after a config block has been successfully written out", func() {
c1.cutter.SetCutNext(true)
// config block
err := c1.Configure(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteConfigBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
// normal block following config block
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
})
})
When("Snapshotting is enabled", func() {
BeforeEach(func() {
c1.opts.SnapshotIntervalSize = 1
c1.opts.SnapshotCatchUpEntries = 1
})
It("take snapshot on accumlated bytes condition met", func() {
// change the SnapshotIntervalSize on the chains
c3.opts.SnapshotIntervalSize = 1
c3.opts.SnapshotCatchUpEntries = 1
c2.opts.SnapshotIntervalSize = 1
c2.opts.SnapshotCatchUpEntries = 1
countSnapShotsForChain := func(cn *chain) int {
files, err := ioutil.ReadDir(cn.opts.SnapDir)
Expect(err).NotTo(HaveOccurred())
return len(files)
}
Expect(countSnapShotsForChain(c1)).Should(Equal(0))
Expect(countSnapShotsForChain(c3)).Should(Equal(0))
By("order envelop on node 1 to accumulate bytes")
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
// all three nodes should take snapshots
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
// order data on all nodes except node 3, empty the raft message directed to node 3
// node 1 should take a snapshot but node 3 should not
snapshots_on_node3 := countSnapShotsForChain(c3)
step1 := c1.getStepFunc()
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
stepMsg := &raftpb.Message{}
Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred())
if dest == 3 && stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) > 0 {
stepMsg.Entries = stepMsg.Entries[0:1]
stepMsg.Entries[0].Data = nil
msg.Payload = protoutil.MarshalOrPanic(stepMsg)
}
return step1(dest, msg)
})
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
// order data on all nodes except node 3, send raft raftpb.EntryConfChange message to node 3
// node 1 should take a snapshot but node 3 should not
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
stepMsg := &raftpb.Message{}
Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred())
if dest == 3 && stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) > 0 {
stepMsg.Entries = stepMsg.Entries[0:1]
// change message type to raftpb.EntryConfChange
stepMsg.Entries[0].Type = raftpb.EntryConfChange
cc := &raftpb.ConfChange{NodeID: uint64(3), Type: raftpb.ConfChangeRemoveNode}
data, err := cc.Marshal()
Expect(err).NotTo(HaveOccurred())
stepMsg.Entries[0].Data = data
msg.Payload = protoutil.MarshalOrPanic(stepMsg)
}
return step1(dest, msg)
})
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
countSnapShotsForc1 := func() int { return countSnapShotsForChain(c1) }
Eventually(countSnapShotsForc1, LongEventualTimeout).Should(Equal(3))
// No snapshot would be taken for node 3 after this orrderer request
additional_snapshots_for_node3 := countSnapShotsForChain(c3) - snapshots_on_node3
Expect(additional_snapshots_for_node3).Should(Equal(0))
})
It("keeps running if some entries in memory are purged", func() {
// Scenario: snapshotting is enabled on node 1 and it purges memory storage
// per every snapshot. Cluster should be correctly functioning.
i, err := c1.opts.MemoryStorage.FirstIndex()
Expect(err).NotTo(HaveOccurred())
Expect(i).To(Equal(uint64(1)))
c1.cutter.SetCutNext(true)
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
Eventually(c1.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
i, err = c1.opts.MemoryStorage.FirstIndex()
Expect(err).NotTo(HaveOccurred())
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
Eventually(c1.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
i, err = c1.opts.MemoryStorage.FirstIndex()
Expect(err).NotTo(HaveOccurred())
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
})
Eventually(c1.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
})
It("lagged node can catch up using snapshot", func() {
network.disconnect(2)
c1.cutter.SetCutNext(true)
c2Lasti, _ := c2.opts.MemoryStorage.LastIndex()
var blockCnt int
// Order blocks until first index of c1 memory is greater than last index of c2,
// so a snapshot will be sent to c2 when it rejoins network
Eventually(func() bool {
c1Firsti, _ := c1.opts.MemoryStorage.FirstIndex()
if c1Firsti > c2Lasti+1 {
return true
}
Expect(c1.Order(env, 0)).To(Succeed())
blockCnt++
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(blockCnt))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(blockCnt))
return false
}, LongEventualTimeout).Should(BeTrue())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
network.join(2, false)
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(blockCnt))
indices := etcdraft.ListSnapshots(logger, c2.opts.SnapDir)
Expect(indices).To(HaveLen(1))
gap := indices[0] - c2Lasti
// TODO In theory, "equal" is the accurate behavior we expect. However, eviction suspector,
// which calls block puller, is still replying on real clock, and sometimes increment puller
// call count. Therefore we are being more lenient here until suspector starts using fake clock
// so we have more deterministic control over it.
Expect(c2.puller.PullBlockCallCount()).To(BeNumerically(">=", int(gap)))
// chain should keeps functioning
Expect(c2.Order(env, 0)).To(Succeed())
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(blockCnt + 1))
})
})
})
Context("failover", func() {
It("follower should step up as leader upon failover", func() {
network.stop(1)
network.elect(2)
By("order envelope on new leader")
c2.cutter.SetCutNext(true)
err := c2.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
// block should not be produced on chain 1
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
// block should be produced on chain 2 & 3
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
By("order envelope on follower")
err = c3.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
// block should not be produced on chain 1
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
// block should be produced on chain 2 & 3
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
It("follower cannot be elected if its log is not up-to-date", func() {
network.disconnect(2)
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
network.disconnect(1)
network.connect(2)
// node 2 has not caught up with other nodes
for tick := 0; tick < 2*ELECTION_TICK-1; tick++ {
c2.clock.Increment(interval)
Consistently(c2.observe).ShouldNot(Receive(Equal(2)))
}
// When PreVote is enabled, node 2 would fail to collect enough
// PreVote because its index is not up-to-date. Therefore, it
// does not cause leader change on other nodes.
Consistently(c3.observe).ShouldNot(Receive())
network.elect(3) // node 3 has newest logs among 2&3, so it can be elected
})
It("PreVote prevents reconnected node from disturbing network", func() {
network.disconnect(2)
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
network.connect(2)
for tick := 0; tick < 2*ELECTION_TICK-1; tick++ {
c2.clock.Increment(interval)
Consistently(c2.observe).ShouldNot(Receive(Equal(2)))
}
Consistently(c1.observe).ShouldNot(Receive())
Consistently(c3.observe).ShouldNot(Receive())
})
It("follower can catch up and then campaign with success", func() {
network.disconnect(2)
c1.cutter.SetCutNext(true)
for i := 0; i < 10; i++ {
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
}
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
network.join(2, false)
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
network.disconnect(1)
network.elect(2)
})
It("purges blockcutter, stops timer and discards created blocks if leadership is lost", func() {
// enqueue one transaction into 1's blockcutter to test for purging of block cutter
c1.cutter.SetCutNext(false)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
// no block should be written because env is not cut into block yet
c1.clock.WaitForNWatchersAndIncrement(interval, 2)
Consistently(c1.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(1)
network.elect(2)
network.join(1, true)
Eventually(c1.clock.WatcherCount, LongEventualTimeout).Should(Equal(1)) // blockcutter time is stopped
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(0))
// the created block should be discarded since there is a leadership change
Consistently(c1.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(2)
network.elect(1)
err = c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
// The following group of assertions is redundant - it's here for completeness.
// If the blockcutter has not been reset, fast-forwarding 1's clock to 'timeout', should result in the blockcutter firing.
// If the blockcucter has been reset, fast-forwarding won't do anything.
//
// Put differently:
//
// correct:
// stop start fire
// |--------------|---------------------------|
// n*intervals timeout
// (advanced in election)
//
// wrong:
// unstop fire
// |---------------------------|
// timeout
//
// timeout-n*interval n*interval
// |-----------|----------------|
// ^ ^
// at this point of time it should fire
// timer should not fire at this point
c1.clock.WaitForNWatchersAndIncrement(timeout-interval, 2)
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
c1.clock.Increment(interval)
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
It("stale leader should not be able to propose block because of lagged term", func() {
network.disconnect(1)
network.elect(2)
network.connect(1)
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Consistently(c.support.WriteBlockCallCount).Should(Equal(0))
})
})
It("aborts waiting for block to be committed upon leadership lost", func() {
network.disconnect(1)
c1.cutter.SetCutNext(true)
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
network.exec(
func(c *chain) {
Consistently(c.support.WriteBlockCallCount).Should(Equal(0))
})
network.elect(2)
network.connect(1)
c2.clock.Increment(interval)
// this check guarantees that signal on resignC is consumed in commitBatches method.
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
})
})
})
})
})
func nodeConfigFromMetadata(consenterMetadata *raftprotos.ConfigMetadata) []cluster.RemoteNode {
var nodes []cluster.RemoteNode
for i, consenter := range consenterMetadata.Consenters {
// For now, skip ourselves
if i == 0 {
continue
}
serverDER, _ := pem.Decode(consenter.ServerTlsCert)
clientDER, _ := pem.Decode(consenter.ClientTlsCert)
node := cluster.RemoteNode{
NodeAddress: cluster.NodeAddress{
ID: uint64(i + 1),
Endpoint: "localhost:7050",
},
NodeCerts: cluster.NodeCerts{
ServerTLSCert: serverDER.Bytes,
ClientTLSCert: clientDER.Bytes,
},
}
nodes = append(nodes, node)
}
return nodes
}
func createMetadata(nodeCount int, tlsCA tlsgen.CA) *raftprotos.ConfigMetadata {
md := &raftprotos.ConfigMetadata{Options: &raftprotos.Options{
TickInterval: time.Duration(interval).String(),
ElectionTick: ELECTION_TICK,
HeartbeatTick: HEARTBEAT_TICK,
MaxInflightBlocks: 5,
}}
for i := 0; i < nodeCount; i++ {
md.Consenters = append(md.Consenters, &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
})
}
return md
}
func serverTLSCert(tlsCA tlsgen.CA) []byte {
cert, err := tlsCA.NewServerCertKeyPair("localhost")
if err != nil {
panic(err)
}
return cert.Cert
}
func clientTLSCert(tlsCA tlsgen.CA) []byte {
cert, err := tlsCA.NewClientCertKeyPair()
if err != nil {
panic(err)
}
return cert.Cert
}
// marshalOrPanic serializes a protobuf message and panics if this
// operation fails
func marshalOrPanic(pb proto.Message) []byte {
data, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return data
}
// helpers to facilitate tests
type stepFunc func(dest uint64, msg *orderer.ConsensusRequest) error
type chain struct {
id uint64
stepLock sync.Mutex
step stepFunc
// msgBuffer serializes ingress messages for a chain
// so they are delivered in the same order
msgBuffer chan *msg
support *consensusmocks.FakeConsenterSupport
cutter *mockblockcutter.Receiver
configurator *mocks.FakeConfigurator
rpc *mocks.FakeRPC
storage *raft.MemoryStorage
clock *fakeclock.FakeClock
opts etcdraft.Options
puller *mocks.FakeBlockPuller
// store written blocks to be returned by mock block puller
ledgerLock sync.RWMutex
ledger map[uint64]*common.Block
ledgerHeight uint64
lastConfigBlockNumber uint64
observe chan raft.SoftState
unstarted chan struct{}
stopped chan struct{}
haltCallback func()
fakeFields *fakeMetricsFields
*etcdraft.Chain
cryptoProvider bccsp.BCCSP
}
type msg struct {
req *orderer.ConsensusRequest
sender uint64
}
func newChain(
timeout time.Duration,
channel, dataDir string,
id uint64,
raftMetadata *raftprotos.BlockMetadata,
consenters map[uint64]*raftprotos.Consenter,
cryptoProvider bccsp.BCCSP,
support *consensusmocks.FakeConsenterSupport,
haltCallback func(),
) *chain {
rpc := &mocks.FakeRPC{}
clock := fakeclock.NewFakeClock(time.Now())
storage := raft.NewMemoryStorage()
fakeFields := newFakeMetricsFields()
opts := etcdraft.Options{
RPCTimeout: timeout,
RaftID: uint64(id),
Clock: clock,
TickInterval: interval,
ElectionTick: ELECTION_TICK,
HeartbeatTick: HEARTBEAT_TICK,
MaxSizePerMsg: 1024 * 1024,
MaxInflightBlocks: 256,
BlockMetadata: raftMetadata,
LeaderCheckInterval: 500 * time.Millisecond,
Consenters: consenters,
Logger: flogging.NewFabricLogger(zap.NewExample()),
MemoryStorage: storage,
WALDir: path.Join(dataDir, "wal"),
SnapDir: path.Join(dataDir, "snapshot"),
Metrics: newFakeMetrics(fakeFields),
}
if support == nil {
support = &consensusmocks.FakeConsenterSupport{}
support.ChannelIDReturns(channel)
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
}
cutter := mockblockcutter.NewReceiver()
close(cutter.Block)
support.BlockCutterReturns(cutter)
// upon leader change, lead is reset to 0 before set to actual
// new leader, i.e. 1 -> 0 -> 2. Therefore 2 numbers will be
// sent on this chan, so we need size to be 2
observe := make(chan raft.SoftState, 2)
configurator := &mocks.FakeConfigurator{}
puller := &mocks.FakeBlockPuller{}
ch := make(chan struct{})
close(ch)
c := &chain{
id: id,
support: support,
cutter: cutter,
rpc: rpc,
storage: storage,
observe: observe,
clock: clock,
opts: opts,
unstarted: ch,
stopped: make(chan struct{}),
configurator: configurator,
puller: puller,
ledger: map[uint64]*common.Block{
0: getSeedBlock(), // Very first block
},
ledgerHeight: 1,
fakeFields: fakeFields,
cryptoProvider: cryptoProvider,
msgBuffer: make(chan *msg, 500),
haltCallback: haltCallback,
}
// receives normal blocks and metadata and appends it into
// the ledger struct to simulate write behaviour
appendNormalBlockToLedger := func(b *common.Block, meta []byte) {
c.ledgerLock.Lock()
defer c.ledgerLock.Unlock()
b = proto.Clone(b).(*common.Block)
bytes, err := proto.Marshal(&common.Metadata{Value: meta})
Expect(err).NotTo(HaveOccurred())
b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes
lastConfigValue := protoutil.MarshalOrPanic(&common.LastConfig{Index: c.lastConfigBlockNumber})
b.Metadata.Metadata[common.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&common.Metadata{
Value: lastConfigValue,
})
c.ledger[b.Header.Number] = b
if c.ledgerHeight < b.Header.Number+1 {
c.ledgerHeight = b.Header.Number + 1
}
}
// receives config blocks and metadata and appends it into
// the ledger struct to simulate write behaviour
appendConfigBlockToLedger := func(b *common.Block, meta []byte) {
c.ledgerLock.Lock()
defer c.ledgerLock.Unlock()
b = proto.Clone(b).(*common.Block)
bytes, err := proto.Marshal(&common.Metadata{Value: meta})
Expect(err).NotTo(HaveOccurred())
b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes
c.lastConfigBlockNumber = b.Header.Number
lastConfigValue := protoutil.MarshalOrPanic(&common.LastConfig{Index: c.lastConfigBlockNumber})
b.Metadata.Metadata[common.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&common.Metadata{
Value: lastConfigValue,
})
c.ledger[b.Header.Number] = b
if c.ledgerHeight < b.Header.Number+1 {
c.ledgerHeight = b.Header.Number + 1
}
}
c.support.WriteBlockStub = appendNormalBlockToLedger
c.support.WriteConfigBlockStub = appendConfigBlockToLedger
// returns current ledger height
c.support.HeightStub = func() uint64 {
c.ledgerLock.RLock()
defer c.ledgerLock.RUnlock()
return c.ledgerHeight
}
// reads block from the ledger
c.support.BlockStub = func(number uint64) *common.Block {
c.ledgerLock.RLock()
defer c.ledgerLock.RUnlock()
return c.ledger[number]
}
// consume ingress messages for chain
go func() {
for msg := range c.msgBuffer {
c.Consensus(msg.req, msg.sender)
}
}()
return c
}
func (c *chain) init() {
ch, err := etcdraft.NewChain(
c.support,
c.opts,
c.configurator,
c.rpc,
c.cryptoProvider,
func() (etcdraft.BlockPuller, error) { return c.puller, nil },
c.haltCallback,
c.observe,
)
Expect(err).NotTo(HaveOccurred())
c.Chain = ch
}
func (c *chain) start() {
c.unstarted = nil
c.Start()
}
func (c *chain) setStepFunc(f stepFunc) {
c.stepLock.Lock()
c.step = f
c.stepLock.Unlock()
}
func (c *chain) getStepFunc() stepFunc {
c.stepLock.Lock()
defer c.stepLock.Unlock()
return c.step
}
type network struct {
delayWG sync.WaitGroup
sync.RWMutex
leader uint64
chains map[uint64]*chain
// links simulates the configuration of comm layer (link is bi-directional).
// if links[left][right] == true, right can send msg to left.
links map[uint64]map[uint64]bool
// connectivity determines if a node is connected to network. This is used for tests
// to simulate network partition.
connectivity map[uint64]bool
}
func (n *network) link(from []uint64, to uint64) {
links := make(map[uint64]bool)
for _, id := range from {
links[id] = true
}
n.Lock()
defer n.Unlock()
n.links[to] = links
}
func (n *network) linked(from, to uint64) bool {
n.RLock()
defer n.RUnlock()
return n.links[to][from]
}
func (n *network) connect(id uint64) {
n.Lock()
defer n.Unlock()
n.connectivity[id] = true
}
func (n *network) disconnect(id uint64) {
n.Lock()
defer n.Unlock()
n.connectivity[id] = false
}
func (n *network) connected(id uint64) bool {
n.RLock()
defer n.RUnlock()
return n.connectivity[id]
}
func (n *network) addChain(c *chain) {
n.connect(c.id) // chain is connected by default
c.step = func(dest uint64, req *orderer.ConsensusRequest) error {
if !n.linked(c.id, dest) {
return errors.Errorf("connection refused")
}
if !n.connected(c.id) || !n.connected(dest) {
return errors.Errorf("connection lost")
}
n.RLock()
target := n.chains[dest]
n.RUnlock()
target.msgBuffer <- &msg{req: req, sender: c.id}
return nil
}
c.rpc.SendConsensusStub = func(dest uint64, msg *orderer.ConsensusRequest) error {
c.stepLock.Lock()
defer c.stepLock.Unlock()
return c.step(dest, msg)
}
c.rpc.SendSubmitStub = func(dest uint64, msg *orderer.SubmitRequest, f func(error)) error {
if !n.linked(c.id, dest) {
err := errors.Errorf("connection refused")
f(err)
return err
}
if !n.connected(c.id) || !n.connected(dest) {
err := errors.Errorf("connection lost")
f(err)
return err
}
n.RLock()
target := n.chains[dest]
n.RUnlock()
go func() {
n.Lock()
n.delayWG.Wait()
n.Unlock()
defer GinkgoRecover()
target.Submit(msg, c.id)
f(nil)
}()
return nil
}
c.puller.PullBlockStub = func(i uint64) *common.Block {
n.RLock()
leaderChain := n.chains[n.leader]
n.RUnlock()
leaderChain.ledgerLock.RLock()
defer leaderChain.ledgerLock.RUnlock()
block := leaderChain.ledger[i]
return block
}
c.puller.HeightsByEndpointsStub = func() (map[string]uint64, error) {
n.RLock()
leader := n.chains[n.leader]
n.RUnlock()
if leader == nil {
return nil, errors.Errorf("ledger not available")
}
leader.ledgerLock.RLock()
defer leader.ledgerLock.RUnlock()
return map[string]uint64{"leader": leader.ledgerHeight}, nil
}
c.configurator.ConfigureCalls(func(channel string, nodes []cluster.RemoteNode) {
var ids []uint64
for _, node := range nodes {
ids = append(ids, node.ID)
}
n.link(ids, c.id)
})
n.Lock()
defer n.Unlock()
n.chains[c.id] = c
}
func createNetwork(
timeout time.Duration,
channel, dataDir string,
raftMetadata *raftprotos.BlockMetadata,
consenters map[uint64]*raftprotos.Consenter,
cryptoProvider bccsp.BCCSP,
tlsCA tlsgen.CA,
haltCallback func(),
) *network {
n := &network{
chains: make(map[uint64]*chain),
connectivity: make(map[uint64]bool),
links: make(map[uint64]map[uint64]bool),
}
for _, nodeID := range raftMetadata.ConsenterIds {
dir, err := ioutil.TempDir(dataDir, fmt.Sprintf("node-%d-", nodeID))
Expect(err).NotTo(HaveOccurred())
m := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
support := &consensusmocks.FakeConsenterSupport{}
support.ChannelIDReturns(channel)
support.SharedConfigReturns(mockOrdererWithBatchTimeout(timeout, nil))
mockOrdererConfig := mockOrdererWithTLSRootCert(timeout, nil, tlsCA)
support.SharedConfigReturns(mockOrdererConfig)
n.addChain(newChain(timeout, channel, dir, nodeID, m, consenters, cryptoProvider, support, haltCallback))
}
return n
}
// tests could alter configuration of a chain before creating it
func (n *network) init() {
n.exec(func(c *chain) { c.init() })
}
func (n *network) start(ids ...uint64) {
nodes := ids
if len(nodes) == 0 {
for i := range n.chains {
nodes = append(nodes, i)
}
}
for _, id := range nodes {
n.chains[id].start()
// When the Raft node bootstraps, it produces a ConfChange
// to add itself, which needs to be consumed with Ready().
// If there are pending configuration changes in raft,
// it refused to campaign, no matter how many ticks supplied.
// This is not a problem in production code because eventually
// raft.Ready will be consumed as real time goes by.
//
// However, this is problematic when using fake clock and artificial
// ticks. Instead of ticking raft indefinitely until raft.Ready is
// consumed, this check is added to indirectly guarantee
// that first ConfChange is actually consumed and we can safely
// proceed to tick raft.
Eventually(func() error {
_, err := n.chains[id].storage.Entries(1, 1, 1)
return err
}, LongEventualTimeout).ShouldNot(HaveOccurred())
Eventually(n.chains[id].WaitReady, LongEventualTimeout).ShouldNot(HaveOccurred())
}
}
func (n *network) stop(ids ...uint64) {
nodes := ids
if len(nodes) == 0 {
for i := range n.chains {
nodes = append(nodes, i)
}
}
for _, id := range nodes {
c := n.chains[id]
c.Halt()
Eventually(c.Errored).Should(BeClosed())
select {
case <-c.stopped:
default:
close(c.stopped)
}
}
}
func (n *network) exec(f func(c *chain), ids ...uint64) {
if len(ids) == 0 {
for _, c := range n.chains {
f(c)
}
return
}
for _, i := range ids {
f(n.chains[i])
}
}
// connect a node to network and tick leader to trigger
// a heartbeat so newly joined node can detect leader.
//
// expectLeaderChange controls whether leader change should
// be observed on newly joined node.
// - it should be true if newly joined node was leader
// - it should be false if newly joined node was follower, and
// already knows the leader.
func (n *network) join(id uint64, expectLeaderChange bool) {
n.connect(id)
n.RLock()
leader, follower := n.chains[n.leader], n.chains[id]
n.RUnlock()
step := leader.getStepFunc()
signal := make(chan struct{})
leader.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
if dest == id {
// close signal channel when a message targeting newly
// joined node is observed on wire.
select {
case <-signal:
default:
close(signal)
}
}
return step(dest, msg)
})
// Tick leader so it sends out a heartbeat to new node.
// One tick _may_ not be enough because leader might be busy
// and this tick is droppped on the floor.
Eventually(func() <-chan struct{} {
leader.clock.Increment(interval)
return signal
}, LongEventualTimeout, 100*time.Millisecond).Should(BeClosed())
leader.setStepFunc(step)
if expectLeaderChange {
Eventually(follower.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: n.leader, RaftState: raft.StateFollower})))
}
// wait for newly joined node to catch up with leader
i, err := n.chains[n.leader].opts.MemoryStorage.LastIndex()
Expect(err).NotTo(HaveOccurred())
Eventually(n.chains[id].opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(i))
}
// elect deterministically elects a node as leader
func (n *network) elect(id uint64) {
n.RLock()
// skip observing leader change on followers if the same leader is elected as the previous one,
// because this may happen too quickly from a slow follower's point of view, and 0 -> X transition
// may not be omitted at all.
observeFollowers := id != n.leader
candidate := n.chains[id]
var followers []*chain
for _, c := range n.chains {
if c.id != id {
followers = append(followers, c)
}
}
n.RUnlock()
// Send node an artificial MsgTimeoutNow to emulate leadership transfer.
fmt.Fprintf(GinkgoWriter, "Send artificial MsgTimeoutNow to elect node %d\n", id)
candidate.Consensus(&orderer.ConsensusRequest{Payload: protoutil.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow, To: id})}, 0)
Eventually(candidate.observe, LongEventualTimeout).Should(Receive(StateEqual(id, raft.StateLeader)))
n.Lock()
n.leader = id
n.Unlock()
if !observeFollowers {
return
}
// now observe leader change on other nodes
for _, c := range followers {
if c.id == id {
continue
}
select {
case <-c.stopped: // skip check if node n is stopped
case <-c.unstarted: // skip check if node is not started yet
default:
if n.linked(c.id, id) && n.connected(c.id) {
Eventually(c.observe, LongEventualTimeout).Should(Receive(StateEqual(id, raft.StateFollower)))
}
}
}
}
// sets the configEnv var declared above
func newConfigEnv(chainID string, headerType common.HeaderType, configUpdateEnv *common.ConfigUpdateEnvelope) *common.Envelope {
return &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{
ChannelHeader: marshalOrPanic(&common.ChannelHeader{
Type: int32(headerType),
ChannelId: chainID,
}),
},
Data: marshalOrPanic(&common.ConfigEnvelope{
LastUpdate: &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{
ChannelHeader: marshalOrPanic(&common.ChannelHeader{
Type: int32(common.HeaderType_CONFIG_UPDATE),
ChannelId: chainID,
}),
},
Data: marshalOrPanic(configUpdateEnv),
}), // common.Payload
}, // LastUpdate
}),
}),
}
}
func newConfigUpdateEnv(chainID string, oldValues, newValues map[string]*common.ConfigValue) *common.ConfigUpdateEnvelope {
return &common.ConfigUpdateEnvelope{
ConfigUpdate: marshalOrPanic(&common.ConfigUpdate{
ChannelId: chainID,
ReadSet: &common.ConfigGroup{
Groups: map[string]*common.ConfigGroup{
"Orderer": {
Values: oldValues,
},
},
},
WriteSet: &common.ConfigGroup{
Groups: map[string]*common.ConfigGroup{
"Orderer": {
Values: newValues,
},
},
}, // WriteSet
}),
}
}
func getSeedBlock() *common.Block {
return &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{[]byte("foo")}},
Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
}
}
func StateEqual(lead uint64, state raft.StateType) types.GomegaMatcher {
return Equal(raft.SoftState{Lead: lead, RaftState: state})
}
func BeFollower() types.GomegaMatcher {
return &StateMatcher{expect: raft.StateFollower}
}
type StateMatcher struct {
expect raft.StateType
}
func (stmatcher *StateMatcher) Match(actual interface{}) (success bool, err error) {
state, ok := actual.(raft.SoftState)
if !ok {
return false, errors.Errorf("StateMatcher expects a raft SoftState")
}
return state.RaftState == stmatcher.expect, nil
}
func (stmatcher *StateMatcher) FailureMessage(actual interface{}) (message string) {
state, ok := actual.(raft.SoftState)
if !ok {
return "StateMatcher expects a raft SoftState"
}
return fmt.Sprintf("Expected %s to be %s", state.RaftState, stmatcher.expect)
}
func (stmatcher *StateMatcher) NegatedFailureMessage(actual interface{}) (message string) {
state, ok := actual.(raft.SoftState)
if !ok {
return "StateMatcher expects a raft SoftState"
}
return fmt.Sprintf("Expected %s not to be %s", state.RaftState, stmatcher.expect)
}
func noOpBlockPuller() (etcdraft.BlockPuller, error) {
bp := &mocks.FakeBlockPuller{}
return bp, nil
}