290 lines
6.0 KiB
Go
290 lines
6.0 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package runner
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"runtime/debug"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
docker "github.com/fsouza/go-dockerclient"
|
|
"github.com/pkg/errors"
|
|
"github.com/tedsuo/ifrit"
|
|
)
|
|
|
|
const (
|
|
CouchDBDefaultImage = "couchdb:3.3.2"
|
|
CouchDBUsername = "admin"
|
|
CouchDBPassword = "adminpw"
|
|
)
|
|
|
|
// CouchDB manages the execution of an instance of a dockerized CounchDB
|
|
// for tests.
|
|
type CouchDB struct {
|
|
Client *docker.Client
|
|
Image string
|
|
HostIP string
|
|
HostPort int
|
|
ContainerPort docker.Port
|
|
Name string
|
|
StartTimeout time.Duration
|
|
Binds []string
|
|
|
|
ErrorStream io.Writer
|
|
OutputStream io.Writer
|
|
|
|
creator string
|
|
containerID string
|
|
hostAddress string
|
|
containerAddress string
|
|
address string
|
|
|
|
mutex sync.Mutex
|
|
stopped bool
|
|
}
|
|
|
|
// Run runs a CouchDB container. It implements the ifrit.Runner interface
|
|
func (c *CouchDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
|
|
if c.Image == "" {
|
|
c.Image = CouchDBDefaultImage
|
|
}
|
|
|
|
if c.Name == "" {
|
|
c.Name = DefaultNamer()
|
|
}
|
|
|
|
if c.HostIP == "" {
|
|
c.HostIP = "127.0.0.1"
|
|
}
|
|
|
|
if c.ContainerPort == docker.Port("") {
|
|
c.ContainerPort = docker.Port("5984/tcp")
|
|
}
|
|
|
|
if c.StartTimeout == 0 {
|
|
c.StartTimeout = DefaultStartTimeout
|
|
}
|
|
|
|
if c.Client == nil {
|
|
client, err := docker.NewClientFromEnv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.Client = client
|
|
}
|
|
|
|
hostConfig := &docker.HostConfig{
|
|
AutoRemove: true,
|
|
PortBindings: map[docker.Port][]docker.PortBinding{
|
|
c.ContainerPort: {{
|
|
HostIP: c.HostIP,
|
|
HostPort: strconv.Itoa(c.HostPort),
|
|
}},
|
|
},
|
|
Binds: c.Binds,
|
|
}
|
|
|
|
container, err := c.Client.CreateContainer(
|
|
docker.CreateContainerOptions{
|
|
Name: c.Name,
|
|
Config: &docker.Config{
|
|
Image: c.Image,
|
|
Env: []string{
|
|
fmt.Sprintf("_creator=%s", c.creator),
|
|
fmt.Sprintf("COUCHDB_USER=%s", CouchDBUsername),
|
|
fmt.Sprintf("COUCHDB_PASSWORD=%s", CouchDBPassword),
|
|
},
|
|
},
|
|
HostConfig: hostConfig,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.containerID = container.ID
|
|
|
|
err = c.Client.StartContainer(container.ID, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Stop()
|
|
|
|
container, err = c.Client.InspectContainer(container.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.hostAddress = net.JoinHostPort(
|
|
container.NetworkSettings.Ports[c.ContainerPort][0].HostIP,
|
|
container.NetworkSettings.Ports[c.ContainerPort][0].HostPort,
|
|
)
|
|
c.containerAddress = net.JoinHostPort(
|
|
container.NetworkSettings.IPAddress,
|
|
c.ContainerPort.Port(),
|
|
)
|
|
|
|
streamCtx, streamCancel := context.WithCancel(context.Background())
|
|
defer streamCancel()
|
|
go c.streamLogs(streamCtx)
|
|
|
|
containerExit := c.wait()
|
|
ctx, cancel := context.WithTimeout(context.Background(), c.StartTimeout)
|
|
defer cancel()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID)
|
|
case <-containerExit:
|
|
return errors.New("container exited before ready")
|
|
case <-c.ready(ctx, c.hostAddress):
|
|
c.address = c.hostAddress
|
|
case <-c.ready(ctx, c.containerAddress):
|
|
c.address = c.containerAddress
|
|
}
|
|
|
|
cancel()
|
|
close(ready)
|
|
|
|
for {
|
|
select {
|
|
case err := <-containerExit:
|
|
return err
|
|
case <-sigCh:
|
|
if err := c.Stop(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func endpointReady(ctx context.Context, url string) bool {
|
|
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
|
|
return err == nil && resp.StatusCode == http.StatusOK
|
|
}
|
|
|
|
func (c *CouchDB) ready(ctx context.Context, addr string) <-chan struct{} {
|
|
readyCh := make(chan struct{})
|
|
go func() {
|
|
url := fmt.Sprintf("http://%s:%s@%s/", CouchDBUsername, CouchDBPassword, addr)
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for {
|
|
if endpointReady(ctx, url) {
|
|
close(readyCh)
|
|
return
|
|
}
|
|
select {
|
|
case <-ticker.C:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return readyCh
|
|
}
|
|
|
|
func (c *CouchDB) wait() <-chan error {
|
|
exitCh := make(chan error)
|
|
go func() {
|
|
exitCode, err := c.Client.WaitContainer(c.containerID)
|
|
if err == nil {
|
|
err = fmt.Errorf("couchdb: process exited with %d", exitCode)
|
|
}
|
|
exitCh <- err
|
|
}()
|
|
|
|
return exitCh
|
|
}
|
|
|
|
func (c *CouchDB) streamLogs(ctx context.Context) {
|
|
if c.ErrorStream == nil && c.OutputStream == nil {
|
|
return
|
|
}
|
|
|
|
logOptions := docker.LogsOptions{
|
|
Context: ctx,
|
|
Container: c.containerID,
|
|
Follow: true,
|
|
ErrorStream: c.ErrorStream,
|
|
OutputStream: c.OutputStream,
|
|
Stderr: c.ErrorStream != nil,
|
|
Stdout: c.OutputStream != nil,
|
|
}
|
|
|
|
err := c.Client.Logs(logOptions)
|
|
if err != nil {
|
|
fmt.Fprintf(c.ErrorStream, "log stream ended with error: %s", err)
|
|
}
|
|
}
|
|
|
|
// Address returns the address successfully used by the readiness check.
|
|
func (c *CouchDB) Address() string {
|
|
return c.address
|
|
}
|
|
|
|
// HostAddress returns the host address where this CouchDB instance is available.
|
|
func (c *CouchDB) HostAddress() string {
|
|
return c.hostAddress
|
|
}
|
|
|
|
// ContainerAddress returns the container address where this CouchDB instance
|
|
// is available.
|
|
func (c *CouchDB) ContainerAddress() string {
|
|
return c.containerAddress
|
|
}
|
|
|
|
// ContainerID returns the container ID of this CouchDB
|
|
func (c *CouchDB) ContainerID() string {
|
|
return c.containerID
|
|
}
|
|
|
|
// Start starts the CouchDB container using an ifrit runner
|
|
func (c *CouchDB) Start() error {
|
|
c.creator = string(debug.Stack())
|
|
p := ifrit.Invoke(c)
|
|
|
|
select {
|
|
case <-p.Ready():
|
|
return nil
|
|
case err := <-p.Wait():
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Stop stops and removes the CouchDB container
|
|
func (c *CouchDB) Stop() error {
|
|
c.mutex.Lock()
|
|
if c.stopped {
|
|
c.mutex.Unlock()
|
|
return errors.Errorf("container %s already stopped", c.containerID)
|
|
}
|
|
c.stopped = true
|
|
c.mutex.Unlock()
|
|
|
|
err := c.Client.StopContainer(c.containerID, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|