Near Real-Time Synchronization with Kafka Connect and Kafka Streams — Part 1

Stephan Weinwurm
willhaben Tech Blog
6 min readDec 18, 2019

--

This is part one of two blog posts about how we built a near real-time synchronization mechanism for classified ad data here at willhaben. You will need some background on Apache Kafka, so be sure to at least read an introduction to the topic, like this one. Part 2 of the blog can be found here.

Photo by Joao Branco on Unsplash

At willhaben, we have over 7 million active classified ads, items private or professional users are trying to sell to other private users. Every day, around 200,000 classified ads are newly published, and the number is growing year after year. For the remainder of the article, we’ll simply call classified ads “ads.”

All the ads are managed by a RESTful backend which, in turn, is backed by a relational database. As willhaben grows and we add more features to the platform, we have to start thinking about how we could extract functionality into separate services, each serving a subset of willhaben’s functionality, in order to

  • Avoid having too many people work on the same code base,
  • Allow clear team responsibilities, and
  • Better scale the service.

Since almost everything at willhaben revolves around ads, almost every part of the willhaben environment requires at least some part of the ads’ data, such as a title, price, etc., to serve our purposes.

To give you an example, at willhaben, users can bookmark ads to keep a concise list of those they find interesting. This functionality could live in a separate service, but it would need at least a subset of the ads’ details to display a list of all bookmarked ads to the user. Instead of fetching the whole ad from the backend every time a user opens her bookmark list, the bookmark service could store the relevant subset of data itself and keep its own database in sync.

Example of an event-driven architecture

First Attempts and the Issue of Dual Writes

The first initiative sent ad details events directly to Kafka from the backend application. The flow called for both writing to the database and the Kafka topic as the ad was inserted / updated. However, it couldn’t be guaranteed that the stream reflected the exact same state as the database, as either one of the writes could fail. For example, imagine an API request to update an ad where the write to the database action succeeds but producing the event in Kafka fails due to a network issue.

Example of the dual write problem

This illustrates a classic problem in distributed systems and is the reason why complicated algorithms like consensus algorithms, two / three phase commit protocols, and so on exist. A good introduction to this can be found here.

It’s probably no surprise that we weren’t the first to run into this problem. It turns out, other smart people have already given this a lot of thought. One very promising approach on how to solve this goes by the name of “Change Data Capture.” Good introductions by the folks from Confluent can be found here and here. The bottom line is, you should have one source of truth (i.e., the database) where the data is changed and a stream of changes is produced directly from that source of truth.

Solutions for Dual Write Issues with Change Data Capture

There are a few different approaches for avoiding dual writes, but most of them rely on the fact that, during the API operation, only one write occurs. Since the backend already writes to the database, we need to treat the database as the source of truth and produce a stream of changes based on the state in the database.

Outbox Patterns (aka Audit Tables)

The outbox pattern is an interesting approach, where the events, which should be sent to Kafka, are stored in a dedicated table (called an “audit table” going forward) in the database containing the event to be pushed to Kafka. Again, this is not something we’ve invented, so I’ll refer you to this excellent article from Debezium for more details. At willhaben, we’ve considered two options:

  1. The application explicitly writes the event to both the main table and the audit table in a transactional fashion (note, both writes need to happen within a transaction, otherwise the dual write issue still exists).
  2. Triggers in the database reliably populate the audit table when a new record is inserted in the main table or altered.

Ultimately, we opted for option 2, since the backend doesn’t consistently use SQL transactions and we wouldn’t be able to guarantee that inserting the change event into the audit table would happen reliably. For instance, one API request our backend to insert a new ad into the database results in tens of stored procedure calls, again resulting in up to 60 isolated INSERTs and UPDATEs from a database perspective. So, we needed to cope with a series of isolated writes to the database.

Background and Requirements in the Context of willhaben

Before we get into further details, let’s summarize the two main constraints we had to work with:

  • Additional load on the database needs to be as small as possible
  • Each insertion / update of an ad results in tens of isolated INSERTs and UPDATEs on the database

Fast forward a few design sessions, and we had two interesting outcomes:

  1. Given our current database load, appending the partial ad that has changed on every trigger invocation would incur significant I/O costs and disk space, and would probably push our current database over the scaling cliff.
  2. Do we really need every single version of an ad, since an ad update is split into multiple database writes anyway? Since the main goal was to produce a sync mechanism for ad details changes, ultimately only the final version of the ad matters. If we can guarantee that the final version, including all previous changes, is eventually written to the stream, it is acceptable to not send every intermittent update as an event to Kafka.

That meant that guaranteeing we would eventually write the final version of the ad as an event to Kafka should be sufficient.

Keeping Track of Changed Ads

To minimize the amount of data that needs to be written to the audit table, we decided to keep it as simple and efficient as possible and to record only the ID of the inserted / changed ad, along with an auditID serving as the primary key, and an auditDate. We configured triggers on all the relevant tables to populate the audit table.

The audit table basically looks like this:

Example of the adAudit table in our database

Now we have a guaranteed audit log of all the ads that have changed.

Kafka Connect for Syncing Changed Ads

Once we had a complete list of IDs for the ads that have changed in the audit table, we wanted to get each entry as an event into Kafka. Luckily, the Confluence platform has a tool, Kafka Connect, for exactly this purpose. Kafka Connect is a relatively simple tool that periodically executes a configured SELECT statement in a database, transforms the result set, and writes each row as an event into a Kafka topic. In addition, it tracks an offset to be able to continue from the last successful database record, in the case of a failure. In our case, Kafka Connect executes the following statement every 300ms and converts each event to Avro, a compact, binary schema for serializing data. Here’s the SQL SELECT it executes:

select adAuditId, adId, auditDate where adAuditId > {offset}

Kafka Connect supports custom transformations to map the database row to the right fields in the Avro schema. In our case, we extract two Avro types, AdAuditKey and AdAudit. AdAuditKey only contains the adId and is used as the partition key of the Kafka event, and AdAudit contains all the fields described above.

We now have all the raw audit events in a Kafka topic partitioned by the adId. We still need to fetch the content of the ad and send the events to the final Kafka topic, which consumers can then subscribe to. Next, we need to look at how we fetch the ad details, as they are not present in the audit table.

This will be the topic of part 2 of this blog post, which is coming soon.

--

--