您现在的位置是:首页 > 正文

rabbitmq代码

2024-02-01 05:41:31阅读 3

1.生产者代码
<?php
namespace app\controller;

use app\BaseController;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Cache;

class RabbitMq extends BaseController
{
    /**
     * 直接发送到队列
     * @return string
     */
    public function send()
    {
//        echo "hello ,mq";die();
        //队列名称,每个消息都会被投入到1个或者多个队列
        $queue = "hello_durable_true";

        //建立连接
        $connection = new AMQPStreamConnection("localhost", '5672', 'guest', 'guest', '/');

        //获取信道
        $channel = $connection->channel();

        //声明创建队列
        $channel->queue_declare($queue, false, true, false, false);

        for ($i = 0; $i < 5; $i++) {
            sleep(1); //休眠1秒
            //消息内容
            $messageBody = "hello,努力,Now time:" . date("Y-m-d H:i:s");
            //将我们需要的消息标记为持久化
            $message = new AMQPMessage($messageBody, array("content_type" => "text/plain", "delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT));
            //发送消息
            $channel->basic_publish($message, '', $queue);

            echo "send Message" . $i . "<br>\n";
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();

        return "send success";
    }

    /**
     * 发布订阅
     */
    public function send1()
    {

        //交换机名称
        $exchange = "logs";

        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");

        //获取信道
        $channel = $connection->channel();

        //声明交换机
        $channel->exchange_declare($exchange, "fanout", false, false, false);

        for ($i = 0; $i < 5; $i++) {
            sleep(1);//休眠1秒
            //消息内容
            $messageBody = "hello,努力,Now time:" . date("Y-m-d H:i:s");
            //将我们需要的消息标记为持久化
            $message = new AMQPMessage($messageBody, array(
                "content_type" => "text/plain",
                "delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ));
            //发送消息
            $channel->basic_publish($message, $exchange);
            echo "Send exchange message:" . $i . "<br>\n";
        }
        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
        return "Send Sueccess";
    }

    /**
     * 直接交换机
     * @return string
     */
    public function direct()
    {
//        echo "direct";die();
        //交换机名称
        $exchange = "direct_logs";
        //建立连接
        $connection = new AMQPStreamConnection("127.0.0.1","5672","guest","guest","/");
        //获取信道
        $channel = $connection->channel();
        //声明交换机
        $channel->exchange_declare($exchange,"direct",false,false,false);

        //模拟发送error消息内容
        $messageBody = "error,Now Time:".date("Y-m-d H:i:s");
        //将我们需要的消息标记为持久化
        $message = new AMQPMessage($messageBody,array(
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ));
        //绑定info路由
        $channel->basic_publish($message,$exchange,"error");
         //模拟发送warning消息内容
        $messageBody = "warning, Now Time:".date("h:i:s");
        //将我们需要的消息标记为持久化 - 通过设置AMQPMessage的参数delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT
        $message = new AMQPMessage($messageBody, array(
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        //绑定warning路由
        $channel->basic_publish($message, $exchange, "warning");

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
        return 'Send Success';
    }

    /**
     * 远程调用
     */
    public function rpc()
    {
        $connection = new AMQPStreamConnection("localhost","5672","guest","guest","/");

        $channel = $connection->channel();

        //申明回调队列
        list($callback_queue,,) = $channel->queue_declare("callback_queue",false,false,true,false);

        //RPC客户端请求参数
        $corr_id = uniqid();
        $msg = new  AMQPMessage("rpc client send message",array(
           "correlation_id"=>$corr_id,
            'reply_to' =>$callback_queue
        ));

        //发送RPC请求
        $channel->basic_publish($msg,'','rpc_queue');

        //在rpc服务端返回的内容
        $response = null;

        //等待RPC回调
        $channel->basic_consume($callback_queue,'',false,false,false,false,function ($reply)use ($corr_id,&$response){
           if ($reply->get("correlation_id") == $corr_id){
                $response = $reply->body;
           }
            //确认消息处理完成
            $reply->delivery_info['channel']->basic_ack($reply->delivery_info['delivery_tag']);
        });

        while (!$response){
            $channel->wait();
        }

        var_dump($response);

        $channel->close();
        $connection->close();


    }


    /**
     * 延迟队列  死信队列
     * @return string
     */
    public function delay(){
        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");
        //获取信道
        $channel = $connection->channel();

        //创建DLX及死信队列
        $channel->exchange_declare("dlx_exchange", "direct", false, false, false);
        $channel->queue_declare("dlx_queue",false,true,false,false);
        $channel->queue_bind("dlx_queue","dlx_exchange","dlx_routing_key");

        //创建延迟队列
        $channel->exchange_declare("delay_exchange",'direct',false,false,false);

        $args = new AMQPTable();
        //消息过期方式:设置queue.normal队列中的消息,5s后过期
        $args->set('x-message-ttl',5000);
        //设置队列最大长度方式:x-max-length
        $args->set("x-max-length",1);

        $args->set('x-dead-letter-exchange','dlx_exchange');
        $args->set('x-dead-letter-routing-key',"dlx_routing_key");

        $channel->queue_declare("delay_queue",false,true,false,false,false,$args);

        $channel->queue_bind("delay_queue","delay_exchange","delay_routing_key");

        //模拟发送消息内容
        $messageBody = "该消息将在5s后发送到延迟队列(".date("h:i:s").")";
        //将我们需要的消息标记为持久化 - 通过设置AMQPMessage的参数delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT
        $message = new AMQPMessage($messageBody, array(
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        //绑定delay.routingKey路由
        $channel->basic_publish($message, "delay_exchange", "delay_routing_key");


        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
        return 'Send Success';
    }

    /**
     * 重试队列
     */
    public function retry()
    {
        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");
        //获取信道
        $channel = $connection->channel();


        //创建DLX及死信队列
        $channel->exchange_declare("dlx_exchange", "direct", false, false, false);
        $channel->queue_declare("dlx_queue", false, true, false, false);
        $channel->queue_bind("dlx_queue", "dlx_exchange", "dlx_routing_key");

        //创建延迟队列
        $channel->exchange_declare("delay_exchange", "direct", false, false, false);
        $args = new AMQPTable();
        // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期
        $args->set('x-message-ttl', 5000);
        // 设置队列最大长度方式: x-max-length
//        $args->set('x-max-length', 100);
        $args->set('x-dead-letter-exchange', 'dlx_exchange');
        $args->set('x-dead-letter-routing-key', 'dlx_routing_key');
        $channel->queue_declare("delay_queue", false, true, false, false, false, $args);
        $channel->queue_bind("delay_queue", "delay_exchange", "delay_routing_key");


        //模拟发送消息内容
        $messageBody = "该消息将在5s后发送到延迟队列(".date("h:i:s").")";
        //将我们需要的消息标记为持久化 - 通过设置AMQPMessage的参数delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT
        $message = new AMQPMessage($messageBody, array(
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        //设置重试次数,默认0
        $headers = new AMQPTable([
            "retry_nums" => 0
        ]);

        $message->set('application_headers', $headers);

        //绑定delay.routingKey路由
        $channel->basic_publish($message, "delay_exchange", "delay_routing_key");


        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
        return 'Send Success';
    }

    /**
     * 消费幂等
     * @return string
     */
    public function idempotent()
    {
        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");
        //获取信道
        $channel = $connection->channel();


        //创建DLX及死信队列
        $channel->exchange_declare("dlx_exchange1", "direct", false, false, false);
        $channel->queue_declare("dlx_queue1", false, true, false, false);
        $channel->queue_bind("dlx_queue1", "dlx_exchange1", "dlx_routing_key1");

        //创建延迟队列
        $channel->exchange_declare("delay_exchange1", "direct", false, false, false);
        $args = new AMQPTable();
        // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期
        $args->set('x-message-ttl', 5000);
        // 设置队列最大长度方式: x-max-length
        //$args->set('x-max-length', 1);
        $args->set('x-dead-letter-exchange', 'dlx_exchange1');
        $args->set('x-dead-letter-routing-key', 'dlx_routing_key1');
        $channel->queue_declare("delay_queue1", false, true, false, false, false, $args);
        $channel->queue_bind("delay_queue1", "delay_exchange1", "delay_routing_key1");


        //模拟发送消息内容
        $messageBody = "重复消息发送(".date("h:i:s").")";
        //将我们需要的消息标记为持久化 - 通过设置AMQPMessage的参数delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT
        $message = new AMQPMessage($messageBody, array(
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        //设置消息ID,防止重复消费
        $corr_id = uniqid();
        $headers = new AMQPTable([
            "correlation_id" => $corr_id
        ]);

        $message->set('application_headers', $headers);

        Cache::set($corr_id, $corr_id, 3600);

        //绑定delay.routingKey路由
        $channel->basic_publish($message, "delay_exchange1", "delay_routing_key1");

        //模拟发送重复消息
        $channel->basic_publish($message, "delay_exchange1", "delay_routing_key1");


        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
        return 'Send Success';
    }
}
2.消费者代码
2.1 消费幂等性代码
<?php

namespace app\command;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;

class Idempotentnd extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('idempotent')
            ->setDescription('the idempotent command');        
    }

    protected function execute(Input $input, Output $output)
    {
        //交换机名
        $exchange = "dlx_exchange1";

        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");
        //获取信道
        $channel = $connection->channel();
        //声明交换机
        $channel->exchange_declare($exchange, 'direct', false, false, false);
        $channel->queue_declare("dlx_queue1", false, true, false, false);
        $channel->queue_bind("dlx_queue1", $exchange, "dlx_routing_key1");

        //创建重试队列
        $channel->exchange_declare("delay_exchange1", "direct", false, false, false);
        $args = new AMQPTable();
        // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期
        $args->set('x-message-ttl', 5000);
        // 设置队列最大长度方式: x-max-length 
        //$args->set('x-max-length', 1);
        $args->set('x-dead-letter-exchange', 'dlx_exchange1');
        $args->set('x-dead-letter-routing-key', 'dlx_routing_key1');
        $channel->queue_declare("delay_queue1", false, true, false, false, false, $args);
        $channel->queue_bind("delay_queue1", "delay_exchange1", "delay_routing_key1");

        //消息消费
        $channel->basic_consume("dlx_queue1", '', false, false, false, false, function ($msg) use ($output, $channel)  {

            $msg_headers = $msg->get('application_headers')->getNativeData();
            $corr_id = $msg_headers['correlation_id'];

            //判断是否已经消费过
            if(Cache::get($corr_id) === null){
                $body = "该消息已消费,不再消费";
                $output->writeln(date("h:i:s") . $body .  PHP_EOL);
                //确认消息处理完成
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                return;
            }

            $body =  $msg->body;
            $output->writeln("生产者发送的消息:".date("h:i:s") . " Received " . $body .  PHP_EOL);

            //确认消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            Cache::delete($corr_id);

        });

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
}

2.2 消费者rpc代码
<?php
declare (strict_types = 1);

namespace app\command;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class RpcCommand extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('rpc')
            ->setDescription('the rpc command');        
    }

    protected function execute1(Input $input, Output $output)
    {
        $queue = "rpc_queue";

        $connection = new AMQPStreamConnection("localhost","5672","guest","guest","/");

        $channel = $connection->channel();

        $channel->queue_declare($queue,false,false,true,false);

        //公平调度
        $channel->basic_qos(null,1,null);

        $channel->basic_consume($queue,'',false,false,false,false,function ($msg) use($output){
           //接受rpc客户端接收的消息
            $output->writeln("Received:".$msg->body.PHP_EOL);

            //执行方法,回调队列
            $reply = new AMQPMessage(
              "rpc server replay message",
                    array(
                        "correlation_id"=>$msg->get('correlation_id')
                    )
            );

            $msg->delivery_info['channel']->basic_publish($reply,'',$msg->get('reply_to'));

            //确认消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        });

        while (count($channel->callbacks)){
            $channel->wait();
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();

    }

    protected function execute(Input $input, Output $output)
    {

        //队列名
        $queue = "rpc_queue";

        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");
        //获取信道
        $channel = $connection->channel();

        //声明创建队列
        $channel->queue_declare($queue, false, false, true, false);

        //公平调度
        $channel->basic_qos(null, 1, null);

        //消息消费
        $channel->basic_consume($queue, '', false, false, false, false, function ($msg) use ($output)  {

            //接收到RPC客户端收到的消息
            $output->writeln(" Received " . $msg->body .  PHP_EOL);


            //执行方法,回调队列
            $reply = new AMQPMessage(
                "rpc server replay  message",
                array('correlation_id' => $msg->get('correlation_id'))
            );

            $msg->delivery_info['channel']->basic_publish(
                $reply, '', $msg->get('reply_to'));


            //确认消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

        });

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();

    }
}

2.3 消费者消费重试
<?php
declare (strict_types = 1);

namespace app\command;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class RetryCommand extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('retry')
            ->setDescription('the retry command');        
    }

    protected function execute(Input $input, Output $output)
    {
        //交换机名
        $exchange = "dlx_exchange";

        //建立连接
        $connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");
        //获取信道
        $channel = $connection->channel();
        //声明交换机
        $channel->exchange_declare($exchange, 'direct', false, false, false);
        $channel->queue_declare("dlx_queue", false, true, false, false);
        $channel->queue_bind("dlx_queue", $exchange, "dlx_routing_key");

        //创建重试队列
        $channel->exchange_declare("delay_exchange", "direct", false, false, false);

        //消息消费
        $channel->basic_consume("dlx_queue", '', false, false, false, false, function ($msg)use($output,$channel){
            $body =  $msg->body;

            $output->writeln(date("h:i:s") . " Received " . $body .  PHP_EOL);

            //确认消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

            $msg_headers = $msg->get('application_headers')->getNativeData();

            //重试次数超过3次,则入库告警
            if (intval($msg_headers['retry_nums']) > 3) {
                $body = "重试次数超过3次,则入库告警";
                $output->writeln(date("h:i:s") . " Error " . $body .  PHP_EOL);
            } else {

                //重试次数加1
                $headers = new AMQPTable([
                    "retry_nums" => intval($msg_headers['retry_nums']) + 1
                ]);
                $msg->set('application_headers', $headers);

                //放回重试队列
                $channel->basic_publish($msg, "delay_exchange", "delay_routing_key");
            }

        });

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
}

2.4 消费者直接交换机代码
<?php
namespace app\command;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Direct1 extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('directRabbitMQ1')
            ->setDescription('the directRabbitMQ1 command');        
    }

    protected function execute(Input $input, Output $output)
    {
        //交换机名
        $exchange = "direct_logs";

        //建立连接
        $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');
        //获取信道
        $channel = $connection->channel();
        //声明交换机
        $channel->exchange_declare($exchange, 'direct', false, false, false);

        //声明创建队列
        //队列名称为空时,会生成一个随机名称队列
        list($queue, ,) = $channel->queue_declare('', false, false, true, false);

        //绑定交换机与队列,并指定路由info
        $channel->queue_bind($queue, $exchange, 'info');
        //绑定交换机与队列,并指定路由error
        $channel->queue_bind($queue, $exchange, 'error');
        //绑定交换机与队列,并指定路由warning
        $channel->queue_bind($queue, $exchange, 'warning');

        //消息消费
        $channel->basic_consume($queue, '', false, false, false, false, function ($msg) use ($output)  {
            //模拟耗时
            sleep(3);
            $output->writeln(" Received " . $msg->body .  PHP_EOL);
            //确认消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

        });

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
}

2.5 基础代码
<?php
namespace app\command;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\Output;

class Rabbitmq extends Command
{
    public function configure()
    {
        //指令配置
        $this->setName("mq")->setDescription("the mq command");
    }

    protected function execute(Input $input, Output $output)
    {
        //队列名
        $queue = "hello_durable_true";

        //建立连接
        $connection = new AMQPStreamConnection("localhost","5672","guest","guest");

        //获取信道
        $channel = $connection->channel();

        //声明创建队列
        //生产者和消费者对queue的声明函数里,这个durable必须保持一致。否则报错PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'
        $channel->queue_declare($queue,false,true,false,false);

        //消息消费
        $channel->basic_consume($queue,'',false,false,false,
            false,function ($msg) use ($output){
            sleep(3);
            //输出到终端
            $output->writeln("Received".$msg->body.PHP_EOL);

            //确认消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        });

        while (count($channel->callbacks)){
            $channel->wait();
        }

        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();
    }
}

网站文章