628 lines
18 KiB
Go
628 lines
18 KiB
Go
/*
|
|
Copyright IBM Corp. 2018 All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"math/rand"
|
|
"reflect"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/hyperledger/fabric-protos-go/common"
|
|
"github.com/hyperledger/fabric-protos-go/orderer"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/hyperledger/fabric/common/util"
|
|
"github.com/hyperledger/fabric/internal/pkg/identity"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// BlockPuller pulls blocks from remote ordering nodes.
|
|
// Its operations are not thread safe.
|
|
type BlockPuller struct {
|
|
// Configuration
|
|
MaxPullBlockRetries uint64
|
|
MaxTotalBufferBytes int
|
|
Signer identity.SignerSerializer
|
|
TLSCert []byte
|
|
Channel string
|
|
FetchTimeout time.Duration
|
|
RetryTimeout time.Duration
|
|
Logger *flogging.FabricLogger
|
|
Dialer Dialer
|
|
VerifyBlockSequence BlockSequenceVerifier
|
|
Endpoints []EndpointCriteria
|
|
|
|
// A 'stopper' goroutine may signal the go-routine servicing PullBlock & HeightsByEndpoints to stop by closing this
|
|
// channel. Note: all methods of the BlockPuller must be serviced by a single goroutine, it is not thread safe.
|
|
// It is the responsibility of the 'stopper' not to close the channel more then once.
|
|
StopChannel chan struct{}
|
|
|
|
// Internal state
|
|
stream *ImpatientStream
|
|
blockBuff []*common.Block
|
|
latestSeq uint64
|
|
endpoint string
|
|
conn *grpc.ClientConn
|
|
cancelStream func()
|
|
}
|
|
|
|
// Clone returns a copy of this BlockPuller initialized
|
|
// for the given channel
|
|
func (p *BlockPuller) Clone() *BlockPuller {
|
|
// Clone by value
|
|
copy := *p
|
|
// Reset internal state
|
|
copy.stream = nil
|
|
copy.blockBuff = nil
|
|
copy.latestSeq = 0
|
|
copy.endpoint = ""
|
|
copy.conn = nil
|
|
copy.cancelStream = nil
|
|
return ©
|
|
}
|
|
|
|
// Close makes the BlockPuller close the connection and stream
|
|
// with the remote endpoint, and wipe the internal block buffer.
|
|
func (p *BlockPuller) Close() {
|
|
p.disconnect()
|
|
p.blockBuff = nil
|
|
}
|
|
|
|
func (p *BlockPuller) disconnect() {
|
|
if p.cancelStream != nil {
|
|
p.cancelStream()
|
|
}
|
|
p.cancelStream = nil
|
|
|
|
if p.conn != nil {
|
|
p.conn.Close()
|
|
}
|
|
p.conn = nil
|
|
p.endpoint = ""
|
|
p.latestSeq = 0
|
|
}
|
|
|
|
// PullBlock blocks until a block with the given sequence is fetched
|
|
// from some remote ordering node, or until consecutive failures
|
|
// of fetching the block exceed MaxPullBlockRetries.
|
|
func (p *BlockPuller) PullBlock(seq uint64) *common.Block {
|
|
retriesLeft := p.MaxPullBlockRetries
|
|
for {
|
|
block := p.tryFetchBlock(seq)
|
|
if block != nil {
|
|
return block
|
|
}
|
|
retriesLeft--
|
|
if retriesLeft == 0 && p.MaxPullBlockRetries > 0 {
|
|
p.Logger.Errorf("Failed pulling block [%d]: retry count exhausted(%d)", seq, p.MaxPullBlockRetries)
|
|
return nil
|
|
}
|
|
|
|
if waitOnStop(p.RetryTimeout, p.StopChannel) {
|
|
p.Logger.Info("Received a stop signal")
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// HeightsByEndpoints returns the block heights by endpoints of orderers
|
|
func (p *BlockPuller) HeightsByEndpoints() (map[string]uint64, error) {
|
|
endpointsInfo := p.probeEndpoints(0)
|
|
res := make(map[string]uint64)
|
|
for endpoint, endpointInfo := range endpointsInfo.byEndpoints() {
|
|
endpointInfo.conn.Close()
|
|
res[endpoint] = endpointInfo.lastBlockSeq + 1
|
|
}
|
|
p.Logger.Info("Returning the heights of OSNs mapped by endpoints", res)
|
|
return res, endpointsInfo.err
|
|
}
|
|
|
|
// UpdateEndpoints assigns the new endpoints and disconnects from the current one.
|
|
func (p *BlockPuller) UpdateEndpoints(endpoints []EndpointCriteria) {
|
|
p.Logger.Debugf("Updating endpoints: %v", endpoints)
|
|
p.Endpoints = endpoints
|
|
// TODO FAB-18121 Disconnect only if the currently connected endpoint was dropped or has changes in its TLSRootCAs
|
|
p.disconnect()
|
|
}
|
|
|
|
// waitOnStop waits duration, but returns immediately with true if the stop channel fires first.
|
|
func waitOnStop(duration time.Duration, stop <-chan struct{}) bool {
|
|
select {
|
|
case <-stop:
|
|
return true
|
|
case <-time.After(duration):
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (p *BlockPuller) tryFetchBlock(seq uint64) *common.Block {
|
|
block := p.popBlock(seq)
|
|
if block != nil {
|
|
return block
|
|
}
|
|
|
|
var reConnected bool
|
|
|
|
for retriesLeft := p.MaxPullBlockRetries; p.isDisconnected(); retriesLeft-- {
|
|
reConnected = true
|
|
p.connectToSomeEndpoint(seq)
|
|
if p.isDisconnected() {
|
|
p.Logger.Debugf("Failed to connect to some endpoint, going to try again in %v", p.RetryTimeout)
|
|
|
|
if waitOnStop(p.RetryTimeout, p.StopChannel) {
|
|
p.Logger.Info("Received a stop signal")
|
|
return nil
|
|
}
|
|
}
|
|
if retriesLeft == 0 && p.MaxPullBlockRetries > 0 {
|
|
p.Logger.Errorf("Failed to connect to some endpoint, attempts exhausted(%d), seq: %d, endpoints: %v",
|
|
p.MaxPullBlockRetries, seq, p.Endpoints)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Else, buffer is empty. So we need to pull blocks
|
|
// to re-fill it.
|
|
if err := p.pullBlocks(seq, reConnected); err != nil {
|
|
p.Logger.Errorf("Failed pulling blocks: %v", err)
|
|
// Something went wrong, disconnect. and return nil
|
|
p.Close()
|
|
// If we have a block in the buffer, return it.
|
|
if len(p.blockBuff) > 0 {
|
|
return p.blockBuff[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := p.VerifyBlockSequence(p.blockBuff, p.Channel); err != nil {
|
|
p.Close()
|
|
p.Logger.Errorf("Failed verifying received blocks: %v", err)
|
|
return nil
|
|
}
|
|
|
|
// At this point, the buffer is full, so shift it and return the first block.
|
|
return p.popBlock(seq)
|
|
}
|
|
|
|
func (p *BlockPuller) setCancelStreamFunc(f func()) {
|
|
p.cancelStream = f
|
|
}
|
|
|
|
func (p *BlockPuller) pullBlocks(seq uint64, reConnected bool) error {
|
|
env, err := p.seekNextEnvelope(seq)
|
|
if err != nil {
|
|
p.Logger.Errorf("Failed creating seek envelope: %v", err)
|
|
return err
|
|
}
|
|
|
|
stream, err := p.obtainStream(reConnected, env, seq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var totalSize int
|
|
p.blockBuff = nil
|
|
nextExpectedSequence := seq
|
|
for totalSize < p.MaxTotalBufferBytes && nextExpectedSequence <= p.latestSeq {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
p.Logger.Errorf("Failed receiving next block from %s: %v", p.endpoint, err)
|
|
return err
|
|
}
|
|
|
|
block, err := extractBlockFromResponse(resp)
|
|
if err != nil {
|
|
p.Logger.Errorf("Received a bad block from %s: %v", p.endpoint, err)
|
|
return err
|
|
}
|
|
seq := block.Header.Number
|
|
if seq != nextExpectedSequence {
|
|
p.Logger.Errorf("Expected to receive sequence %d but got %d instead", nextExpectedSequence, seq)
|
|
return errors.Errorf("got unexpected sequence from %s - (%d) instead of (%d)", p.endpoint, seq, nextExpectedSequence)
|
|
}
|
|
size := blockSize(block)
|
|
totalSize += size
|
|
p.blockBuff = append(p.blockBuff, block)
|
|
nextExpectedSequence++
|
|
p.Logger.Infof("Got block [%d] of size %d KB from %s", seq, size/1024, p.endpoint)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *BlockPuller) obtainStream(reConnected bool, env *common.Envelope, seq uint64) (*ImpatientStream, error) {
|
|
var stream *ImpatientStream
|
|
var err error
|
|
if reConnected {
|
|
p.Logger.Infof("Sending request for block [%d] to %s", seq, p.endpoint)
|
|
stream, err = p.requestBlocks(p.endpoint, NewImpatientStream(p.conn, p.FetchTimeout), env)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Stream established successfully.
|
|
// In next iterations of this function, reuse it.
|
|
p.stream = stream
|
|
} else {
|
|
// Reuse previous stream
|
|
stream = p.stream
|
|
}
|
|
|
|
p.setCancelStreamFunc(stream.cancelFunc)
|
|
return stream, nil
|
|
}
|
|
|
|
// popBlock pops a block from the in-memory buffer and returns it,
|
|
// or returns nil if the buffer is empty or the block doesn't match
|
|
// the given wanted sequence.
|
|
func (p *BlockPuller) popBlock(seq uint64) *common.Block {
|
|
if len(p.blockBuff) == 0 {
|
|
return nil
|
|
}
|
|
block, rest := p.blockBuff[0], p.blockBuff[1:]
|
|
p.blockBuff = rest
|
|
// If the requested block sequence is the wrong one, discard the buffer
|
|
// to start fetching blocks all over again.
|
|
if seq != block.Header.Number {
|
|
p.blockBuff = nil
|
|
return nil
|
|
}
|
|
return block
|
|
}
|
|
|
|
func (p *BlockPuller) isDisconnected() bool {
|
|
return p.conn == nil
|
|
}
|
|
|
|
// connectToSomeEndpoint makes the BlockPuller connect to some endpoint that has
|
|
// the given minimum block sequence.
|
|
func (p *BlockPuller) connectToSomeEndpoint(minRequestedSequence uint64) {
|
|
// Probe all endpoints in parallel, searching an endpoint with a given minimum block sequence
|
|
// and then sort them by their endpoints to a map.
|
|
endpointsInfo := p.probeEndpoints(minRequestedSequence).byEndpoints()
|
|
if len(endpointsInfo) == 0 {
|
|
p.Logger.Warningf("Could not connect to any endpoint of %v", p.Endpoints)
|
|
return
|
|
}
|
|
|
|
// Choose a random endpoint out of the available endpoints
|
|
chosenEndpoint := randomEndpoint(endpointsInfo)
|
|
// Disconnect all connections but this endpoint
|
|
for endpoint, endpointInfo := range endpointsInfo {
|
|
if endpoint == chosenEndpoint {
|
|
continue
|
|
}
|
|
endpointInfo.conn.Close()
|
|
}
|
|
|
|
p.conn = endpointsInfo[chosenEndpoint].conn
|
|
p.endpoint = chosenEndpoint
|
|
p.latestSeq = endpointsInfo[chosenEndpoint].lastBlockSeq
|
|
|
|
p.Logger.Infof("Connected to %s with last block seq of %d", p.endpoint, p.latestSeq)
|
|
}
|
|
|
|
// probeEndpoints reaches to all endpoints known and returns the latest block sequences
|
|
// of the endpoints, as well as gRPC connections to them.
|
|
func (p *BlockPuller) probeEndpoints(minRequestedSequence uint64) *endpointInfoBucket {
|
|
endpointsInfo := make(chan *endpointInfo, len(p.Endpoints))
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(p.Endpoints))
|
|
|
|
var forbiddenErr uint32
|
|
var unavailableErr uint32
|
|
|
|
for _, endpoint := range p.Endpoints {
|
|
go func(endpoint EndpointCriteria) {
|
|
defer wg.Done()
|
|
ei, err := p.probeEndpoint(endpoint, minRequestedSequence)
|
|
if err != nil {
|
|
p.Logger.Warningf("Received error of type '%v' from %s", err, endpoint.Endpoint)
|
|
p.Logger.Debugf("%s's TLSRootCAs are %s", endpoint.Endpoint, endpoint.TLSRootCAs)
|
|
if err == ErrForbidden {
|
|
atomic.StoreUint32(&forbiddenErr, 1)
|
|
}
|
|
if err == ErrServiceUnavailable {
|
|
atomic.StoreUint32(&unavailableErr, 1)
|
|
}
|
|
return
|
|
}
|
|
endpointsInfo <- ei
|
|
}(endpoint)
|
|
}
|
|
wg.Wait()
|
|
|
|
close(endpointsInfo)
|
|
eib := &endpointInfoBucket{
|
|
bucket: endpointsInfo,
|
|
logger: p.Logger,
|
|
}
|
|
|
|
if unavailableErr == 1 && len(endpointsInfo) == 0 {
|
|
eib.err = ErrServiceUnavailable
|
|
}
|
|
if forbiddenErr == 1 && len(endpointsInfo) == 0 {
|
|
eib.err = ErrForbidden
|
|
}
|
|
return eib
|
|
}
|
|
|
|
// probeEndpoint returns a gRPC connection and the latest block sequence of an endpoint with the given
|
|
// requires minimum sequence, or error if something goes wrong.
|
|
func (p *BlockPuller) probeEndpoint(endpoint EndpointCriteria, minRequestedSequence uint64) (*endpointInfo, error) {
|
|
conn, err := p.Dialer.Dial(endpoint)
|
|
if err != nil {
|
|
p.Logger.Warningf("Failed connecting to %s: %v", endpoint, err)
|
|
return nil, err
|
|
}
|
|
|
|
lastBlockSeq, err := p.fetchLastBlockSeq(minRequestedSequence, endpoint.Endpoint, conn)
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
return &endpointInfo{conn: conn, lastBlockSeq: lastBlockSeq, endpoint: endpoint.Endpoint}, nil
|
|
}
|
|
|
|
// randomEndpoint returns a random endpoint of the given endpointInfo
|
|
func randomEndpoint(endpointsToHeight map[string]*endpointInfo) string {
|
|
var candidates []string
|
|
for endpoint := range endpointsToHeight {
|
|
candidates = append(candidates, endpoint)
|
|
}
|
|
|
|
rand.Seed(time.Now().UnixNano())
|
|
return candidates[rand.Intn(len(candidates))]
|
|
}
|
|
|
|
// fetchLastBlockSeq returns the last block sequence of an endpoint with the given gRPC connection.
|
|
func (p *BlockPuller) fetchLastBlockSeq(minRequestedSequence uint64, endpoint string, conn *grpc.ClientConn) (uint64, error) {
|
|
env, err := p.seekLastEnvelope()
|
|
if err != nil {
|
|
p.Logger.Errorf("Failed creating seek envelope for %s: %v", endpoint, err)
|
|
return 0, err
|
|
}
|
|
|
|
stream, err := p.requestBlocks(endpoint, NewImpatientStream(conn, p.FetchTimeout), env)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer stream.abort()
|
|
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
p.Logger.Errorf("Failed receiving the latest block from %s: %v", endpoint, err)
|
|
return 0, err
|
|
}
|
|
|
|
block, err := extractBlockFromResponse(resp)
|
|
if err != nil {
|
|
p.Logger.Warningf("Received %v from %s: %v", resp, endpoint, err)
|
|
return 0, err
|
|
}
|
|
stream.CloseSend()
|
|
|
|
seq := block.Header.Number
|
|
if seq < minRequestedSequence {
|
|
err := errors.Errorf("minimum requested sequence is %d but %s is at sequence %d", minRequestedSequence, endpoint, seq)
|
|
p.Logger.Infof("Skipping pulling from %s: %v", endpoint, err)
|
|
return 0, err
|
|
}
|
|
|
|
p.Logger.Infof("%s is at block sequence of %d", endpoint, seq)
|
|
return block.Header.Number, nil
|
|
}
|
|
|
|
// requestBlocks starts requesting blocks from the given endpoint, using the given ImpatientStreamCreator by sending
|
|
// the given envelope.
|
|
// It returns a stream that is used to pull blocks, or error if something goes wrong.
|
|
func (p *BlockPuller) requestBlocks(endpoint string, newStream ImpatientStreamCreator, env *common.Envelope) (*ImpatientStream, error) {
|
|
stream, err := newStream()
|
|
if err != nil {
|
|
p.Logger.Warningf("Failed establishing deliver stream with %s", endpoint)
|
|
return nil, err
|
|
}
|
|
|
|
if err := stream.Send(env); err != nil {
|
|
p.Logger.Errorf("Failed sending seek envelope to %s: %v", endpoint, err)
|
|
stream.abort()
|
|
return nil, err
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
func extractBlockFromResponse(resp *orderer.DeliverResponse) (*common.Block, error) {
|
|
switch t := resp.Type.(type) {
|
|
case *orderer.DeliverResponse_Block:
|
|
block := t.Block
|
|
if block == nil {
|
|
return nil, errors.New("block is nil")
|
|
}
|
|
if block.Data == nil {
|
|
return nil, errors.New("block data is nil")
|
|
}
|
|
if block.Header == nil {
|
|
return nil, errors.New("block header is nil")
|
|
}
|
|
if block.Metadata == nil || len(block.Metadata.Metadata) == 0 {
|
|
return nil, errors.New("block metadata is empty")
|
|
}
|
|
return block, nil
|
|
case *orderer.DeliverResponse_Status:
|
|
if t.Status == common.Status_FORBIDDEN {
|
|
return nil, ErrForbidden
|
|
}
|
|
if t.Status == common.Status_SERVICE_UNAVAILABLE {
|
|
return nil, ErrServiceUnavailable
|
|
}
|
|
return nil, errors.Errorf("faulty node, received: %v", resp)
|
|
default:
|
|
return nil, errors.Errorf("response is of type %v, but expected a block", reflect.TypeOf(resp.Type))
|
|
}
|
|
}
|
|
|
|
func (p *BlockPuller) seekLastEnvelope() (*common.Envelope, error) {
|
|
return protoutil.CreateSignedEnvelopeWithTLSBinding(
|
|
common.HeaderType_DELIVER_SEEK_INFO,
|
|
p.Channel,
|
|
p.Signer,
|
|
last(),
|
|
int32(0),
|
|
uint64(0),
|
|
util.ComputeSHA256(p.TLSCert),
|
|
)
|
|
}
|
|
|
|
func (p *BlockPuller) seekNextEnvelope(startSeq uint64) (*common.Envelope, error) {
|
|
return protoutil.CreateSignedEnvelopeWithTLSBinding(
|
|
common.HeaderType_DELIVER_SEEK_INFO,
|
|
p.Channel,
|
|
p.Signer,
|
|
nextSeekInfo(startSeq),
|
|
int32(0),
|
|
uint64(0),
|
|
util.ComputeSHA256(p.TLSCert),
|
|
)
|
|
}
|
|
|
|
func last() *orderer.SeekInfo {
|
|
return &orderer.SeekInfo{
|
|
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Newest{Newest: &orderer.SeekNewest{}}},
|
|
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
|
|
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
|
|
ErrorResponse: orderer.SeekInfo_BEST_EFFORT,
|
|
}
|
|
}
|
|
|
|
func nextSeekInfo(startSeq uint64) *orderer.SeekInfo {
|
|
return &orderer.SeekInfo{
|
|
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: startSeq}}},
|
|
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
|
|
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
|
|
ErrorResponse: orderer.SeekInfo_BEST_EFFORT,
|
|
}
|
|
}
|
|
|
|
func blockSize(block *common.Block) int {
|
|
return len(protoutil.MarshalOrPanic(block))
|
|
}
|
|
|
|
type endpointInfo struct {
|
|
endpoint string
|
|
conn *grpc.ClientConn
|
|
lastBlockSeq uint64
|
|
}
|
|
|
|
type endpointInfoBucket struct {
|
|
bucket <-chan *endpointInfo
|
|
logger *flogging.FabricLogger
|
|
err error
|
|
}
|
|
|
|
func (eib endpointInfoBucket) byEndpoints() map[string]*endpointInfo {
|
|
infoByEndpoints := make(map[string]*endpointInfo)
|
|
for endpointInfo := range eib.bucket {
|
|
if _, exists := infoByEndpoints[endpointInfo.endpoint]; exists {
|
|
eib.logger.Warningf("Duplicate endpoint found(%s), skipping it", endpointInfo.endpoint)
|
|
endpointInfo.conn.Close()
|
|
continue
|
|
}
|
|
infoByEndpoints[endpointInfo.endpoint] = endpointInfo
|
|
}
|
|
return infoByEndpoints
|
|
}
|
|
|
|
// ImpatientStreamCreator creates an ImpatientStream
|
|
type ImpatientStreamCreator func() (*ImpatientStream, error)
|
|
|
|
// ImpatientStream aborts the stream if it waits for too long for a message.
|
|
type ImpatientStream struct {
|
|
waitTimeout time.Duration
|
|
orderer.AtomicBroadcast_DeliverClient
|
|
cancelFunc func()
|
|
}
|
|
|
|
func (stream *ImpatientStream) abort() {
|
|
stream.cancelFunc()
|
|
}
|
|
|
|
// Recv blocks until a response is received from the stream or the
|
|
// timeout expires.
|
|
func (stream *ImpatientStream) Recv() (*orderer.DeliverResponse, error) {
|
|
// Initialize a timeout to cancel the stream when it expires
|
|
timeout := time.NewTimer(stream.waitTimeout)
|
|
defer timeout.Stop()
|
|
|
|
responseChan := make(chan errorAndResponse, 1)
|
|
|
|
// receive waitGroup ensures the goroutine below exits before
|
|
// this function exits.
|
|
var receive sync.WaitGroup
|
|
receive.Add(1)
|
|
defer receive.Wait()
|
|
|
|
go func() {
|
|
defer receive.Done()
|
|
resp, err := stream.AtomicBroadcast_DeliverClient.Recv()
|
|
responseChan <- errorAndResponse{err: err, resp: resp}
|
|
}()
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
stream.cancelFunc()
|
|
return nil, errors.Errorf("didn't receive a response within %v", stream.waitTimeout)
|
|
case respAndErr := <-responseChan:
|
|
return respAndErr.resp, respAndErr.err
|
|
}
|
|
}
|
|
|
|
// NewImpatientStream returns a ImpatientStreamCreator that creates impatientStreams.
|
|
func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) ImpatientStreamCreator {
|
|
return func() (*ImpatientStream, error) {
|
|
abc := orderer.NewAtomicBroadcastClient(conn)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
stream, err := abc.Deliver(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
once := &sync.Once{}
|
|
return &ImpatientStream{
|
|
waitTimeout: waitTimeout,
|
|
// The stream might be canceled while Close() is being called, but also
|
|
// while a timeout expires, so ensure it's only called once.
|
|
cancelFunc: func() {
|
|
once.Do(cancel)
|
|
},
|
|
AtomicBroadcast_DeliverClient: stream,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
type errorAndResponse struct {
|
|
err error
|
|
resp *orderer.DeliverResponse
|
|
}
|
|
|
|
// ErrForbidden denotes that an ordering node refuses sending blocks due to access control.
|
|
var ErrForbidden = errors.New("forbidden pulling the channel")
|
|
|
|
// ErrServiceUnavailable denotes that an ordering node is not servicing at the moment.
|
|
var ErrServiceUnavailable = errors.New("service unavailable")
|
|
|
|
// ErrNotInChannel denotes that an ordering node is not in the channel
|
|
var ErrNotInChannel = errors.New("not in the channel")
|
|
|
|
var ErrRetryCountExhausted = errors.New("retry attempts exhausted")
|