Commit 781640ec authored by Brad Davidson's avatar Brad Davidson Committed by Brad Davidson

Fix syncing empty list of apiserver addresses during initial startup

Also add more debug logging to the sync process. Signed-off-by: 's avatarBrad Davidson <brad.davidson@rancher.com>
parent 3ce7ca75
...@@ -38,7 +38,11 @@ import ( ...@@ -38,7 +38,11 @@ import (
) )
var ( var (
endpointDebounceDelay = time.Second // endpointDebounceDelay sets how long we wait before updating apiserver
// addresses when the kubernetes endpoint list changes. When the apiserver is
// starting up it adds then removes then re-adds itself a few times in quick
// succession, and we want to avoid closing connections unnecessarily.
endpointDebounceDelay = 3 * time.Second
defaultDialer = net.Dialer{} defaultDialer = net.Dialer{}
) )
...@@ -335,14 +339,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan ...@@ -335,14 +339,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
<-done <-done
}() }()
var cancelUpdate context.CancelFunc
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
if cancelUpdate != nil {
cancelUpdate()
}
return return
case ev, ok := <-watch.ResultChan(): case ev, ok := <-watch.ResultChan():
endpoint, ok := ev.Object.(*v1.Endpoints) endpoint, ok := ev.Object.(*v1.Endpoints)
...@@ -351,20 +350,15 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan ...@@ -351,20 +350,15 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
continue continue
} }
if cancelUpdate != nil {
cancelUpdate()
}
var debounceCtx context.Context
debounceCtx, cancelUpdate = context.WithCancel(ctx)
// When joining the cluster, the apiserver adds, removes, and then re-adds itself to // When joining the cluster, the apiserver adds, removes, and then re-adds itself to
// the endpoint list several times. This causes a bit of thrashing if we react to // the endpoint list several times. This causes a bit of thrashing if we react to
// endpoint changes immediately. Instead, perform the endpoint update in a // endpoint changes immediately. Instead, perform the endpoint update in a
// goroutine that sleeps for a short period before checking for changes and updating // goroutine that sleeps for a short period before checking for changes and updating
// the proxy addresses. If another update occurs, the previous update operation // the proxy addresses. If another update occurs, the previous update operation
// will be cancelled and a new one queued. // will be cancelled and a new one queued.
go syncProxyAddresses(debounceCtx, util.GetAddresses(endpoint)) addresses := util.GetAddresses(endpoint)
logrus.Debugf("Syncing apiserver addresses from tunnel watch: %v", addresses)
syncProxyAddresses(addresses)
} }
} }
} }
...@@ -478,15 +472,15 @@ func (a *agentTunnel) dialContext(ctx context.Context, network, address string) ...@@ -478,15 +472,15 @@ func (a *agentTunnel) dialContext(ctx context.Context, network, address string)
} }
// proxySyncer is a common signature for functions that sync the proxy address list with a context // proxySyncer is a common signature for functions that sync the proxy address list with a context
type proxySyncer func(ctx context.Context, addresses []string) type proxySyncer func(addresses []string)
// getProxySyncer returns a function that can be called to update the list of supervisors. // getProxySyncer returns a function that can be called to update the list of supervisors.
// This function is responsible for connecting to or disconnecting websocket tunnels, // This function is responsible for connecting to or disconnecting websocket tunnels,
// as well as updating the proxy loadbalancer server list. // as well as updating the proxy loadbalancer server list.
func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) proxySyncer { func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) proxySyncer {
disconnect := map[string]context.CancelFunc{} disconnect := map[string]context.CancelFunc{}
// Attempt to connect to supervisors, storing their cancellation function for later when we // Attempt to connect to inital list of addresses, storing their cancellation
// need to disconnect. // function for later when we need to disconnect.
for _, address := range proxy.SupervisorAddresses() { for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, wg, address, tlsConfig) conn := a.connect(ctx, wg, address, tlsConfig)
...@@ -495,43 +489,72 @@ func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tl ...@@ -495,43 +489,72 @@ func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tl
} }
} }
// return a function that can be called to update the address list. var cancelUpdate context.CancelFunc
// servers will be connected to or disconnected from as necessary,
// and the proxy addresses updated. // return a function that can be called to update the address list. servers will be
return func(debounceCtx context.Context, addresses []string) { // connected to or disconnected from as necessary, and the proxy addresses updated.
select { // The update is done in a goroutine that waits a short period in order to reduce
case <-time.After(endpointDebounceDelay): // thrashing during apiserver startup. Each time the function is called, the context for
case <-debounceCtx.Done(): // the goroutine started by the previous call is cancelled to prevent it from updating
// if the delay has not yet expired.
return func(addresses []string) {
if len(addresses) == 0 {
logrus.Debugf("Skipping apiserver addresses sync: %v", addresses)
return return
} }
// Compare list of supervisor addresses before and after syncing apiserver if cancelUpdate != nil {
// endpoints into the proxy to figure out which supervisors we need to connect to cancelUpdate()
// or disconnect from. Note that the addresses we were passed will not match
// the supervisor addresses if the supervisor and apiserver are on different ports -
// they must be round-tripped through proxy.Update before comparing.
curAddresses := sets.New(proxy.SupervisorAddresses()...)
proxy.Update(addresses)
newAddresses := sets.New(proxy.SupervisorAddresses()...)
// add new servers
for address := range newAddresses.Difference(curAddresses) {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
logrus.Infof("Started tunnel to %s", address)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
} }
// remove old servers var debounceCtx context.Context
for address := range curAddresses.Difference(newAddresses) { debounceCtx, cancelUpdate = context.WithCancel(ctx)
if cancel, ok := disconnect[address]; ok {
cancel() go func() {
delete(disconnect, address) select {
logrus.Infof("Stopped tunnel to %s", address) case <-time.After(endpointDebounceDelay):
logrus.Debugf("Settled apiserver addresses sync: %v", addresses)
case <-debounceCtx.Done():
logrus.Debugf("Cancelled apiserver addresses sync: %v", addresses)
return
} }
}
// Compare list of supervisor addresses before and after syncing apiserver
// endpoints into the proxy to figure out which supervisors we need to connect to
// or disconnect from. Note that the addresses we were passed will not match
// the supervisor addresses if the supervisor and apiserver are on different ports -
// they must be round-tripped through proxy.Update before comparing.
curAddresses := sets.New(proxy.SupervisorAddresses()...)
proxy.Update(addresses)
newAddresses := sets.New(proxy.SupervisorAddresses()...)
addedAddresses := newAddresses.Difference(curAddresses)
removedAddresses := curAddresses.Difference(newAddresses)
if addedAddresses.Len() == 0 && removedAddresses.Len() == 0 {
return
}
logrus.Debugf("Sync apiserver addresses - connecting: %v, disconnecting: %v", addedAddresses.UnsortedList(), removedAddresses.UnsortedList())
// add new servers
for address := range addedAddresses {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
logrus.Infof("Started tunnel to %s", address)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
// remove old servers
for address := range removedAddresses {
if cancel, ok := disconnect[address]; ok {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}()
} }
} }
...@@ -550,10 +573,11 @@ func getAPIServersRequester(node *daemonconfig.Node, proxy proxy.Proxy, syncProx ...@@ -550,10 +573,11 @@ func getAPIServersRequester(node *daemonconfig.Node, proxy proxy.Proxy, syncProx
} }
} }
if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil { if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil || len(addresses) == 0 {
logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err) logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err)
} else { } else {
syncProxyAddresses(ctx, addresses) logrus.Debugf("Syncing apiserver addresses from server: %v", addresses)
syncProxyAddresses(addresses)
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment