发布:2024/3/25 16:31:07作者:管理员 来源:本站 浏览次数:564
1.MQTTnet开源库,https://github.com/chkr1011/MQTTnet
MQTTnet.dll 2.8.5 下载
链接:https://pan.baidu.com/s/1BjQ41vOqQ6-JoY06m2uN4Q
提取码:a18s
(此版本示例:https://download.csdn.net/download/uaime/11359429 我尝试升级版本,很多方法不支持,没耐心看文档)
知识点(关于MQTT的详细说明 http://x5zj.com/?p=42)
2.服务器端和客户端(看上实例下载)
服务器端和客户端两个,他们需要保持长连接,主要是通过订阅和发布来进行消息的传递交换。
MQTT 服务端主要用于与多个客户端保持连接,并处理客户端的发布和订阅等逻辑。一般很少直接从服务端发送消息给客户端(可以使用 mqttServer.Publish(appMsg); 直接发送消息),多数情况下服务端都是转发主题匹配的客户端消息,在系统中起到一个中介的作用。
而客户端主要是通过向服务端订阅它感兴趣(主题)的消息,另一些客户端向服务端发布(主题)消息,服务端将订阅和发布的主题进行匹配,并将消息转发给匹配通过的客户端。
3连接,通信
(1)服务器端启动服务:
public static IMqttServer mqttServer = null; //创建对象
// 配置一个mqtt服务.
var optionsBuilder = new MqttServerOptionsBuilder()
//连接记录数,默认 一般为2000
.WithConnectionBacklog(2000)
//服务器连接端口
.WithDefaultEndpointPort(port)
//连接验证器
.WithConnectionValidator(ValidatingMqttClients())
//持续会话
.WithPersistentSessions()
;
// 建立一个MQTT服务.
mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
mqttServer.ClientSubscribedTopic += MqttServer_ClientSubscribedTopic;
mqttServer.ClientUnsubscribedTopic += MqttServer_ClientUnsubscribedTopic;
mqttServer.ClientConnected += MqttServer_ClientConnected;
mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
//启动mqtt服务
await Task.Run(async () => { await mqttServer.StartAsync(optionsBuilder.Build()); });
(2)客户端
与服务器端连接
连接语句:
private IMqttClient mqttClient = null; //客户端对象
//实例化 创建客户端对象
var Factory = new MqttFactory();
mqttClient = Factory.CreateMqttClient();
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
mqttClient.Connected += MqttClient_Connected;
mqttClient.Disconnected += MqttClient_Disconnected;
//调用异步方法连接到服务端
await mqttClient.ConnectAsync(option());
option的信息设置:
public IMqttClientOptions option()
{
//连接到服务器前,获取所需要的MqttClientTcpOptions 对象的信息
var options = new MqttClientOptionsBuilder()
.WithClientId(ClientId) // clientid是设备id
.WithTcpServer(IP, Port) //onenet ip:183.230.40.39 port:6002
.WithCredentials(UserName,pwd) //username为产品id 密码为鉴权信息或者APIkey
//.WithTls()//服务器端没有启用加密协议,这里用tls的会提示协议异常
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(2000))
.Build();
return options;
}
服务端支持 ClientConnected、ClientDisconnected 和 ApplicationMessageReceived 事件,分别用来检查客户端连接、客户端断开以及接收客户端发来的消息。
客户端支持 Connected、Disconnected 和 ApplicationMessageReceived 事件,用来处理客户端与服务端连接、客户端从服务端断开以及客户端收到消息的事情。
通过 mqtt接入onenet,其实也就是把onenet 平台当做mqtt broker,连接上稍作改变就可以了。
客户端(接收服务推送)封装类参考:
MQQT服务的源码,下载源码地址:https://github.com/mqtt/mqtt.github.io/wiki/libraries
public class MqttClientService
{
private static volatile MqttClientService _instance = null;
private static readonly object LockHelper = new object();
/// <summary>
/// 创建单例模式
/// </summary>
/// <param name="ipAddress"></param>
/// <returns></returns>
public static MqttClientService CreateInstance(string ipAddress)
{
if (_instance == null)
{
lock (LockHelper)
{
if (_instance == null)
_instance = new MqttClientService(ipAddress);
}
}
return _instance;
}
/// <summary>
/// 实例化订阅客户端
/// </summary>
public MqttClient SubscribeClient { get; set; }
public Action<Object, MqttMsgPublishEventArgs> ClientPublishReceivedAction { get; set; }
public MqttClientService(string ipAddress)
{
// create client instance
SubscribeClient = new MqttClient(IPAddress.Parse(ipAddress));
// register to message received
SubscribeClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
string clientId = Guid.NewGuid().ToString();
SubscribeClient.Connect(clientId);
//订阅时修改路径(主题)即可
// subscribe to the topic "/home/temperature" with QoS 2
SubscribeClient.Subscribe(new string[] { "avatar/uploaded" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
// handle message received
ClientPublishReceivedAction.Invoke(sender, e);
}
public void client_MqttMsgPublish(string publishString)
{
SubscribeClient.Publish("avatar/signed", Encoding.UTF8.GetBytes(publishString), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
}
}
C#mqtt库M2Mqtt的使用方法
目前最好用的C#库是 eclipse出的M2Mqtt库,
主页链接: http://www.eclipse.org/paho/clients/dotnet/
项目的地址是 https://github.com/eclipse/paho.mqtt.m2mqtt
使用方式是在vs 的命令中输入 Install-Package M2Mqtt
...
// create client instance
MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS)); //注意是域名哦不支持IP
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
string strValue = Convert.ToString(value);
// publish a message on "/home/temperature" topic with QoS 2
client.Publish("/home/temperature", Encoding.UTF8.GetBytes(strValue), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
然而,却存在一个坑, MQTT_BROKER_ADDRESS 不能用IP地址,而要用域名,用IP地址会出现在Connect这一步连接失败。
放一个示例,可以直接使用的。
https://sdk.bce.baidu.com/console-sdk/mqtt-net.rar?responseContentDisposition=attachment
这个是百度云mqtt物接入的一个例子,用的是mqtt协议,下载后,把域名改一下,就可以用了。
mqtt服务器可以采用http://mosquitto.org/download/ 实现的mosquitto服务器。
Source
mosquitto-1.6.3.tar.gz (319kB) (GPG signature)
Git source code repository (github.com)
Older downloads are available at https://mosquitto.org/files/
Windows
mosquitto-1.6.3-install-windows-x64.exe (~1.4 MB) (64-bit build, Windows Vista and up, built with Visual Studio Community 2017)
mosquitto-1.6.3-install-windows-x32.exe (~1.4 MB) (32-bit build, Windows Vista and up, built with Visual Studio Community 2017)
组件下载:M2Mqtt.Net.dll (v4.3.0最新版 2019.7.16本地生成)
链接:https://pan.baidu.com/s/1lacs13v9nde8d2wrj2oU4g
提取码:l8cg
相关代码摘要:
//创建客户端实例(两种方式选择,否则出错,域名需转换)
MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS)); //主机为IP时
MqttClient client = new MqttClient(MQTT_BROKER_ADDRESS); //当主机地址为域名时
// 注册消息接收处理事件,还可以注册消息订阅成功、取消订阅成功、与服务器断开等事件处理函数
client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
//生成客户端ID并连接服务器
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
// 订阅主题"/home/temperature" 消息质量为 2
// 三种参数:0 至多1次,1 至少1次,2 只有1次 (好像3保留)
client.Subscribe(new string[] { "/home/temperature" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
...
void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
//处理接收到的消息
string msg = System.Text.Encoding.Default.GetString(e.Message);
textBox1.AppendText("收到消息:" + msg + "\r\n");
}
//**************************************************************
// 发布消息到主题 "/home/temperature" 消息质量为 2,不保留
client.Publish("/home/temperature", Encoding.UTF8.GetBytes("hello"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
客户端代码参考2
服务端搭建:apache-apollo-***-windows,下载链接:http://activemq.apache.org/apollo/download.html
解压:在 apache-apollo bin文件夹中,作如下操作:
输入:apollo.cmd create test 创建虚拟test服务器主机,在该文件夹下会生成一个test目录
进入 apache-apollo-1.7.1\bin\test\bin文件,开启服务器。
进入test目录下etc文件夹中,找到apollo.xml
打开浏览器:输入http://127.0.0.1:61680,任意一个都可以,即可进入管理的web界面,账户:admin ,密码:password
测试连接,打开mqtt用户客户端,查看服务器显示情况
using MQTTnet;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace mqtt_client
{
public partial class Form1 : Form
{
private MqttClient mqttClient = null;
public Form1()
{
InitializeComponent();
Task.Run(async () => { await ConnectMqttServerAsync(); });
}
private async Task ConnectMqttServerAsync()
{
if (mqttClient == null)
{
mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
mqttClient.Connected += MqttClient_Connected;
mqttClient.Disconnected += MqttClient_Disconnected;
}
try
{
var options = new MqttClientTcpOptions
{
Server = "127.0.0.1",
Port=61613,
ClientId = Guid.NewGuid().ToString(),
UserName = "admin",
Password = "password",
CleanSession = true
};
await mqttClient.ConnectAsync(options);
}
catch (Exception ex)
{
Invoke((new Action(() =>
{
textBox2.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
})));
}
}
private void MqttClient_Connected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
textBox2.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
})));
}
private void MqttClient_Disconnected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
textBox2.AppendText("已断开MQTT连接!" + Environment.NewLine);
})));
}
private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Invoke((new Action(() =>
{
textBox2.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
})));
}
private void button1_Click(object sender, EventArgs e)
{
string topic = textBox1.Text.Trim();
if (string.IsNullOrEmpty(topic))
{
MessageBox.Show("订阅主题不能为空!");
return;
}
if (!mqttClient.IsConnected)
{
MessageBox.Show("MQTT客户端尚未连接!");
return;
}
mqttClient.SubscribeAsync(new List<TopicFilter> {
new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)
});
textBox2.AppendText($"已订阅[{topic}]主题" + Environment.NewLine);
textBox1.Enabled = false;
button1.Enabled = false;
}
private void button2_Click(object sender, EventArgs e)
{
string topic = textBox4.Text.Trim();
if (string.IsNullOrEmpty(topic))
{
MessageBox.Show("发布主题不能为空!");
return;
}
string inputString = textBox3.Text.Trim();
var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
mqttClient.PublishAsync(appMsg);
}
private void Form1_Load(object sender, EventArgs e)
{
Sunisoft.IrisSkin.SkinEngine skin = new Sunisoft.IrisSkin.SkinEngine
{
SkinFile = @"./RealOne.ssk"
};
}
}
}
参考资料:
https://github.com/mqtt/mqtt.github.io/wiki/libraries
https://github.com/leytton/m2mqtt (此项目下载后依赖项很多出问题,尝试修改没弄成)
仅部分内容摘自网络整理而成,如有侵权请联系删除。