527 lines
20 KiB
C#
527 lines
20 KiB
C#
using MQTTnet;
|
||
using MQTTnet.Diagnostics.Logger;
|
||
using MQTTnet.Internal;
|
||
using MQTTnet.Protocol;
|
||
using MQTTnet.Server;
|
||
using Newtonsoft.Json;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.IO;
|
||
using System.Linq;
|
||
using System.Net;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
using XiaoZhiSharp.MQTT.Common;
|
||
using XiaoZhiSharp.Utils;
|
||
|
||
namespace XiaoZhiSharp.MQTT
|
||
{
|
||
/// <summary>
|
||
/// MQTT通讯相关的工具类
|
||
/// </summary>
|
||
public class MqttNetServer
|
||
{
|
||
#region 变量
|
||
/// <summary>
|
||
/// MQTT服务
|
||
/// </summary>
|
||
private MqttServer _MqttServer = null;
|
||
|
||
/// <summary>
|
||
/// 记录日志、输出、保存等操作
|
||
/// </summary>
|
||
private Action<ResultMqtt> _Callback = null;
|
||
#endregion 变量
|
||
|
||
public static void RunEmptyServer()
|
||
{
|
||
var mqttServer = new MqttServerFactory().CreateMqttServer(new MqttServerOptions());
|
||
mqttServer.StartAsync().GetAwaiter().GetResult();
|
||
|
||
Console.WriteLine("Press any key to exit.");
|
||
Console.ReadLine();
|
||
}
|
||
|
||
public static void RunEmptyServerWithLogging()
|
||
{
|
||
var logger = new MqttNetEventLogger();
|
||
MqttNetConsoleLogger.ForwardToConsole(logger);
|
||
|
||
var mqttFactory = new MqttServerFactory(logger);
|
||
var mqttServer = mqttFactory.CreateMqttServer(new MqttServerOptions());
|
||
mqttServer.StartAsync().GetAwaiter().GetResult();
|
||
|
||
Console.WriteLine("Press any key to exit.");
|
||
Console.ReadLine();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 创建MQTTServer并运行
|
||
/// </summary>
|
||
public async Task<ResultMqtt> CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action<ResultMqtt> callback)
|
||
{
|
||
ResultMqtt resultData_MQTT = new ResultMqtt();
|
||
|
||
_Callback = callback;
|
||
try
|
||
{
|
||
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
|
||
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
|
||
|
||
RegisterEvents(_MqttServer);
|
||
|
||
await _MqttServer.StartAsync(); // 开启服务
|
||
|
||
if (_MqttServer.IsStarted)
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!"
|
||
};
|
||
}
|
||
else
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!"
|
||
};
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message
|
||
};
|
||
}
|
||
|
||
_Callback?.Invoke(resultData_MQTT);
|
||
return resultData_MQTT;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 简易创建MQTTServer并运行-不使用加密
|
||
/// </summary>
|
||
/// <param name="ip">IP</param>
|
||
/// <param name="port">端口</param>
|
||
/// <param name="withPersistentSessions">是否保持会话</param>
|
||
/// <param name="callback">处理方法</param>
|
||
/// <returns></returns>
|
||
public async Task<ResultMqtt> CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action<ResultMqtt> callback)
|
||
{
|
||
ResultMqtt resultData_MQTT = new ResultMqtt();
|
||
_Callback = callback;
|
||
|
||
try
|
||
{
|
||
|
||
// Extend the timestamp for all messages from clients.
|
||
// Protect several topics from being subscribed from every client.
|
||
|
||
//var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
|
||
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
|
||
//options.ConnectionBacklog = 5;
|
||
//options.DefaultEndpointOptions.IsEnabled = true;
|
||
//options.TlsEndpointOptions.IsEnabled = false;
|
||
|
||
MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置
|
||
mqttServerOptionsBuilder.WithDefaultEndpoint();
|
||
mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP
|
||
mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号
|
||
//mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口
|
||
mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话
|
||
mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数
|
||
//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效
|
||
//{
|
||
// if (c.Username != uName || c.Password != uPwd)
|
||
// {
|
||
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
||
// }
|
||
//})
|
||
|
||
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
|
||
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
|
||
|
||
RegisterEvents(_MqttServer);
|
||
_MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完
|
||
|
||
await _MqttServer.StartAsync(); // 开启服务
|
||
|
||
if (_MqttServer.IsStarted)
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!"
|
||
};
|
||
}
|
||
else
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!"
|
||
};
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message
|
||
};
|
||
}
|
||
|
||
_Callback?.Invoke(resultData_MQTT);
|
||
return resultData_MQTT;
|
||
}
|
||
|
||
private void RegisterEvents(MqttServer mqttServer)
|
||
{
|
||
mqttServer.StartedAsync += StartedHandle; // 服务器开启事件
|
||
mqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
|
||
mqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
|
||
mqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
|
||
mqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
|
||
mqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
|
||
mqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
|
||
/*------------------------------------------*/
|
||
mqttServer.RetainedMessageChangedAsync += MqttServer_RetainedMessageChangedAsync;
|
||
mqttServer.RetainedMessagesClearedAsync += MqttServer_RetainedMessagesClearedAsync;
|
||
mqttServer.LoadingRetainedMessageAsync += MqttServer_LoadingRetainedMessageAsync;
|
||
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
|
||
mqttServer.InterceptingSubscriptionAsync += MqttServer_InterceptingSubscriptionAsync;
|
||
//options.ApplicationMessageInterceptor = c =>
|
||
//{
|
||
// if (c.ApplicationMessage.PayloadSegment == null || c.ApplicationMessage.PayloadSegment.Length == 0)
|
||
// {
|
||
// return;
|
||
// }
|
||
|
||
// try
|
||
// {
|
||
// var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.PayloadSegment));
|
||
// var timestampProperty = content.Property("timestamp");
|
||
// if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
|
||
// {
|
||
// timestampProperty.Value = DateTime.Now.ToString("O");
|
||
// c.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(content.ToString());
|
||
// }
|
||
// }
|
||
// catch (Exception)
|
||
// {
|
||
// }
|
||
//};
|
||
}
|
||
|
||
/// <summary>
|
||
/// 关闭MQTTServer
|
||
/// </summary>
|
||
public async Task<ResultMqtt> StopMQTTServer()
|
||
{
|
||
ResultMqtt resultData_MQTT = new ResultMqtt();
|
||
|
||
try
|
||
{
|
||
if (_MqttServer == null)
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错!MQTTServer未在运行。"
|
||
};
|
||
}
|
||
else
|
||
{
|
||
foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
|
||
{
|
||
await clientStatus.DisconnectAsync();
|
||
}
|
||
await _MqttServer.StopAsync();
|
||
_MqttServer = null;
|
||
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功!"
|
||
};
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
resultData_MQTT = new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败!错误信息:" + ex.Message
|
||
};
|
||
}
|
||
|
||
_Callback?.Invoke(resultData_MQTT);
|
||
return resultData_MQTT;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取所有的客户端
|
||
/// </summary>
|
||
public List<MqttClientStatus> GetClientsAsync()
|
||
{
|
||
return _MqttServer.GetClientsAsync().Result.ToList();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发送消息-未写
|
||
/// </summary>
|
||
/// <param name="Topic">主题</param>
|
||
/// <param name="msg">消息</param>
|
||
/// <returns></returns>
|
||
public Task SendMessage(string Topic, string msg)
|
||
{
|
||
try
|
||
{
|
||
//var clients = _MqttServer.GetClientsAsync().Result;
|
||
//foreach (var client in clients)
|
||
//{
|
||
// var imqttClient = client.GetClient();
|
||
// if (imqttClient != null)
|
||
// {
|
||
|
||
// }
|
||
//}
|
||
}
|
||
catch { }
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
#region 处理事件
|
||
/// <summary>
|
||
/// 开启Server的处理程序
|
||
/// </summary>
|
||
private Task StartedHandle(EventArgs arg)
|
||
{
|
||
_Callback?.Invoke(new()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启!"
|
||
});
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 关闭Server的处理程序
|
||
/// </summary>
|
||
private Task StoppedHandle(EventArgs arg)
|
||
{
|
||
_Callback?.Invoke(new()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭!"
|
||
});
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 设置客户端连接成功后的处理程序
|
||
/// </summary>
|
||
private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
|
||
{
|
||
var clients = _MqttServer.GetClientsAsync().Result;
|
||
|
||
_Callback?.Invoke(new()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。"
|
||
});
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 设置客户端断开后的处理程序
|
||
/// </summary>
|
||
private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
|
||
{
|
||
var clients = _MqttServer.GetClientsAsync().Result;
|
||
_Callback?.Invoke(new ResultMqtt()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。"
|
||
});
|
||
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 设置消息订阅通知
|
||
/// </summary>
|
||
private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
|
||
{
|
||
//if (!arg.Equals("admin"))
|
||
//{
|
||
// var clients = _MqttServer.GetClientsAsync().Result;
|
||
// var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
|
||
// var imqttClient = client.GetClient();
|
||
// if (imqttClient != null)
|
||
// {
|
||
|
||
// }
|
||
// client?.DisconnectAsync();
|
||
|
||
// return Task.CompletedTask;
|
||
//}
|
||
|
||
_Callback?.Invoke(new ResultMqtt()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'!"
|
||
});
|
||
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 设置消息退订通知
|
||
/// </summary>
|
||
private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
|
||
{
|
||
_Callback?.Invoke(new ResultMqtt()
|
||
{
|
||
ResultCode = 1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}!"
|
||
});
|
||
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 鉴权-未写完
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权
|
||
{
|
||
/*
|
||
if (arg.ClientId == "SpecialClient")
|
||
{
|
||
if (arg.UserName != "USER" || arg.Password != "PASS")
|
||
{
|
||
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
||
}
|
||
}*/
|
||
if (arg.UserName != "Admin" || arg.Password != "Admin123")
|
||
{
|
||
|
||
}
|
||
|
||
return Task.CompletedTask;//CompletedTask.Instance;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 设置消息处理程序
|
||
/// </summary>
|
||
private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
|
||
{
|
||
_Callback?.Invoke(new ResultMqtt()
|
||
{
|
||
ResultCode = -1,
|
||
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
|
||
});
|
||
|
||
return Task.CompletedTask;
|
||
}
|
||
#endregion 处理事件
|
||
|
||
#region 处理事件2
|
||
private string Filename
|
||
{
|
||
get
|
||
{
|
||
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"MQTT\\RetainedMessages.json");
|
||
var osType = XiaoZhiSharp.Kernels.OperatingSystem.GetOperatingSystemType();
|
||
var formatFileName = path.FormatPath(osType);
|
||
return formatFileName;
|
||
}
|
||
}
|
||
|
||
private Task MqttServer_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
|
||
{
|
||
var directory = Path.GetDirectoryName(Filename);
|
||
if (!Directory.Exists(directory))
|
||
{
|
||
Directory.CreateDirectory(directory);
|
||
}
|
||
|
||
File.WriteAllText(Filename, JsonConvert.SerializeObject(arg.StoredRetainedMessages));
|
||
return CompletedTask.Instance;
|
||
}
|
||
|
||
private Task MqttServer_RetainedMessagesClearedAsync(EventArgs arg)
|
||
{
|
||
File.Delete(Filename);
|
||
return CompletedTask.Instance;
|
||
}
|
||
|
||
private Task MqttServer_LoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
|
||
{
|
||
List<MqttApplicationMessage> retainedMessages;
|
||
if (File.Exists(Filename))
|
||
{
|
||
var json = File.ReadAllText(Filename);
|
||
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
|
||
}
|
||
else
|
||
{
|
||
retainedMessages = new List<MqttApplicationMessage>();
|
||
}
|
||
|
||
arg.LoadedRetainedMessages = retainedMessages;
|
||
|
||
return CompletedTask.Instance;
|
||
}
|
||
|
||
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
|
||
{
|
||
//MqttNetConsoleLogger.PrintToConsole(
|
||
// $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment ?? new byte[0])}'",
|
||
// ConsoleColor.Magenta);
|
||
MqttNetConsoleLogger.PrintToConsole(
|
||
$"'{arg.ClientId}' reported '{arg.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}'",
|
||
ConsoleColor.Magenta);
|
||
|
||
if (MqttTopicFilterComparer.Compare(arg.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch)
|
||
{
|
||
// Replace the payload with the timestamp. But also extending a JSON
|
||
// based payload with the timestamp is a suitable use case.
|
||
arg.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
|
||
}
|
||
|
||
if (arg.ApplicationMessage.Topic == "not_allowed_topic")
|
||
{
|
||
arg.ProcessPublish = false;
|
||
arg.CloseConnection = true;
|
||
}
|
||
|
||
return CompletedTask.Instance;
|
||
}
|
||
|
||
private Task MqttServer_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs arg)
|
||
{
|
||
if (arg.TopicFilter.Topic.StartsWith("admin/foo/bar") && arg.ClientId != "theAdmin")
|
||
{
|
||
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
|
||
}
|
||
|
||
if (arg.TopicFilter.Topic.StartsWith("the/secret/stuff") && arg.ClientId != "Imperator")
|
||
{
|
||
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
|
||
arg.CloseConnection = true;
|
||
}
|
||
|
||
return CompletedTask.Instance;
|
||
}
|
||
|
||
#endregion 处理事件2
|
||
}
|
||
}
|