Near Real-Time Synchronization with Kafka Connect and Kafka Streams — Part 2

Stephan Weinwurm
willhaben Tech Blog
9 min readJan 27, 2020

--

This is the second part of the blog posts about how we built a near real-time synchronization mechanism for classified ad data here at willhaben.

To recap, at willhaben, we manage more than 7 million active classified ads and we needed a mechanism to reliably sync the ad details among multiple services to avoid a tight coupling between them. We recommend reading Part 1 first, as it explains the project’s background.

Photo by Diego Jimenez on Unsplash

Part 1 of this blog post stopped just as we had written all audit table entries as events into Kafka, containing adId, adAuditId, and auditDate. Now, we want to look at how we enrich the events with the ad details and send them onward to the final Kafka topic.

Kafka Streams for Fetching Ad Details

Kafka Streams is part of the Confluence platform and is a client-side library for building distributed processing applications that store their intermediate state in Kafka. Think of it as distributed Java Streams where you can do mapping, filtering, and other stream operations.

Without too many details, Kafka Streams is a very powerful tool that does a lot of heavy lifting, such as scalability, fault-tolerance, processing guarantees, and so on, and allows you to build real-time processing streams quickly.

Having said this, Kafka Streams does have a somewhat steep learning curve, and there are some pitfalls that one needs to be aware of.

For example, the concept of time in a stream is not straightforward, which makes understanding concepts such as windowing rather difficult — but more on that later.

Using the Windowing Mechanism to Reduce Duplicated Events

As mentioned previously, for each insertion / update of an ad, we append many entries to the audit table in the database; thus, we also have the same number of partially duplicated events in Kafka. However, we really only want to send one update on the final topic, and we want it to contain all the changes that have occurred. Basically, we want to reduce a clustered set of audit events to one final event.

Kafka Streams provides a concept of windowing that can be used to group events together based on some time-based criteria. A nice summary can be found here. In our case, we use a tumbling window, which creates distinct windows of a fixed duration (e.g., 1 second).

Basically, this:

Our expectation was to have one event per window reach the .toStream() step for further processing.

The Pitfalls of Windowing

The idea of windowing seems pretty tempting and straightforward, but there are a few details that are worth paying attention to. Looking at the docs and the example above, windowedBy and reduce ultimately return a KTable, meaning the events are no longer a series of events, so all the events are materialized into a tabled view (see Stream and Table duality). Hence, we need to invoke toStream() to turn the table back into a stream and continue to apply event-based operations such as map, filter, etc.

What we expected was that, after the windowing, we would receive exactly one event in the given time window, since we applied a tumbling window and reduced all the events in the window by retaining the newest one; this wasn’t the case.

Here’s what we expected:

Expected behavior of windowing in Kafka Streams

Here’s what we got:

Actual behavior of windowing in Kafka Streams

After digging into the nitty-gritty details, it turns out that Kafka Streams does cluster events in the window and will eventually perform the reduce step on these events; however, contrary to our assumption, this doesn’t happen at the end of the window, but, instead, at each commit, when the internal state of Kafka Streams is persisted in the state store which is configured with commit.interval.ms. This means that, upon the commit, even though the window hasn’t closed, the already-received events in this window are reduced and materialized in the underlying KTable. And, since we call toStream(), Kafka Streams immediately sends the intermediate state of the window onward to the following steps in the KStream(). This sort of defeats the intention we initially had when using windowing and has definitely cost us some time to get to the root of.

The point is, even though Kafka Streams takes care of many sophisticated features, the exact behavior is sometimes hard to grasp, so it’s worth diving deep to understand.

Ultimately, the windowing produced a few events per ad per window, instead of just exactly one. This meant we would send multiple events per ad to the final Kafka topic. This was acceptable for us, as we could still guarantee that the last event would include all previous changes to the ad.

Before diving into how we used Kafka Streams, we need to look quickly at another interesting restriction we had to work with.

Business Logic in the REST API’s GET Resource

The classified ad entity contains all the details about an ad, including its title, price, description, attributes, category (taxonomy), etc. There’s a bit of logic needed to assemble the ad entity when fetching it through the REST API. For example, we use the concept of a category tree, which represents the tree of all categories that are necessary to place an ad in (e.g., “Sports / Sport Equipment → Bicycles / Cycling → Cycles → Cruiser”), which happens in the API when the ad details are fetched.

Example of category structure

The system also contains the possible attributes of ads that vary among subcategories.

Example of attributes in marketplace
Example of available attributes in Auto&Motor

There are also additional fields derived in the REST API’s business logic that are not directly present in the database. In addition, there are some substantial differences between the ads in different verticals (Marketplace , Auto & Motor, etc.).

Options for fetching Ad Details

Basically, we had three options:

  1. Fetch the ad details from the database and send them exactly as returned
Fetching and forwarding raw ad details directly from database

2. Fetch the ad details from the database and extract and duplicate the logic needed to convert the ad to the form it is returned from the API. And then send the ad as it would have been returned from the API.

Fetching and converting ad details from database

3. Call the API directly to fetch the ad details.

Fetching converting ad details from API

We quickly decided that Option 1 was not an viable, as we’d simply push down the complexity to our consumers and impede adoption.

Deciding between Options 2 and 3 was significantly more difficult. In short, the advantages of Option 2 were that we would be independent of the REST API and we were in control of almost all the steps needed to produce the ad stream.

Option 3 would require less effort, since we only needed to integrate with the API and we would not have to try to mirror the logic in the REST API perfectly.

To make a long story short, we decided on Option 3, simply because it was quicker and we would have an MVP in production sooner. Also, we could always switch to Option 2 later on, because there are no backwards-compatibility issues in switching how the ad details are fetched, as long as we stick to the same final message format.

Dealing with Processing Errors

By far, the most common types of errors are intermittent API issues (network, timeouts, etc.). We use an exponential back-off with jitter strategy (you should too) to retry failed API calls up to three times. This happens synchronously while the event is moving through our Kafka Streams topology, and it blocks the execution of subsequent events on that Kafka partition so the retries need to happen in relatively quick succession (within a couple of seconds).

But what happens after the three retries are exhausted? We definitely don’t want to extend the retry period much beyond the three attempts, because all other events on the same Kafka partition waiting to be processed will be blocked.

We found a nice summary on how to build a retry architecture using a separate Kafka Stream topology, and we pretty much followed it. The basic idea is that, after the third retry has failed, we move the audit event into a separate Kafka topic (ad-audit-retry) so we can asynchronously apply some additional delay (backoff) and retry again later without blocking other events.

We opted to use a separate Kafka Streams topology to process the retry topic, which does little more than read the first event on a partition, wait approx. 5 minutes after the event was last processed, and then move it back into the original ad-audit topic.

The main difference between the article above’s suggestions and our implementation was that we decided to not fetch the ad details in the retry topology. This would have introduced a race condition if the same adId was present in the audit topic and the retry topic. Essentially, we couldn’t guarantee that the latest version returned from the API would also be produced to the final topic last. From the example, consider the following sequence:

  • Retry topology fetches ad details from API and is then paused for whatever reason
  • Next, the ad is changed in the API by the user.
  • Then, the audit topology fetches the new ad details and immediately produces the event to the final topic.
  • Last, the retry topology “wakes up” and produces the old version of the ad).

Hence, we decided to only fetch ad details in the main topology:

Complete architecture of the application

Dealing with Unrecoverable Errors

Once in a while, we encounter issues that can’t be recovered, such as deserialization issues within the topology and so on. For these records, we have a Dead-Letter-Queue (DLQ), which is just another topic where we send failed events and which is monitored for new events to notify us. At this first stage, dealing with events in the DLQ is a manual process.

Kafka Streams has out-of-the-box support for DLQs and it can be configured through default.production.exception.handler and default.deserialization.exception.handler.

Summary & Some Numbers

To quickly summarize, we talked about using Kafka Connect and Kafka Streams to sync classified ad data via Kafka to multiple consumers. The main steps included:

  1. Triggers in the database to record the inserted / changed adId in an audit table
  2. Kafka Connect to transfer the audit table to a Kafka topic
  3. Kafka Streams application to:
  • Reduce the audit events that happen in quick succession
  • Fetch the ad details for a REST API
  • Send the full ad event to the output topic
  • Handle various types of errors

Before we wrap this up, here are some facts about the whole setup:

We currently peak at around 280 incoming messages per second, which corresponds to a max. of 100 API calls per second, with an average around 30 per second. At the time of this writing, the Kafka Streams topologies run on a single Kubernetes Pod with 1 CPU with 2GB of RAM, so, all-in-all, it’s pretty lightweight. The mean processing time is ~1.1 seconds, measured from when the last audit event in a window was written to the database up until the final enriched events are sent to the output topic. Most (95%) of all events are processed in less than ~2.15 seconds.

To our surprise, the effect on database load was marginal, and we couldn’t detect a significant uptick in any of the relevant load metrics.

We hope you’ve enjoyed reading this! Please feel free to post questions / feedback.

--

--