1、首先安装kafka扩展
创新互联公司是一家集网站建设,江阴企业网站建设,江阴品牌网站建设,网站定制,江阴网站建设报价,网络营销,网络优化,江阴网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。#安装librdkafka: 版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2 $ git clone https://github.com/edenhill/librdkafka.git $ ./configure $ make $ sudo make install #安装 rdkafka.so 版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1 $ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd php-rdkafka $ phpize $ ./configure $ make all -j 5 $ sudo make install2、生产者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smallest');
$rk = new RdKafka\Producer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname
for($i = 0; $i < 10; $i++) {
$topic->produce(0,0,'test' . $i);
}
3、消费者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion
$cf = new RdKafka\TopicConf();
$cf->set('auto.offset.reset', 'smallest');
$cf->set('auto.commit.enable', true);
$rk = new RdKafka\Consumer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname,topicobject
$topic->consumeStart(0,10); //partition,offset
$msg = $topic->consume(0, 1000); //partition,timeout
var_dump($msg);
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。