隐藏

C# MQTTnet使用心得和C# MQTT库M2Mqtt的使用方法

发布:2024/3/25 16:31:07作者:管理员 来源:本站 浏览次数:564

1.MQTTnet开源库,https://github.com/chkr1011/MQTTnet

MQTTnet.dll 2.8.5 下载

链接:https://pan.baidu.com/s/1BjQ41vOqQ6-JoY06m2uN4Q
提取码:a18s
(此版本示例:https://download.csdn.net/download/uaime/11359429 我尝试升级版本,很多方法不支持,没耐心看文档)

知识点(关于MQTT的详细说明 http://x5zj.com/?p=42)

2.服务器端和客户端(看上实例下载)

服务器端和客户端两个,他们需要保持长连接,主要是通过订阅和发布来进行消息的传递交换。

MQTT 服务端主要用于与多个客户端保持连接,并处理客户端的发布和订阅等逻辑。一般很少直接从服务端发送消息给客户端(可以使用 mqttServer.Publish(appMsg); 直接发送消息),多数情况下服务端都是转发主题匹配的客户端消息,在系统中起到一个中介的作用。

而客户端主要是通过向服务端订阅它感兴趣(主题)的消息,另一些客户端向服务端发布(主题)消息,服务端将订阅和发布的主题进行匹配,并将消息转发给匹配通过的客户端。

3连接,通信

(1)服务器端启动服务:

 public static IMqttServer mqttServer = null; //创建对象

// 配置一个mqtt服务.
var optionsBuilder = new MqttServerOptionsBuilder()
//连接记录数,默认 一般为2000
.WithConnectionBacklog(2000)
//服务器连接端口
.WithDefaultEndpointPort(port)
//连接验证器
.WithConnectionValidator(ValidatingMqttClients())
//持续会话
.WithPersistentSessions()

;
// 建立一个MQTT服务.
mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
mqttServer.ClientSubscribedTopic += MqttServer_ClientSubscribedTopic;
mqttServer.ClientUnsubscribedTopic += MqttServer_ClientUnsubscribedTopic;
mqttServer.ClientConnected += MqttServer_ClientConnected;
mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;

//启动mqtt服务
await Task.Run(async () => { await mqttServer.StartAsync(optionsBuilder.Build()); });

 

(2)客户端

与服务器端连接

连接语句:

 private IMqttClient mqttClient = null; //客户端对象

//实例化 创建客户端对象
var Factory = new MqttFactory();
mqttClient = Factory.CreateMqttClient();
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
mqttClient.Connected += MqttClient_Connected;
mqttClient.Disconnected += MqttClient_Disconnected;

//调用异步方法连接到服务端

await mqttClient.ConnectAsync(option());

option的信息设置:

public IMqttClientOptions option()
{
//连接到服务器前,获取所需要的MqttClientTcpOptions 对象的信息
var options = new MqttClientOptionsBuilder()
.WithClientId(ClientId)                    // clientid是设备id
.WithTcpServer(IP, Port)              //onenet ip:183.230.40.39    port:6002
.WithCredentials(UserName,pwd)      //username为产品id       密码为鉴权信息或者APIkey
//.WithTls()//服务器端没有启用加密协议,这里用tls的会提示协议异常
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(2000))
.Build();
return options;
}

服务端支持 ClientConnected、ClientDisconnected 和 ApplicationMessageReceived 事件,分别用来检查客户端连接、客户端断开以及接收客户端发来的消息。

客户端支持 Connected、Disconnected 和 ApplicationMessageReceived 事件,用来处理客户端与服务端连接、客户端从服务端断开以及客户端收到消息的事情。

通过 mqtt接入onenet,其实也就是把onenet 平台当做mqtt broker,连接上稍作改变就可以了。

客户端(接收服务推送)封装类参考:

MQQT服务的源码,下载源码地址:https://github.com/mqtt/mqtt.github.io/wiki/libraries

    public class MqttClientService
        {
     
            private static volatile MqttClientService _instance = null;
     
            private static readonly object LockHelper = new object();
     
            /// <summary>
            /// 创建单例模式
            /// </summary>
            /// <param name="ipAddress"></param>
            /// <returns></returns>
            public static MqttClientService CreateInstance(string ipAddress)
            {
                if (_instance == null)
                {
                    lock (LockHelper)
                    {
                        if (_instance == null)
                            _instance = new MqttClientService(ipAddress);
                    }
                }
                return _instance;
            }
     
            /// <summary>
            /// 实例化订阅客户端
            /// </summary>
            public MqttClient SubscribeClient { get; set; }
     
     
            public Action<Object, MqttMsgPublishEventArgs> ClientPublishReceivedAction { get; set; }
     
            public MqttClientService(string ipAddress)
            {
                // create client instance
                SubscribeClient = new MqttClient(IPAddress.Parse(ipAddress));
     
                // register to message received
                SubscribeClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
     
                string clientId = Guid.NewGuid().ToString();
     
                SubscribeClient.Connect(clientId);
                //订阅时修改路径(主题)即可
                // subscribe to the topic "/home/temperature" with QoS 2
                SubscribeClient.Subscribe(new string[] { "avatar/uploaded" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
            }
     
            void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
            {
                // handle message received
                ClientPublishReceivedAction.Invoke(sender, e);
            }
     
            public void client_MqttMsgPublish(string publishString)
            {
                SubscribeClient.Publish("avatar/signed", Encoding.UTF8.GetBytes(publishString), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
            }
        }

C#mqtt库M2Mqtt的使用方法

目前最好用的C#库是 eclipse出的M2Mqtt库,

主页链接: http://www.eclipse.org/paho/clients/dotnet/

项目的地址是 https://github.com/eclipse/paho.mqtt.m2mqtt

使用方式是在vs 的命令中输入 Install-Package M2Mqtt
 

...
 
// create client instance
MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS)); //注意是域名哦不支持IP
 
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
 
string strValue = Convert.ToString(value);
 
// publish a message on "/home/temperature" topic with QoS 2
client.Publish("/home/temperature", Encoding.UTF8.GetBytes(strValue), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
然而,却存在一个坑, MQTT_BROKER_ADDRESS 不能用IP地址,而要用域名,用IP地址会出现在Connect这一步连接失败。

 

放一个示例,可以直接使用的。

https://sdk.bce.baidu.com/console-sdk/mqtt-net.rar?responseContentDisposition=attachment

这个是百度云mqtt物接入的一个例子,用的是mqtt协议,下载后,把域名改一下,就可以用了。

mqtt服务器可以采用http://mosquitto.org/download/ 实现的mosquitto服务器。
Source

    mosquitto-1.6.3.tar.gz (319kB) (GPG signature)
    Git source code repository (github.com)

Older downloads are available at https://mosquitto.org/files/
Windows

    mosquitto-1.6.3-install-windows-x64.exe (~1.4 MB) (64-bit build, Windows Vista and up, built with Visual Studio Community 2017)
    mosquitto-1.6.3-install-windows-x32.exe (~1.4 MB) (32-bit build, Windows Vista and up, built with Visual Studio Community 2017)

 

组件下载:M2Mqtt.Net.dll (v4.3.0最新版 2019.7.16本地生成)

链接:https://pan.baidu.com/s/1lacs13v9nde8d2wrj2oU4g
提取码:l8cg


相关代码摘要:

    //创建客户端实例(两种方式选择,否则出错,域名需转换)
    MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS)); //主机为IP时
    MqttClient client = new MqttClient(MQTT_BROKER_ADDRESS); //当主机地址为域名时
     
    // 注册消息接收处理事件,还可以注册消息订阅成功、取消订阅成功、与服务器断开等事件处理函数
    client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
     
    //生成客户端ID并连接服务器
    string clientId = Guid.NewGuid().ToString();
    client.Connect(clientId);
     
    // 订阅主题"/home/temperature" 消息质量为 2
    // 三种参数:0 至多1次,1 至少1次,2 只有1次 (好像3保留)
    client.Subscribe(new string[] { "/home/temperature" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
     
    ...
     
    void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
    {
    //处理接收到的消息
    string msg = System.Text.Encoding.Default.GetString(e.Message);
            textBox1.AppendText("收到消息:" + msg + "\r\n");
    }
     
     
    //**************************************************************
     
    // 发布消息到主题 "/home/temperature" 消息质量为 2,不保留
    client.Publish("/home/temperature", Encoding.UTF8.GetBytes("hello"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);

客户端代码参考2

服务端搭建:apache-apollo-***-windows,下载链接:http://activemq.apache.org/apollo/download.html

解压:在 apache-apollo    bin文件夹中,作如下操作:

输入:apollo.cmd create test 创建虚拟test服务器主机,在该文件夹下会生成一个test目录
进入 apache-apollo-1.7.1\bin\test\bin文件,开启服务器。
进入test目录下etc文件夹中,找到apollo.xml
打开浏览器:输入http://127.0.0.1:61680,任意一个都可以,即可进入管理的web界面,账户:admin ,密码:password
测试连接,打开mqtt用户客户端,查看服务器显示情况
 

    using MQTTnet;
    using MQTTnet.Core;
    using MQTTnet.Core.Client;
    using MQTTnet.Core.Packets;
    using MQTTnet.Core.Protocol;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Windows.Forms;
     
    namespace mqtt_client
    {
        public partial class Form1 : Form
        {
            private MqttClient mqttClient = null;
            public Form1()
            {
                InitializeComponent();
                Task.Run(async () => { await ConnectMqttServerAsync(); });
            }
            private async Task ConnectMqttServerAsync()
            {
                if (mqttClient == null)
                {
                    mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
                    mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                    mqttClient.Connected += MqttClient_Connected;
                    mqttClient.Disconnected += MqttClient_Disconnected;
                }
     
                try
                {
                    var options = new MqttClientTcpOptions
                    {
                        Server = "127.0.0.1",
                        Port=61613,
                        ClientId = Guid.NewGuid().ToString(),
                        UserName = "admin",
                        Password = "password",
                        CleanSession = true
                    };
     
                    await mqttClient.ConnectAsync(options);
                }
                catch (Exception ex)
                {
                    Invoke((new Action(() =>
                    {
                        textBox2.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
                    })));
                }
            }
            private void MqttClient_Connected(object sender, EventArgs e)
            {
                Invoke((new Action(() =>
                {
                    textBox2.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
                })));
            }
     
            private void MqttClient_Disconnected(object sender, EventArgs e)
            {
                Invoke((new Action(() =>
                {
                    textBox2.AppendText("已断开MQTT连接!" + Environment.NewLine);
                })));
            }
     
            private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
            {
                Invoke((new Action(() =>
                {
                    textBox2.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
                })));
            }
     
            private void button1_Click(object sender, EventArgs e)
            {
                string topic = textBox1.Text.Trim();
     
                if (string.IsNullOrEmpty(topic))
                {
                    MessageBox.Show("订阅主题不能为空!");
                    return;
                }
     
                if (!mqttClient.IsConnected)
                {
                    MessageBox.Show("MQTT客户端尚未连接!");
                    return;
                }
     
                mqttClient.SubscribeAsync(new List<TopicFilter> {
                    new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)
                });
     
                textBox2.AppendText($"已订阅[{topic}]主题" + Environment.NewLine);
                textBox1.Enabled = false;
                button1.Enabled = false;
     
            }
     
            private void button2_Click(object sender, EventArgs e)
            {
                string topic = textBox4.Text.Trim();
     
                if (string.IsNullOrEmpty(topic))
                {
                    MessageBox.Show("发布主题不能为空!");
                    return;
                }
     
                string inputString = textBox3.Text.Trim();
                var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
                mqttClient.PublishAsync(appMsg);
     
            }
     
            private void Form1_Load(object sender, EventArgs e)
            {
                Sunisoft.IrisSkin.SkinEngine skin = new Sunisoft.IrisSkin.SkinEngine
                {
                    SkinFile = @"./RealOne.ssk"
                };
            }
        }
    }

 

参考资料:

https://github.com/mqtt/mqtt.github.io/wiki/libraries

https://github.com/leytton/m2mqtt (此项目下载后依赖项很多出问题,尝试修改没弄成)

仅部分内容摘自网络整理而成,如有侵权请联系删除。