Kafka

Useful Kafka commands,


#! /bin/bash
export BOOTSTRAP=localhost:9092
kafka-topics --bootstrap-server $BOOTSTRAP --describe --topic my-events
kafka-console-consumer --bootstrap-server $BOOTSTRAP --topic my-events --from-beginning
kafka-consumer-groups --bootstrap-server $BOOTSTRAP --describe --group my-group

  • When updating Kafka versions, update consumers before producers
  • Can regex on topics, so use something like company.service.context.message
  • Single writer per topic
  • In Kafka, unlike traditional messaging systems, you commit offsets instead of acking individual messages. If you fail to process record 30 and succeed processing 31, you should not commit 31, this would result in committing all records up to 31 including 30.
  • On offset backup in downstream service, consider rate limiting clients to shed traffic
  • For maximum reliability, set a replication factor of 3 (or 5 if you're really paranoid), unclean.leader.election.enable=false, min.insync.replicas=2, acks=all, retry until either success or message no longer makes sense, only commit offset after completely processed, set auto.offset.reset=earliest and idempotently consume, enable.auto.commit=false
    • Usually, setting the number of retries to zero is not an option in a reliable system, so if guaranteed order is critical, set max.inflight.requests.per.connection=1 to make sure that while a batch of messages is retrying, additional messages will not be sent (because this has the potential to reverse the correct order). This will severely limit the throughput of the producer, so only use this when order is important.
  • Error recovery stategies
    • Fail fast: stop processing stream on error
    • Ignore: commit offset and continue processing
    • Retry: continuously retry and don't update offset until either success or message no longer makes sense
    • Saga: compensating transactions for services upstream of error
    • DLQ: route messages to a monitored deadletter topic, include headers with details

      company.error.topic=my-topic
      company.error.partition=0
      company.error.offset=94
      company.error.task.id=0
      company.error.stage=VALUE_CONVERTER
      company.error.class.name=org.apache.kafka.connect.json.JsonConverter
      company.error.exception.class.name=org.apache.kafka.connect.errors.DataException
      company.error.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error

  • Local transactions via the "Listen to Yourself" pattern as an alternative to Kafka Connect listening to a database log
  • Exactly once introduces ~100ms of latency, but other than that is great, prevents you from having to do idemponent checks
  • 30 partition default (also heard 32/64 is standard, don't need much higher.)

    It's clear that you want many partitions but not too many. If you have some estimate regarding the target throughput of the topic and the expected throughput of the consumers, you can divide the target throughput by the expected consumer throughput and derive the number of partitions this way. So if I want to be able to write and read 1 GB/sec from a topic, and I know each consumer can only process 50 MB/s, then I know I need at least 20 partitions. This way, I can have 20 consumers reading from the topic and achieve 1 GB/sec. If you don't have this detailed information, our experience suggests that limiting the size of the partition on the disk to less than 6 GB per day of retention often gives satisfactory results. Source

  • For small datasets, stored in compacted topics, it's useful to reduce the default segment size (log.segment.bytes) as the most recent segment of a compacted topic is never compacted.
  • As a rule of thumb, we recommend each broker to have up to 4,000 partitions and each cluster to have up to 200,000 partitions. The main reason for the latter cluster-wide limit is to accommodate for the rare event of a hard failure of the controller, one may need some additional configuration tuning with more partitions. We plan to make further improvements to support millions of partitions in a Kafka cluster. Source

  • Topics that are produced with keyed messages can be very difficult to add partitions to from a consumer's point of view. This is because the mapping of keys to partitions will change when the number of partitions is changed. For this reason, it is advisable to set the number of partitions for a topic that will contain keyed messages once, when the topic is created, and avoid resizing the topic.

  • Metrics: under replicated partition is the most important, run preferred replace election, if that doesn't fix it, look for broker down. If the number of underreplicated partitions is fluctuating, or if the number is steady but there are no brokers offline, this typically indicates a performance issue in the cluster.
  • When to split/combine topics
    1. The most important rule is that any events that need to stay in a fixed order must go in the same topic (and they must also use the same partitioning key). Most commonly, the order of events matters if they are about the same entity. So, as a rule of thumb, we could say that all events about the same entity need to go in the same topic.
    2. As a rule of thumb, if you care about latency, you should probably aim for (order of magnitude) hundreds of topic-partitions per broker node. If you have tens of thousands, or even thousands of partitions per node, your latency will suffer. That performance argument provides some guidance for designing your topic structure: if you're finding yourself with many thousands of topics, it would be advisable to merge some of the fine-grained, low-throughput topics into coarser-grained topics, and thus reduce the proliferation of partitions.
  • Number of partitions for a topic can only be increased, never decreased
  • If you set fetch.max.wait.ms=100ms and fetch.min.bytes=1mb, Kafka will receive a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.
  • When the producer receives an error message from the server, the error could be transient (e.g., a lack of leader for a partition). In this case, the value of the retries parameter will control how many times the producer will retry sending the message before giving up and notifying the client of an issue. By default, the producer will wait 100ms between retries, but you can control this using the retry.backoff.ms parameter. We recommend testing how long it takes to recover from a crashed broker (i.e., how long until all partitions get new leaders) and setting the number of retries and delay between them such that the total amount of time spent retrying will be longer than the time it takes the Kafka cluster to recover from the crash—otherwise, the producer will give up too soon. Not all errors will be retried by the producer. Some errors are not transient and will not cause retries (e.g., "message too large" error). In general, because the producer handles retries for you, there is no point in handling retries within your own application logic. You will want to focus your efforts on handling nonretriable errors or cases where retry attempts were exhausted.
  • The client can get read-your-write semantics from any node by providing the timestamp of a write as part of its query—a serving node receiving such a query will compare the desired timestamp to its own index point and if necessary delay the request until it has indexed up to at least that time to avoid serving stale data.
  • One of the trickier things a distributed system must do is handle restoring failed nodes or moving partitions from node to node. A typical approach would have the log retain only a fixed window of data and combine this with a snapshot of the data stored in the partition. It is equally possible for the log to retain a complete copy of data and garbage collect the log itself. This moves a significant amount of complexity out of the serving layer, which is system-specific, and into the log, which can be general purpose.
  • Many actor systems by default maintain actor state and messages in memory, so they are lost if the machine running the actor crashes. Thus, such actor systems are unsuitable for maintaining derived data.

Stay up to date

Get notified when I publish. Unsubscribe at any time.