no-kafka-slim
no-kafka-slim is Apache Kafka 0.9 client for Node.js with new unified consumer API support. This module is a direct fork of oleksiyk/kafka, but with removed depency for Snappy (due to various problems on Windows).
All methods will return a promise
Using
- download and install Kafka
- create your test topic:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic kafka-test-topic --partitions 3 --replication-factor 1
- install no-kafka-slim
npm install no-kafka-slim
Producer
Example:
var Kafka = ;var producer = ; return producer;
Send and retry if failed 2 times with 100ms delay:
return producer;
Accumulate messages into single batch until their total size is >= 1024 bytes or 100ms timeout expires (overwrite Producer constructor options):
producer;producer;
Please note, that if you pass different options to the send()
method then these messages will be grouped into separate batches:
// will be sent in batch 1producer;
Producer options:
requiredAcks
- require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).timeout
- timeout in ms for produce requestclientId
- ID of this client, defaults to 'no-kafka-client'connectionString
- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'partitioner
- function used to determine topic partition for message. If message already specifies a partition, the partitioner won't be used. The partitioner function receives 3 arguments: the topic name, an array with topic partitions, and the message (useful to partition by key, etc.).partitioner
can be sync or async (return a Promise).retries
- controls number of attempts at delay between them when produce request failsattempts
- number of total attempts to send the message, defaults to 3delay
- delay in ms between retries, defaults to 1000
codec
- compression codec, one of Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_GZIPbatch
- control batching (grouping) of requestssize
- group messages together into single batch until their total size exceeds this value, defaults to 16384 bytes. Set to 0 to disable batching.maxWait
- send grouped messages after this amount of milliseconds expire even if their total size doesn't exceedbatch.size
yet, defaults to 10ms. Set to 0 to disable batching.
asyncCompression
- boolean, use asynchronouse compression instead of synchronous, defaults tofalse
SimpleConsumer
Manually specify topic, partition and offset when subscribing. Suitable for simple use cases.
Example:
var consumer = ; // data handler function can return a Promisevar { messageSet;}; return consumer;
Subscribe (or change subscription) to specific offset and limit maximum received MessageSet size:
consumer
Subscribe to latest or earliest offsets in the topic/parition:
consumerconsumer
Subscribe to all partitions in a topic:
consumer
Commit offset(s) (V0, Kafka saves these commits to Zookeeper)
consumer
Fetch commited offset(s)
consumer;
SimpleConsumer options
groupId
- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0'timeout
- timeout for fetch requests, defaults to 100msidleTimeout
- timeout between fetch calls, defaults to 1000msminBytes
- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 bytemaxBytes
- maximum size of messages in a fetch responseclientId
- ID of this client, defaults to 'no-kafka-client'connectionString
- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'recoveryOffset
- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSETasyncCompression
- boolean, use asynchronouse decompression instead of synchronous, defaults tofalse
GroupConsumer (new unified consumer API)
Specify an assignment strategy (or use no-kafka built-in consistent or round robin assignment strategy) and subscribe by specifying only topics. Elected group leader will automatically assign partitions between all group members.
Example:
var Promise = ;var consumer = ; var { return Promise;}; var strategies = strategy: 'TestStrategy' subscriptions: 'kafka-test-topic' handler: dataHandler; consumer; // all done, now wait for messages in dataHandler
Assignment strategies
no-kafka provides three built-in strategies:
Kafka.WeightedRoundRobinAssignment
weighted round robin assignment (based on wrr-pool).Kafka.ConsistentAssignment
which is based on a consistent hash ring and so provides consistent assignment across consumers in a group based on suppliedmetadata.id
andmetadata.weight
options.Kafka.RoundRobinAssignment
simple assignment strategy (default).
Using Kafka.WeightedRoundRobinAssignment
:
var strategies = strategy: 'TestStrategy' subscriptions: 'kafka-test-topic' metadata: weight: 4 fn: KafkaWeightedRoundRobinAssignment handler: dataHandler;// consumer.init(strategies)....
Using Kafka.ConsistentAssignment
:
var strategies = strategy: 'TestStrategy' subscriptions: 'kafka-test-topic' metadata: id: processargv2 || 'consumer_1' weight: 50 fn: KafkaConsistentAssignment handler: dataHandler;// consumer.init(strategies)....
Note that each consumer in a group should have its own and consistent metadata.id.
Using Kafka.RoundRobinAssignment
(default in no-kafka):
var strategies = strategy: 'TestStrategy' subscriptions: 'kafka-test-topic' handler: dataHandler;// consumer.init(strategies)....
You can also write your own assignment strategy function and provide it as fn
option of the strategy item.
GroupConsumer options
groupId
- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0.9'timeout
- timeout for fetch requests, defaults to 100msidleTimeout
- timeout between fetch calls, defaults to 1000msminBytes
- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 bytemaxBytes
- maximum size of messages in a fetch responseclientId
- ID of this client, defaults to 'no-kafka-client'connectionString
- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'sessionTimeout
- session timeout in ms, min 6000, max 30000, defaults to15000
heartbeatTimeout
- delay between heartbeat requests in ms, defaults to1000
retentionTime
- offset retention time in ms, defaults to 1 day (24 * 3600 * 1000)startingOffset
- starting position (time) when there is no commited offset, defaults toKafka.LATEST_OFFSET
recoveryOffset
- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSETasyncCompression
- boolean, use asynchronouse decompression instead of synchronous, defaults tofalse
GroupAdmin (consumer groups API)
Offes two methods:
listGroups
- list existing consumer groupsdescribeGroup
- describe existing group by its id
Example:
var admin = ; return admin;
Compression
no-kafka supports Gzip compression.
Enable compression in Producer:
var Kafka = ; var producer = clientId: 'producer' codec: KafkaCOMPRESSION_GZIP // Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_GZIP;
Alternatively just send some messages with specified compression codec (overwrites codec set in contructor):
return producer
By default no-kafka will use synchronous compression and decompression (synchronous Gzip is not availble in node < 0.11).
Enable async compression/decompression with asyncCompression
options:
Producer:
var producer = clientId: 'producer' asyncCompression: true codec: KafkaCOMPRESSION_GZIP;
Consumer:
var consumer = idleTimeout: 100 clientId: 'simple-consumer' asyncCompression: true;
Logging
You can differentiate messages from several instances of producer/consumer by providing unique clientId
in options:
var consumer1 = clientId: 'group-consumer-1';var consumer2 = clientId: 'group-consumer-2';
=>
2016-01-12T07:41:57.884Z INFO group-consumer-1 ....
2016-01-12T07:41:57.884Z INFO group-consumer-2 ....
Change the logging level:
var consumer = clientId: 'group-consumer' logger: logLevel: 1 // 0 - nothing, 1 - just errors, 2 - +warnings, 3 - +info, 4 - +debug, 5 - +trace ;
Send log messages to Logstash server(s) via UDP:
var consumer = clientId: 'group-consumer' logger: logstash: enabled: true connectionString: '10.0.1.1:9999,10.0.1.2:9999' app: 'myApp-kafka-consumer' ;
You can overwrite the function that outputs messages to stdout/stderr:
var consumer = clientId: 'group-consumer' logger: logFunction: consolelog ;