c语言实现mqtt (c语言mqtt实例讲解)

什么是MQTT?

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 OASIS 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 IoT 场景。

目前很多工厂都架设了MQTT服务器,用于接收设备上传上来的实时数据

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于 发布/订阅 publish/subscribe )模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

划重点的地方就是 1.发布、订阅模式,2.轻量级,3.TCP/IP 协议;基于上面3个特点,他很方便地运用于物联网(IOT)。可以通过MQTT 协议将各个设备连接起来;

c语言实现mqtt,c语言开发mqtt

发布订阅模式; 基于topic, 通过MQTT的Broker 的模式

上图名词解析:Broker [经纪人]

从上图可以看到,又3个部分组成: MQTT 服务器端,发布端,订阅端;我们在工厂自动化的项目中,能用到的基本也是 编写发布端,订阅端的代码;

C#中github排名比较高的第三库mqttnet;下面用这个来简单实现一个mqtt的服务器跟客户端。

新建服务端代码:

新建一个控制台应用程序 MqttServerTest,安装mqttnet包

 Install-package mqttnet
private static async Task RunServer()
{
  var mqttFactory = new MqttFactory();
  //默认mqtt端口为1833
  var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();

  using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
  {
    //监听客户端的连接,断开连接
    mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
    mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
    //开启服务器
    await mqttServer.StartAsync();
    await Console.Out.WriteLineAsync("Mqtt服务器启动成功:127.0.0.1:1883");
    Console.WriteLine("Press Enter to Exit");
    Console.ReadLine();
    await mqttServer.StopAsync();
  }
}

private static Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
  Console.Out.WriteLineAsync("==============");
  return Console.Out.WriteLineAsync("客户端断开连接:" + arg.ClientId);
}

private static Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
  Console.Out.WriteLineAsync("==============");
  return Console.Out.WriteLineAsync("客户端连接:" + arg.ClientId);
}

static void Main(string[] args)
{
  _ = RunServer();
}

建立一个Mqtt 发布消息客户端MqttPublishTest,

代码如下:

//创建发布消息客户端
private static async Task CreateMqttClient()
{
  try
  {
    var factory = new MqttFactory();
    var mqttClient = factory.CreateMqttClient();
    var options = new MqttClientOptionsBuilder()
    .WithClientId("Client-publish")
    .WithTcpServer("127.0.0.1", 1883)
    .WithUserProperty("admin", "admin")
    .WithCleanSession().Build();
    mqttClient.ConnectAsync(options).Wait();
    Console.WriteLine("连接MqttServer成功...");
    while (true)
    {
      Console.WriteLine("请输入mqtt top[t1/t2]:");
      string topic = Console.ReadLine();
      Console.WriteLine("请输入Mqtt-Playload");
      var playload = Console.ReadLine();
      var message = new MqttApplicationMessageBuilder()
      .WithTopic(topic)
      .WithPayload(playload)
      .WithRetainFlag(true)
      .Build();
      await mqttClient.PublishAsync(message, CancellationToken.None);
      Console.WriteLine("==发布消息成功==");
    }

  }catch(Exception ex)
  {
    Console.WriteLine(ex.Message);
  }
}
/// <summary>
/// 发布消息客户端
/// </summary>
/// <param name="args"></param>
static  void Main(string[] args)
{
  Task task = CreateMqttClient();
  Console.ReadLine();
}

建立一个Mqtt 订阅消息客户端MqttSubTest,

代码如下:

public static async Task CreateClientPublish()
{
  try
  {
    var factory = new MqttFactory();
    var mqttClient = factory.CreateMqttClient();
    var options = new MqttClientOptionsBuilder()
    .WithClientId("Client-Sub")
    .WithTcpServer("127.0.0.1", 1883)
    .WithUserProperty("admin", "admin")
    .WithCleanSession().Build();
    mqttClient.ConnectAsync(options).Wait();
    await Console.Out.WriteLineAsync("连接MqttServer成功");
    await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("t1").Build());
    await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("t2").Build());
    await Console.Out.WriteLineAsync("订阅消息成功,订阅的Topic为:t1,t2");
    mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
  }catch(Exception ex)
  {
    Console.WriteLine(ex.ToString());
  }
}

private static Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
  Console.WriteLine("收到消息:");
  Console.WriteLine(#34;topic :{arg.ApplicationMessage.Topic}");
  Console.WriteLine(#34;playload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}");
  Console.WriteLine(#34;Qos:{arg.ApplicationMessage.QualityOfServiceLevel}");
  Console.WriteLine(#34;Retain:{arg.ApplicationMessage.Retain}");
  return Task.CompletedTask;
}

static void Main(string[] args)
{
  Task task = CreateClientPublish();
  Console.ReadKey();
}

依次运行:Server, Publish,Subscribe3个程序,结果如下:

c语言实现mqtt,c语言开发mqtt

在发布客户端,输入发布的topic, playload即可看到订阅的客户端收到了发布的消息;

到此,我们用Mqttnet完成了一个简单的测试;是不是很简单呢?