Today i encountered a problem in which i need to know what is the batch size for a TransactionalTridentKafkaSpout.
The default output field by KafkaSpout is "str", which can be of variable length. By some simple thought, it is reasonable that kafka spout will set the batch size dynamically based on the actual sizes of the "str" messages it retrieves in a batch. In other words, i suspect that the batch size may be determined by the following pseudo codes:
batchSize = 0;
totalBytesConsumed=0;
while(totalBytesConsumed < maxBytesConsumed)
{
totalBytesConsumed+= getMessageSize("str");
batchSize ++;
}
After quickly read through trident-kafka's source codes on TransactionalTridentKafkaSpout and trace down to the following classes and their methods:
TransactionalTridentKafkaSpout.getEmitter(...)
TridentKafkaEmitter.fetchMessages(...)
KafkaUtils.fetchMessages(...)
by the builder.addFetch(...) line in "KafkaUtils.fetchMessages(...)", it looks like the batch size is determine by a variable defined in KafkaConfig.fetchSizeBytes, which is defaulted to 1024 * 1024.
I also noticed another variable KafkaConfig.bufferSizeBytes, which is used by the SimpleConsumer class, this is also defaulted to 1024 * 1024.
Therefore i suspect that the batch size of the kafka spout depends on both Math.Min(KafkaConfig.bufferSizeBytes, kafkaConfig.fetchSizeBytes).
After some googling, I noticed for handling huge data in Kafka, the following settings have been used:
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
No comments:
Post a Comment