前言

k8s dashboard有实现web terminal的源代码,但是是使用sockjs实现的

实现

数据流向大概就是这样浏览器websocket -> k8s-client —> k8s-server -> kubelet -> container

websocket实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
func ExecuteBash(k8sClient kubernetes.Interface, namespace, podName, containerName string, cfg *rest.Config,
	conn *websocket.Conn, cols, rows uint16) error {
	req := k8sClient.CoreV1().RESTClient().Post().
		Resource("pods").
		Name(podName).
		Namespace(namespace).
		SubResource("exec")

	req.VersionedParams(&v1.PodExecOptions{
		Container: containerName,
		Command:   []string{"/bin/bash"},
		Stdin:     true,
		Stdout:    true,
		Stderr:    true,
		TTY:       true,
	}, scheme.ParameterCodec)

	exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL())

	if err != nil {
		return err
	}
	term := NewWebTerminal(conn, cols, rows)
	err = exec.Stream(remotecommand.StreamOptions{
		Stdin:             term,
		Stdout:            term,
		Stderr:            term,
		TerminalSizeQueue: term,
		Tty:               true,
	})

	if err != nil {
		return err
	}
	return nil
}

/*
1 web container terminal
2 20分钟无输入要退出bash进程
3 需要支持心跳,不然websocket过腾讯云lb只能持续2分钟
4 程序ctrl c之后,断开所有容器的bash进程 //未能成功
*/
type WebTerminal struct {
	conn     *websocket.Conn
	size     chan *remotecommand.TerminalSize
	timeout  time.Duration
	readNum  chan int
	canClose bool
	err      error
}

func init() {
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
	go func() {
		<-ch
		{
			fmt.Println(errorProcessinterrupt)
			cancel()
		}
	}()
}

//心跳检查的意义在于生产环境是lb  nginx ,有代理超时设置
//本地调试不会自动断开
const healthCheck = "[%heart_check%]"
const errorTimeOut = "\033[31m20 min 无输入 close\033[0m"
const errorProcessinterrupt = "\033[31m Process interrupt close\033[0m"

//web terminal的实现
func NewWebTerminal(conn *websocket.Conn, w, h uint16) *WebTerminal {
	term := &WebTerminal{
		conn:    conn,
		size:    make(chan *remotecommand.TerminalSize, 1),
		timeout: time.Minute * 20, //20分钟用户无输入发送eof
		readNum: make(chan int, 1),
	}
	term.size <- &remotecommand.TerminalSize{Width: w, Height: h}
	go func() {
		for {
			term.watchRead()
			if term.err != nil {
				fmt.Println("term.watchRead break")
				break
			}
		}
	}()

	return term
}

// 模拟stdout,stderr
func (a *WebTerminal) Write(p []byte) (n int, err error) {
	err = a.conn.WriteMessage(1, p)
	return len(p), err
}

// 模拟stdin
func (a *WebTerminal) Read(p []byte) (n int, err error) {
	if a.canClose {
		a.conn.WriteMessage(1, []byte(errorTimeOut))
		return 0, errors.New(errorTimeOut)
	}
	t, msg, err := a.conn.ReadMessage()
	defer func() {
		a.err = err
		a.readNum <- n
	}()
	//收到前端close信号之后 返回错误
	if t == websocket.CloseMessage {
		return 0, errors.New("websocket CloseMessage 8")
	}
	//前端心跳
	if string(msg) == healthCheck {
		return 0, nil
	}
	//复制k8s的代码,发送exit会eof
	if err != nil {
		n = copy(p, END_OF_TRANSMISSION)
		return n, err
	}
	n = copy(p, msg)
	return
}

func (a *WebTerminal) watchRead() {
	tf := time.After(a.timeout)
	select {
	case <-tf:
		a.canClose = true
	case <-a.readNum:
		return
	}
}

/*
// Next returns the new terminal size after the terminal has been resized. It returns nil when
// monitoring has been stopped.
注意 这里是设置终端大小的实现,终端字符个数是前端计算 传入后端的如果返回nil证明已经设置完成,不然k8s接口会频繁监听设置大小,导致终端卡顿
因此让k8s调用一次 然后关闭chan 返回nil
*/
func (a *WebTerminal) Next() *remotecommand.TerminalSize {
	if v, ok := <-a.size; ok {
		defer close(a.size)
		return v
	} else {
		return nil
	}
}