go_study/fabric-main/integration/nwo/runner/couchdb.go

290 lines
6.0 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package runner
import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"runtime/debug"
"strconv"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/pkg/errors"
"github.com/tedsuo/ifrit"
)
const (
CouchDBDefaultImage = "couchdb:3.3.2"
CouchDBUsername = "admin"
CouchDBPassword = "adminpw"
)
// CouchDB manages the execution of an instance of a dockerized CounchDB
// for tests.
type CouchDB struct {
Client *docker.Client
Image string
HostIP string
HostPort int
ContainerPort docker.Port
Name string
StartTimeout time.Duration
Binds []string
ErrorStream io.Writer
OutputStream io.Writer
creator string
containerID string
hostAddress string
containerAddress string
address string
mutex sync.Mutex
stopped bool
}
// Run runs a CouchDB container. It implements the ifrit.Runner interface
func (c *CouchDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
if c.Image == "" {
c.Image = CouchDBDefaultImage
}
if c.Name == "" {
c.Name = DefaultNamer()
}
if c.HostIP == "" {
c.HostIP = "127.0.0.1"
}
if c.ContainerPort == docker.Port("") {
c.ContainerPort = docker.Port("5984/tcp")
}
if c.StartTimeout == 0 {
c.StartTimeout = DefaultStartTimeout
}
if c.Client == nil {
client, err := docker.NewClientFromEnv()
if err != nil {
return err
}
c.Client = client
}
hostConfig := &docker.HostConfig{
AutoRemove: true,
PortBindings: map[docker.Port][]docker.PortBinding{
c.ContainerPort: {{
HostIP: c.HostIP,
HostPort: strconv.Itoa(c.HostPort),
}},
},
Binds: c.Binds,
}
container, err := c.Client.CreateContainer(
docker.CreateContainerOptions{
Name: c.Name,
Config: &docker.Config{
Image: c.Image,
Env: []string{
fmt.Sprintf("_creator=%s", c.creator),
fmt.Sprintf("COUCHDB_USER=%s", CouchDBUsername),
fmt.Sprintf("COUCHDB_PASSWORD=%s", CouchDBPassword),
},
},
HostConfig: hostConfig,
},
)
if err != nil {
return err
}
c.containerID = container.ID
err = c.Client.StartContainer(container.ID, nil)
if err != nil {
return err
}
defer c.Stop()
container, err = c.Client.InspectContainer(container.ID)
if err != nil {
return err
}
c.hostAddress = net.JoinHostPort(
container.NetworkSettings.Ports[c.ContainerPort][0].HostIP,
container.NetworkSettings.Ports[c.ContainerPort][0].HostPort,
)
c.containerAddress = net.JoinHostPort(
container.NetworkSettings.IPAddress,
c.ContainerPort.Port(),
)
streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()
go c.streamLogs(streamCtx)
containerExit := c.wait()
ctx, cancel := context.WithTimeout(context.Background(), c.StartTimeout)
defer cancel()
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID)
case <-containerExit:
return errors.New("container exited before ready")
case <-c.ready(ctx, c.hostAddress):
c.address = c.hostAddress
case <-c.ready(ctx, c.containerAddress):
c.address = c.containerAddress
}
cancel()
close(ready)
for {
select {
case err := <-containerExit:
return err
case <-sigCh:
if err := c.Stop(); err != nil {
return err
}
}
}
}
func endpointReady(ctx context.Context, url string) bool {
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return false
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
return err == nil && resp.StatusCode == http.StatusOK
}
func (c *CouchDB) ready(ctx context.Context, addr string) <-chan struct{} {
readyCh := make(chan struct{})
go func() {
url := fmt.Sprintf("http://%s:%s@%s/", CouchDBUsername, CouchDBPassword, addr)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
if endpointReady(ctx, url) {
close(readyCh)
return
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}()
return readyCh
}
func (c *CouchDB) wait() <-chan error {
exitCh := make(chan error)
go func() {
exitCode, err := c.Client.WaitContainer(c.containerID)
if err == nil {
err = fmt.Errorf("couchdb: process exited with %d", exitCode)
}
exitCh <- err
}()
return exitCh
}
func (c *CouchDB) streamLogs(ctx context.Context) {
if c.ErrorStream == nil && c.OutputStream == nil {
return
}
logOptions := docker.LogsOptions{
Context: ctx,
Container: c.containerID,
Follow: true,
ErrorStream: c.ErrorStream,
OutputStream: c.OutputStream,
Stderr: c.ErrorStream != nil,
Stdout: c.OutputStream != nil,
}
err := c.Client.Logs(logOptions)
if err != nil {
fmt.Fprintf(c.ErrorStream, "log stream ended with error: %s", err)
}
}
// Address returns the address successfully used by the readiness check.
func (c *CouchDB) Address() string {
return c.address
}
// HostAddress returns the host address where this CouchDB instance is available.
func (c *CouchDB) HostAddress() string {
return c.hostAddress
}
// ContainerAddress returns the container address where this CouchDB instance
// is available.
func (c *CouchDB) ContainerAddress() string {
return c.containerAddress
}
// ContainerID returns the container ID of this CouchDB
func (c *CouchDB) ContainerID() string {
return c.containerID
}
// Start starts the CouchDB container using an ifrit runner
func (c *CouchDB) Start() error {
c.creator = string(debug.Stack())
p := ifrit.Invoke(c)
select {
case <-p.Ready():
return nil
case err := <-p.Wait():
return err
}
}
// Stop stops and removes the CouchDB container
func (c *CouchDB) Stop() error {
c.mutex.Lock()
if c.stopped {
c.mutex.Unlock()
return errors.Errorf("container %s already stopped", c.containerID)
}
c.stopped = true
c.mutex.Unlock()
err := c.Client.StopContainer(c.containerID, 0)
if err != nil {
return err
}
return nil
}