527 lines
20 KiB
C#
Raw Normal View History

2025-10-11 18:25:59 +08:00
using MQTTnet;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Internal;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using XiaoZhiSharp.MQTT.Common;
using XiaoZhiSharp.Utils;
namespace XiaoZhiSharp.MQTT
{
/// <summary>
/// MQTT通讯相关的工具类
/// </summary>
public class MqttNetServer
{
#region
/// <summary>
/// MQTT服务
/// </summary>
private MqttServer _MqttServer = null;
/// <summary>
/// 记录日志、输出、保存等操作
/// </summary>
private Action<ResultMqtt> _Callback = null;
#endregion
public static void RunEmptyServer()
{
var mqttServer = new MqttServerFactory().CreateMqttServer(new MqttServerOptions());
mqttServer.StartAsync().GetAwaiter().GetResult();
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
public static void RunEmptyServerWithLogging()
{
var logger = new MqttNetEventLogger();
MqttNetConsoleLogger.ForwardToConsole(logger);
var mqttFactory = new MqttServerFactory(logger);
var mqttServer = mqttFactory.CreateMqttServer(new MqttServerOptions());
mqttServer.StartAsync().GetAwaiter().GetResult();
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
/// <summary>
/// 创建MQTTServer并运行
/// </summary>
public async Task<ResultMqtt> CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action<ResultMqtt> callback)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
_Callback = callback;
try
{
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
RegisterEvents(_MqttServer);
await _MqttServer.StartAsync(); // 开启服务
if (_MqttServer.IsStarted)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 简易创建MQTTServer并运行-不使用加密
/// </summary>
/// <param name="ip">IP</param>
/// <param name="port">端口</param>
/// <param name="withPersistentSessions">是否保持会话</param>
/// <param name="callback">处理方法</param>
/// <returns></returns>
public async Task<ResultMqtt> CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action<ResultMqtt> callback)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
_Callback = callback;
try
{
// Extend the timestamp for all messages from clients.
// Protect several topics from being subscribed from every client.
//var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
//options.ConnectionBacklog = 5;
//options.DefaultEndpointOptions.IsEnabled = true;
//options.TlsEndpointOptions.IsEnabled = false;
MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置
mqttServerOptionsBuilder.WithDefaultEndpoint();
mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP
mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号
//mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口
mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话
mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数
//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效
//{
// if (c.Username != uName || c.Password != uPwd)
// {
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
// }
//})
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
RegisterEvents(_MqttServer);
_MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完
await _MqttServer.StartAsync(); // 开启服务
if (_MqttServer.IsStarted)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
private void RegisterEvents(MqttServer mqttServer)
{
mqttServer.StartedAsync += StartedHandle; // 服务器开启事件
mqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
mqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
mqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
mqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
mqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
mqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
/*------------------------------------------*/
mqttServer.RetainedMessageChangedAsync += MqttServer_RetainedMessageChangedAsync;
mqttServer.RetainedMessagesClearedAsync += MqttServer_RetainedMessagesClearedAsync;
mqttServer.LoadingRetainedMessageAsync += MqttServer_LoadingRetainedMessageAsync;
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
mqttServer.InterceptingSubscriptionAsync += MqttServer_InterceptingSubscriptionAsync;
//options.ApplicationMessageInterceptor = c =>
//{
// if (c.ApplicationMessage.PayloadSegment == null || c.ApplicationMessage.PayloadSegment.Length == 0)
// {
// return;
// }
// try
// {
// var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.PayloadSegment));
// var timestampProperty = content.Property("timestamp");
// if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
// {
// timestampProperty.Value = DateTime.Now.ToString("O");
// c.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(content.ToString());
// }
// }
// catch (Exception)
// {
// }
//};
}
/// <summary>
/// 关闭MQTTServer
/// </summary>
public async Task<ResultMqtt> StopMQTTServer()
{
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
if (_MqttServer == null)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错MQTTServer未在运行。"
};
}
else
{
foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
{
await clientStatus.DisconnectAsync();
}
await _MqttServer.StopAsync();
_MqttServer = null;
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 获取所有的客户端
/// </summary>
public List<MqttClientStatus> GetClientsAsync()
{
return _MqttServer.GetClientsAsync().Result.ToList();
}
/// <summary>
/// 发送消息-未写
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="msg">消息</param>
/// <returns></returns>
public Task SendMessage(string Topic, string msg)
{
try
{
//var clients = _MqttServer.GetClientsAsync().Result;
//foreach (var client in clients)
//{
// var imqttClient = client.GetClient();
// if (imqttClient != null)
// {
// }
//}
}
catch { }
return Task.CompletedTask;
}
#region
/// <summary>
/// 开启Server的处理程序
/// </summary>
private Task StartedHandle(EventArgs arg)
{
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启"
});
return Task.CompletedTask;
}
/// <summary>
/// 关闭Server的处理程序
/// </summary>
private Task StoppedHandle(EventArgs arg)
{
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置客户端连接成功后的处理程序
/// </summary>
private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
{
var clients = _MqttServer.GetClientsAsync().Result;
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置客户端断开后的处理程序
/// </summary>
private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
{
var clients = _MqttServer.GetClientsAsync().Result;
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置消息订阅通知
/// </summary>
private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
{
//if (!arg.Equals("admin"))
//{
// var clients = _MqttServer.GetClientsAsync().Result;
// var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
// var imqttClient = client.GetClient();
// if (imqttClient != null)
// {
// }
// client?.DisconnectAsync();
// return Task.CompletedTask;
//}
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置消息退订通知
/// </summary>
private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
{
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}"
});
return Task.CompletedTask;
}
/// <summary>
/// 鉴权-未写完
/// </summary>
/// <returns></returns>
private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权
{
/*
if (arg.ClientId == "SpecialClient")
{
if (arg.UserName != "USER" || arg.Password != "PASS")
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
}*/
if (arg.UserName != "Admin" || arg.Password != "Admin123")
{
}
return Task.CompletedTask;//CompletedTask.Instance;
}
/// <summary>
/// 设置消息处理程序
/// </summary>
private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
{
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
});
return Task.CompletedTask;
}
#endregion
#region 2
private string Filename
{
get
{
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"MQTT\\RetainedMessages.json");
var osType = XiaoZhiSharp.Kernels.OperatingSystem.GetOperatingSystemType();
var formatFileName = path.FormatPath(osType);
return formatFileName;
}
}
private Task MqttServer_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
{
var directory = Path.GetDirectoryName(Filename);
if (!Directory.Exists(directory))
{
Directory.CreateDirectory(directory);
}
File.WriteAllText(Filename, JsonConvert.SerializeObject(arg.StoredRetainedMessages));
return CompletedTask.Instance;
}
private Task MqttServer_RetainedMessagesClearedAsync(EventArgs arg)
{
File.Delete(Filename);
return CompletedTask.Instance;
}
private Task MqttServer_LoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
{
List<MqttApplicationMessage> retainedMessages;
if (File.Exists(Filename))
{
var json = File.ReadAllText(Filename);
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
}
else
{
retainedMessages = new List<MqttApplicationMessage>();
}
arg.LoadedRetainedMessages = retainedMessages;
return CompletedTask.Instance;
}
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
//MqttNetConsoleLogger.PrintToConsole(
// $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment ?? new byte[0])}'",
// ConsoleColor.Magenta);
MqttNetConsoleLogger.PrintToConsole(
$"'{arg.ClientId}' reported '{arg.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}'",
ConsoleColor.Magenta);
if (MqttTopicFilterComparer.Compare(arg.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch)
{
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
arg.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
if (arg.ApplicationMessage.Topic == "not_allowed_topic")
{
arg.ProcessPublish = false;
arg.CloseConnection = true;
}
return CompletedTask.Instance;
}
private Task MqttServer_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs arg)
{
if (arg.TopicFilter.Topic.StartsWith("admin/foo/bar") && arg.ClientId != "theAdmin")
{
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
}
if (arg.TopicFilter.Topic.StartsWith("the/secret/stuff") && arg.ClientId != "Imperator")
{
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
arg.CloseConnection = true;
}
return CompletedTask.Instance;
}
#endregion 2
}
}