reverseproxy: weighted_round_robin load balancing policy (#5579)

* added weighted round robin algorithm to load balancer

* added an adapt integration test for wrr and fixed a typo

* changed args format to Caddyfile args convention

* added provisioner and validator for wrr

* simplified the code and improved doc
This commit is contained in:
Saber Haj Rabiee 2023-06-20 10:42:58 -07:00 committed by GitHub
parent 424ae0f420
commit 361946eb0c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 218 additions and 1 deletions

View file

@ -0,0 +1,71 @@
:8884
reverse_proxy 127.0.0.1:65535 127.0.0.1:35535 {
lb_policy weighted_round_robin 10 1
lb_retries 5
lb_try_duration 10s
lb_try_interval 500ms
lb_retry_match {
path /foo*
method POST
}
lb_retry_match path /bar*
}
----------
{
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [
":8884"
],
"routes": [
{
"handle": [
{
"handler": "reverse_proxy",
"load_balancing": {
"retries": 5,
"retry_match": [
{
"method": [
"POST"
],
"path": [
"/foo*"
]
},
{
"path": [
"/bar*"
]
}
],
"selection_policy": {
"policy": "weighted_round_robin",
"weights": [
10,
1
]
},
"try_duration": 10000000000,
"try_interval": 500000000
},
"upstreams": [
{
"dial": "127.0.0.1:65535"
},
{
"dial": "127.0.0.1:35535"
}
]
}
]
}
]
}
}
}
}
}

View file

@ -40,6 +40,7 @@ func init() {
caddy.RegisterModule(RandomChoiceSelection{}) caddy.RegisterModule(RandomChoiceSelection{})
caddy.RegisterModule(LeastConnSelection{}) caddy.RegisterModule(LeastConnSelection{})
caddy.RegisterModule(RoundRobinSelection{}) caddy.RegisterModule(RoundRobinSelection{})
caddy.RegisterModule(WeightedRoundRobinSelection{})
caddy.RegisterModule(FirstSelection{}) caddy.RegisterModule(FirstSelection{})
caddy.RegisterModule(IPHashSelection{}) caddy.RegisterModule(IPHashSelection{})
caddy.RegisterModule(ClientIPHashSelection{}) caddy.RegisterModule(ClientIPHashSelection{})
@ -78,6 +79,90 @@ func (r *RandomSelection) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return nil return nil
} }
// WeightedRoundRobinSelection is a policy that selects
// a host based on weighted round-robin ordering.
type WeightedRoundRobinSelection struct {
// The weight of each upstream in order,
// corresponding with the list of upstreams configured.
Weights []int `json:"weights,omitempty"`
index uint32
totalWeight int
}
// CaddyModule returns the Caddy module information.
func (WeightedRoundRobinSelection) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "http.reverse_proxy.selection_policies.weighted_round_robin",
New: func() caddy.Module {
return new(WeightedRoundRobinSelection)
},
}
}
// UnmarshalCaddyfile sets up the module from Caddyfile tokens.
func (r *WeightedRoundRobinSelection) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
if len(args) == 0 {
return d.ArgErr()
}
for _, weight := range args {
weightInt, err := strconv.Atoi(weight)
if err != nil {
return d.Errf("invalid weight value '%s': %v", weight, err)
}
if weightInt < 1 {
return d.Errf("invalid weight value '%s': weight should be non-zero and positive", weight)
}
r.Weights = append(r.Weights, weightInt)
}
}
return nil
}
// Provision sets up r.
func (r *WeightedRoundRobinSelection) Provision(ctx caddy.Context) error {
for _, weight := range r.Weights {
r.totalWeight += weight
}
return nil
}
// Select returns an available host, if any.
func (r *WeightedRoundRobinSelection) Select(pool UpstreamPool, _ *http.Request, _ http.ResponseWriter) *Upstream {
if len(pool) == 0 {
return nil
}
if len(r.Weights) < 2 {
return pool[0]
}
var index, totalWeight int
currentWeight := int(atomic.AddUint32(&r.index, 1)) % r.totalWeight
for i, weight := range r.Weights {
totalWeight += weight
if currentWeight < totalWeight {
index = i
break
}
}
upstreams := make([]*Upstream, 0, len(r.Weights))
for _, upstream := range pool {
if !upstream.Available() {
continue
}
upstreams = append(upstreams, upstream)
if len(upstreams) == cap(upstreams) {
break
}
}
if len(upstreams) == 0 {
return nil
}
return upstreams[index%len(upstreams)]
}
// RandomChoiceSelection is a policy that selects // RandomChoiceSelection is a policy that selects
// two or more available hosts at random, then // two or more available hosts at random, then
// chooses the one with the least load. // chooses the one with the least load.
@ -762,6 +847,7 @@ var (
_ Selector = (*RandomChoiceSelection)(nil) _ Selector = (*RandomChoiceSelection)(nil)
_ Selector = (*LeastConnSelection)(nil) _ Selector = (*LeastConnSelection)(nil)
_ Selector = (*RoundRobinSelection)(nil) _ Selector = (*RoundRobinSelection)(nil)
_ Selector = (*WeightedRoundRobinSelection)(nil)
_ Selector = (*FirstSelection)(nil) _ Selector = (*FirstSelection)(nil)
_ Selector = (*IPHashSelection)(nil) _ Selector = (*IPHashSelection)(nil)
_ Selector = (*ClientIPHashSelection)(nil) _ Selector = (*ClientIPHashSelection)(nil)
@ -770,8 +856,11 @@ var (
_ Selector = (*HeaderHashSelection)(nil) _ Selector = (*HeaderHashSelection)(nil)
_ Selector = (*CookieHashSelection)(nil) _ Selector = (*CookieHashSelection)(nil)
_ caddy.Validator = (*RandomChoiceSelection)(nil) _ caddy.Validator = (*RandomChoiceSelection)(nil)
_ caddy.Provisioner = (*RandomChoiceSelection)(nil) _ caddy.Provisioner = (*RandomChoiceSelection)(nil)
_ caddy.Provisioner = (*WeightedRoundRobinSelection)(nil)
_ caddyfile.Unmarshaler = (*RandomChoiceSelection)(nil) _ caddyfile.Unmarshaler = (*RandomChoiceSelection)(nil)
_ caddyfile.Unmarshaler = (*WeightedRoundRobinSelection)(nil)
) )

View file

@ -74,6 +74,63 @@ func TestRoundRobinPolicy(t *testing.T) {
} }
} }
func TestWeightedRoundRobinPolicy(t *testing.T) {
pool := testPool()
wrrPolicy := WeightedRoundRobinSelection{
Weights: []int{3, 2, 1},
totalWeight: 6,
}
req, _ := http.NewRequest("GET", "/", nil)
h := wrrPolicy.Select(pool, req, nil)
if h != pool[0] {
t.Error("Expected first weighted round robin host to be first host in the pool.")
}
h = wrrPolicy.Select(pool, req, nil)
if h != pool[0] {
t.Error("Expected second weighted round robin host to be first host in the pool.")
}
// Third selected host is 1, because counter starts at 0
// and increments before host is selected
h = wrrPolicy.Select(pool, req, nil)
if h != pool[1] {
t.Error("Expected third weighted round robin host to be second host in the pool.")
}
h = wrrPolicy.Select(pool, req, nil)
if h != pool[1] {
t.Error("Expected fourth weighted round robin host to be second host in the pool.")
}
h = wrrPolicy.Select(pool, req, nil)
if h != pool[2] {
t.Error("Expected fifth weighted round robin host to be third host in the pool.")
}
h = wrrPolicy.Select(pool, req, nil)
if h != pool[0] {
t.Error("Expected sixth weighted round robin host to be first host in the pool.")
}
// mark host as down
pool[0].setHealthy(false)
h = wrrPolicy.Select(pool, req, nil)
if h != pool[1] {
t.Error("Expected to skip down host.")
}
// mark host as up
pool[0].setHealthy(true)
h = wrrPolicy.Select(pool, req, nil)
if h != pool[0] {
t.Error("Expected to select first host on availablity.")
}
// mark host as full
pool[1].countRequest(1)
pool[1].MaxRequests = 1
h = wrrPolicy.Select(pool, req, nil)
if h != pool[2] {
t.Error("Expected to skip full host.")
}
}
func TestLeastConnPolicy(t *testing.T) { func TestLeastConnPolicy(t *testing.T) {
pool := testPool() pool := testPool()
lcPolicy := LeastConnSelection{} lcPolicy := LeastConnSelection{}