80 lines
2.8 KiB
Go
80 lines
2.8 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package node
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/hyperledger/fabric/common/semaphore"
|
|
"github.com/hyperledger/fabric/core/peer"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func initGrpcSemaphores(config *peer.Config) map[string]semaphore.Semaphore {
|
|
semaphores := make(map[string]semaphore.Semaphore)
|
|
endorserConcurrency := config.LimitsConcurrencyEndorserService
|
|
deliverConcurrency := config.LimitsConcurrencyDeliverService
|
|
gatewayConcurrency := config.LimitsConcurrencyGatewayService
|
|
|
|
// Currently concurrency limit is applied to endorser service, deliver service and gateway service.
|
|
// These services are defined in fabric-protos and fabric-protos-go (generated from fabric-protos).
|
|
// Below service names must match their definitions.
|
|
if endorserConcurrency != 0 {
|
|
logger.Infof("concurrency limit for endorser service is %d", endorserConcurrency)
|
|
semaphores["/protos.Endorser"] = semaphore.New(endorserConcurrency)
|
|
}
|
|
if deliverConcurrency != 0 {
|
|
logger.Infof("concurrency limit for deliver service is %d", deliverConcurrency)
|
|
semaphores["/protos.Deliver"] = semaphore.New(deliverConcurrency)
|
|
}
|
|
if gatewayConcurrency != 0 {
|
|
logger.Infof("concurrency limit for gateway service is %d", gatewayConcurrency)
|
|
semaphores["/gateway.Gateway"] = semaphore.New(gatewayConcurrency)
|
|
}
|
|
|
|
return semaphores
|
|
}
|
|
|
|
func unaryGrpcLimiter(semaphores map[string]semaphore.Semaphore) grpc.UnaryServerInterceptor {
|
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
serviceName := getServiceName(info.FullMethod)
|
|
sema, ok := semaphores[serviceName]
|
|
if !ok {
|
|
return handler(ctx, req)
|
|
}
|
|
if !sema.TryAcquire() {
|
|
logger.Errorf("Too many requests for %s, exceeding concurrency limit (%d)", serviceName, cap(sema))
|
|
return nil, errors.Errorf("too many requests for %s, exceeding concurrency limit (%d)", serviceName, cap(sema))
|
|
}
|
|
defer sema.Release()
|
|
return handler(ctx, req)
|
|
}
|
|
}
|
|
|
|
func streamGrpcLimiter(semaphores map[string]semaphore.Semaphore) grpc.StreamServerInterceptor {
|
|
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
|
serviceName := getServiceName(info.FullMethod)
|
|
sema, ok := semaphores[serviceName]
|
|
if !ok {
|
|
return handler(srv, ss)
|
|
}
|
|
if !sema.TryAcquire() {
|
|
logger.Errorf("Too many requests for %s, exceeding concurrency limit (%d)", serviceName, cap(sema))
|
|
return errors.Errorf("too many requests for %s, exceeding concurrency limit (%d)", serviceName, cap(sema))
|
|
}
|
|
defer sema.Release()
|
|
return handler(srv, ss)
|
|
}
|
|
}
|
|
|
|
func getServiceName(methodName string) string {
|
|
index := strings.LastIndex(methodName, "/")
|
|
return methodName[:index]
|
|
}
|