2025-10-11 18:25:59 +08:00

522 lines
21 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MQTTnet;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using XiaoZhiSharp.MQTT.Common;
namespace XiaoZhiSharp.MQTT
{
/// <summary>
/// 连接成功事件
/// </summary>
/// <param name="source"></param>
/// <param name="e"></param>
public delegate void ConnectedHandler(object source, MqttClientConnectedEventArgs arg);
/// <summary>
/// 断开连接成功事件
/// </summary>
/// <param name="source"></param>
/// <param name="e"></param>
public delegate void DisconnectedHandler(object source, MqttClientDisconnectedEventArgs arg);
/// <summary>
/// 数据接收事件
/// </summary>
/// <param name="source"></param>
/// <param name="e"></param>
public delegate void ReceivedHandler(object source, MqttApplicationMessageReceivedEventArgs arg);
/// <summary>
/// MQTT通讯相关的工具类
/// </summary>
public class MqttNetClient
{
#region
private MqttClientFactory mqttFactory = null;
/// <summary>
/// 客户端
/// </summary>
private IMqttClient _MqttClient = null;
private string mqttServerUrl = "127.0.0.1";
private int port = 8085;
/// <summary>
/// 记录日志、输出、保存等操作
/// </summary>
private Action<ResultMqtt> _Callback = null;
#endregion
public bool IsConnected { get { return _MqttClient != null ? _MqttClient.IsConnected : false; } }
public MqttNetClient(string mqttServerUrl, int port)
{
var logger = new MqttNetEventLogger();
MqttNetConsoleLogger.ForwardToConsole(logger);
mqttFactory = new MqttClientFactory(logger);
this.mqttServerUrl = mqttServerUrl;
this.port = port;
}
/// <summary>
/// 创建MQTTClient并运行
/// </summary>
/// <param name="mqttClientOptionsBuilder">MQTTClient连接配置</param>
/// <param name="callback">信息处理逻辑</param>
/// <returns></returns>
public async Task<ResultMqtt> CreateMQTTClientAndStart(MqttClientOptionsBuilder mqttClientOptionsBuilder, Action<ResultMqtt> callback)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
_Callback = callback;
try
{
MqttClientOptions options = mqttClientOptionsBuilder.Build();
_MqttClient = mqttFactory.CreateMqttClient();
RegisterEvents(_MqttClient);
await _MqttClient.ConnectAsync(options); // 连接
if (_MqttClient.IsConnected)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 简易创建MQTTClient并运行
/// </summary>
/// <param name="mqttServerUrl">mqttServer的Url</param>
/// <param name="port">mqttServer的端口</param>
/// <param name="userName">认证用用户名</param>
/// <param name="userPassword">认证用密码</param>
/// <param name="callback">信息处理逻辑</param>
/// <returns></returns>
public async Task<ResultMqtt> CreateMQTTClientAndStart(string mqttServerUrl, int port, string userName, string userPassword, Action<ResultMqtt> callback)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
_Callback = callback;
try
{
MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
if (!string.IsNullOrEmpty(userName))
{
mqttClientOptionsBuilder.WithCredentials(userName, userPassword);// 设置鉴权参数
}
mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N"));// 设置客户端序列号
mqttClientOptionsBuilder.WithCleanSession(true);
mqttClientOptionsBuilder.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0));//默认15秒
mqttClientOptionsBuilder.WithTimeout(TimeSpan.FromSeconds(2));//默认100秒
var isWebSocket = false;
if (isWebSocket)
{
mqttClientOptionsBuilder.WithWebSocketServer(a => a.WithUri(mqttServerUrl));// 设置MQTT服务器地址(带port时,或者启动Server时,不能在emqx里显示信息)
}
else
{
mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port);// 设置MQTT服务器地址(带port时,或者启动Server时,不能在emqx里显示信息)
}
MqttClientOptions options = mqttClientOptionsBuilder.Build();
_MqttClient = mqttFactory.CreateMqttClient();
RegisterEvents(_MqttClient);
await _MqttClient.ConnectAsync(options); // 连接
if (_MqttClient.IsConnected)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
private void RegisterEvents(IMqttClient mqttClient)
{
mqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件
mqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件)
mqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件
}
/// <summary>
/// 关闭MQTTClient
/// </summary>
public async Task DisconnectAsync()
{
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
if (_MqttClient != null && _MqttClient.IsConnected)
{
await _MqttClient.DisconnectAsync();
_MqttClient.Dispose();
_MqttClient = null;
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_失败MQTTClient未开启连接"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 重连
/// </summary>
/// <returns></returns>
public async Task ReconnectAsync()
{
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
if (_MqttClient != null)
{
await _MqttClient.ReconnectAsync();
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_失败未设置MQTTClient连接"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="topic">主题</param>
public async void SubscribeAsync(string topic)
{
/*
Console.WriteLine("### CONNECTED WITH SERVER ###");
await _MqttClient.SubscribeAsync("#");
Console.WriteLine("### SUBSCRIBED ###");
*/
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
/*
var mqttClientSubscribeOptionsBuilder = mqttFactory.CreateSubscribeOptionsBuilder();
mqttClientSubscribeOptionsBuilder = mqttClientSubscribeOptionsBuilder.WithTopicFilter(
f =>
{
f.WithTopic(topic);
});
MqttClientSubscribeOptions mqttClientSubscribeOptions = mqttClientSubscribeOptionsBuilder.Build();
await _MqttClient.SubscribeAsync(mqttClientSubscribeOptions, CancellationToken.None);
*/
MqttTopicFilter topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithAtLeastOnceQoS()
.Build();
await _MqttClient.SubscribeAsync(topicFilter, CancellationToken.None);
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了订阅'{topic}'_成功"
};
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了订阅'{topic}'_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 退订阅
/// </summary>
/// <param name="topic">主题</param>
public async void UnsubscribeAsync(string topic)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
await _MqttClient.UnsubscribeAsync(topic, CancellationToken.None);
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了退订'{topic}'_成功"
};
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行退订'{topic}'_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 发布消息( 必须在成功连接以后才生效 )
/// </summary>
/// <param name="topic">主题</param>
/// <param name="msg">信息</param>
/// <param name="retained">是否保留</param>
/// <returns></returns>
public async Task PublishAsync(string topic, string msg, bool retained)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
/* https://zhuanlan.zhihu.com/p/671565499
* MQTT 定义了三个 服务质量QoSQuality of Service等级分别为QoS 0默认最多交付一次、QoS 1至少交付一次、QoS 2只交付一次。
QoS 0 最多交付一次:
QoS 0 是最低的QoS等级。QoS 0消息即发即弃不需要等待确认不需要存储和重传
因此对于接收方来说,永远都不需要担心收到重复的消息。
缺点是当我们使用QoS 0传递消息时消息的可靠性完全依赖于底层的TCP协议。而TCP只能保证在连接稳定不关闭的情况下消息的可靠到达
一旦出现连接关闭、重置仍有可能丢失当前处于网络链路或操作系统底层缓冲区中的消息这也是QoS 0消息最主要的丢失场景。
QoS 1 至少交付一次:
可以保证消息到达,所以适合传输一些较为重要的数据,比如下达关键指令、更新重要的有实时性要求的状态等。
缺点是可能会导致消息重复所以当我们选择使用QoS 1时还需要能够处理消息的重复或者能够允许消息的重复。
QoS 2 恰好交付一次:
既可以保证消息到达,也可以保证消息不会重复,但传输成本最高。
如果我们不愿意自行实现去重方案并且能够接受QoS 2带来的额外开销那么QoS 2将是一个合适的选择。
通常我们会在金融、航空等行业场景下会更多地见到QoS 2的使用。
*/
/*
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("A/B/C")
.WithPayload("Hello World")
.WithRetainFlag(retained)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
*/
MqttApplicationMessageBuilder mqttAppMsgBuilder = new MqttApplicationMessageBuilder();
mqttAppMsgBuilder.WithTopic(topic); // 主题
mqttAppMsgBuilder.WithPayload(msg); // 信息
mqttAppMsgBuilder.WithRetainFlag(retained); // 保留
mqttAppMsgBuilder.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce);
MqttApplicationMessage messageObj = mqttAppMsgBuilder.Build();
if (_MqttClient.IsConnected)
{
await _MqttClient.PublishAsync(messageObj, CancellationToken.None);
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>执行了发布信息_成功主题:'{topic}',信息:'{msg}',是否保留:'{retained}'"
};
}
else
{
// 未连接
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了发布信息_失败MQTTClient未开启连接"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了发布信息_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
#region
/// <summary>
/// 服务器连接事件
/// </summary>
private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
{
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>已连接到MQTT服务器"
});
OnConnected(arg);
return Task.CompletedTask;//CompletedTask.Instance;
}
/// <summary>
/// 服务器断开事件(可以写入重连事件)
/// </summary>
private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
{
/*
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await _MqttClient.ConnectAsync(clientOptions);//重新连接
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
*/
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>已断开与MQTT服务器连接"
});
OnDisconnected(arg);
return Task.CompletedTask;//CompletedTask.Instance;
}
/// <summary>
/// 接收消息事件
/// </summary>
private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {arg.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {arg.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {arg.ApplicationMessage.Retain}");
Console.WriteLine();
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主题:'{arg.ApplicationMessage.Topic}'消息等级Qos[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]",
ResultObject1 = arg.ApplicationMessage.Topic,
ResultObject2 = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)
});
OnReceived(arg);
return Task.CompletedTask;//CompletedTask.Instance;
}
#endregion
#region Session Handles
public event ConnectedHandler Connected;
public event DisconnectedHandler Disconnected;
public event ReceivedHandler Received;
protected virtual void OnConnected(MqttClientConnectedEventArgs arg)
{
if (this.Connected != null)
{
this.Connected(this, arg);
}
}
protected virtual void OnDisconnected(MqttClientDisconnectedEventArgs arg)
{
if (this.Disconnected != null)
{
this.Disconnected(this, arg);
}
}
protected virtual void OnReceived(MqttApplicationMessageReceivedEventArgs arg)
{
if (this.Received != null)
{
this.Received(this, arg);
}
}
#endregion Session Handles
}
}