node.js - kafka-node does not receive messages in real time -


i followed quick start guide: http://kafka.apache.org/documentation.html#quickstart , write consumer in node.js. topic 'test' created, can use kafka-console-produce.sh , receive messages kafka-console-consumer.sh

i wrote simple consumer (live.js):

var kafka = require('kafka-node'),     client = new kafka.client('localhost:2181/'),     consumer = new kafka.consumer(client,                                   [{'topic': 'test', partition: 0}],                                   {autocommit: true});  client.on('ready', function(){   console.log('client ready!'); });  console.log(client); console.log(consumer);  consumer.on('error', function (err) {   console.log("kafka error: consumer - " + err); });  consumer.on('offsetoutofrange', function (err){   console.log("kafka offsetoutofrange: " + err); });  consumer.on('message', function(message){   console.log(message); }); 

while running node live.js receive sent messages. when live.js running , produce message kafka-supplied script, message not received live.js (but consumer script shipped kafka). after restarting live.js, receive messages, in 'real time'. use default configuration, here logs live.js:

eventemitter {   connectionstring: 'localhost:2181/',   clientid: 'kafka-node-client',   zkoptions: undefined,   noackbatchoptions: undefined,   brokers: {},   longpollingbrokers: {},   topicmetadata: {},   topicpartitions: {},   correlationid: 0,   _socketid: 0,   cbqueue: {},   brokermetadata: {},   ready: false,   zk:     eventemitter {      client:        client {         domain: null,         _events: [object],         _eventscount: 3,         _maxlisteners: undefined,         connectionmanager: [object],         options: [object],         state: [object] },      _events:        { init: [object],         brokerschanged: [function],         disconnected: [object],         error: [function] },      _eventscount: 4 },   _events:     { ready: [ [function], [function] ],      error: [function],      close: [function],      brokerschanged: [function] },   _eventscount: 4 } eventemitter {   fetchcount: 0,   client:     eventemitter {      connectionstring: 'localhost:2181/',      clientid: 'kafka-node-client',      zkoptions: undefined,      noackbatchoptions: undefined,      brokers: {},      longpollingbrokers: {},      topicmetadata: {},      topicpartitions: {},      correlationid: 0,      _socketid: 0,      cbqueue: {},      brokermetadata: {},      ready: false,      zk: eventemitter { client: [object], _events: [object], _eventscount: 4 },      _events:        { ready: [object],         error: [function],         close: [function],         brokerschanged: [function] },      _eventscount: 4 },   options:     { autocommit: true,      groupid: 'kafka-node-group',      autocommitmsgcount: 100,      autocommitintervalms: 5000,      fetchmaxwaitms: 100,      fetchminbytes: 1,      fetchmaxbytes: 1048576,      fromoffset: false,      encoding: 'utf8' },   ready: false,   paused: undefined,   id: 0,   payloads:     [ { topic: 'test',        partition: 0,        offset: 0,        maxbytes: 1048576,        metadata: 'm' } ],   _events: { done: [function] },   _eventscount: 1,   encoding: 'utf8' } client ready! 

--edit--

after stopping live.js , starting again zookeeper log shows following:

[2016-01-27 15:53:20,135] info accepted socket connection /127.0.0.1:38166 (org.apache.zookeeper.server.nioservercnxnfactory) [2016-01-27 15:53:20,139] warn connection request old client /127.0.0.1:38166; dropped if server in r-o mode (org.apache.zookeeper.server.zookeeperserver) [2016-01-27 15:53:20,140] info client attempting establish new session @ /127.0.0.1:38166 (org.apache.zookeeper.server.zookeeperserver) [2016-01-27 15:53:20,166] info established session 0x1528384e45e0007 negotiated timeout 30000 client /127.0.0.1:38166 (org.apache.zookeeper.server.zookeeperserver) 

try use highlevel consumer works until have specific needs. using following options highlevel consumer

            {                 groupid: "consumer group",             // auto commit config                 autocommit: true,                 autocommitmsgcount: 100,                 autocommitintervalms: 5000,             // fetch message config                 fetchmaxwaitms: 100,                 fetchminbytes: 1,                 fetchmaxbytes: 1024 * 10,                 fromoffset: true,                 frombeginning: false, //to stop reading beggening                 encoding:'utf8'             } 

Comments

Popular posts from this blog

php - Wordpress website dashboard page or post editor content is not showing but front end data is showing properly -

How to get the ip address of VM and use it to configure SSH connection dynamically in Ansible -

javascript - Get parameter of GET request -