PHP使用kafka的操作是什么?

PHP 作者:绿色软件园 2023-03-24 18:56:57

 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这篇文章主要给大家介绍PHP中使用kafka的操作,文中示例代码介绍的非常详细,对大家学习和理解kafka的使用有一定的帮助,感兴趣的朋友接下来一起跟随小编看看吧。

    本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

    以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

<?phpnamespace App\Tools; use Illuminate\Config\Repository; use Illuminate\Support\Facades\DB;use Monolog\Logger;use Monolog\Handler\StreamHandler; use Illuminate\Http\Request; class Kafka{  public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka  public $topic = 'test';//管道名称  public $partition = 0;   protected $producer = null;  protected $consumer = null;   public function __construct()  {    if (empty($this->broker_list)) {      throw new InvalidConfigException("broker not config");    }    $rk = new \RdKafka\Producer();    if (empty($rk)) {      throw new InvalidConfigException("producer error");    }    $rk->setLogLevel(LOG_DEBUG);    if (!$rk->addBrokers($this->broker_list)) {      throw new InvalidConfigException("producer error");    }    $this->producer = $rk;  }   /**   * 生产者   * @param array $messages   * @return mixed   */  public function send($messages = [],$topic)  {    $topic = $this->producer->newTopic($topic);    return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));  }   /**   * 消费者   */  public function consumer($object, $callback){    $conf = new \RdKafka\Conf();    $conf->set('group.id', 0);    $conf->set('metadata.broker.list', $this->broker_list);     $topicConf = new \RdKafka\TopicConf();    $topicConf->set('auto.offset.reset', 'smallest');     $conf->setDefaultTopicConf($topicConf);     $consumer = new \RdKafka\KafkaConsumer($conf);     $consumer->subscribe([$this->topic]);     echo "waiting for messages.....\n";    while(true) {      $message = $consumer->consume(120*1000);      switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:          echo "message payload....";          $object->$callback($message->payload);          break;      }      sleep(1);    }  }}?>

    在控制器中如何使用:

    首先再头部导入这个类:use App\Tools\Kafka;

    下面是使用生产者实例:

public function test(){    $topic = 'tool';//输入使用管道名称   $data['shop_id'] = 58;   $data['bar_code']=586;   $data['goods_num'] = 1;   $data['goods_unit'] = '个'; $Kafka = new Kafka();$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换jsonvar_dump($Error_Msg);    }

    下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

<?php $conf = new RdKafka\Conf(); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf);$rk->addBrokers("localhost:9092"); $topicConf = new RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'file');$topicConf->set('offset.store.path', sys_get_temp_dir());$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("tool", $topicConf);//读取的管道 // Start consuming partition 0$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) {  $message = $topic->consume(0, 120*10000);  switch ($message->err) {    case RD_KAFKA_RESP_ERR_NO_ERROR:    //没有错误打印信息      $message = json_decode(json_encode($message),true);      $data = json_decode($message['payload'],true);      var_dump($data);      break;    case RD_KAFKA_RESP_ERR__PARTITION_EOF:      echo "等待接收信息\n";      break;    case RD_KAFKA_RESP_ERR__TIMED_OUT:      echo "超时\n";      break;    default:      throw new \Exception($message->errstr(), $message->err);      break;  } sleep(1);} ?>


关注公众号:拾黑(shiheibook)了解更多

友情链接:

下软件就上简单下载站:https://www.jdsec.com/
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/

公众号 关注网络尖刀微信公众号
随时掌握互联网精彩
赞助链接