Linx's Solution

Use Mqtt With Golang and Angular6

使用go作为MQTT服务端,angular作为客户端,进行消息交互

GO服务端这边

  • 需要能够监听MQTT Broker的tcp端口(默认:1883)
  • 需要能够监听MQTT Websocket的 ws 端口
  • 提供一个广播或者指定发送到某个客户端的方法

直接上代码


package utils

import (
	"encoding/json"
	"fmt"
	"io"
	"net"
	"net/http"
	"net/url"

	log "github.com/soesoftcn/process-manage/pkg/soelog"
	"github.com/surgemq/message"
	s "github.com/surgemq/surgemq/service"
	"golang.org/x/net/websocket"
)

type ServerMessage struct {
	IsBroadCast bool        `json:"isBroadCast"`
	ClientId    string      `json:"clientId"`
	MsgType     int         `json:"msgType"`
	Msg         interface{} `json:"msg"`
}

var MqttSvr *s.Server

func InitMQTTServer() {
	MqttSvr = &s.Server{
		KeepAlive:        300,           // seconds
		ConnectTimeout:   2,             // seconds
		SessionsProvider: "mem",         // keeps sessions in memory
		Authenticator:    "mockSuccess", // always succeed
		TopicsProvider:   "mem",         // keeps topic subscriptions in memory
	}

	mqttaddr := "tcp://:1883"
	addr := "tcp://127.0.0.1:1883"
	AddWebsocketHandler("/mqtt", addr)
	wsAddr := ":1882"
	go ListenAndServeWebsocket(wsAddr)

	err := MqttSvr.ListenAndServe(mqttaddr)
	if err != nil {
		log.Logger.Sugar().Errorf("surgemq/main: %v", err)
	}
}

/* copy from reader to websocket, this copies the binary frames as is */
func io_ws_copy(src io.Reader, dst *websocket.Conn) (int, error) {
	buffer := make([]byte, 2048)
	count := 0
	for {
		n, err := src.Read(buffer)
		if err != nil || n < 1 {
			return count, err
		}
		count += n
		err = websocket.Message.Send(dst, buffer[0:n])
		if err != nil {
			return count, err
		}
	}
	return count, nil
}

/* copy from websocket to writer, this copies the binary frames as is */
func io_copy_ws(src *websocket.Conn, dst io.Writer) (int, error) {
	var buffer []byte
	count := 0
	for {
		err := websocket.Message.Receive(src, &buffer)
		if err != nil {
			return count, err
		}
		n := len(buffer)
		count += n
		i, err := dst.Write(buffer)
		if err != nil || i < 1 {
			return count, err
		}
	}
	return count, nil
}

func WebsocketTcpProxy(ws *websocket.Conn, nettype string, host string) error {
	client, err := net.Dial(nettype, host)
	if err != nil {
		return err
	}
	defer client.Close()
	defer ws.Close()
	chDone := make(chan bool)

	go func() {
		io_ws_copy(client, ws)
		chDone <- true
	}()
	go func() {
		io_copy_ws(ws, client)
		chDone <- true
	}()
	<-chDone
	return nil
}

func AddWebsocketHandler(urlPattern string, uri string) error {
	sugar := log.Logger.Sugar()

	sugar.Infof("AddWebsocketHandler urlPattern=%s, uri=%s", urlPattern, uri)
	u, err := url.Parse(uri)
	if err != nil {
		sugar.Errorf("surgemq/main: %v", err)
		return err
	}

	h := func(ws *websocket.Conn) {
		WebsocketTcpProxy(ws, u.Scheme, u.Host)
	}
	http.Handle(urlPattern, websocket.Handler(h))
	return nil
}

func ListenAndServeWebsocket(addr string) error {
	return http.ListenAndServe(addr, nil)
}

func MessageFromServer(i ServerMessage) {
	msg := message.NewPublishMessage()
	payload, err := json.Marshal(i)
	if err != nil {
		fmt.Println(err)
	}
	msg.SetTopic([]byte("/superserver/notify"))
	msg.SetPayload(payload)

	MqttSvr.Publish(msg, nil)
}