Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ import (
"github.com/Shopify/toxiproxy/v2/toxics"
)

func stopBrowsersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.UserAgent(), "Mozilla/") {
http.Error(w, "User agent not allowed", 403)
} else {
next.ServeHTTP(w, r)
}
})
}

func timeoutMiddleware(next http.Handler) http.Handler {
return http.TimeoutHandler(next, 30*time.Second, "")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased timeout from 5 seconds to 30 seconds.

}

type ApiServer struct {
Collection *ProxyCollection
Metrics *metricsContainer
Expand Down Expand Up @@ -46,20 +60,6 @@ func (server *ApiServer) PopulateConfig(filename string) {
}
}

func stopBrowsersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.UserAgent(), "Mozilla/") {
http.Error(w, "User agent not allowed", 403)
} else {
next.ServeHTTP(w, r)
}
})
}

func timeoutMiddleware(next http.Handler) http.Handler {
return http.TimeoutHandler(next, 5*time.Second, "")
}

func (server *ApiServer) Listen(host string, port string) {
r := mux.NewRouter()
r.Use(hlog.NewHandler(*server.Logger))
Expand Down Expand Up @@ -153,6 +153,7 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http.
}

func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.Request) {
ctx := request.Context()
proxies := server.Collection.Proxies()

for _, proxy := range proxies {
Expand All @@ -161,13 +162,13 @@ func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.
return
}

proxy.Toxics.ResetToxics()
proxy.Toxics.ResetToxics(ctx)
}

response.WriteHeader(http.StatusNoContent)
_, err := response.Write(nil)
if err != nil {
log := zerolog.Ctx(request.Context())
log := zerolog.Ctx(ctx)
log.Warn().Err(err).Msg("ResetState: Failed to write headers to client")
}
}
Expand Down Expand Up @@ -414,21 +415,22 @@ func (server *ApiServer) ToxicUpdate(response http.ResponseWriter, request *http

func (server *ApiServer) ToxicDelete(response http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
ctx := request.Context()
log := zerolog.Ctx(ctx)

proxy, err := server.Collection.Get(vars["proxy"])
if server.apiError(response, err) {
return
}

err = proxy.Toxics.RemoveToxic(vars["toxic"])
err = proxy.Toxics.RemoveToxic(ctx, vars["toxic"])
if server.apiError(response, err) {
return
}

response.WriteHeader(http.StatusNoContent)
_, err = response.Write(nil)
if err != nil {
log := zerolog.Ctx(request.Context())
log.Warn().Err(err).Msg("ToxicDelete: Failed to write headers to client")
}
}
Expand Down
49 changes: 32 additions & 17 deletions link.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package toxiproxy

import (
"context"
"fmt"
"io"
"net"

Expand Down Expand Up @@ -183,23 +185,36 @@ func (link *ToxicLink) UpdateToxic(toxic *toxics.ToxicWrapper) {
}

// Remove an existing toxic from the chain.
func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
i := toxic.Index
func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapper) {
toxic_index := toxic.Index
log := zerolog.Ctx(ctx).
With().
Str("component", "ToxicLink").
Str("method", "RemoveToxic").
Str("toxic", toxic.Name).
Str("toxic_type", toxic.Type).
Int("toxic_index", toxic.Index).
Str("link_addr", fmt.Sprintf("%p", link)).
Str("toxic_stub_addr", fmt.Sprintf("%p", link.stubs[toxic_index])).
Str("prev_toxic_stub_addr", fmt.Sprintf("%p", link.stubs[toxic_index-1])).
Logger()

if link.stubs[i].InterruptToxic() {
if link.stubs[toxic_index].InterruptToxic() {
cleanup, ok := toxic.Toxic.(toxics.CleanupToxic)
if ok {
cleanup.Cleanup(link.stubs[i])
cleanup.Cleanup(link.stubs[toxic_index])
// Cleanup could have closed the stub.
if link.stubs[i].Closed() {
if link.stubs[toxic_index].Closed() {
log.Trace().Msg("Cleanup closed toxic and removed toxic")
// TODO: Check if cleanup happen would link.stubs recalculated?
return
}
}

log.Trace().Msg("Interrupt the previous toxic to update its output")
stop := make(chan bool)
// Interrupt the previous toxic to update its output
go func() {
stop <- link.stubs[i-1].InterruptToxic()
stop <- link.stubs[toxic_index-1].InterruptToxic()
}()

// Unblock the previous toxic if it is trying to flush
Expand All @@ -210,32 +225,32 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
select {
case interrupted = <-stop:
stopped = true
case tmp := <-link.stubs[i].Input:
case tmp := <-link.stubs[toxic_index].Input:
if tmp == nil {
link.stubs[i].Close()
link.stubs[toxic_index].Close()
if !stopped {
<-stop
}
return
}
link.stubs[i].Output <- tmp
link.stubs[toxic_index].Output <- tmp
}
}

// Empty the toxic's buffer if necessary
for len(link.stubs[i].Input) > 0 {
tmp := <-link.stubs[i].Input
for len(link.stubs[toxic_index].Input) > 0 {
tmp := <-link.stubs[toxic_index].Input
if tmp == nil {
link.stubs[i].Close()
link.stubs[toxic_index].Close()
return
}
link.stubs[i].Output <- tmp
link.stubs[toxic_index].Output <- tmp
}

link.stubs[i-1].Output = link.stubs[i].Output
link.stubs = append(link.stubs[:i], link.stubs[i+1:]...)
link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output
link.stubs = append(link.stubs[:toxic_index], link.stubs[toxic_index+1:]...)

go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
go link.stubs[toxic_index-1].Run(link.toxics.chain[link.direction][toxic_index-1])
}
}

Expand Down
19 changes: 14 additions & 5 deletions link_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package toxiproxy

import (
"context"
"encoding/binary"
"flag"
"io"
"os"
"testing"
"time"

Expand Down Expand Up @@ -81,6 +84,7 @@ func TestStubInitializaationWithToxics(t *testing.T) {
}

func TestAddRemoveStubs(t *testing.T) {
ctx := context.Background()
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, zerolog.Nop())
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
Expand Down Expand Up @@ -117,7 +121,7 @@ func TestAddRemoveStubs(t *testing.T) {
}

// Remove stubs
collection.chainRemoveToxic(toxic)
collection.chainRemoveToxic(ctx, toxic)
if cap(link.stubs[len(link.stubs)-1].Output) != 0 {
t.Fatalf("Link output buffer was not initialized as 0: %d", cap(link.stubs[0].Output))
}
Expand All @@ -134,6 +138,7 @@ func TestAddRemoveStubs(t *testing.T) {
}

func TestNoDataDropped(t *testing.T) {
ctx := context.Background()
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, zerolog.Nop())
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
Expand All @@ -159,17 +164,17 @@ func TestNoDataDropped(t *testing.T) {
}
link.input.Close()
}()
go func() {
go func(ctx context.Context) {
for {
select {
case <-done:
return
default:
collection.chainAddToxic(toxic)
collection.chainRemoveToxic(toxic)
collection.chainRemoveToxic(ctx, toxic)
}
}
}()
}(ctx)

buf := make([]byte, 2)
for i := 0; i < 64*1024; i++ {
Expand Down Expand Up @@ -238,7 +243,11 @@ func TestToxicity(t *testing.T) {

func TestStateCreated(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, zerolog.Nop())
log := zerolog.Nop()
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}
link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

Expand Down
70 changes: 49 additions & 21 deletions toxic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package toxiproxy

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -43,14 +44,14 @@ func NewToxicCollection(proxy *Proxy) *ToxicCollection {
return collection
}

func (c *ToxicCollection) ResetToxics() {
func (c *ToxicCollection) ResetToxics(ctx context.Context) {
c.Lock()
defer c.Unlock()

// Remove all but the first noop toxic
for dir := range c.chain {
for len(c.chain[dir]) > 1 {
c.chainRemoveToxic(c.chain[dir][1])
c.chainRemoveToxic(ctx, c.chain[dir][1])
}
}
}
Expand Down Expand Up @@ -158,16 +159,28 @@ func (c *ToxicCollection) UpdateToxicJson(
return nil, ErrToxicNotFound
}

func (c *ToxicCollection) RemoveToxic(name string) error {
func (c *ToxicCollection) RemoveToxic(ctx context.Context, name string) error {
log := zerolog.Ctx(ctx).
With().
Str("component", "ToxicCollection").
Str("method", "RemoveToxic").
Str("toxic", name).
Str("proxy", c.proxy.Name).
Logger()
log.Trace().Msg("Acquire locking...")
c.Lock()
defer c.Unlock()

log.Trace().Msg("Getting toxic by name...")
toxic := c.findToxicByName(name)
if toxic != nil {
c.chainRemoveToxic(toxic)
return nil
if toxic == nil {
log.Trace().Msg("Could not find toxic by name")
return ErrToxicNotFound
}
return ErrToxicNotFound

c.chainRemoveToxic(ctx, toxic)
log.Trace().Msg("Finished")
return nil
}

func (c *ToxicCollection) StartLink(
Expand Down Expand Up @@ -217,17 +230,17 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {
c.chain[dir] = append(c.chain[dir], toxic)

// Asynchronously add the toxic to each link
group := sync.WaitGroup{}
wg := sync.WaitGroup{}
for _, link := range c.links {
if link.direction == dir {
group.Add(1)
go func(link *ToxicLink) {
defer group.Done()
wg.Add(1)
go func(link *ToxicLink, wg *sync.WaitGroup) {
defer wg.Done()
link.AddToxic(toxic)
}(link)
}(link, &wg)
}
}
group.Wait()
wg.Wait()
}

func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) {
Expand All @@ -247,25 +260,40 @@ func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) {
group.Wait()
}

func (c *ToxicCollection) chainRemoveToxic(toxic *toxics.ToxicWrapper) {
func (c *ToxicCollection) chainRemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapper) {
log := zerolog.Ctx(ctx).
With().
Str("component", "ToxicCollection").
Str("method", "chainRemoveToxic").
Str("toxic", toxic.Name).
Str("direction", toxic.Direction.String()).
Logger()

dir := toxic.Direction
c.chain[dir] = append(c.chain[dir][:toxic.Index], c.chain[dir][toxic.Index+1:]...)
for i := toxic.Index; i < len(c.chain[dir]); i++ {
c.chain[dir][i].Index = i
}

// Asynchronously remove the toxic from each link
group := sync.WaitGroup{}
wg := sync.WaitGroup{}

event_array := zerolog.Arr()
for _, link := range c.links {
if link.direction == dir {
group.Add(1)
go func(link *ToxicLink) {
defer group.Done()
link.RemoveToxic(toxic)
}(link)
event_array = event_array.Str(fmt.Sprintf("Link[%p] %s", link, link.Direction()))
wg.Add(1)
go func(ctx context.Context, link *ToxicLink, log zerolog.Logger) {
defer wg.Done()
link.RemoveToxic(ctx, toxic)
}(ctx, link, log)
}
}
group.Wait()

log.Trace().
Array("links", event_array).
Msg("Waiting to update links")
wg.Wait()

toxic.Index = -1
}
Loading