using MQTTnet; using MQTTnet.Diagnostics.Logger; using MQTTnet.Internal; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; using XiaoZhiSharp.MQTT.Common; using XiaoZhiSharp.Utils; namespace XiaoZhiSharp.MQTT { /// /// MQTT通讯相关的工具类 /// public class MqttNetServer { #region 变量 /// /// MQTT服务 /// private MqttServer _MqttServer = null; /// /// 记录日志、输出、保存等操作 /// private Action _Callback = null; #endregion 变量 public static void RunEmptyServer() { var mqttServer = new MqttServerFactory().CreateMqttServer(new MqttServerOptions()); mqttServer.StartAsync().GetAwaiter().GetResult(); Console.WriteLine("Press any key to exit."); Console.ReadLine(); } public static void RunEmptyServerWithLogging() { var logger = new MqttNetEventLogger(); MqttNetConsoleLogger.ForwardToConsole(logger); var mqttFactory = new MqttServerFactory(logger); var mqttServer = mqttFactory.CreateMqttServer(new MqttServerOptions()); mqttServer.StartAsync().GetAwaiter().GetResult(); Console.WriteLine("Press any key to exit."); Console.ReadLine(); } /// /// 创建MQTTServer并运行 /// public async Task CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action callback) { ResultMqtt resultData_MQTT = new ResultMqtt(); _Callback = callback; try { MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build(); _MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置) RegisterEvents(_MqttServer); await _MqttServer.StartAsync(); // 开启服务 if (_MqttServer.IsStarted) { resultData_MQTT = new ResultMqtt() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!" }; } else { resultData_MQTT = new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!" }; } } catch (Exception ex) { resultData_MQTT = new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 简易创建MQTTServer并运行-不使用加密 /// /// IP /// 端口 /// 是否保持会话 /// 处理方法 /// public async Task CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action callback) { ResultMqtt resultData_MQTT = new ResultMqtt(); _Callback = callback; try { // Extend the timestamp for all messages from clients. // Protect several topics from being subscribed from every client. //var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); //options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); //options.ConnectionBacklog = 5; //options.DefaultEndpointOptions.IsEnabled = true; //options.TlsEndpointOptions.IsEnabled = false; MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置 mqttServerOptionsBuilder.WithDefaultEndpoint(); mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号 //mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口 mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话 mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数 //mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效 //{ // if (c.Username != uName || c.Password != uPwd) // { // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; // } //}) MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build(); _MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置) RegisterEvents(_MqttServer); _MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完 await _MqttServer.StartAsync(); // 开启服务 if (_MqttServer.IsStarted) { resultData_MQTT = new ResultMqtt() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!" }; } else { resultData_MQTT = new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!" }; } } catch (Exception ex) { resultData_MQTT = new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } private void RegisterEvents(MqttServer mqttServer) { mqttServer.StartedAsync += StartedHandle; // 服务器开启事件 mqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件 mqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序 mqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序 mqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知 mqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知 mqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序 /*------------------------------------------*/ mqttServer.RetainedMessageChangedAsync += MqttServer_RetainedMessageChangedAsync; mqttServer.RetainedMessagesClearedAsync += MqttServer_RetainedMessagesClearedAsync; mqttServer.LoadingRetainedMessageAsync += MqttServer_LoadingRetainedMessageAsync; mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; mqttServer.InterceptingSubscriptionAsync += MqttServer_InterceptingSubscriptionAsync; //options.ApplicationMessageInterceptor = c => //{ // if (c.ApplicationMessage.PayloadSegment == null || c.ApplicationMessage.PayloadSegment.Length == 0) // { // return; // } // try // { // var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.PayloadSegment)); // var timestampProperty = content.Property("timestamp"); // if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) // { // timestampProperty.Value = DateTime.Now.ToString("O"); // c.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(content.ToString()); // } // } // catch (Exception) // { // } //}; } /// /// 关闭MQTTServer /// public async Task StopMQTTServer() { ResultMqtt resultData_MQTT = new ResultMqtt(); try { if (_MqttServer == null) { resultData_MQTT = new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错!MQTTServer未在运行。" }; } else { foreach (var clientStatus in _MqttServer.GetClientsAsync().Result) { await clientStatus.DisconnectAsync(); } await _MqttServer.StopAsync(); _MqttServer = null; resultData_MQTT = new ResultMqtt() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功!" }; } } catch (Exception ex) { resultData_MQTT = new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 获取所有的客户端 /// public List GetClientsAsync() { return _MqttServer.GetClientsAsync().Result.ToList(); } /// /// 发送消息-未写 /// /// 主题 /// 消息 /// public Task SendMessage(string Topic, string msg) { try { //var clients = _MqttServer.GetClientsAsync().Result; //foreach (var client in clients) //{ // var imqttClient = client.GetClient(); // if (imqttClient != null) // { // } //} } catch { } return Task.CompletedTask; } #region 处理事件 /// /// 开启Server的处理程序 /// private Task StartedHandle(EventArgs arg) { _Callback?.Invoke(new() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启!" }); return Task.CompletedTask; } /// /// 关闭Server的处理程序 /// private Task StoppedHandle(EventArgs arg) { _Callback?.Invoke(new() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭!" }); return Task.CompletedTask; } /// /// 设置客户端连接成功后的处理程序 /// private Task ClientConnectedHandle(ClientConnectedEventArgs arg) { var clients = _MqttServer.GetClientsAsync().Result; _Callback?.Invoke(new() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。" }); return Task.CompletedTask; } /// /// 设置客户端断开后的处理程序 /// private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg) { var clients = _MqttServer.GetClientsAsync().Result; _Callback?.Invoke(new ResultMqtt() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。" }); return Task.CompletedTask; } /// /// 设置消息订阅通知 /// private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg) { //if (!arg.Equals("admin")) //{ // var clients = _MqttServer.GetClientsAsync().Result; // var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault(); // var imqttClient = client.GetClient(); // if (imqttClient != null) // { // } // client?.DisconnectAsync(); // return Task.CompletedTask; //} _Callback?.Invoke(new ResultMqtt() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'!" }); return Task.CompletedTask; } /// /// 设置消息退订通知 /// private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg) { _Callback?.Invoke(new ResultMqtt() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}!" }); return Task.CompletedTask; } /// /// 鉴权-未写完 /// /// private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权 { /* if (arg.ClientId == "SpecialClient") { if (arg.UserName != "USER" || arg.Password != "PASS") { arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; } }*/ if (arg.UserName != "Admin" || arg.Password != "Admin123") { } return Task.CompletedTask;//CompletedTask.Instance; } /// /// 设置消息处理程序 /// private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg) { _Callback?.Invoke(new ResultMqtt() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}" }); return Task.CompletedTask; } #endregion 处理事件 #region 处理事件2 private string Filename { get { var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"MQTT\\RetainedMessages.json"); var osType = XiaoZhiSharp.Kernels.OperatingSystem.GetOperatingSystemType(); var formatFileName = path.FormatPath(osType); return formatFileName; } } private Task MqttServer_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg) { var directory = Path.GetDirectoryName(Filename); if (!Directory.Exists(directory)) { Directory.CreateDirectory(directory); } File.WriteAllText(Filename, JsonConvert.SerializeObject(arg.StoredRetainedMessages)); return CompletedTask.Instance; } private Task MqttServer_RetainedMessagesClearedAsync(EventArgs arg) { File.Delete(Filename); return CompletedTask.Instance; } private Task MqttServer_LoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg) { List retainedMessages; if (File.Exists(Filename)) { var json = File.ReadAllText(Filename); retainedMessages = JsonConvert.DeserializeObject>(json); } else { retainedMessages = new List(); } arg.LoadedRetainedMessages = retainedMessages; return CompletedTask.Instance; } private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { //MqttNetConsoleLogger.PrintToConsole( // $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment ?? new byte[0])}'", // ConsoleColor.Magenta); MqttNetConsoleLogger.PrintToConsole( $"'{arg.ClientId}' reported '{arg.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}'", ConsoleColor.Magenta); if (MqttTopicFilterComparer.Compare(arg.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch) { // Replace the payload with the timestamp. But also extending a JSON // based payload with the timestamp is a suitable use case. arg.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); } if (arg.ApplicationMessage.Topic == "not_allowed_topic") { arg.ProcessPublish = false; arg.CloseConnection = true; } return CompletedTask.Instance; } private Task MqttServer_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs arg) { if (arg.TopicFilter.Topic.StartsWith("admin/foo/bar") && arg.ClientId != "theAdmin") { arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError; } if (arg.TopicFilter.Topic.StartsWith("the/secret/stuff") && arg.ClientId != "Imperator") { arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError; arg.CloseConnection = true; } return CompletedTask.Instance; } #endregion 处理事件2 } }