Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems with MQTT server sending data #599

Open
Holo-k opened this issue Oct 29, 2022 · 2 comments
Open

Problems with MQTT server sending data #599

Holo-k opened this issue Oct 29, 2022 · 2 comments

Comments

@Holo-k
Copy link

Holo-k commented Oct 29, 2022

After my device sends some basic environment information of the device to the server, the mqtt server pings the data sent by the client before.

this sample code

 class MqttHandler : SimpleChannelInboundHandler<Packet>
    {
        public MqttHandler() { }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="context"></param>
        /// <param name="msg"></param>
        private async Task ProcessMqttMsgAsync(IChannelHandlerContext context, Packet msg)
        {
            switch (msg.PacketType)
            {
                case PacketType.CONNECT:
                    await context.WriteAndFlushAsync(new ConnAckPacket { ReturnCode = ConnectReturnCode.Accepted, SessionPresent = true });

                    break;

                case PacketType.PUBACK:
                    if (msg is PubAckPacket pubAckPacket)
                    {
                        var msgId = pubAckPacket.PacketId;
                    }
                    break;
                case PacketType.PUBCOMP:
                    if (msg is PubCompPacket pubCompPacket)
                    {
                        var msgId = pubCompPacket.PacketId;
                    }
                    break;
                case PacketType.PUBREC:
                    if (msg is PubRecPacket pubRecPacket)
                    {
                    }
                    break;
                case PacketType.PUBREL:
                    if (msg is PubRelPacket pubRelPacket)
                    {
                    }
                    break;
                case PacketType.SUBSCRIBE:
                    await context.WriteAndFlushAsync(SubAckPacket.InResponseTo(msg as SubscribePacket, QualityOfService.ExactlyOnce));
                    break;
                case PacketType.UNSUBSCRIBE:
                    await context.WriteAndFlushAsync(UnsubAckPacket.InResponseTo(msg as UnsubscribePacket));
                    break;
                case PacketType.PINGREQ:
                    await context.WriteAndFlushAsync(PingRespPacket.Instance);
                    break;
                case PacketType.DISCONNECT:

                    break;
                default:
                    break;
            }
        }

        private async Task ProcessConnectAsync(IChannelHandlerContext context, ConnectPacket connectPacket)
        {
            await context.WriteAndFlushAsync(CreateMqttConnectionAck(ConnectReturnCode.Accepted, connectPacket));
        }

        private ConnAckPacket CreateMqttConnectionAck(ConnectReturnCode returnCode, ConnectPacket connectPacket)
        {
            if (connectPacket == null)
                return null;
            return new ConnAckPacket { ReturnCode = returnCode, SessionPresent = !connectPacket.CleanSession };
        }
        protected override void ChannelRead0(IChannelHandlerContext context, Packet message)
        {
            if (message is PublishPacket publishPacket)
            {                
                Console.WriteLine(publishPacket.Payload.ToString(encoding: Encoding.UTF8));
                return;
            }
            try
            {
                if (message is Packet msg)
                {
                    ProcessMqttMsgAsync(context, msg).GetAwaiter();
                }
                else
                {
                    context.CloseAsync();
                }
            }
            finally
            {

            }
        }
    }
        public static void Main() => RunServerAsync().Wait();
       

        public static async Task RunServerAsync()
        {
            IChannel boundChannel;

            var bossGroup = new MultithreadEventLoopGroup(1);
            var workerGroup = new MultithreadEventLoopGroup();

            try
            {                     

                var bootstrap = new ServerBootstrap();
                bootstrap.Group(bossGroup, workerGroup);

                bootstrap.Channel<TcpServerSocketChannel>();

                bootstrap.Option(ChannelOption.SoBacklog, 100)
                        .Option(ChannelOption.SoKeepalive, false)
                        .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
                        {
                            IChannelPipeline pipeline = channel.Pipeline;
                            pipeline.AddLast(MqttEncoder.Instance, new MqttDecoder(true, 64 * 1024), new MqttHandler());
                        }
                        ));

                boundChannel = await bootstrap.BindAsync(1883);


                Console.WriteLine("mqtt server is start");

                Console.ReadLine();

            }
            catch(Exception ex)
            {
                Console.WriteLine("mqtt server failue");
            }
            
        }
    }
@linfx
Copy link

linfx commented Nov 23, 2022

https://github.com/linfx/MqttFx

MqttFx is a mqtt v3.1.1 client using DotNetty

@Holo-k
Copy link
Author

Holo-k commented Dec 5, 2022

https://github.com/linfx/MqttFx

MqttFx is a mqtt v3.1.1 client using DotNetty

谢谢. 不过我这边是作为mqtt broker server, MqttFx也能作为broker? 之前大佬使用DotNetty作为broker, 好多坑, 除了这个发送的数据会把客户端发过来的一起发过去外, 最麻烦的还是引用次数为0报错的问题, 暂时还没有发现那里出了问题

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants