node.js - kafka-node does not receive messages in real time -
i followed quick start guide: http://kafka.apache.org/documentation.html#quickstart , write consumer in node.js. topic 'test' created, can use kafka-console-produce.sh , receive messages kafka-console-consumer.sh
i wrote simple consumer (live.js):
var kafka = require('kafka-node'), client = new kafka.client('localhost:2181/'), consumer = new kafka.consumer(client, [{'topic': 'test', partition: 0}], {autocommit: true}); client.on('ready', function(){ console.log('client ready!'); }); console.log(client); console.log(consumer); consumer.on('error', function (err) { console.log("kafka error: consumer - " + err); }); consumer.on('offsetoutofrange', function (err){ console.log("kafka offsetoutofrange: " + err); }); consumer.on('message', function(message){ console.log(message); });
while running node live.js
receive sent messages. when live.js running , produce message kafka-supplied script, message not received live.js (but consumer script shipped kafka). after restarting live.js, receive messages, in 'real time'. use default configuration, here logs live.js:
eventemitter { connectionstring: 'localhost:2181/', clientid: 'kafka-node-client', zkoptions: undefined, noackbatchoptions: undefined, brokers: {}, longpollingbrokers: {}, topicmetadata: {}, topicpartitions: {}, correlationid: 0, _socketid: 0, cbqueue: {}, brokermetadata: {}, ready: false, zk: eventemitter { client: client { domain: null, _events: [object], _eventscount: 3, _maxlisteners: undefined, connectionmanager: [object], options: [object], state: [object] }, _events: { init: [object], brokerschanged: [function], disconnected: [object], error: [function] }, _eventscount: 4 }, _events: { ready: [ [function], [function] ], error: [function], close: [function], brokerschanged: [function] }, _eventscount: 4 } eventemitter { fetchcount: 0, client: eventemitter { connectionstring: 'localhost:2181/', clientid: 'kafka-node-client', zkoptions: undefined, noackbatchoptions: undefined, brokers: {}, longpollingbrokers: {}, topicmetadata: {}, topicpartitions: {}, correlationid: 0, _socketid: 0, cbqueue: {}, brokermetadata: {}, ready: false, zk: eventemitter { client: [object], _events: [object], _eventscount: 4 }, _events: { ready: [object], error: [function], close: [function], brokerschanged: [function] }, _eventscount: 4 }, options: { autocommit: true, groupid: 'kafka-node-group', autocommitmsgcount: 100, autocommitintervalms: 5000, fetchmaxwaitms: 100, fetchminbytes: 1, fetchmaxbytes: 1048576, fromoffset: false, encoding: 'utf8' }, ready: false, paused: undefined, id: 0, payloads: [ { topic: 'test', partition: 0, offset: 0, maxbytes: 1048576, metadata: 'm' } ], _events: { done: [function] }, _eventscount: 1, encoding: 'utf8' } client ready!
--edit--
after stopping live.js , starting again zookeeper log shows following:
[2016-01-27 15:53:20,135] info accepted socket connection /127.0.0.1:38166 (org.apache.zookeeper.server.nioservercnxnfactory) [2016-01-27 15:53:20,139] warn connection request old client /127.0.0.1:38166; dropped if server in r-o mode (org.apache.zookeeper.server.zookeeperserver) [2016-01-27 15:53:20,140] info client attempting establish new session @ /127.0.0.1:38166 (org.apache.zookeeper.server.zookeeperserver) [2016-01-27 15:53:20,166] info established session 0x1528384e45e0007 negotiated timeout 30000 client /127.0.0.1:38166 (org.apache.zookeeper.server.zookeeperserver)
try use highlevel consumer works until have specific needs. using following options highlevel consumer
{ groupid: "consumer group", // auto commit config autocommit: true, autocommitmsgcount: 100, autocommitintervalms: 5000, // fetch message config fetchmaxwaitms: 100, fetchminbytes: 1, fetchmaxbytes: 1024 * 10, fromoffset: true, frombeginning: false, //to stop reading beggening encoding:'utf8' }
Comments
Post a Comment