Kafka Connect Custom Single Message Transform using JSLT

Stefan Fuchs
willhaben Tech Blog
7 min readMar 22, 2022

--

This blog post describes the implementation of a custom Kafka Connect Single Message Transform (SMT). It offers plenty of links to resources that might help you implement your own custom transformation. The source code is available on Willhaben’s Github.

What is Kafka Connect

There are already many resources on Kafka Connect, especially the documentation page, blog posts, and tutorials offered by Confluent itself. However, to provide some context, we repeat here the part relevant to understanding the Kafka Connect API that allows us to hook into the data processing and write a custom transformation.

There are two basic kinds of Kafka Connectors:

  1. Kafka Connect Source Connectors read data from a data source, like a relational database, and write the data as events into a Kafka Topic.
  2. Kafka Connect Sink Connectors write data from a Kafka Topic to some kind of storage, such as S3.

There are any number of connectors for a wide variety of APIs and storage options. At the time of writing, Confluent lists over 40 connector implementations on its Connectors Page.

Most Source Connector implementations will provide a structured representation of the data as a Struct object, while Sink Connectors expect a Struct object. The idea is to separate the implementation of a connector for a specific storage from the serialization format of the events in the Kafka Topic. This also allows for performing data transformations between the Connector that fetches or produces the data and the Converter that serializes or deserializes the data.

A Kafka Connector configuration may involve three key parts:

Each can be configured independently by providing the name of the class that implements the corresponding interface. The only prerequisite is that the Jar with the implementation is located in a path as configured by the `plugin.path` property of the Worker Instance.

The following image shows the relation between the parts:

The Kafka Connect Core Concepts and Architecture documentation provides more details on how the process works.

What is JSLT?

The first line of the README file on the JSLT Github repository puts it nicely:

“JSLT is a complete query and transformation language for JSON. The language design is inspired by jq, XPath, and XQuery.”

JSLT was developed at Schibsted to perform transformations on the millions of events routed daily through its data platform. The library was made open source in 2018 and is well documented with a function reference, a language tutorial, and examples.

It allows you to take any Json as input and then to transform that object into a new Json with a completely different structure, if needed. The approximately 50 built-in functions and support for conditions, loops, and function declarations let you perform quite complex transformations. The code looks much like Json itself, which makes it easy to read and understand, even by people who are not programmers in the classical sense.

The library is written in Java with Jackson as the only dependency, except for the Java Standard Library.

The following example is an input JSON, the applied JSLT, and the resulting output to show some of the language features:

  • Defining a simple function that can be used in the script
  • Adding a new constant field
  • Replacing a value with a sha256 hex representation
  • Adding an array
  • Flattening some of the fields

Input Record

{
"customer_id": 123456,
"customer_status": 2,
"invoice_address_street": "McDuck Manor",
"invoice_address_zip_code": "1312",
"invoice_address_city": "Duckburg",
"delivery_address_street": "Webfoot Walk",
"delivery_address_zip_code": "1313",
"delivery_address_city": "Duckburg",
"attributes": {
"customer_type": "C2C",
"customer_class": "A",
"last_order": "2022-03-10T18:25:43.511Z"
}
}

JSLT

{
def map_status(status)
if ($status == 0)
"ACTIVE"
else if ($status == 1)
"INACTIVE"
else
"UNDEFINED"
"customer_id": sha256-hex(.customer_id),
"customer_status_code": map_status(.customer_status),
"locations": [
{
"zip_code": .invoice_address_zip_code,
"city": .invoice_address_city
},
{
"zip_code": .delivery_address_zip_code,
"city": .delivery_address_city
}
],
"customer_type": .attributes.customer_type,
"customer_class": .attributes.customer_class,
"producer_team": "us.california.burbank.disney"
}

Output Record

{
"customer_id": "8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92",
"customer_status_code": "INACTIVE",
"locations": [
{
"zip_code": "1312",
"city": "Duckburg"
},
{
"zip_code": "1313",
"city": "Duckburg"
}
],
"customer_type": "C2C",
"customer_class": "A",
"producer_team": "us.california.burbank.disney"
}

Why a custom SMT implementation

Confluent provides a number of included transforms, and more implementations are available as open source. Most fulfill a very specific use case, like Flatten, ReplaceField, and InsertField.

The transforms can be chained to allow complex transformations within a single connector configuration. But even though that is possible, the actual configuration of more complex transformations will be quite cumbersome and therefor prone to error. At some point, chaining will no longer be possible with the available transformations, which means another specialized transformer will be required.

By contrast, JSLT is a simple but very flexible and powerful language to transform structured data. Our specific use case is the transformation of flat rows produced into a Kafka Topic from a Postgres-DB using a Kafka Connect JDBC Source into a nested object with some additional (constant) fields to send the data to a REST-Endpoint using a Kafka Connect REST Sink.

Implementation of the Custom SMT

To write a custom SMT, you need to implement the Transformation interface that comes with the org.apache.kafka:connect-transforms dependency. This means implementing the parts listed below.

Override the configure() function.

This function maps the properties that Kafka Connect reads from the Connector configuration. To have all variables in the configuration resolved and the expected settings validated, the Kafka Connect library provides the ConfigDef class to define the expected configuration properties and the SimpleConfig class as an implementation of the AbstractConfig class to parse them.

With this function, you can perform all required initializations and validate the provided configuration. In our example case, we expect only the single attribute `jslt`:

...
val CONFIG_DEF: ConfigDef = ConfigDef()
.define(
"jslt",
ConfigDef.Type.STRING,
ConfigDef.Importance.MEDIUM,
"JSLT expression that returns the transformed object"
)
...
override fun configure(props: Map<String?, *>?) {
val config = SimpleConfig(CONFIG_DEF, props)
jslt = config.getString(JSLT_CONFIG)
jsltExpression = Parser.compileString(jslt)
}

Override the apply() function

This is the core of the implementation; it contains the logic that actually performs the transformation. The function takes a ConnectRecord as a parameter and returns a new ConnectRecord.

It is important to note that the function may not alter the record; instead a new record should be created and returned!

Create subclasses based on our Transformation implementation for Key and Value

As our transformer does not transform the whole record but only a (structured) value, we need to provide separate implementations for the record key and value, which can then be referenced in the Kafka Connect configuration.

To have common code for the actual transformation logic but also avoid conditions in the logic of our base class, we followed the same approach as used in the Confluent SMT Tutorial.

The JsltTransform class defines three abstract methods that are used in the apply() method and are overridden in the JsltTransform.Key and JsltTransform.Value classes.

The following snippet shows an extract of the parts mentioned above, with the full code available here:

abstract class JsltTransform<R : ConnectRecord<R>?> : Transformation<R> {
...
val CONFIG_DEF: ConfigDef = ConfigDef()
.define(
"jslt",
ConfigDef.Type.STRING,
ConfigDef.Importance.MEDIUM,
"JSLT expression that returns the transformed object"
)
...
override fun configure(props: Map<String?, *>?) {
val config = SimpleConfig(CONFIG_DEF, props)
jslt = config.getString(JSLT_CONFIG)
jsltExpression = Parser.compileString(jslt)
}
override fun apply(record: R): R {
val (configMap, isKeyFlag) = getJsonConverterConfig(record)
jsonConverter.configure(configMap, isKeyFlag)
return when {
operatingValue(record) == null -> {
record
}
operatingSchema(record) == null -> {
applySchemaless(record)
}
else -> {
applyWithSchema(record)
}
}
}
protected abstract fun operatingSchema(record: R?): Schema?protected abstract fun operatingValue(record: R?): Any?protected abstract fun newRecord(record: R?, updatedSchema: Schema?, updatedValue: Any?): R
...
class Key<R : ConnectRecord<R>?> : JsltTransform<R>() {
...
override fun operatingValue(record: R?): Any? = record?.key()
...
}
class Value<R : ConnectRecord<R>?> : JsltTransform<R>() {
...
override fun operatingValue(record: R?): Any? = record?.value()
...
}
}

As the interface only works with a record object, no explicit Kafka infrastructure is needed to test the logic.

@Test
fun handlesNullValue() {
configureNonTransformJslt(xformValue)
val given = SourceRecord(
null,
null,
"topic",
0,
null,
null
)
val expected = null
val actual: Any? = xformValue.apply(given).value()
assertEquals(expected, actual)
}

Some basic unit tests for the JsltTransform implementation are also available in the Git repository in the test folder.

The interface is pretty simple, with all the complexity in the transformation logic itself.

You can also look at the Confluent Single Message Transformation Tutorial for another example and the tests and implementations of the standard transformations in the Confluent Git repository.

How to use the JSLT Single Message Transformation

To be able to configure a Connector with our new Transformation, we first need to install it on the worker nodes. To do this:

  1. Clone the git repository:
    git clone git@github.com:willhaben/wh-kafka-connect-jslt-transform.git
  2. Compile the project using Gradle:
    gradle build
  3. Copy the Jar from `<local-repo-folder>/build/libs/` to every (!) worker node into a directory that is configured under the `plugin.path` property.

A more detailed step-by-step guide can be found in the Kafka Connect Self-Managed Userguide.

The Transformer can then be used in both Source and Sink Connectors. To apply a JSLT transformation on the value of a Kafka Event, add the following lines to the Connector config:

{
...
"transforms": "jsltTransform",
"transforms.jsltTransform.type": "at.willhaben.kafka.connect.transforms.jslt.JsltTransform$Value",
"transforms.jsltTransform.jslt": "{ \"foo\": .bar }"
}

The JSLT example above is very trivial. It produces an object with one field `foo` that contains the value of the field `bar` from the input. In practice, the expression can use every language feature described in the JSLT documentation and become arbitrarily complex.

Outlook

Using JSLT makes it even easier to apply complex transformation logic to Kafka events. It might also be helpful to use JSLT for filtering and routing events with Predicates.

JSLT also allows the registration of user-defined functions and pass variables before execution. We might add support to configure that in the Connector config in the future. So far, the built-in functions fulfill our needs.

As the JSLT library uses JsonNode to work with the object fields, the resulting data types are determined by the field values of the output object. This might lead to problems in some cases. To deal with that, an additional configuration property that specifies a schema, like Avro or Json-Schema, could offer a solution.

--

--