2025-10-11 18:25:59 +08:00

527 lines
20 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MQTTnet;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Internal;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using XiaoZhiSharp.MQTT.Common;
using XiaoZhiSharp.Utils;
namespace XiaoZhiSharp.MQTT
{
/// <summary>
/// MQTT通讯相关的工具类
/// </summary>
public class MqttNetServer
{
#region
/// <summary>
/// MQTT服务
/// </summary>
private MqttServer _MqttServer = null;
/// <summary>
/// 记录日志、输出、保存等操作
/// </summary>
private Action<ResultMqtt> _Callback = null;
#endregion
public static void RunEmptyServer()
{
var mqttServer = new MqttServerFactory().CreateMqttServer(new MqttServerOptions());
mqttServer.StartAsync().GetAwaiter().GetResult();
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
public static void RunEmptyServerWithLogging()
{
var logger = new MqttNetEventLogger();
MqttNetConsoleLogger.ForwardToConsole(logger);
var mqttFactory = new MqttServerFactory(logger);
var mqttServer = mqttFactory.CreateMqttServer(new MqttServerOptions());
mqttServer.StartAsync().GetAwaiter().GetResult();
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
/// <summary>
/// 创建MQTTServer并运行
/// </summary>
public async Task<ResultMqtt> CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action<ResultMqtt> callback)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
_Callback = callback;
try
{
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
RegisterEvents(_MqttServer);
await _MqttServer.StartAsync(); // 开启服务
if (_MqttServer.IsStarted)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 简易创建MQTTServer并运行-不使用加密
/// </summary>
/// <param name="ip">IP</param>
/// <param name="port">端口</param>
/// <param name="withPersistentSessions">是否保持会话</param>
/// <param name="callback">处理方法</param>
/// <returns></returns>
public async Task<ResultMqtt> CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action<ResultMqtt> callback)
{
ResultMqtt resultData_MQTT = new ResultMqtt();
_Callback = callback;
try
{
// Extend the timestamp for all messages from clients.
// Protect several topics from being subscribed from every client.
//var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
//options.ConnectionBacklog = 5;
//options.DefaultEndpointOptions.IsEnabled = true;
//options.TlsEndpointOptions.IsEnabled = false;
MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置
mqttServerOptionsBuilder.WithDefaultEndpoint();
mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP
mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号
//mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口
mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话
mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数
//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效
//{
// if (c.Username != uName || c.Password != uPwd)
// {
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
// }
//})
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
RegisterEvents(_MqttServer);
_MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完
await _MqttServer.StartAsync(); // 开启服务
if (_MqttServer.IsStarted)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功"
};
}
else
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
private void RegisterEvents(MqttServer mqttServer)
{
mqttServer.StartedAsync += StartedHandle; // 服务器开启事件
mqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
mqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
mqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
mqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
mqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
mqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
/*------------------------------------------*/
mqttServer.RetainedMessageChangedAsync += MqttServer_RetainedMessageChangedAsync;
mqttServer.RetainedMessagesClearedAsync += MqttServer_RetainedMessagesClearedAsync;
mqttServer.LoadingRetainedMessageAsync += MqttServer_LoadingRetainedMessageAsync;
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
mqttServer.InterceptingSubscriptionAsync += MqttServer_InterceptingSubscriptionAsync;
//options.ApplicationMessageInterceptor = c =>
//{
// if (c.ApplicationMessage.PayloadSegment == null || c.ApplicationMessage.PayloadSegment.Length == 0)
// {
// return;
// }
// try
// {
// var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.PayloadSegment));
// var timestampProperty = content.Property("timestamp");
// if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
// {
// timestampProperty.Value = DateTime.Now.ToString("O");
// c.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(content.ToString());
// }
// }
// catch (Exception)
// {
// }
//};
}
/// <summary>
/// 关闭MQTTServer
/// </summary>
public async Task<ResultMqtt> StopMQTTServer()
{
ResultMqtt resultData_MQTT = new ResultMqtt();
try
{
if (_MqttServer == null)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错MQTTServer未在运行。"
};
}
else
{
foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
{
await clientStatus.DisconnectAsync();
}
await _MqttServer.StopAsync();
_MqttServer = null;
resultData_MQTT = new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败错误信息" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 获取所有的客户端
/// </summary>
public List<MqttClientStatus> GetClientsAsync()
{
return _MqttServer.GetClientsAsync().Result.ToList();
}
/// <summary>
/// 发送消息-未写
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="msg">消息</param>
/// <returns></returns>
public Task SendMessage(string Topic, string msg)
{
try
{
//var clients = _MqttServer.GetClientsAsync().Result;
//foreach (var client in clients)
//{
// var imqttClient = client.GetClient();
// if (imqttClient != null)
// {
// }
//}
}
catch { }
return Task.CompletedTask;
}
#region
/// <summary>
/// 开启Server的处理程序
/// </summary>
private Task StartedHandle(EventArgs arg)
{
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启"
});
return Task.CompletedTask;
}
/// <summary>
/// 关闭Server的处理程序
/// </summary>
private Task StoppedHandle(EventArgs arg)
{
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置客户端连接成功后的处理程序
/// </summary>
private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
{
var clients = _MqttServer.GetClientsAsync().Result;
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置客户端断开后的处理程序
/// </summary>
private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
{
var clients = _MqttServer.GetClientsAsync().Result;
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置消息订阅通知
/// </summary>
private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
{
//if (!arg.Equals("admin"))
//{
// var clients = _MqttServer.GetClientsAsync().Result;
// var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
// var imqttClient = client.GetClient();
// if (imqttClient != null)
// {
// }
// client?.DisconnectAsync();
// return Task.CompletedTask;
//}
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置消息退订通知
/// </summary>
private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
{
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}"
});
return Task.CompletedTask;
}
/// <summary>
/// 鉴权-未写完
/// </summary>
/// <returns></returns>
private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权
{
/*
if (arg.ClientId == "SpecialClient")
{
if (arg.UserName != "USER" || arg.Password != "PASS")
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
}*/
if (arg.UserName != "Admin" || arg.Password != "Admin123")
{
}
return Task.CompletedTask;//CompletedTask.Instance;
}
/// <summary>
/// 设置消息处理程序
/// </summary>
private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
{
_Callback?.Invoke(new ResultMqtt()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
});
return Task.CompletedTask;
}
#endregion
#region 2
private string Filename
{
get
{
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"MQTT\\RetainedMessages.json");
var osType = XiaoZhiSharp.Kernels.OperatingSystem.GetOperatingSystemType();
var formatFileName = path.FormatPath(osType);
return formatFileName;
}
}
private Task MqttServer_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
{
var directory = Path.GetDirectoryName(Filename);
if (!Directory.Exists(directory))
{
Directory.CreateDirectory(directory);
}
File.WriteAllText(Filename, JsonConvert.SerializeObject(arg.StoredRetainedMessages));
return CompletedTask.Instance;
}
private Task MqttServer_RetainedMessagesClearedAsync(EventArgs arg)
{
File.Delete(Filename);
return CompletedTask.Instance;
}
private Task MqttServer_LoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
{
List<MqttApplicationMessage> retainedMessages;
if (File.Exists(Filename))
{
var json = File.ReadAllText(Filename);
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
}
else
{
retainedMessages = new List<MqttApplicationMessage>();
}
arg.LoadedRetainedMessages = retainedMessages;
return CompletedTask.Instance;
}
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
//MqttNetConsoleLogger.PrintToConsole(
// $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment ?? new byte[0])}'",
// ConsoleColor.Magenta);
MqttNetConsoleLogger.PrintToConsole(
$"'{arg.ClientId}' reported '{arg.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}'",
ConsoleColor.Magenta);
if (MqttTopicFilterComparer.Compare(arg.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch)
{
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
arg.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
if (arg.ApplicationMessage.Topic == "not_allowed_topic")
{
arg.ProcessPublish = false;
arg.CloseConnection = true;
}
return CompletedTask.Instance;
}
private Task MqttServer_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs arg)
{
if (arg.TopicFilter.Topic.StartsWith("admin/foo/bar") && arg.ClientId != "theAdmin")
{
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
}
if (arg.TopicFilter.Topic.StartsWith("the/secret/stuff") && arg.ClientId != "Imperator")
{
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
arg.CloseConnection = true;
}
return CompletedTask.Instance;
}
#endregion 2
}
}