濮阳杆衣贸易有限公司

主頁 > 知識庫 > Laravel中Kafka的使用詳解

Laravel中Kafka的使用詳解

熱門標(biāo)簽:常州地圖標(biāo)注服務(wù)商 衡水外呼系統(tǒng)平臺 新河科技智能外呼系統(tǒng)怎么樣 福州人工外呼系統(tǒng)哪家強 百度商鋪地圖標(biāo)注 地圖標(biāo)注平臺怎么給錢注冊 注冊400電話申請 安裝電銷外呼系統(tǒng) 釘釘打卡地圖標(biāo)注

本文并沒有kafka的安裝教程,本文是針對已經(jīng)安裝kafka及其配置好kafka的php拓展并且使用laravel框架進(jìn)行開發(fā)項目,配置一個可供laravel框架使用的生產(chǎn)及消費者類.

以下代碼修改自本站的YII框架關(guān)于kafka類的代碼,經(jīng)過測試使用在本人的項目中,可正常運行,larvael版本:5.6 代碼放置larvael框架位置:app/Tools/Kafka.php

?php
namespace App\Tools;
 
use Illuminate\Config\Repository;
 
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
 
use Illuminate\Http\Request;
 
class Kafka
{
  public $broker_list = '127.0.0.1';//配置kafka,可以用逗號隔開多個kafka
  public $topic = 'test';//管道名稱
  public $partition = 0;
 
  protected $producer = null;
  protected $consumer = null;
 
  public function __construct()
  {
    if (empty($this->broker_list)) {
      throw new InvalidConfigException("broker not config");
    }
    $rk = new \RdKafka\Producer();
    if (empty($rk)) {
      throw new InvalidConfigException("producer error");
    }
    $rk->setLogLevel(LOG_DEBUG);
    if (!$rk->addBrokers($this->broker_list)) {
      throw new InvalidConfigException("producer error");
    }
    $this->producer = $rk;
  }
 
  /**
   * 生產(chǎn)者
   * @param array $messages
   * @return mixed
   */
  public function send($messages = [],$topic)
  {
    $topic = $this->producer->newTopic($topic);
    return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  }
 
  /**
   * 消費者
   */
  public function consumer($object, $callback){
    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 0);
    $conf->set('metadata.broker.list', $this->broker_list);
 
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
 
    $conf->setDefaultTopicConf($topicConf);
 
    $consumer = new \RdKafka\KafkaConsumer($conf);
 
    $consumer->subscribe([$this->topic]);
 
    echo "waiting for messages.....\n";
    while(true) {
      $message = $consumer->consume(120*1000);
      switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
          echo "message payload....";
          $object->$callback($message->payload);
          break;
      }
      sleep(1);
    }
  }
}
?>

在控制器中如何使用:

首先再頭部導(dǎo)入這個類:use App\Tools\Kafka;

下面是使用生產(chǎn)者實例:

public function test(){
 
   $topic = 'tool';//輸入使用管道名稱
   $data['shop_id'] = 58;
   $data['bar_code']=586;
   $data['goods_num'] = 1;
   $data['goods_unit'] = '個';
 
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//傳入數(shù)組會自動轉(zhuǎn)換json
var_dump($Error_Msg);
 
 
  }

下面是消費者實例,消費者我這里使用了的是php腳本進(jìn)行的操作:

?php
 
$conf = new RdKafka\Conf();
 
$conf->set('group.id', 'myConsumerGroup');
 
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
 
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
 
$topic = $rk->newTopic("tool", $topicConf);//讀取的管道
 
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 
while (true) {
  $message = $topic->consume(0, 120*10000);
  switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //沒有錯誤打印信息
      $message = json_decode(json_encode($message),true);
      $data = json_decode($message['payload'],true);
      var_dump($data);
      break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "等待接收信息\n";
      break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "超時\n";
      break;
    default:
      throw new \Exception($message->errstr(), $message->err);
      break;
  }
 sleep(1);
}
 
?>

到此這篇關(guān)于Laravel中Kafka的使用詳解的文章就介紹到這了,更多相關(guān)Laravel中Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • 淺談Laravel中使用Slack進(jìn)行異常通知
  • 如何用Laravel包含你自己的幫助函數(shù)
  • 詳解Laravel框架的依賴注入功能
  • php+laravel 掃碼二維碼簽到功能
  • laravel的數(shù)據(jù)表填充器使用詳解
  • laravel ajax curd 搜索登錄判斷功能的實現(xiàn)
  • laravel使用redis隊列實例講解
  • Laravel的加密解密與哈希實例講解
  • Laravel中10個有用的用法小結(jié)
  • 分析五個Laravel Dusk的使用技巧

標(biāo)簽:克拉瑪依 唐山 遼陽 鶴崗 鷹潭 柳州 白城 六安

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Laravel中Kafka的使用詳解》,本文關(guān)鍵詞  Laravel,中,Kafka,的,使用,詳解,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《Laravel中Kafka的使用詳解》相關(guān)的同類信息!
  • 本頁收集關(guān)于Laravel中Kafka的使用詳解的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    新密市| 浦江县| 遂昌县| 定结县| 芦山县| 谷城县| 隆昌县| 郴州市| 北碚区| 施秉县| 广东省| 惠来县| 渝北区| 安化县| 环江| 平塘县| 四子王旗| 永宁县| 黄梅县| 永年县| 乐平市| 桂东县| 双峰县| 宁晋县| 喀喇沁旗| 六枝特区| 岑巩县| 安岳县| 苗栗县| 高邑县| 景谷| 博白县| 许昌县| 天津市| 天峻县| 临清市| 玛纳斯县| 象州县| 巨鹿县| 新巴尔虎左旗| 吉木萨尔县|