Zalando Tech Blog – Technical Articles Katariina Kari

Introducing Semantic Web Technologies at Zalando

Two years ago, in March 2016, the newly-opened Helsinki office wondered about the expressivity of our current product data. What do attributes like material construction or sport quality really mean, and how can we use them to create meaningful fashion experiences online for our customers?

It was around that time that, after working for five years solely in the art sector and building digital strategies for various classical music organisations, I applied for Zalando in hopes of improving my existing technology skills and perhaps learn some Scala. It quickly became apparent that what the company and teams in Helsinki actually needed, was a technical solution for better fashion understanding. In order to innovate the online shopping experience, we need to be fluent in fashion, and for that we need background information. We need to be able to know, for example, what vegan clothes are. While product data does express whether the item is made of wool or leather, etc., nowhere is it explicitly expressed that the absence of these materials equals “vegan.” So how can we, and the customer, know this?

So, I gave the Scala course a pause and dove into building a fashion knowledge graph for Zalando using Turtle, JSON-LD and Python.

The Benefits of a Knowledge Graph at Zalando

Semantic web technologies use knowledge graphs, technically known as named directed graphs, to provide background data held by humans in a machine-readable form. For example, the knowledge graph knows that:

  1. Silk and wool are animal-based fibers and leather is an animal-based material.
  2. Vegan means refusing to use any animal-based products.

Therefore, an application using the knowledge graph can interpret the product data of wool, silk, or leather as not suitable for vegans, or only offer products that are vegan by excluding those items with materials made from animals.

In practice, this means that Zalando can:

  1. Understand the word "vegan" in a search without ever expressing it in our product data explicitly,
  2. Offer special values in filters, such as "vegan",
  3. Show a page with knowledge about vegan clothing that includes articles on vegan fashion, outfits for vegans, vegan clothing collections, and vegan-appropriate products from our catalogue.

Not only do we make the implicit human knowledge explicit, we also store what kind of information it is and how it relates to other kinds of knowledge. For example, we know that being vegan is a type of global awareness, as is favoring sustainable clothes. By understanding the underlying structure of these snippets of human understanding, we can do even more. We can:

  1. Intelligently suggest links for further browsing.
  2. Apply business rules. For example, when a customer is browsing a particular brand, we might not suggest competing brands.
  3. Know which attributes are complementary to each other and which ones are opposites of each other.

Katariina presents her work in our Dublin tech hub.

Not all Artificial Intelligence is Machine Learning

Many times semantic web is either overlooked or confused with machine learning (ML). Artificial intelligence nowadays is mostly a synonym for machine learning. However, semantic web is a branch of artificial intelligence that is very different from machine learning.

Typically, when I state my profession as a semantic web practitioner, I hear what most of my colleagues hear: “But Machine Learning does it better!” I agree with better to some extent, but I am not so sure about the it. ML does many things better; things semantic web technologies are not really good at. Different learning algorithms are great at finding and recognising patterns in large data sets. A knowledge graph does not really do that. What it is good at, is providing additional human knowledge to what has been learned or could be learned.

Recently, there have been numerous expert discussions on the explainability of machine learning methods, and more general discussions about making machine learning processes less opaque. Deep-learning methods are statistical models based on neural networks that fit to large datasets and learn layers of so called "weights," or numeric parameters. They produce highly complex and functional black boxes that work, but cannot really answer why they work. Further research explores how to add ontology information to the learning structure and thus to explain what has been learned.

I find this research exciting, because I think we can get the best results by combining both approaches. Years ago I co-wrote a paper on how a classic machine learning application of document classification based on word-to-vector could be improved by few percentage points when adding information from a general-domain knowledge graph. Therefore, I have found the sentence “But Machine Learning does it better!” too dualistic for comparing two very different branches of artificial intelligence. It is like saying that “fish tastes better!” when I express my liking to Béarnaise sauce.

I have one more thought I would like to share that I have developed over the years I have worked with the arts. Much like any artistic endeavour, fashion seeks to find new and unusual combinations. It seeks to break the status quo and to shock us in just the right way. Trends love contradicting each other. At some point pink and red do not go together ever, at another point in time it is a total trend. Machine learning can only do recommendations based on existing data. With the knowledge graph however, we can ask for fashion experts to give us these glances to the future, describe the self-contradicting world that fashion is, and help us present our contents to our customers in a meaningful way.

Want to change the fashion landscape like Katariina? Check out our Helsinki job postings.


Zalando Tech Blog – Technical Articles Oleksandr Kukhtin

Red flags to look out for in badly written projects.

Let’s talk about common red flags or alternatively, how to define badly-written project.

Many of us have experienced a project which is crying and begging for something drastic to change, or even for it to be put out of its misery altogether, but alas; we don’t have the heart or the resources to “pull the plug” as it were. From year to year, this poor project grows and grows; each day with new fixes and features being added. It can become so painful and cumbersome in the end, it simply isn’t tenable anymore. You fix something in one place, but in other parts something crashes, and so on…

In this post, we’ll help you work out at an earlier stage if a project is bad–without checking source code (excluding project structure).

So, how to define a project that should be rewritten because of age or quality?

There are some red flags to look out for:

1. Lack of or no communication
When you open a project for the first time you should be able to:

  • Understand what is the project about.
  • Be able to see the technologies which are used in it.
  • Have an overview of the project structure or an overview of the main business workflows.
  • See the team that maintains the project.
  • See how the project relates to other teams/services.
  • See how to run the project.

At the very least you should see which team maintains the project, then if you have any of the other enquiries listed, you will be able to contact them and ask them about every detail you need. If most of the above points are missing, this is a major red flag.

2. No tests
When you’re finished with the documentation, you have some information to think about. If it is good documentation, you will be able to understand if a project is covered or not. But if you have no documentation, you should check the project structure.

If a project is good you will see a “tests” folder or “.spec files” within services or components with some code in them.

The red flag here is if there are no tests, or there are created folders or files for tests but they are empty. Even worse, is a situation where there are tests, but they are commented out.

3. Repository state and activity
A project should have a remote repository. If a project is deployed locally, that is not just your everyday red flag it is a huge sign that something is dramatically wrong with this project.

A sample repository is a GitHub repo.

When you go into the repository, you should check for the following:
a) Huge numbers of branches:

  • Developers are not cleaning up the branches they deployed to production.
  • Too many features developed at the same time.

b) No

  • Without the help of contributors, you won’t be able to run the project or deploy it correctly. Sometimes instead of a, it will be Wiki page or GitHub Pages; this is ok.

c) Number of issues and how fast they are approached:

  • You can check the latest issues (about the last 10) and check their dates and comments flow. If contributors answer fast that means they are supporting the project and you can easily use their code in your own codebase.

d) Last time any changes were made:

  • You can check latest Pull Requests or open issues.

Some rules for open source repositories when a project has already been in production some time:

  • Small numbers of contributors.
  • Small numbers of stars and forks.

If you follow the above guidelines, you'll find that you can spot red flags more easily and hopefully save yourself some time!

Want to work on great projects? Join our team at Zalando Tech.

Zalando Tech Blog – Technical Articles Mohd Nadeem Akhtar

Apache’s lightning fast engine for data analysis and machine learning

In recent years, there has been a massive shift in the industry towards data-oriented decision making backed by enormously large data sets. This means that we can serve our customers with more relevant, personalized content.

We in the Digital Experience team are tasked with analysing Big Data in order to gather insights and support the product team with the decision making process. This includes finding our customers’ top-rated articles. We can then organize outfits related to those items and help customers make choices in the fashion store. Or we can leverage on similar customer behaviour and suggest an article they might want in future.


As data is rapidly growing, we need a tool which can clean and train the data fast enough. With large datasets, sometimes it take days to finish the job, which results in some very frustrated data analysts. Let’s have a look at some of the problems:

  • Latency while training the data
  • Less performance optimization

Why Spark is good for data science?

Focusing on organizing data and analysing it with the help of Spark, first we will try to understand how Spark behaves “under the hood.”

  • Simple API’s
  • Fault tolerance

Fault tolerance made it possible to analyse large datasets without the fear of failure, such as instances where one node out of 1,000 nodes failed and the whole operation needed to be performed again.

As personalization becomes an ever more important aspect of the Zalando customer journey, we need a tool that enables us to serve the content in approximate real time. Hence, we decided to use Spark as it retains fault tolerance and significantly reduces latency.

Note: Spark keeps all data immutable and in-memory. It achieves this using ideas from functional programming such as fault tolerance, which works by replaying functional transformation over original datasets.

For the sake of comparison, let’s recap the Hadoop way of working:

Hadoop saves intermediate states to disk and communicates over a network. If we consider the logistic regression of a ML (machine learning) model, then each iteration state is saved back to disk. The process is very slow.

In the case of Spark, it works mostly in-memory and tries to minimize data transportation over a network, as seen below:

Spark is powerful with operations like logistic regression where multiple iterations to train the data are required.


Spark laziness (on transformation) and eagerness (on action) is how Spark optimises network communication using the programming model. Hence, Spark defines transformations and actions on Resilient Distributed Data (RDD) to support this. Let’s take a look:

Transformations: They are lazy. Their resultant RDD is not immediately computed. e.g map, flatMap.

Actions: They are eager. Their result is immediately computed. e.g collect, take(10).

The execution of filters is deferred until a “take” action is applied. What’s important here is that Spark is not performing a filter on all logs. It will be executed when a “take” action is called and stops as soon as “10 Error log” is fulfilled.

Long story short, we know that latency makes a big difference and wastes a lot of time for data analysts. In-memory computation significantly lowers latency, and Spark is smart enough to optimize on the basics of action.

The figure below shows the hierarchy of Spark functioning. The Spark context is:

Spark is organized in a master/workers topology. In the context of Spark, the driver program is a master node whereas the executor nodes are the workers. Each worker node runs the same task and returns the results to the master node. The resource distribution is handled by a cluster manager.

A Spark programming model is a set of processes running on a cluster.

All these processes are coordinated by a driver program:

  • Runs the code that created sparkContext, creates RDDs and sends off transformations and actions.

The processes that run the computation and store data of your application are executors:

  • Returns computed data to the driver.
  • Provides in memory storage for cached RDD’s.

For Big Data processing, the most common form of data is key-value pairs. In fact, in a 2004 mapReduce research paper the designer states that key-value pairs is a key choice in designing mapReduce. Spark enables us to project down such complex data types to key-value pairs as Pair RDD.

Useful: Pair RDD allows you to act on each key in parallel or regroup data across a network. Moreover, it provides some additional methods such as “groupByKey(), reduceByKey(), join.”

The data is distributed over different nodes and with operations like groupByKey shuffling the data over a network.

We know reshuffling the data over a network is bad. But I’ll explain why the data is reshuffled shortly.

Let’s take an example:

Goal:  Calculate how many articles and how much money is spent by each individual over the course of month.

Here, we can see that groupByKey shuffles the data over a network. If it’s not absolutely required we don't send it. We can perform reduceByKey instead of groupByKey and reduce the data flow over a network.

Optimizing with Partitioners

There are few different kinds of partitioner available:

  1. Hash partitioners
  2. Range partitioners

Partitioning can bring enormous performance gains, especially in the shuffling phase.

Spark SQL for Structured data

SQL is used for analytics but it's a pain to connect data processing pipelines like Spark or Hadoop to SQL database. Spark SQL not only contains all the advance database optimisation, but also seamlessly intermixes SQL queries with Scala.

Spark SQL is a component to the Spark stack. It has three main goals:

  • High performance, achieved by using techniques from the database.
  • Supports relation data processing.
  • Supports new data sources like JSON.


In this article, we covered how Spark can be optimized for data analysis and machine learning. We discussed how latency becomes the bottleneck for large datasets, as well as the role of in-memory computation, which enables the data scientist to perform real-time analysis.

The highlights of Spark functionality that make life easier:

  • Spark SQL for structured data helps in executing queries either in-memory or persisted on disk.
  • Spark ML for classification of data with different models like logistic regression.
  • Spark RDD which is a Key-value pair helps in data exploration or analysis.
  • Spark pre-optimization with partitioned methodology with less network shuffle.

We believe this will take personalization to a whole new level, thus improving the Zalando user journey.

Discuss Spark in more detail with Nadeem on Twitter. Keep up with all Zalando Tech job openings here.

Zalando Tech Blog – Technical Articles Han Xiao

How We Built the Next Generation Product Search from Scratch using a Deep Neural Network

Product search is one of the key components in an online retail store. A good product search can understand a user’s query in any language, retrieve as many relevant products as possible, and finally present the results as a list in which the preferred products should be at the top, and the less relevant products should be at the bottom.

Unlike text retrieval (e.g. Google web search), products are structured data. A product is often described by a list of key-value pairs, a set of pictures and some free text. In the developers’ world, Apache Solr and Elasticsearch are known as de-facto solutions for full-text search, making them a top contender for building e-commerce product searches.

At the core, Solr/Elasticsearch is a symbolic information retrieval (IR) system. Mapping queries and documents to a common string space is crucial to the search quality. This mapping process is an NLP pipeline implemented with Lucene Analyzer. In this post, I will reveal some drawbacks of such a symbolic-pipeline approach, and then present an end-to-end way of building a product search system from query logs using Tensorflow. This deep learning based system is less prone to spelling errors, leverages underlying semantics better, and scales out to multiple languages much easier.

Recap: Symbolic Approach for Product Search

Let’s first do a short review of the classic approach. Typically, an information retrieval system can be divided into three tasks: indexing, parsing and matching. As an example, the next figure illustrates a simple product search system:

  1. indexing: storing products in a database with attributes as keys, e.g. brand, color, category;
  2. parsing: extracting attribute terms from the input query, e.g. red shirt -> {"color": "red", "category": "shirt"};
  3. matching: filtering the product database by attributes.

Many existing solutions such as Apache Solr and Elasticsearch follow this simple idea.Note, at the core, they are symbolic IR systems that rely on NLP pipelines for getting effective string representation of the query and product.

Pain points of A Symbolic IR System

1. The NLP pipeline is fragile and doesn’t scale out to multiple languages
The NLP Pipeline in Solr/Elasticsearch is based on the Lucene Analyzer class. A simple analyzer such as StandardAnalyzer would just split the sequence by whitespace and remove some stopwords. Quite often you have to extend it by adding more and more functionalities, which eventually results in a pipeline as illustrated in the figure below.

While it looks legit, my experience is that such NLP pipelines suffer from the following drawbacks:

  • The system is fragile. As the output of every component is the input of the next, a defect in the upstream component can easily break down the whole system. For example, canyourtoken izer split thiscorrectly?
  • Dependencies between components can be complicated. A component can take from and output to multiple components, forming a directed acyclic graph. Consequently, you may have to introduce some asynchronous mechanisms to reduce the overall blocking time.
  • It is not straightforward to improve the overall search quality. An improvement in one or two components does not necessarily improve the end-user search experience.
  • The system doesn’t scale out to multiple languages. To enable cross-lingual search, developers have to rewrite those language-dependent components in the pipeline for every language, which increases the maintenance cost.

2. Symbolic Systems do not Understand Semantics without Hard Coding
A good IR system should understand trainer is sneaker by using some semantic knowledge. No one likes hard coding this knowledge, especially you machine learning guys. Unfortunately, it is difficult for Solr/Elasticsearch to understand any acronym/synonym unless you implement SynonymFilter class, which is basically a rule-based filter. This severely restricts the generalizability and scalability of the system, as you need someone to maintain a hard-coded language-dependent lexicon. If one can represent query/product by a vector in a space learned from actual data, then synonyms and acronyms could easily be found in the neighborhood without hard coding.

Neural IR System
The next figure illustrates a neural information retrieval framework, which looks pretty much the same as its symbolic counterpart, except that the NLP pipeline is replaced by a deep neural network and the matching job is done in a learned common space.

End-to-End Model Training
There are several ways to train a neural IR system. One of the most straightforward (but not necessarily the most effective) ways is end-to-end learning. Namely, your training data is a set of query-product pairs feeding on the top-right and top-left blocks in the last figure. All the other blocks are learned from data. Depending on the engineering requirements or resource limitations, one can also fix or pre-train some of the components.

Where Do Query-Product Pairs Come From?
To train a neural IR system in an end-to-end manner, you need some associations between query and product such as the query log. This log should contain what products a user interacted with after typing a query. Typically, you can fetch this information from the query/event log of your system. After some work on segmenting, cleaning and aggregating, you can get pretty accurate associations. In fact, any user-generated text can be good association data. This includes comments, product reviews, and crowdsourcing annotations.

Neural Network Architecture
The next figure illustrates the architecture of the neural network. The proposed architecture is composed of multiple encoders, a metric layer, and a loss layer. First, input data is fed to the encoders which generate vector representations. In the metric layer, we compute the similarity of a query vector with an image vector and an attribute vector, respectively. Finally, in the loss layer, we compute the difference of similarities between positive and negative pairs, which is used as the feedback to train encoders via backpropagation.

Query Encoder
Here we need a model that takes in a sequence and outputs a vector. Besides the content of a sequence, the vector representation should also encode language information and be resilient to misspellings. The character-RNN (e.g. LSTM, GRU, SRU) model is a good choice. By feeding RNN character by character, the model becomes resilient to misspelling such as adding/deleting/replacing characters. The misspelled queries would result in a similar vector representation as the genuine one. Moreover, as European languages (e.g. German and English) share some Unicode characters, one can train queries from different languages in one RNN model. To distinguish the words with the same spelling but different meanings in two languages, such as German rot (color red) and English rot, one can prepend a special character to indicate the language of the sequence, e.g. 🇩🇪 rot and 🇬🇧 rot.

Image Encoder
The image encoder rests on purely visual information. The RGB image data of a product is fed into a multi-layer convolutional neural network based on the ResNet architecture, resulting in an image vector representation in 128-dimensions.

Attribute Encoder
The attributes of a product can be combined into a sparse one-hot encoded vector. It is then supplied to a four-layer, fully connected deep neural network with steadily diminishing layer size. Activation was rendered nonlinear by standard ReLUs, and drop-out is applied to address overfitting. The output yields attribute vector representation in 20 dimensions.

Metric & Loss Layer
After a query-product pair goes through all three encoders, one can obtain a vector representation of the query, an image representation and an attribute representation of the product. It is now the time to squeeze them into a common latent space. In the metric layer, we need a similarity function which gives higher value to the positive pair than the negative pair. To understand how a similarity function works, I strongly recommend you read my other blog post on “Optimizing Contrastive/Rank/Triplet Loss in Tensorflow for Neural Information Retrieval”. It also explains the metric and loss layer implementation in detail.

For a neural IR system, doing inference means serving search requests from users. Since products are updated regularly (say once a day), we can pre-compute the image representation and attribute representation for all products and store them. During the inference time, we first represent user input as a vector using query encoder; then iterate over all available products and compute the metric between the query vector and each of them; finally, sort the results. Depending on the stock size, the metric computation part could take a while. Fortunately, this process can be easily parallelized.

Qualitative Results
Here, I demonstrated (cherry-picked) some results for different types of query. It seems that the system goes in the right direction. It is exciting to see that the neural IR system is able to correctly interpret named-entity, spelling errors and multilinguality without any NLP pipeline or hard-coded rule. However, one can also notice that some top ranked products are not relevant to the query, which leaves quite some room for improvement.

Speed-wise, the inference time is about two seconds per query on a quad-core CPU for 300,000 products. One can further improve the efficiency by using model compression techniques.

Query & Top-20 Results

🇩🇪 nike

🇩🇪 schwarz (black)

🇩🇪 nike schwarz

🇩🇪 nike schwarz shirts

🇩🇪 nike schwarz shirts langarm (long-sleeved)

🇬🇧 addidsa (misspelled brand)

🇬🇧 addidsa trosers (misspelled brand and category)

🇬🇧 addidsa trosers blue shorrt (misspelled brand and category and property)

🇬🇧 striped shirts woman

🇬🇧 striped shirts man

🇩🇪 kleider (dress)

🇩🇪 🇬🇧 kleider flowers (mix-language)

🇩🇪 🇬🇧 kleid ofshoulder (mix-language & misspelled off-shoulder)

If you are a search developer who is building a symbolic IR system with Solr/Elasticsearch/Lucene, this post should make you aware of the drawbacks of such a system.

This post should also answer your What?, Why? and How? questions regarding a neural IR system. Compared to the symbolic counterpart, the new system is more resilient to the input noise and requires little domain knowledge about the products and languages. Nonetheless, one should not take it as a “Team Symbol” or “Team Neural” kind of choice. Both systems have their own advantages and can complement each other pretty well. A better solution would be combining these two systems in a way that we can enjoy all advantages from both sides.

Some implementation details and tricks are omitted here but can be found in my other posts. I strongly recommend readers to continue with the following posts:

Last but not least, the open-source project MatchZoo contains many state-of-the-art neural IR algorithms. In addition to product search, one may find its application in conversational chatbot and question-answer systems.

To work with great people like Han, have a look at our jobs page.

Zalando Tech Blog – Technical Articles Kevin Eid

Solving the many small files problem for AVRO

The Fashion Content Platform teams in Zalando Dublin handle large amounts of data on a  daily basis. To make sense of it all, we utilise Hadoop (EMR) on AWS. Within this post, we discuss a system where a real-time system feeds the data. Due to the variance in data volumes and the period that these systems write to storage, there can be a large number of small files.

While Hadoop is capable processing large amounts of data it typically works best with a small number of large files, and not with a large number of small files. A small file is one which is smaller than the Hadoop Distributed File System (HDFS) block size (default 64MB). In MapReduce, every map task handles computation on a single input block. Having many small files means that there will be a lot of map tasks, and each map task will handle small amounts of data. This creates a larger memory overhead and slows down the job. Additionally, when using HDFS backed by AWS S3, listing objects can take quite a long time and even longer when lots of objects exist. [3]

Known solutions and why they don’t work for AVRO

This is a well-known problem; there are many utilities and approaches for solving the issue:

  1. s3-dist-cp - This is a utility created by Amazon Web Services (AWS). It is an adaptation of Hadoop’s DistCp utility for HDFS that supports S3. This utility enables you to solve the small file problem by aggregating files together using the --groupBy option and by setting a maximum size using the --targetSize option.
  2. Filecrush - This is a highly configurable tool designed for the sole purpose of “crushing” small files together to solve the small file problem.
  3. Avro-Tools - This supplies many different functions for reading and manipulating AVRO files. One of these functions is “concat” which works perfectly for merging AVRO files together. However, it’s designed to be used on a developer’s machine rather than on a large scale scheduled job.

While both these utilities exist, they do not work for our use case. The data produced by our system is stored as AVRO files. These files contain a file header followed by one or more blocks of data. As such, a simple append will not work and doing so results in corrupt data. Additionally, the Filecrush utility doesn’t support reading files from AWS S3.

We decided to roll out our own solution. The idea was straightforward: Use Spark to create a simple job to read the daily directory of the raw AVRO data and re-partition the data using the following equation to determine the number of partitions needed to write back the larger files:

number_of_partitions = input_size / (AVRO_COMPRESSION_RATIO * DEFAULT_HDFS_BLOCK_SIZE)

Our initial approach used spark-avro by Databricks to read in the AVRO files and write out the grouped output. However, on validation of the data, we noticed an issue; the schema in the outputted data was completely mangled. With no workaround to be found, we reached out to our resident big data guru Peter Barron who saved the day by introducing us to Spark’s newAPIHadoopFile and saveAsNewAPIHadoopFile methods which allowed us to read and write GenericRecords of AVRO without modifying the schema.


To put it in a nutshell, we were able to solve the many small files problem in AVRO by writing a Spark job leveraging the low level functionalities of the Hadoop fs library. In effect, repartitioning files to be able to work on bigger blocks of data will improve the speed of future jobs by decreasing the number of map tasks needed and reducing the cost of storage.

We're looking for software engineers and other talents. For details, check out our jobs page.


[1] Dealing with Small Files Problem in Hadoop Distributed File System, Sachin Bendea and Rajashree Shedge,
[2] The Small Files Problem, Cloudera Engineering Blog,
[3] Integrating Spark At Petabyte Scale, Netflix,

Zalando Tech Blog – Technical Articles Michal Michalski

Building an aggregated view of data in the event-driven microservice architecture

In the world of microservices, where a domain model gets decomposed into related, but independently handled entities, we often face the challenge of building an aggregate view of the data that brings together different parts of that model. While this can already be interesting with “traditional” designs, the move to event-driven architectures can magnify these difficulties, especially with simplistic event streams.

In this post, I'll describe how we tackled this when building Zalando’s Smart Product Platform (SPP); how we initially got it wrong by trying to solve all problems in one place, and how we fixed it by "distributing" pieces of that complexity. I'll show how stepping back and taking a fresh look at the problem can lead to a much cleaner, simpler, more maintainable and less error-prone solution.

The Challenge

The SPP is the IT backbone of Zalando’s business. It consists of many smaller components focused on the ultimate goal of making articles sellable in Zalando’s online stores. One of the “earliest” stages in the pipeline is the Article ingestion, built as a part of Smart Product Platform. That’s the part that I, together with my colleagues, am responsible for.

Long story short, we built a system that allows for a very flexible Article data model based on a few “core” entities representing the various pieces of data we need in the system. For simplicity, in this blog post, I’m going to limit the core model to:

  • Product - core entity representing products that Zalando sells, which can form a hierarchy itself (Product being a child of another Product); all the entities need to be associated – either directly or indirectly – with a single Product that’s the “root” (topmost entity) of the hierarchy,
  • Media - photos, videos, etc., associated with a Product,
  • Enrichments - additional pieces of data associated with either Product or Media entity.

Sample hierarchy constructed from the building blocks described above may look like this:

Since we decided to adopt a “third generation Microservices” architecture focusing on “event-first development”, we ended up with a bunch of CRUD-like (i) microservices (service per entity) producing ordered streams of events describing the current “state of the world” to their own Kafka topic. Clients would then build the Article (which is an aggregate of Product, Media and Enrichment information) by making subsequent calls to all the services, starting from a Product and then adding other pieces of data as required.

What’s important is that all the services were ensuring the correct ordering of operations. For instance, it’s not possible to create a Media entity associated with a non-existent Product, or a “child” Product whose “parent” Product wasn’t yet created (implementation detail: we make HEAD requests to the referenced entity’s service REST API to ensure it).

Initially, we thought that this was enough; we would expose these streams to the consumers, and they would do The Right Thing™, which is merging the Product, Media and Enrichment data into the aggregated Article view.

i) CRU, rather than CRUD - the “D” part was intentionally left out.

This approach, however, had some significant drawbacks; one being that consumers of our data needed the aggregated Article view. They rarely cared only about the bits of information we exposed. This would mean that the non-trivial logic responsible for combining different streams of data would have to be re-implemented by many teams across Zalando.

We decided to bite the bullet and solve this problem for everyone.

Gozer - the reason to cross the streams

Gozer, as we called it, is an application whose only purpose was “merging” the inbound streams of the data – initially only Products and Media – and use them to build an aggregated Article view that’s exposed to the consumers who need it.

As simple as it sounds, that each service was publishing its stream and there were no ordering guarantees across different streams was making Gozer’s implementation non-trivial. We knew that entities were created and published in the correct order, but it didn’t guarantee the ordering of consumption. To account for that, whenever we consume an entity that’s associated with another entity not yet received by Gozer (e.g. Media event for a Product), we fetch the missing data using the service’s REST API.

Once we have the data, we have to sequence all the events correctly; make the “root” Product go first and the rest of the hierarchy follow in the way that a node is followed by its children. We use Kafka, so it’s to make sure that for all the entities in the hierarchy, the ID of the “root” Product is used as Partition Key. This will become important later. To do it in a performant way, we need to keep track of the whole hierarchy and its “root” (and some other information, but I’m going to ignore it for simplicity), which added more complexity and a significant performance penalty to the processing.

Then sequenced entities are published in the correct order to an “intermediate” Kafka topic, so in the next step they can be processed and merged.

This whole logic, extra service calls, local hierarchy tracking and event sequencing, added some complexity to the code, but at the time we were happy with the outcome. We had reasonably simple REST APIs and a single place handling the merge complexity. This looked reasonable and quite clean at the time.

Unfortunately, it didn’t stay like this for too long. Soon we added handling of the Enrichments inbound stream and some other features. This added complexity to the sequencing and merging logic and resulted in even more dependencies on REST APIs for fetching the missing data. Code was becoming more and more convoluted and processing was becoming slower. Making changes to the Gozer codebase was becoming a pain.

To visualise this complexity, below you can see the interaction diagram that my colleague, Conor Clifford drew, which describes the creation of a 2-level Product hierarchy with a single Media and a single Enrichment for that Media. Don’t worry if you can’t see the details; it’s the number of interactions and services involved that matters, showing the scale of the problem:

Note that we’re dealing with millions of Products with tens of Media and Enrichments. What you see above is the unrealistically simplified version. The real issue was much, much bigger.

But it wasn’t the end. As our data model grew, not only more inbound streams were about to arise, but we also started discussing the need for adding outbound streams for other entities in the future.

At this time, a significant amount of Gozer’s code and processing power was dedicated to dealing with issues caused by the lack of ordering across all the inbound streams, which was guaranteed at the entity creation time (remember the HEAD checks I mentioned earlier?), but lost later. The fact that we were not maintaining this ordering all the way down to Gozer because of having a stream per entity was causing us significant pain when we had to deal with out-of-order events.

We realised that we were giving up a very important property of our system (ordering) because it was “convenient” and looked “clean”, only to introduce a lot of complexity and put significant effort into reclaiming it back later.

This was something that we needed to change.

Vigo to the rescue

Following the established naming convention, we decided to rework Gozer by creating Vigo; an application whose purpose was the same as Gozer’s, but the approach we took this time was substantially different.

The main difference was that this time we wanted to ensure that the order of events received by Vigo was guaranteed to be correct. This way Vigo wouldn’t be responsible for “merging streams” and fetching missing data as before. It would consume entity-related events as they come and it’s only purpose would be to produce the “aggregate” event correctly. This design would have two main benefits:

  • Ordered events mean no “missing” data when events are delivered in an incorrect order, so the application architecture (sequencing step, additional Kafka topic) and logic (handling of the “missing entity” case, fetching it via REST API) are simplified,
  • No external calls are required to fetch missing entities; a massive performance gain.

As much as we cared for performance, since we were about to add even more functionality to Gozer, the simplicity argument was the main driving force to make the Vigo project happen.

We knew what we want Vigo to be, but we had to figure out how to get there; how to create that single, ordered stream of all the entity-change events.

One could ask, “Why not just make all the services publish to one Kafka topic? This would ensure the ordering and it is simple to do”:

Unfortunately, it’s not that simple in our case. I mentioned earlier that all the entities in our system build a hierarchy and need to be processed in the context of a Product. More precisely, to know what partition key to use for an entity, we need to know its “root” Product entity ID, which is the very top of the whole hierarchy. That’s where this approach gets a bit tricky…

Let's consider a Product that has a Media associated with it. That Media has an Enrichment. This Enrichment only 'knows' which Media it's defined for, but has no 'knowledge' on the Product (and its ID) that the Media is for. From the Enrichment's perspective, to get the Product ID we need, we must either:

  • Make the Enrichment service query the Media service for information about the Product that given Media is assigned to (meaning that Media would be a “proxy” for Product API),
  • Make the Enrichment service “understand” this kind of relationship and make it query the Product service directly, asking for a Product ID for a Media that the Enrichment is assigned to.

Both of these solutions sound bad to us: they break encapsulation and leak the details of the “global” design all over the system. Services would become responsible for things that, we believe, they shouldn’t be. They should only “understand” the concept of the “parent entity” and they should only interact directly with the services that are responsible for their parents.

This leads us to the third option; a bit more complex than the simplistic, ideal approach described above, but still significantly cleaner than what we had before in Gozer.

This complexity wouldn’t completely disappear, of course. We still have to:

  • enforce the ordering of events across different streams,
  • ensure entities are processed in the context of their parents.

To achieve the above, our services needed to become a bit smarter. They would need to first consume the outbound streams of the services responsible for entities that depend on them (e.g. Product stream would consume Media and Enrichment streams) and then publish the received entities into a single, ordered Kafka topic (partitioned by Product ID, because it’s the “root” entity) after the entity they’re associated with.

This approach can be “decomposed” and presented in a more abstract form as a service X consuming N inbound streams (containing “dependent” entities), and multiplexing the events received with the entities it’s responsible for (X) into a single, ordered outbound topic.

This service’s outbound topic may then become an inbound topic for another service and so on, which means that these small blocks can be composed into a more complex structure maintaining the ordering across all the entities, but still allow them to process all the entities in a context of their parents.

Putting all the building blocks together, the final design looked like this:

with Product service’s outbound queue containing something like:

This queue contains all the entities in the correct order, so they can be consumed and processed by Vigo “as is”, without considering the case of missing data and making any external calls.

While the “big picture” looks more complex right now it’s important to remember that a single engineer will rarely (almost never) deal with all that complexity at once in their daily development work as it will usually be done within the boundaries of a single service. Before Vigo, Gozer was a single place where all this complexity (and more!) was accumulated, sitting and waiting for an unsuspecting engineer to come and get swallowed.

Also, do you remember the interaction diagram I showed you earlier? This is the same interaction after making the discussed changes:

Again, don’t worry about the details - it’s about the number of interactions and services involved. The difference should be apparent.

Was it worth it?

As I was hopefully able to show you, we removed a lot of complexity from the “aggregating” service, but it came at a price. We had to add some complexity to other services in our Platform; this is how we coined the term “simplicity by distributing complexity”. While we think there’s no “silver bullet” solution here, the benefits of the second design (Vigo) make it superior to the original solution (Gozer) for at least few reasons:

  • It’s easier to understand; fewer cases to handle, less special cases, less external dependencies make it easier to reason about the service and create a mental model of it.
  • It’s easier to test - there’s less code, less possible scenarios to test overall, no REST services to take into account.
  • It’s easier to reason about and debug - monitoring a couple of consumers with cross-dependencies and making external calls is much more challenging than doing the same for a single one.
  • More extensible and composable - adding new data flows and streams becomes a much smaller undertaking.
  • It’s more resilient - again, no external REST services to call means that it’s less likely that a problem with other services will stop us from aggregating the data that’s waiting to be aggregated.

What’s worth noting is that the last point (resiliency) is true for the system as a whole as well. These REST calls weren’t moved anywhere, they simply disappeared: they’re now handled by moving data through Kafka (which we already have a hard dependency on).

What we noticed is that while complexity grouped in a single place tends to “multiply” (ii), the similar amount of complexity spread across many parts of the system is easier to handle and only “adds up”.

This only applies for instances when complexity is spread by design, put where it “logically” belongs; not just randomly (or even worse: accidentally) thrown all over the place!

“Distributing the complexity” is not a free lunch. In the same way that Microservice architecture distributes the monolith’s complexity into smaller, self-contained services at the price of the general operational overhead, our approach massively reduced the pain related to the complexity of a single service, and yet, resulted in adding a few small moving pieces to a couple of other places in the system.

Overall: yes, we think it was worth it.

ii) Of course this mathematical / numerical interpretation assumes that complexity has to be greater than 1

Check out some of the amazing jobs with people like Michal in our Dublin Tech Hub!

Zalando Tech Blog – Technical Articles Ian Duffy

Backing up Apache Kafka and Zookeeper to S3

What is Apache Kafka?

Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It is horizontally scalable, fault-tolerant, and wicked fast. It runs in production in many companies.

Backups are important with any kind of data. Apache Kafka lowers this risk of data loss with replication across brokers. However, it is still necessary to have protection in place in the event of user error.

This post will demo and introduce tools that our small team of three at Zalando uses to backup and restore Apache Kafka and Zookeeper.

Backing up Apache Kafka

Getting started with Kafka Connect
Kafka Connect is a framework for connecting Kafka with external systems. Its purpose is to make it easy to add new systems to scalable and secure stream data pipelines.

By using a connector by, backing up and restoring the contents of a topic to S3 becomes a trivial task.

Download the prerequisite

Checkout the following repository

$ git clone 

It contains a docker-compose file for bring up a Zookeeper, Kafka, and Kafka Connect locally.

Kafka Connect will load all jars put in the ./kafka-connect/jars directory. Go ahead and download the kafka-connect-s3.jar

$ wget "" -O kafka-connect-s3.jar

Bring up the stack
To boot the stack, use docker-compose up

Create some data
Using the Kafka command line utilities, create a topic and a console producer:

$ kafka-topics --zookeeper localhost:2181 --create --topic example-topic --replication-factor 1 --partitions 1Created topic "example-topic".$ kafka-console-producer --topic example-topic --broker-list localhost:9092>hello world

Using a console consumer, confirm the data is successfully written:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning hello world

Create a bucket on S3 to store the backups:

$ aws s3api create-bucket --create-bucket-configuration LocationConstraint=eu-west-1 --region eu-west-1 --bucket example-kafka-backup-bucket

Create a bucket on S3 to store the backups:

$ cat << EOF > example-topic-backup-tasks.json
 "name": "example-topic-backup-tasks",
 "config": {
   "connector.class": "com.spredfast.kafka.connect.s3.sink.S3SinkConnector",
   "format.include.keys": "true",
   "topics": "example-topic",
   "tasks.max": "1",
   "format": "binary",
 "s3.bucket": "example-kafka-backup-bucket",
   "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
   "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
   "local.buffer.dir": "/tmp"
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @example-topic-backup-tasks.json /api/kafka-connect-1/connectors

(Check out the Spredfast documentation for more configuration options.)

After a few moments the backup task will begin. By listing the Kafka Consumer groups, one can identify the consumer group related to the backup task and query for its lag to determine if the backup is finished.

$ kafka-consumer-groups --bootstrap-server localhost:9092 --listNote: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).connect-example-topic-backup-task
$ kafka-consumer-groups --describe --bootstrap-server localhost:9092 --group connect-example-topic-backup-tasks

Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
example-topic                  0          1               1               0          consumer-5-e95f5858-5c2e-4474-bab9-8edfa722db21   /                    consumer-5

The backup is completed when the lag reaches 0. On inspecting the S3 bucket, a folder of the raw backup data will be present.

Let’s destroy all of the containers and start fresh:

$ docker-compose rm -f -v
$ docker-compose up

Re-create the topic:

$ kafka-topics --zookeeper localhost:2181 --create --topic example-topic --replication-factor 1 --partitions 1Created topic "example-topic".

Create a source with Kafka Connect:

$ cat << EOF > example-restore.json
"name": "example-restore",
"config": {
"connector.class": "com.spredfast.kafka.connect.s3.source.S3SourceConnector",
"tasks.max": "1",
"topics": "example-topic",
"s3.bucket": "imduffy15-example-kafka-backup-bucket",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"format": "binary",
"format.include.keys": "true"
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @example-restore.json /api/kafka-connect-1/connectors

(Check out the Spredfast documentation for more configuration options.)

The restore process should begin, in some time, the ‘hello world’ message will display when you run the Kafka console consumer again:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning 
hello world

The restore process should begin, in some time, the ‘hello world’ message will display when you run the Kafka console consumer again:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning 
hello world

Backing Up Zookeeper

Zookeeper’s role
Newer versions of Kafka ( >= version 0.10.x) use ZooKeeper in a small but still important coordination role. When new Kafka Brokers join the cluster, they use ZooKeeper to discover and connect to the other brokers. The cluster also uses ZooKeeper to elect the controller and track the controller epoch. Finally and perhaps most importantly, ZooKeeper stores the Kafka Broker topic partition mappings, which tracks the information stored on each broker. The data will still persist without these mappings, but won't be accessible or replicated.

is a popular supervisor system for ZooKeeper. It provides a number of housekeeping facilities for ZooKeeper as well as exposing a nice UI for exploring the stored data. It also provides some backup and restore capabilities for ZooKeeper out of the box. However, we should make sure we understand these features before relying on them.

On AWS, we run our Exhibitor brokers under the same stack. In this setup, when the stack auto scaling group is responsible for controlling when any of the Exhibitor instances are removed, it is relatively easy for multiple (or even all) Exhibitor brokers to be terminated at the same time. That’s why for our tests we set up an Exhibitor cluster and connected Kafka to it. We then indexed all the Kafka znodes to create an exhibitor backup. Finally, we tore down the Exhibitor stack and re-deployed it with the same config.

Unfortunately, after re-deploy, while the backup folder was definitely in S3, the new Exhibitor appliance did not recognise it as an existing index. With a bit of searching we found that this is actually the expected behaviour and the suggested solution is to read the S3 index and apply changes by hand.

Creating, deleting and re-assigning topics in Kafka is an uncommon occurrence for us, so we estimated that a daily backup task would be sufficient for our needs.

We came across Burry. Burry is a small tool which allows for snapshotting and restoring of a number of system critical stores, including ZooKeeper. It can save the snapshot dump locally or to various cloud storage options. Its backup dump is also conveniently organized along the znode structure making it very easy to work with manually if need be. Using this tool we set up a daily cron job on our production to get a full daily ZooKeeper snapshot and upload the resultant dump to an S3 bucket on AWS.

Conveniently, Burry also works as a restore tool using a previous ZooKeeper snapshot. It will try to recreate the full snapshot znode structure and znode data. It also tries to be careful to preserve existing data, so if a znode exists it will not overwrite it.

But there is a catch. Some of the Kafka-created znodes are ephemeral and expected to expire when the Kafka Brokers disconnect. Currently Burry snapshots these as any other znodes, so restoring to a fresh Exhibitor cluster will recreate them. If we were to restore Zookeeper before restarting our Kafka brokers, we'd restore from the snapshot of the ephemeral znodes with information about the Kafka brokers in our previous cluster. If we then bring up our Kafka cluster, our new broker node IDs, which must be unique, would conflict with the IDs restored from our Zookeeper backup. In other words, we'd be unable to start up our Kafka brokers.

We can easily get around this problem by starting our new Zookeeper and new Kafka clusters before restoring the Zookeeper content from our backup. By doing this, the Kafka brokers will create their ephemeral znodes, and the Zookeeper restore will not overwrite these, and will go on to recreate the topic and partition assignment information. After restarting the Kafka brokers, the data stored on their persisted disks will once again be correctly mapped and available, and consumers will be able to resume processing from their last committed position.

Be part of Zalando Tech. We're hiring!

Zalando Tech Blog – Technical Articles Ian Duffy

A closer look at the ingredients needed for ultimate stability

This is part of a series of posts on Kafka. See Ranking Websites in Real-time with Apache Kafka’s Streams API for the first post in the series.

Remora is a small application to track the monitoring of Kafka. Due to many teams deploying this to their production environments, open sourcing this application made sense. Here, I’ll go through some technical pieces of what is streaming, why it is useful and the drive behind this useful monitoring application.

Streaming architectures have become an important architecture pattern at Zalando. To have fast, highly available and scalable systems to process data across numerous teams and layers, having a good streaming infrastructure and monitoring is key.  

Without streaming, the system is not reactive to changes. An older style would manage changes through incremental batches. Batches, such as CRONs, can be hard to manage as you have to keep track of a large number of jobs, each taking care of a shard of data, this may also be hard to scale. Having a streaming component, such as Kafka, allows for a centralised scalable resource to make your architecture reactive.

Some use cloud infrastructure such as AWS Kinesis or SQS. Alternatively, to reach better throughput and use frameworks like AKKA streams, Kafka is chosen by Zalando.

Monitoring lag is important; without it we don’t know where the consumer is relative to the size of the queue. An analogy might be piloting a plane without knowing how many more miles are left on your journey. Zalando has trialled:

We needed a simple independent application, which may scrape metrics from a URL and place them into our metrics system. But what to include in this application? I put on my apron and played chef with the range of ingredients available to us.

Scala is big at Zalando; a majority of the developers know AKKA or Play. At the time in our team, we were designing systems using AKKA HTTP with an actor pattern. It is a very light, stable and an asynchronous framework. Could you just wrap the command line tools in a light Scala framework? Potentially, it could take less time and be more stable. Sounds reasonable, we thought, let’s do that.

The ingredients for ultimate stability were as follows: a handful of Kafka java command line tools with a pinch of AKKA Http and a hint of an actor design. Leave to code for a few days and take out of the oven. Lightly garnish with a performance test to ensure stability, throw in some docker. Deploy, monitor, alert, and top it off with a beautiful graph to impress. Present Remora to your friends so that everyone may have a piece of the cake, no matter where in the world you are!

Bon app-etit!

Zalando Tech Blog – Technical Articles Ian Duffy

Second in our series about the use of Apache Kafka’s Streams API by Zalando

This is the second in a series about the use of Apache Kafka’s Streams API by Zalando, Europe’s leading online fashion platform. See Ranking Websites in Real-time with Apache Kafka’s Streams API for the first post in the series.

This piece was first published on

Running Kafka Streams applications in AWS

At Zalando, Europe’s leading online fashion platform, we use Apache Kafka for a wide variety of use cases. In this blog post, we share our experiences and lessons learned to run our real-time applications built with Kafka’s Streams API in production on Amazon Web Services (AWS). Our team at Zalando was an early adopter of the Kafka Streams API. We have been using it since its initial release in Kafka 0.10.0 in mid-2016, so we hope you find this hands-on information helpful for running your own use cases in production.

What is Apache Kafka’s Streams API?

The Kafka Streams API is available as a Java library included in Apache Kafka that allows you to build real-time applications and microservices that process data from Kafka. It allows you to perform stateless operations such as filtering (where messages are processed independently from each other), as well as stateful operations like aggregations, joins, windowing, and more. Applications built with the Streams API are elastically scalable, distributed, and fault-tolerant. For example, the Streams API guarantees fault-tolerant data processing with exactly-once semantics, and it processes data based on event-time i.e., when the data was actually generated in the real world (rather than when it happens to be processed). This conveniently covers many of the production needs for mission-critical real-time applications.

An example of how we use Kafka Streams at Zalando is the aforementioned use case of ranking websites in real-time to understand fashion trends.

Library Upgrades of Kafka Streams

Largely due to our early adoption of Kafka Streams, we encountered many teething problems in running Streams applications in production. However, we stuck with it due to how easy it was to write Kafka Streams code. In our early days of adoption, we hit various issues around stream consumer groups rebalancing, issues with getting locks on the local RocksDB after a rebalance, and more. These eventually settled down and sorted themselves out in the release (April 2017) of the Kafka Streams API.


After upgrading to, our Kafka Streams applications were mostly stable, but we would still see what appeared to be random crashes every so often. These crashes occurred more frequently on components doing complex stream aggregations. We eventually discovered that the actual culprit was AWS rather than Kafka Streams; on AWS General purpose SSD (GP2) EBS volumes operate using I/O credits. The AWS pricing model allocates a baseline read and write IOPS allowance to a volume, based on the volume size. Each volume also has an IOPS burst balance, to act as a buffer if the base limit is exceeded. Burst balance replenishes over time but as it gets used up, the reading and writing to disks starts getting throttled to the baseline, leaving the application with an EBS that is very unresponsive. This ended up being the root cause of most of our issues with aggregations. When running Kafka Stream applications, we had initially assigned 10gb disks as we didn’t foresee much storage occurring on these boxes. However, under the hood, the applications performed lots of read/write operations on the RocksDBs which resulted in I/O credits being used up, and given the size of our disks, the I/O credits were not replenished quickly enough, grinding our application to a halt. We remediated this issue by provisioning Kafka Streams applications with much larger disks. This gave us more baseline IOPS, and the burst balance was replenished at a faster rate.


EBS Burst Balance

Our monitoring solution polls CloudWatch metrics and pulls back all AWS exposed metrics. For the issue outlined, the most important of these is EBS burst balance. As mentioned above, in many cases applications that use Kafka Streams rely on heavy utilization of locally persisted RocksDBs for storage and quick data access. This storage is persisted on the instance’s EBS volumes and generates a high read and write workload on the volumes. GP2 disks were used in preference to provisioned IOPS disks (IO1) since these were found to be much more cost-effective in our case.

Fine-tuning your application

With upgrades in the underlying Kafka Streams library, the Kafka community introduced many improvements to the underlying stream configuration defaults. Where in previous, more unstable iterations of the client library we spent a lot of time tweaking config values such as,, and to achieve some level of stability.

With new releases we found ourselves discarding these custom values and achieving better results. However, some timeout issues persisted on some of our services, where a service would frequently get stuck in a rebalancing state. We noticed that reducing the max.poll.records value for the stream configs would sometimes alleviate issues experienced by these services. From partition lag profiles we also saw that the consuming issue seemed to be confined to only a few partitions, while the others would continue processing normally between re-balances. Ultimately we realised that the processing time for a record in these services could be very long (up to minutes) in some edge cases. Kafka has a fairly large maximum offset commit time before a stream consumer is considered dead (five minutes), but with larger message batches of data, this timeout was still being exceeded. By the time the processing of the record was finished, the stream was already marked as failed and so the offset could not be committed. On rebalance, this same record would once again be fetched from Kafka, would fail to process in a timely manner and the situation would repeat. Therefore for any of the affected applications, we introduced a processing timeout, ensuring there was an upper bound on the time taken by any of our edge cases.


Consumer Lag

By looking at the metadata of a Kafka Consumer Group, we can determine a few key metrics. How many messages are being written to the partitions within the group? How many messages are being read from those partitions? The difference between these is called lag; it represents how far the Consumers lag behind the Producers.

The ideal running state is that lag is a near zero value. At Zalando, we wanted a way to monitor and plot this to see if our streams applications are functioning.

After trying out a number of consumer lag monitoring utilities, such as Burrow, Kafka Lag Monitor and Kafka Manager, we ultimately found these tools either too unstable or a poor fit for our use case. From this need, our co-worker, Mark Kelly, build a small utility called Remora. It is a simple HTTP wrapper around the Kafka consumer group “describe” command. By polling the Remora HTTP endpoints from our monitoring system at a set time interval, we were able to get good insights into our stream applications.


Our final issue was due to memory consumption. Initially we somewhat naively assigned very large heaps to the Java virtual machine (JVM). This was a bad idea because Kafka Streams applications utilize a lot of off-heap memory when configured to use RocksDB as their local storage engine, which is the default. By assigning large heaps, there wasn’t much free system memory. As a result, applications would eventually come to a halt and crash. For our applications we use M4.large instances, we assign 4gb of ram to the heap and usually utilize about 2gb of it, the system has a remaining 4gb of ram free for off-heap and system usage, utilization of overall system memory is at 70%. Additionally, we would recommend reviewing the memory management section of Confluent’s Kafka Streams documentation as customising the RocksDB configuration was necessary in some of our use cases.


JVM Heap Utilization

We expose JVM heap utilization using Dropwizard metrics via HTTP. This is polled on an interval by our monitoring solution and graphed. Many of our applications are fairly memory intensive, with in-memory caching, so it was important for us to be able to see at a glance how much memory was available to the application. Additionally, due to the relative complexity of many of our applications, we wanted to have easy visibility into garbage collection in the systems. Dropwizard metrics offered a robust, ready-made solution for these problems.

CPU, System Memory Utilization and Disk Usage

We run Prometheus node exporteron all of our servers; this exports lots of system metrics via HTTP. Again, our monitoring solution polls this on interval and graphs them. While the JVM monitoring provided a great insight into what was going on in the JVM, we needed to also have insight into what was going on in the instance. In general, most of the applications we ended up writing had a much greater network and memory overheads than CPU requirements. However, in many failure cases we saw, our instances were ultimately terminated by auto-scaling groups on failing their health checks. These health checks would fail because the endpoints became unresponsive due to high CPU loads as other resources were used up. While this was usually not due to high CPU use in the application processing itself, it was a great symptom to capture and dig further into where this CPU usage was coming from. Disk monitoring also proved very valuable, particularly for Kafka Streams applications consuming from topics with a large partitioning factor and/or doing more complex aggregations. These applications store a fairly large amount of data (200MB per partition) in RocksDB on the host instance, so it is very easy to accidentally run out of space. Finally, it is also good to monitor how much memory the system has available as a whole since this was frequently directly connected to CPU loads saturating on the instances, as briefly outlined above.

Conclusion: The Big Picture of our Journey

As mentioned in the beginning, our team at Zalando has been using the Kafka Streams API since its initial release in Kafka 0.10.0 in mid-2016.  While it wasn’t a smooth journey in the very beginning, we stuck with it and, with its recent versions, we now enjoy many benefits: our productivity as developers has skyrocketed, writing new real-time applications is now quick and easy with the Streams API’s very natural programming model, and horizontal scaling has become trivial.

In the next article of this series, we will discuss how we are backing up Apache Kafka and Zookeeper to Amazon S3 as part of our disaster recovery system.

About Apache Kafka’s Streams API

If you have enjoyed this article, you might want to continue with the following resources to learn more about Apache Kafka’s Streams API:

Join our ace tech team. We’re hiring!

Zalando Tech Blog – Technical Articles Hunter Kelly

Using Apache and the Kafka Streams API with Scala on AWS for real-time fashion insights

This piece was originally published on

The Fashion Web

Zalando, Europe’s leading online fashion platform, cares deeply about fashion. Our mission statement is to, “Reimagine fashion for the good of all”. To reimagine something, first you need to understand it. The Dublin Fashion Insight Centre was created to understand the “Fashion Web” – what is happening in the fashion world beyond the borders of what’s happening within Zalando’s shops.  

We continually gather data from fashion-related sites. We bootstrapped this process with a list of relevant sites from our fashion experts, but as we scale our coverage, and add support for multiple languages (spoken, not programming), we need to know what are the next “best” sites.

Rather than relying on human knowledge and intuition, we needed an automated, data-driven methodology to do this. We settled on a modified version of Jon Kleinberg’s HITS algorithm. HITS (Hyperlink Induced Topic Search) is also sometimes known as Hubs and Authorities, which are the main outputs of the algorithm. We use a modified version of the algorithm, where we flatten to the domain level (e.g., rather than on the original per-document level (e.g,

HITS in a Nutshell

The core concept in HITS is that of Hubs and Authorities. Basically, a Hub is an entity that points to lots of other “good” entities. An Authority is the complement; an entity pointed to by lots of other “good” entities. The entities here, for our purposes, are web sites represented by their domains such as or Domains have both Hub and Authority scores, and they are separate (this turns out to be important, which we’ll explain later).

These Hub and Authority scores are computed using an adjacency matrix. For every domain, we mark the other domains that it has links to. This is a directed graph, with the direction being who links to whom.

(Image courtesy of

Once you have the adjacency matrix, you perform some straightforward matrix calculations to calculate a vector of Hub scores and a vector of Authority scores as follows:

  • Sum across the columns and normalize, this becomes your Hub vector
  • Multiply the Hub vector element-wise across the adjacency matrix
  • Sum down the rows and normalize, this becomes your Authority vector
  • Multiply the Authority vector element-wise down the the adjacency matrix
  • Repeat

An important thing to note is that the algorithm is iterative: you perform the steps above until  eventually you reach convergence—that is, the vectors stop changing—and you’re done. For our purposes, we just pick a set number of iterations, execute them, and then accept the results from that point.  We’re mostly interested in the top entries, and those tend to stabilize pretty quickly.

So why not just use the raw counts from the adjacency matrix? The beauty of the HITS algorithm is that Hubs and Authorities are mutually supporting—the better the sites that something points at, the better a Hub it is; similarly, the better the sites that point to something, the better an Authority it is. That is why the iteration is necessary: it bubbles the good stuff up to the top.

(Technically, you don’t have to iterate. There’s some fancy matrix math you can do instead with calculating the eigenvectors. In practice, we found that when working with large, sparse matrices, the results didn’t turn out the way we expected, so we stuck with the iterative, straightforward method.)

Common Questions

What about non-fashion domains? Don’t they clog things up?
Yes, in fact, on the first run of the algorithm, sites like Facebook, Twitter, Instagram, et al. were right up at the top of the list. Our Fashion Librarians then curated that list to get a nice list of fashion-relevant sites to work with.

Why not PageRank?
PageRank needs to have nearly complete information on the web; with the resources needed to get this. We only have outgoing link data on the domains that are already in our working list. We need an algorithm that is robust in the face of partial information.  

This is where the power of the separate Hub and Authority scores comes in. Given the information for our seeds, they become our list of Hubs. We can then calculate the Authorities, filter out our seeds, and have a ranked list of stuff we don’t have. Voilà!  Problem solved, even in the face of partial knowledge.

But wait, you said Kafka Streams?

Kafka was already part of our solution, so it made sense to try to leverage that infrastructure and our experience using it. Here’s some of the thinking behind why we chose to go with Apache Kafka’s® Streams API to perform such real-time ranking of domains as described above:

  • It has all the primitives necessary for MapReduce-style computation: the “Map” step can be done with groupBy & groupByKey, the “Reduce” step can be done with reduce & aggregate.
  • Streaming allows us to have real-time, up-to-date data.
  • The focus stays on the data. We’re not thinking about distributed computing machinery.
  • It fits in naturally with the functional style of the rest of our application.

You may wonder at this point, “Why not use MapReduce? And why not use tools like Apache Hadoop or Apache Spark that provide implementations of MapReduce?” Given that MapReduce was invented originally to solve this type of ranking problem, why not use it for the very similar type computation we have here? There are a few reasons we didn’t go with it:

  • We’re a small team, with no previous Hadoop experience, which rules Hadoop out.
  • While we do run Spark jobs occasionally in batch mode, it is a high infrastructure cost to run full-time if you’re not using it for anything else.
  • Initial experience with Spark Streaming, snapshotting, and recovery didn’t go smoothly.

How We Do It

For the rest of this article we are going to assume at least a basic familiarity with the Kafka Streams API and its two core abstractions, KStream and KTable. If not, there are plenty of tutorials, blog posts and examples available.


The real-time ranking is performed by a set of Scala components (groups of related functionality) that use the Kafka Streams API. We deploy them via containers in AWS, where they interact with our Kafka clusters. The Kafka Streams API allows us to run each component, or group of components, in a distributed fashion across multiple containers depending on our scalability needs.

At a conceptual level, there are three main components of the application. The first two, the Domain Link Extractor and the Domain Reducer, are deployed together in a single JVM. The third component, the HITS Calculator and its associated API front end, is deployed as a separate JVM. In the diagram below, the curved bounding boxes represent deployment units; the rectangles represent the components.

Data, in the form of s3 URLs to stored web documents, comes into the system on an input topic. The Domain Link Extractor loads the document, extracts the links, and outputs a mapping from domain to associated domains, for that document. We’ll drill into this a little bit more below. At a high level, we use the flatMap KStream operation. We use flatMap rather than map to simplify the error handling—we’re not overly concerned with errors, so for each document we either emit one new stream element on success, or zero if there is an error. Using flatMap makes that straightforward.

These mappings then go to the Domain Reducer, where we use groupByKey and reduce to create a KTable. The key is the domain, and the value in the table is the union of all the domains that the key domain has linked to, across all the documents in that domain. From the KTable, we use toStream to convert back to a KStream and from there to an output topic, which is log-compacted.

The final piece of the puzzle is the HITS Calculator. It reads in the updates to domains, keeps the mappings in a local cache, uses these mappings to create the adjacency matrix, then perform the actual HITS calculation using the process described above. The ranked Hubs and Authorities are then made available via a REST API.

The flexibility of Kafka’s Streams API

Let’s dive into the Domain Link Extractor for a second, not to focus on the implementation, but as a means of exploring the flexibility that Kafka Streams gives.

The current implementation of the Domain Link Extractor component is a function that calls four more focused functions, tying everything together with a Scala for comprehension.  This all happens in a KStream flatMap call. Interestingly enough, the monadic style of the for comprehension fits in very naturally with the flatMap KStream call.

One of the nice things about working with Kafka Streams is the flexibility that it gives us. For example, if we wanted to add information to the extracted external links, it is very straightforward to capture the intermediate output and further process that data, without interfering with the existing calculation (shown below).

In Summary

For us, Apache Kafka is useful for much more than just collecting and sharing data in real-time; we use it to solve important problems in our business domain by building applications on top. Specifically, Kafka’s Streams API enables us to build real-time Scala applications that are easy to implement, elastically scalable, and that fit well into our existing, AWS-based deployment setup.  

The programming style fits in very naturally with our already existing functional approach, allowing us to quickly and easily tackle problems with a natural decomposition of the problem into flexible and scalable microservices.

Given that much of what we’re doing is manipulating and transforming data, we get to stay close to the data. Kafka Streams doesn’t force us to work at too abstract a level or distract us with unnecessary infrastructure concerns.

It’s for these reasons we use Apache Kafka’s Streams API in the Dublin Fashion Insight Centre as one of our go-to tools in building up our understanding of the Fashion Web.

About Apache Kafka’s Streams API

If you have enjoyed this article, you might want to continue with the following resources to learn more about Apache Kafka’s Streams API:

Interested in working in our team? Get in touch: we’re hiring.