527 lines
20 KiB
C#
527 lines
20 KiB
C#
|
using MQTTnet;
|
|||
|
using MQTTnet.Diagnostics.Logger;
|
|||
|
using MQTTnet.Internal;
|
|||
|
using MQTTnet.Protocol;
|
|||
|
using MQTTnet.Server;
|
|||
|
using Newtonsoft.Json;
|
|||
|
using System;
|
|||
|
using System.Collections.Generic;
|
|||
|
using System.IO;
|
|||
|
using System.Linq;
|
|||
|
using System.Net;
|
|||
|
using System.Text;
|
|||
|
using System.Threading.Tasks;
|
|||
|
using XiaoZhiSharp.MQTT.Common;
|
|||
|
using XiaoZhiSharp.Utils;
|
|||
|
|
|||
|
namespace XiaoZhiSharp.MQTT
|
|||
|
{
|
|||
|
/// <summary>
|
|||
|
/// MQTT通讯相关的工具类
|
|||
|
/// </summary>
|
|||
|
public class MqttNetServer
|
|||
|
{
|
|||
|
#region 变量
|
|||
|
/// <summary>
|
|||
|
/// MQTT服务
|
|||
|
/// </summary>
|
|||
|
private MqttServer _MqttServer = null;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 记录日志、输出、保存等操作
|
|||
|
/// </summary>
|
|||
|
private Action<ResultMqtt> _Callback = null;
|
|||
|
#endregion 变量
|
|||
|
|
|||
|
public static void RunEmptyServer()
|
|||
|
{
|
|||
|
var mqttServer = new MqttServerFactory().CreateMqttServer(new MqttServerOptions());
|
|||
|
mqttServer.StartAsync().GetAwaiter().GetResult();
|
|||
|
|
|||
|
Console.WriteLine("Press any key to exit.");
|
|||
|
Console.ReadLine();
|
|||
|
}
|
|||
|
|
|||
|
public static void RunEmptyServerWithLogging()
|
|||
|
{
|
|||
|
var logger = new MqttNetEventLogger();
|
|||
|
MqttNetConsoleLogger.ForwardToConsole(logger);
|
|||
|
|
|||
|
var mqttFactory = new MqttServerFactory(logger);
|
|||
|
var mqttServer = mqttFactory.CreateMqttServer(new MqttServerOptions());
|
|||
|
mqttServer.StartAsync().GetAwaiter().GetResult();
|
|||
|
|
|||
|
Console.WriteLine("Press any key to exit.");
|
|||
|
Console.ReadLine();
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 创建MQTTServer并运行
|
|||
|
/// </summary>
|
|||
|
public async Task<ResultMqtt> CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action<ResultMqtt> callback)
|
|||
|
{
|
|||
|
ResultMqtt resultData_MQTT = new ResultMqtt();
|
|||
|
|
|||
|
_Callback = callback;
|
|||
|
try
|
|||
|
{
|
|||
|
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
|
|||
|
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
|
|||
|
|
|||
|
RegisterEvents(_MqttServer);
|
|||
|
|
|||
|
await _MqttServer.StartAsync(); // 开启服务
|
|||
|
|
|||
|
if (_MqttServer.IsStarted)
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!"
|
|||
|
};
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!"
|
|||
|
};
|
|||
|
}
|
|||
|
}
|
|||
|
catch (Exception ex)
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message
|
|||
|
};
|
|||
|
}
|
|||
|
|
|||
|
_Callback?.Invoke(resultData_MQTT);
|
|||
|
return resultData_MQTT;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 简易创建MQTTServer并运行-不使用加密
|
|||
|
/// </summary>
|
|||
|
/// <param name="ip">IP</param>
|
|||
|
/// <param name="port">端口</param>
|
|||
|
/// <param name="withPersistentSessions">是否保持会话</param>
|
|||
|
/// <param name="callback">处理方法</param>
|
|||
|
/// <returns></returns>
|
|||
|
public async Task<ResultMqtt> CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action<ResultMqtt> callback)
|
|||
|
{
|
|||
|
ResultMqtt resultData_MQTT = new ResultMqtt();
|
|||
|
_Callback = callback;
|
|||
|
|
|||
|
try
|
|||
|
{
|
|||
|
|
|||
|
// Extend the timestamp for all messages from clients.
|
|||
|
// Protect several topics from being subscribed from every client.
|
|||
|
|
|||
|
//var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
|
|||
|
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
|
|||
|
//options.ConnectionBacklog = 5;
|
|||
|
//options.DefaultEndpointOptions.IsEnabled = true;
|
|||
|
//options.TlsEndpointOptions.IsEnabled = false;
|
|||
|
|
|||
|
MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置
|
|||
|
mqttServerOptionsBuilder.WithDefaultEndpoint();
|
|||
|
mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP
|
|||
|
mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号
|
|||
|
//mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口
|
|||
|
mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话
|
|||
|
mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数
|
|||
|
//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效
|
|||
|
//{
|
|||
|
// if (c.Username != uName || c.Password != uPwd)
|
|||
|
// {
|
|||
|
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
|||
|
// }
|
|||
|
//})
|
|||
|
|
|||
|
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
|
|||
|
_MqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
|
|||
|
|
|||
|
RegisterEvents(_MqttServer);
|
|||
|
_MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完
|
|||
|
|
|||
|
await _MqttServer.StartAsync(); // 开启服务
|
|||
|
|
|||
|
if (_MqttServer.IsStarted)
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!"
|
|||
|
};
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!"
|
|||
|
};
|
|||
|
}
|
|||
|
}
|
|||
|
catch (Exception ex)
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message
|
|||
|
};
|
|||
|
}
|
|||
|
|
|||
|
_Callback?.Invoke(resultData_MQTT);
|
|||
|
return resultData_MQTT;
|
|||
|
}
|
|||
|
|
|||
|
private void RegisterEvents(MqttServer mqttServer)
|
|||
|
{
|
|||
|
mqttServer.StartedAsync += StartedHandle; // 服务器开启事件
|
|||
|
mqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
|
|||
|
mqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
|
|||
|
mqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
|
|||
|
mqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
|
|||
|
mqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
|
|||
|
mqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
|
|||
|
/*------------------------------------------*/
|
|||
|
mqttServer.RetainedMessageChangedAsync += MqttServer_RetainedMessageChangedAsync;
|
|||
|
mqttServer.RetainedMessagesClearedAsync += MqttServer_RetainedMessagesClearedAsync;
|
|||
|
mqttServer.LoadingRetainedMessageAsync += MqttServer_LoadingRetainedMessageAsync;
|
|||
|
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
|
|||
|
mqttServer.InterceptingSubscriptionAsync += MqttServer_InterceptingSubscriptionAsync;
|
|||
|
//options.ApplicationMessageInterceptor = c =>
|
|||
|
//{
|
|||
|
// if (c.ApplicationMessage.PayloadSegment == null || c.ApplicationMessage.PayloadSegment.Length == 0)
|
|||
|
// {
|
|||
|
// return;
|
|||
|
// }
|
|||
|
|
|||
|
// try
|
|||
|
// {
|
|||
|
// var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.PayloadSegment));
|
|||
|
// var timestampProperty = content.Property("timestamp");
|
|||
|
// if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
|
|||
|
// {
|
|||
|
// timestampProperty.Value = DateTime.Now.ToString("O");
|
|||
|
// c.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(content.ToString());
|
|||
|
// }
|
|||
|
// }
|
|||
|
// catch (Exception)
|
|||
|
// {
|
|||
|
// }
|
|||
|
//};
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 关闭MQTTServer
|
|||
|
/// </summary>
|
|||
|
public async Task<ResultMqtt> StopMQTTServer()
|
|||
|
{
|
|||
|
ResultMqtt resultData_MQTT = new ResultMqtt();
|
|||
|
|
|||
|
try
|
|||
|
{
|
|||
|
if (_MqttServer == null)
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错!MQTTServer未在运行。"
|
|||
|
};
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
|
|||
|
{
|
|||
|
await clientStatus.DisconnectAsync();
|
|||
|
}
|
|||
|
await _MqttServer.StopAsync();
|
|||
|
_MqttServer = null;
|
|||
|
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功!"
|
|||
|
};
|
|||
|
}
|
|||
|
}
|
|||
|
catch (Exception ex)
|
|||
|
{
|
|||
|
resultData_MQTT = new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败!错误信息:" + ex.Message
|
|||
|
};
|
|||
|
}
|
|||
|
|
|||
|
_Callback?.Invoke(resultData_MQTT);
|
|||
|
return resultData_MQTT;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取所有的客户端
|
|||
|
/// </summary>
|
|||
|
public List<MqttClientStatus> GetClientsAsync()
|
|||
|
{
|
|||
|
return _MqttServer.GetClientsAsync().Result.ToList();
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发送消息-未写
|
|||
|
/// </summary>
|
|||
|
/// <param name="Topic">主题</param>
|
|||
|
/// <param name="msg">消息</param>
|
|||
|
/// <returns></returns>
|
|||
|
public Task SendMessage(string Topic, string msg)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
//var clients = _MqttServer.GetClientsAsync().Result;
|
|||
|
//foreach (var client in clients)
|
|||
|
//{
|
|||
|
// var imqttClient = client.GetClient();
|
|||
|
// if (imqttClient != null)
|
|||
|
// {
|
|||
|
|
|||
|
// }
|
|||
|
//}
|
|||
|
}
|
|||
|
catch { }
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
#region 处理事件
|
|||
|
/// <summary>
|
|||
|
/// 开启Server的处理程序
|
|||
|
/// </summary>
|
|||
|
private Task StartedHandle(EventArgs arg)
|
|||
|
{
|
|||
|
_Callback?.Invoke(new()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启!"
|
|||
|
});
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 关闭Server的处理程序
|
|||
|
/// </summary>
|
|||
|
private Task StoppedHandle(EventArgs arg)
|
|||
|
{
|
|||
|
_Callback?.Invoke(new()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭!"
|
|||
|
});
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 设置客户端连接成功后的处理程序
|
|||
|
/// </summary>
|
|||
|
private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
|
|||
|
{
|
|||
|
var clients = _MqttServer.GetClientsAsync().Result;
|
|||
|
|
|||
|
_Callback?.Invoke(new()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。"
|
|||
|
});
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 设置客户端断开后的处理程序
|
|||
|
/// </summary>
|
|||
|
private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
|
|||
|
{
|
|||
|
var clients = _MqttServer.GetClientsAsync().Result;
|
|||
|
_Callback?.Invoke(new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。"
|
|||
|
});
|
|||
|
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 设置消息订阅通知
|
|||
|
/// </summary>
|
|||
|
private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
|
|||
|
{
|
|||
|
//if (!arg.Equals("admin"))
|
|||
|
//{
|
|||
|
// var clients = _MqttServer.GetClientsAsync().Result;
|
|||
|
// var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
|
|||
|
// var imqttClient = client.GetClient();
|
|||
|
// if (imqttClient != null)
|
|||
|
// {
|
|||
|
|
|||
|
// }
|
|||
|
// client?.DisconnectAsync();
|
|||
|
|
|||
|
// return Task.CompletedTask;
|
|||
|
//}
|
|||
|
|
|||
|
_Callback?.Invoke(new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'!"
|
|||
|
});
|
|||
|
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 设置消息退订通知
|
|||
|
/// </summary>
|
|||
|
private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
|
|||
|
{
|
|||
|
_Callback?.Invoke(new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = 1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}!"
|
|||
|
});
|
|||
|
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 鉴权-未写完
|
|||
|
/// </summary>
|
|||
|
/// <returns></returns>
|
|||
|
private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权
|
|||
|
{
|
|||
|
/*
|
|||
|
if (arg.ClientId == "SpecialClient")
|
|||
|
{
|
|||
|
if (arg.UserName != "USER" || arg.Password != "PASS")
|
|||
|
{
|
|||
|
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
|||
|
}
|
|||
|
}*/
|
|||
|
if (arg.UserName != "Admin" || arg.Password != "Admin123")
|
|||
|
{
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
return Task.CompletedTask;//CompletedTask.Instance;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 设置消息处理程序
|
|||
|
/// </summary>
|
|||
|
private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
|
|||
|
{
|
|||
|
_Callback?.Invoke(new ResultMqtt()
|
|||
|
{
|
|||
|
ResultCode = -1,
|
|||
|
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
|
|||
|
});
|
|||
|
|
|||
|
return Task.CompletedTask;
|
|||
|
}
|
|||
|
#endregion 处理事件
|
|||
|
|
|||
|
#region 处理事件2
|
|||
|
private string Filename
|
|||
|
{
|
|||
|
get
|
|||
|
{
|
|||
|
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"MQTT\\RetainedMessages.json");
|
|||
|
var osType = XiaoZhiSharp.Kernels.OperatingSystem.GetOperatingSystemType();
|
|||
|
var formatFileName = path.FormatPath(osType);
|
|||
|
return formatFileName;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
private Task MqttServer_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
|
|||
|
{
|
|||
|
var directory = Path.GetDirectoryName(Filename);
|
|||
|
if (!Directory.Exists(directory))
|
|||
|
{
|
|||
|
Directory.CreateDirectory(directory);
|
|||
|
}
|
|||
|
|
|||
|
File.WriteAllText(Filename, JsonConvert.SerializeObject(arg.StoredRetainedMessages));
|
|||
|
return CompletedTask.Instance;
|
|||
|
}
|
|||
|
|
|||
|
private Task MqttServer_RetainedMessagesClearedAsync(EventArgs arg)
|
|||
|
{
|
|||
|
File.Delete(Filename);
|
|||
|
return CompletedTask.Instance;
|
|||
|
}
|
|||
|
|
|||
|
private Task MqttServer_LoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
|
|||
|
{
|
|||
|
List<MqttApplicationMessage> retainedMessages;
|
|||
|
if (File.Exists(Filename))
|
|||
|
{
|
|||
|
var json = File.ReadAllText(Filename);
|
|||
|
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
retainedMessages = new List<MqttApplicationMessage>();
|
|||
|
}
|
|||
|
|
|||
|
arg.LoadedRetainedMessages = retainedMessages;
|
|||
|
|
|||
|
return CompletedTask.Instance;
|
|||
|
}
|
|||
|
|
|||
|
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
|
|||
|
{
|
|||
|
//MqttNetConsoleLogger.PrintToConsole(
|
|||
|
// $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment ?? new byte[0])}'",
|
|||
|
// ConsoleColor.Magenta);
|
|||
|
MqttNetConsoleLogger.PrintToConsole(
|
|||
|
$"'{arg.ClientId}' reported '{arg.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}'",
|
|||
|
ConsoleColor.Magenta);
|
|||
|
|
|||
|
if (MqttTopicFilterComparer.Compare(arg.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch)
|
|||
|
{
|
|||
|
// Replace the payload with the timestamp. But also extending a JSON
|
|||
|
// based payload with the timestamp is a suitable use case.
|
|||
|
arg.ApplicationMessage.PayloadSegment = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
|
|||
|
}
|
|||
|
|
|||
|
if (arg.ApplicationMessage.Topic == "not_allowed_topic")
|
|||
|
{
|
|||
|
arg.ProcessPublish = false;
|
|||
|
arg.CloseConnection = true;
|
|||
|
}
|
|||
|
|
|||
|
return CompletedTask.Instance;
|
|||
|
}
|
|||
|
|
|||
|
private Task MqttServer_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs arg)
|
|||
|
{
|
|||
|
if (arg.TopicFilter.Topic.StartsWith("admin/foo/bar") && arg.ClientId != "theAdmin")
|
|||
|
{
|
|||
|
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
|
|||
|
}
|
|||
|
|
|||
|
if (arg.TopicFilter.Topic.StartsWith("the/secret/stuff") && arg.ClientId != "Imperator")
|
|||
|
{
|
|||
|
arg.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
|
|||
|
arg.CloseConnection = true;
|
|||
|
}
|
|||
|
|
|||
|
return CompletedTask.Instance;
|
|||
|
}
|
|||
|
|
|||
|
#endregion 处理事件2
|
|||
|
}
|
|||
|
}
|