1
0
mirror of synced 2025-12-19 18:05:53 -05:00
Files
sense-installer/pkg/api/clientgo_utils.go
Ashwathi Shiva 2ed59321e4 Preflight checks unit tests and bug fixes (#399)
* - committing changes lost during conflict resolution
- adding more tests
- marking tests to run in parallel to speed things up
- changed case of mongoDbUrl to mongodbUrl
- modified preflight -all output
2020-06-09 13:00:35 -04:00

728 lines
21 KiB
Go

package api
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/mitchellh/go-homedir"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)
var gracePeriod int64 = 0
var waitTimeout = 2 * time.Minute
type ClientGoUtils struct {
Verbose bool
}
func (p *ClientGoUtils) LogVerboseMessage(strMessage string, args ...interface{}) {
if p.Verbose || os.Getenv("QLIKSENSE_DEBUG") == "true" {
fmt.Printf(strMessage, args...)
}
}
func int32Ptr(i int32) *int32 { return &i }
func (p *ClientGoUtils) LoadKubeConfigAndNamespace() (string, []byte, error) {
LogDebugMessage("Reading .kube/config file...")
homeDir, err := homedir.Dir()
if err != nil {
err = fmt.Errorf("Unable to deduce home dir")
return "", nil, err
}
LogDebugMessage("Kube config location: %s\n\n", filepath.Join(homeDir, ".kube", "config"))
kubeConfig := filepath.Join(homeDir, ".kube", "config")
kubeConfigContents, err := ioutil.ReadFile(kubeConfig)
if err != nil {
err = fmt.Errorf("Unable to deduce home dir")
return "", nil, err
}
// retrieve namespace
namespace := GetKubectlNamespace()
// if namespace comes back empty, we will run checks in the default namespace
if namespace == "" {
namespace = "default"
}
return namespace, kubeConfigContents, nil
}
func (p *ClientGoUtils) RetryOnError(mf func() error) error {
return retry.OnError(wait.Backoff{
Duration: 1 * time.Second,
Factor: 1,
Jitter: 0.1,
Steps: 5,
}, func(err error) bool {
return k8serrors.IsConflict(err) || k8serrors.IsGone(err) || k8serrors.IsServerTimeout(err) ||
k8serrors.IsServiceUnavailable(err) || k8serrors.IsTimeout(err) || k8serrors.IsTooManyRequests(err)
}, mf)
}
func (p *ClientGoUtils) GetK8SClientSet(kubeconfig []byte, contextName string) (*kubernetes.Clientset, *rest.Config, error) {
var clientConfig *rest.Config
var err error
if len(kubeconfig) == 0 {
clientConfig, err = rest.InClusterConfig()
if err != nil {
err = fmt.Errorf("Unable to load in-cluster kubeconfig: %w", err)
return nil, nil, err
}
} else {
config, err := clientcmd.Load(kubeconfig)
if err != nil {
err = fmt.Errorf("Unable to load kubeconfig: %w", err)
return nil, nil, err
}
if contextName != "" {
config.CurrentContext = contextName
}
clientConfig, err = clientcmd.NewDefaultClientConfig(*config, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
err = fmt.Errorf("Unable to create client config from config: %w", err)
return nil, nil, err
}
}
clientset, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
err = fmt.Errorf("Unable to create clientset: %w", err)
return nil, nil, err
}
return clientset, clientConfig, nil
}
func (p *ClientGoUtils) CreatePreflightTestDeployment(clientset kubernetes.Interface, namespace string, depName string, imageName string) (*appsv1.Deployment, error) {
deploymentsClient := clientset.AppsV1().Deployments(namespace)
deployment := &appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: depName,
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(1),
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"app": "preflight-check",
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
"app": "preflight-check",
"label": "preflight-check-label",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "dep",
Image: imageName,
Ports: []apiv1.ContainerPort{
{
Name: "http",
Protocol: apiv1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}
// Create Deployment
var result *appsv1.Deployment
if err := p.RetryOnError(func() (err error) {
result, err = deploymentsClient.Create(deployment)
return err
}); err != nil {
err = fmt.Errorf("unable to create deployments in the %s namespace: %w", namespace, err)
return nil, err
}
p.LogVerboseMessage("Created deployment %q\n", result.GetObjectMeta().GetName())
return deployment, nil
}
func (p *ClientGoUtils) getDeployment(clientset kubernetes.Interface, namespace, depName string) (*appsv1.Deployment, error) {
deploymentsClient := clientset.AppsV1().Deployments(namespace)
var deployment *appsv1.Deployment
if err := p.RetryOnError(func() (err error) {
deployment, err = deploymentsClient.Get(depName, v1.GetOptions{})
return err
}); err != nil {
err = fmt.Errorf("unable to get deployments in the %s namespace: %w", namespace, err)
LogDebugMessage("%v\n", err)
return nil, err
}
return deployment, nil
}
func (p *ClientGoUtils) DeleteDeployment(clientset kubernetes.Interface, namespace, name string) error {
deploymentsClient := clientset.AppsV1().Deployments(namespace)
// Create Deployment
deletePolicy := v1.DeletePropagationForeground
deleteOptions := v1.DeleteOptions{
PropagationPolicy: &deletePolicy,
GracePeriodSeconds: &gracePeriod,
}
if err := p.RetryOnError(func() (err error) {
return deploymentsClient.Delete(name, &deleteOptions)
}); err != nil {
return err
}
if err := p.WaitForDeploymentToDelete(clientset, namespace, name); err != nil {
return err
}
p.LogVerboseMessage("Deleted deployment: %s\n", name)
return nil
}
func (p *ClientGoUtils) CreatePreflightTestService(clientset kubernetes.Interface, namespace string, svcName string) (*apiv1.Service, error) {
iptr := int32Ptr(80)
servicesClient := clientset.CoreV1().Services(namespace)
service := &apiv1.Service{
ObjectMeta: v1.ObjectMeta{
Name: svcName,
Namespace: namespace,
Labels: map[string]string{
"app": "preflight-check",
},
},
Spec: apiv1.ServiceSpec{
Ports: []apiv1.ServicePort{
{Name: "port1",
Port: *iptr,
},
},
Selector: map[string]string{
"app": "preflight-check",
},
ClusterIP: "",
},
}
var result *apiv1.Service
if err := p.RetryOnError(func() (err error) {
result, err = servicesClient.Create(service)
return err
}); err != nil {
return nil, err
}
p.LogVerboseMessage("Created service %q\n", result.GetObjectMeta().GetName())
return service, nil
}
func (p *ClientGoUtils) GetService(clientset kubernetes.Interface, namespace, svcName string) (*apiv1.Service, error) {
servicesClient := clientset.CoreV1().Services(namespace)
var svc *apiv1.Service
if err := p.RetryOnError(func() (err error) {
svc, err = servicesClient.Get(svcName, v1.GetOptions{})
return err
}); err != nil {
err = fmt.Errorf("unable to get services in the %s namespace: %w", namespace, err)
return nil, err
}
return svc, nil
}
func (p *ClientGoUtils) DeleteService(clientset kubernetes.Interface, namespace, name string) error {
servicesClient := clientset.CoreV1().Services(namespace)
// Create Deployment
deletePolicy := v1.DeletePropagationForeground
deleteOptions := v1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
if err := p.RetryOnError(func() (err error) {
return servicesClient.Delete(name, &deleteOptions)
}); err != nil {
return err
}
p.LogVerboseMessage("Deleted service: %s\n", name)
return nil
}
func (p *ClientGoUtils) DeletePod(clientset kubernetes.Interface, namespace, name string) error {
podsClient := clientset.CoreV1().Pods(namespace)
deletePolicy := v1.DeletePropagationForeground
deleteOptions := v1.DeleteOptions{
PropagationPolicy: &deletePolicy,
GracePeriodSeconds: &gracePeriod,
}
if err := p.RetryOnError(func() (err error) {
return podsClient.Delete(name, &deleteOptions)
}); err != nil {
return err
}
if err := p.waitForPodToDelete(clientset, namespace, name); err != nil {
return err
}
p.LogVerboseMessage("Deleted pod: %s\n", name)
return nil
}
func (p *ClientGoUtils) CreatePreflightTestPod(clientset kubernetes.Interface, namespace, podName, imageName string, secretNames map[string]string, commandToRun []string) (*apiv1.Pod, error) {
// build the pod definition we want to deploy
pod := &apiv1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"app": "preflight",
},
},
Spec: apiv1.PodSpec{
RestartPolicy: apiv1.RestartPolicyNever,
Containers: []apiv1.Container{
{
Name: "cnt",
Image: imageName,
ImagePullPolicy: apiv1.PullIfNotPresent,
Command: commandToRun,
},
},
},
}
if len(secretNames) > 0 {
for secretName, mountPath := range secretNames {
pod.Spec.Volumes = append(pod.Spec.Volumes, apiv1.Volume{
Name: secretName,
VolumeSource: apiv1.VolumeSource{
Secret: &apiv1.SecretVolumeSource{
SecretName: secretName,
Items: []apiv1.KeyToPath{
{
Key: secretName,
Path: filepath.Base(mountPath),
},
},
},
},
})
if len(pod.Spec.Containers) > 0 {
pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, apiv1.VolumeMount{
Name: secretName,
MountPath: filepath.Dir(mountPath),
ReadOnly: true,
})
}
}
}
// now create the pod in kubernetes cluster using the clientset
if err := p.RetryOnError(func() (err error) {
pod, err = clientset.CoreV1().Pods(namespace).Create(pod)
return err
}); err != nil {
return nil, err
}
p.LogVerboseMessage("Created pod: %s\n", pod.Name)
return pod, nil
}
func (p *ClientGoUtils) getPod(clientset kubernetes.Interface, namespace, podName string) (*apiv1.Pod, error) {
LogDebugMessage("Fetching pod: %s\n", podName)
var pod *apiv1.Pod
if err := p.RetryOnError(func() (err error) {
pod, err = clientset.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{})
return err
}); err != nil {
LogDebugMessage("%v\n", err)
return nil, err
}
return pod, nil
}
func (p *ClientGoUtils) GetPodLogs(clientset kubernetes.Interface, pod *apiv1.Pod) (string, error) {
return p.GetPodContainerLogs(clientset, pod, "")
}
func (p *ClientGoUtils) GetPodContainerLogs(clientset kubernetes.Interface, pod *apiv1.Pod, container string) (string, error) {
podLogOpts := apiv1.PodLogOptions{}
if container != "" {
podLogOpts.Container = container
}
LogDebugMessage("Retrieving logs for pod: %s namespace: %s\n", pod.GetName(), pod.Namespace)
req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
podLogs, err := req.Stream()
if err != nil {
return "", err
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", err
}
LogDebugMessage("Log from pod: %s\n", buf.String())
return buf.String(), nil
}
func (p *ClientGoUtils) waitForResource(checkFunc func() (interface{}, error), validateFunc func(interface{}) bool) error {
timeout := time.NewTicker(waitTimeout)
defer timeout.Stop()
OUT:
for {
r, err := checkFunc()
if err != nil {
return err
}
select {
case <-timeout.C:
break OUT
default:
if validateFunc(r) {
break OUT
}
}
time.Sleep(5 * time.Second)
}
return nil
}
func (p *ClientGoUtils) WaitForDeployment(clientset kubernetes.Interface, namespace string, pfDeployment *appsv1.Deployment) error {
var err error
depName := pfDeployment.GetName()
checkFunc := func() (interface{}, error) {
pfDeployment, err = p.getDeployment(clientset, namespace, depName)
if err != nil {
err = fmt.Errorf("unable to retrieve deployment: %s\n", depName)
return nil, err
}
return pfDeployment, nil
}
validateFunc := func(data interface{}) bool {
d := data.(*appsv1.Deployment)
return int(d.Status.ReadyReplicas) > 0
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return err
}
if int(pfDeployment.Status.ReadyReplicas) == 0 {
err = fmt.Errorf("deployment took longer than expected to spin up pods")
return err
}
return nil
}
func (p *ClientGoUtils) WaitForPod(clientset kubernetes.Interface, namespace string, pod *apiv1.Pod) error {
var err error
if len(pod.Spec.Containers) == 0 {
err = fmt.Errorf("there are no containers in the pod")
return err
}
podName := pod.Name
checkFunc := func() (interface{}, error) {
pod, err = p.getPod(clientset, namespace, podName)
if err != nil {
err = fmt.Errorf("unable to retrieve %s pod by name", podName)
return nil, err
}
return pod, nil
}
validateFunc := func(data interface{}) bool {
po := data.(*apiv1.Pod)
return po.Status.Phase == apiv1.PodRunning || po.Status.Phase == apiv1.PodSucceeded || po.Status.Phase == apiv1.PodFailed
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return err
}
if pod.Status.Phase != apiv1.PodRunning && pod.Status.Phase != apiv1.PodSucceeded && pod.Status.Phase != apiv1.PodFailed {
err = fmt.Errorf("container is taking much longer than expected")
return err
}
return nil
}
func (p *ClientGoUtils) WaitForPodToDie(clientset kubernetes.Interface, namespace string, pod *apiv1.Pod) error {
podName := pod.Name
checkFunc := func() (interface{}, error) {
po, err := p.getPod(clientset, namespace, podName)
if err != nil {
err = fmt.Errorf("unable to retrieve %s pod by name", podName)
return nil, err
}
return po, nil
}
validateFunc := func(r interface{}) bool {
po := r.(*apiv1.Pod)
return po.Status.Phase == apiv1.PodFailed || po.Status.Phase == apiv1.PodSucceeded
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return err
}
return nil
}
func (p *ClientGoUtils) waitForPodToDelete(clientset kubernetes.Interface, namespace, podName string) error {
checkFunc := func() (interface{}, error) {
po, err := p.getPod(clientset, namespace, podName)
if err != nil {
return nil, err
}
return po, nil
}
validateFunc := func(po interface{}) bool {
return false
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return nil
}
err := fmt.Errorf("delete pod is taking unusually long")
return err
}
func (p *ClientGoUtils) WaitForDeploymentToDelete(clientset kubernetes.Interface, namespace, deploymentName string) error {
checkFunc := func() (interface{}, error) {
dep, err := p.getDeployment(clientset, namespace, deploymentName)
if err != nil {
return nil, err
}
return dep, nil
}
validateFunc := func(po interface{}) bool {
return false
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return nil
}
err := fmt.Errorf("delete deployment is taking unusually long")
return err
}
func (p *ClientGoUtils) CreatePreflightTestSecret(clientset kubernetes.Interface, namespace, secretName string, secretData []byte) (*apiv1.Secret, error) {
var secret *apiv1.Secret
var err error
// build the secret defination we want to create
secretSpec := &apiv1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: secretName,
Namespace: namespace,
Labels: map[string]string{
"app": "preflight",
},
},
Data: map[string][]byte{
secretName: secretData,
},
}
// now create the secret in kubernetes cluster using the clientset
if err = p.RetryOnError(func() (err error) {
secret, err = clientset.CoreV1().Secrets(namespace).Create(secretSpec)
return err
}); err != nil {
return nil, err
}
p.LogVerboseMessage("Created Secret: %s\n", secret.Name)
return secret, nil
}
func (p *ClientGoUtils) DeleteK8sSecret(clientset kubernetes.Interface, namespace string, secretName string) error {
secretClient := clientset.CoreV1().Secrets(namespace)
deletePolicy := v1.DeletePropagationForeground
deleteOptions := v1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
err := secretClient.Delete(secretName, &deleteOptions)
if err != nil {
return err
}
p.LogVerboseMessage("Deleted Secret: %s\n", secretName)
return nil
}
func (p *ClientGoUtils) CreateStatefulSet(clientset kubernetes.Interface, namespace string, statefulSetName string, imageName string) (*appsv1.StatefulSet, error) {
statefulSetsClient := clientset.AppsV1().StatefulSets(namespace)
statefulset := &appsv1.StatefulSet{
ObjectMeta: v1.ObjectMeta{
Name: statefulSetName,
},
Spec: appsv1.StatefulSetSpec{
Replicas: int32Ptr(1),
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"app": "postflight-check",
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
"app": "postflight-check",
"label": "postflight-check-label",
},
},
Spec: apiv1.PodSpec{
InitContainers: []apiv1.Container{
{
Name: "migration",
Image: "ubuntu",
ImagePullPolicy: apiv1.PullIfNotPresent,
// Command: []string{"bash", "-c", "for i in {1..10}; do echo \"from init container...\"; sleep 1; done"},
Command: []string{"bash", "-c", "for i in {1..10}; do echo \"from init container...\"; sleep 1; exit 1; done"},
},
},
Containers: []apiv1.Container{
{
Name: "statefulset",
Image: imageName,
Ports: []apiv1.ContainerPort{
{
Name: "http",
Protocol: apiv1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}
// Create Statefulset
var result *appsv1.StatefulSet
if err := p.RetryOnError(func() (err error) {
result, err = statefulSetsClient.Create(statefulset)
return err
}); err != nil {
err = fmt.Errorf("unable to create statefulsets in the %s namespace: %w", namespace, err)
return nil, err
}
LogDebugMessage("Created statefulset %q\n", result.GetObjectMeta().GetName())
return statefulset, nil
}
func (p *ClientGoUtils) waitForStatefulSet(clientset kubernetes.Interface, namespace string, pfStatefulset *appsv1.StatefulSet) error {
var err error
statefulsetName := pfStatefulset.GetName()
checkFunc := func() (interface{}, error) {
pfStatefulset, err = p.getStatefulset(clientset, namespace, statefulsetName)
if err != nil {
err = fmt.Errorf("unable to retrieve stateful set: %s\n", statefulsetName)
return nil, err
}
return pfStatefulset, nil
}
validateFunc := func(data interface{}) bool {
s := data.(*appsv1.StatefulSet)
return int(s.Status.ReadyReplicas) > 0
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return err
}
if int(pfStatefulset.Status.ReadyReplicas) == 0 {
err = fmt.Errorf("deployment took longer than expected to spin up pods")
return err
}
return nil
}
func (p *ClientGoUtils) getStatefulset(clientset kubernetes.Interface, namespace, statefulsetName string) (*appsv1.StatefulSet, error) {
statefulsetsClient := clientset.AppsV1().StatefulSets(namespace)
var statefulset *appsv1.StatefulSet
if err := p.RetryOnError(func() (err error) {
statefulset, err = statefulsetsClient.Get(statefulsetName, v1.GetOptions{})
return err
}); err != nil {
err = fmt.Errorf("unable to get statefulsets in the %s namespace: %w", namespace, err)
fmt.Printf("%v\n", err)
return nil, err
}
return statefulset, nil
}
func (p *ClientGoUtils) deleteStatefulSet(clientset kubernetes.Interface, namespace, name string) error {
statefulsetClient := clientset.AppsV1().StatefulSets(namespace)
deletePolicy := v1.DeletePropagationForeground
deleteOptions := v1.DeleteOptions{
PropagationPolicy: &deletePolicy,
GracePeriodSeconds: &gracePeriod,
}
if err := p.RetryOnError(func() (err error) {
return statefulsetClient.Delete(name, &deleteOptions)
}); err != nil {
return err
}
if err := p.waitForStatefulsetToDelete(clientset, namespace, name); err != nil {
return err
}
LogDebugMessage("Deleted statefulset: %s\n", name)
return nil
}
func (p *ClientGoUtils) waitForStatefulsetToDelete(clientset kubernetes.Interface, namespace, statefulsetName string) error {
checkFunc := func() (interface{}, error) {
statefulset, err := p.getStatefulset(clientset, namespace, statefulsetName)
if err != nil {
return nil, err
}
return statefulset, nil
}
validateFunc := func(po interface{}) bool {
return false
}
if err := p.waitForResource(checkFunc, validateFunc); err != nil {
return nil
}
err := fmt.Errorf("delete statefulset is taking unusually long")
return err
}
func (p *ClientGoUtils) GetPodsAndPodLogsFromFailedInitContainer(clientset kubernetes.Interface, lbls map[string]string, namespace, containerName string) (map[string]string, error) {
set := labels.Set(lbls)
listOptions := v1.ListOptions{LabelSelector: set.AsSelector().String()}
podList, err := clientset.CoreV1().Pods(namespace).List(listOptions)
if err != nil {
err = fmt.Errorf("unable to get podlist: %v", err)
fmt.Printf("%s\n", err)
}
LogDebugMessage("%d Pods retrieved\n ", len(podList.Items))
// var logs map[string]string
logs := map[string]string{}
for _, pod := range podList.Items {
LogDebugMessage("pod: %v\n", pod.GetName())
LogDebugMessage("%d init containers retrieved\n", len(pod.Spec.InitContainers))
for _, cs := range pod.Status.InitContainerStatuses {
if cs.Name == containerName && ((cs.State.Terminated != nil && (cs.State.Terminated.Reason != "Completed" || cs.State.Terminated.ExitCode > 0)) ||
(cs.LastTerminationState.Terminated != nil && (cs.LastTerminationState.Terminated.Reason != "Completed" || cs.LastTerminationState.Terminated.ExitCode > 0))) {
logs[pod.GetName()], err = p.GetPodContainerLogs(clientset, &pod, cs.Name)
if err != nil {
err = fmt.Errorf("unable to get pod logs: %v", err)
fmt.Printf("%s\n", err)
return nil, err
}
}
}
}
return logs, nil
}