Zalando Tech Blog – Technical Articles Holger Schmeisky

Sharing successful large scale agile experiences


Zalando has been known for radical approaches to agility since 2015. In order to keep growing and staying successful we took the next step in 2017 forming around 30 business units. Each business unit is formed around one particular business problem, topic or product with end2end responsibility. All disciplines needed are inside this business unit from commercial roles to tech teams.

Challenges in large scale product groups

Looking at this setup, we experience challenges. You’re probably familiar with this if you work in a similar setup or if your company has around the size of one of our business units (<100 people).

  • Who takes product decisions at this size with several teams and product people?
  • How to keep the focus on the actual product with so many technical components and intermediate steps?
  • How to enable 50 engineers to understand their everyday task contribution to the overall quarterly goals?
  • How to do sprint planning with 20 people?
  • How to handle cross-cutting concerns like 24/7 and platform work in a feature team setup?

By far the biggest question was however: How can this work inside Zalando?


Our Solution Approach

How to support these +30 business units to reach their business goals through agile working? Rome was not built in a day. We knew we had to work by process and collaboration.

We used the power of our network and collected successful solutions from all units. The first and most important observation was that no solution can be mechanically copied, but always has to be adapted to the specific needs of the unit (“There are no best practices, only practices that work in a specific context”). To enable this adaption and learning, in addition to the bare facts we collected:

  1. the story and motivation around the solutions
  2. the details of how they are adopted
  3. the (contact details of the) people who created them

For the first factor, we invited people from these teams for teachback sessions open for everyone to share their experiences in a try/avoid format.

Secondly, from these we created a 20 page guide on how to structure large teams with background details. Finally, we connected people we talked to who have similar challenges to the pioneers, because all advice needs to be adapted to the specific BU needs.

Concrete Examples

For example, the Fashion Store Apps group (5 teams) struggled with their narrow product definition: Each platform and the API were treated as separate products, with seperate teams, backlogs, product owners, etc. These needed to be managed, synchronized, and aligned, and code needed to be integrated. As you can imagine, somewhere along the way the focus on the customer gets hard to find. To address this, the team redefined the product as “Fashion Store Apps,” reorganized the teams to reflect this, and merged all backlogs into one.

Another example is how Personalization (6 teams) increased the understanding of the goals and unlocked possibilities. As is typical in a large organization, goals and concrete products were difficult to define for this department and usually the understanding did not transfuse to the engineering and data science teams. To tackle this, everyone (including engineers) took responsibility for creating or refining the press releases that underlie the epics for the upcoming quarter. Ideas to achieve goals are as likely to come from Product* as they are to come from delivery teams. The concrete outcome is an aligned and commonly understood overview of the next quarter’s sprints. This led to much higher involvement and identification during the quarter, and to more motivated teams.

A LeSS introduction backwards

These are only two examples from many more instances of how we scale agile at Zalando. The whole approach is somehow a LeSS introduction backwards. We make note of what trials work, and we find a high similarity to the LeSS framework without ever using the word or the whole framework. The practices emerged themselves as they made sense to the people inside the organization. As one engineering lead put it after reading a LeSS book, “It’s nice to see that we were not the only ones with these ideas.”

Our key learning directed to all fellow Agile Coaches and Agile Change Agents is to not implement frameworks, but to source from working solutions and share the successes.

Eventually we will end up in a form of LeSS organization without anybody inside Zalando connecting emotionally to the framework itself.

If you would like to learn more, feel free to reach out to agility-coaching@zalando.de or have a look at our open position.

Many thanks for the input and support of our colleagues Samir Hanna, Tobias Leonhard and Frank Ewert.

Zalando Tech Blog – Technical Articles Oleksandr Volynets

How we migrated the Zalando Logistics Operating Services to Java 8

“Never touch working code!” goes the old saying. How often do you disregard this message and touch a big monolithic system? This article tells you why you should ignore common wisdom and, in fact, do it even more often.


Preface

Various kinds of migration are a natural part of software development. Do you remember the case when the current database didn’t scale enough? Or maybe there is need for a new tech stack when the existing stack does not meet changing requirements? Or perhaps the migration from the monolithic application to the microservice architecture is hard. There could also be smaller-scale migrations like upgrading to a newer version of the dependency, e.g. Spring, or Java Runtime Environment (JRE). This is the story on how a relatively simple task of migration from Java 7 to Java 8 was performed on a large-scale monolithic application that has ultimate criticality to the business.

Zalos as the service for Logistics Operations

Zalos (Zalando Logistics System) is a set of Java services, backend and frontend, that contains submodules to operate most functions inside the warehouses operated by Zalando. The scale of Zalos can be summarized by the following statistics:

  • more than 80,000 git commits,
  • more than 70 active developers in 2017,
  • almost 500 maven submodules,
  • around 13,000 Java classes with 1.3m lines of code, plus numerous production and test resource files,
  • operates with around 600 PostgreSQL tables and more than 3,000 stored procedures.

Zalos 2, denoted as just Zalos below, is the second generation of the system, and has grown to this size over the past five years. Patterns that were, at the time, easy to adopt for scaling up architectural functionality, have quickly become a bottleneck with the growing number of teams maintaining it. It is deployed to all Zalando warehouses every second week, and every week there is a special procedure to create a new release branch. Each deployment takes about five hours, branching takes about the same time. When also considering urgent patches, it takes a significant portion of each team’s time to do regular deployment or maintenance operations.

Now, what happens if the system is left unmaintained for a while? The package dependencies and Java libraries become obsolete and, as a consequence, security instability grows. Then, one day one of the core infrastructure systems has to change the SSL certificate, and this causes some downtime in all relevant legacy systems operating a deprecated Java version. For the logistics services these problems might become a big disaster, and you start thinking: “What does it take to migrate Zalos from Java 7 to Java 8?”


Migration? Easy!

With some basic experience with Java 9, the option to go even further has been rejected pretty fast: a combination of Java-9 modularity and 500 sub-modules doesn’t look very positive. Well, bad luck. What else do you need to keep in mind for Java 8 support? Spring? Sure. GWT? Maybe. Guava? Oh yes. Generics? This too.

This is a good time to talk about the tech stack for Zalos. It contains backend as well as frontend parts, both running Spring 3. The backend uses PostgreSQL databases via the awesome sprocwrapper library. Both backend and frontend rely on Zalando-internal parent packages to take care of dependency management. The frontend engine is GWT 2.4 with some SmartGWT widgets. And, to mention a few more challenges, it uses Maven overlays with JavaScript but more on this later.

Our first strategy was to bump as many package dependencies as we can. Spring 4 which fully supports Java 8, GWT 2.8.2 that already has support for Java 9, Guava 23.0, etc. We use GWT 2.4; a jump of over five years development-wise. Hard dependency on our internal Zalando dependencies had ruled out the major Spring upgrade too. Guava 23 has deprecated some methods and we would need to change quite an amount of code: again, a failure.

Let’s try an another strategy then: bump as little as we can. This strategy worked much better. We only needed to have Spring 3.2.13 and Guava 20.0, plus required upgrades like javassist and org.reflections. The matrix of compatible versions is shown in the appendix. GWT dependency was left untouched, although it limits our client code to Java 7. A compromise but not a blocker: there is little active development of new GWT code anyway.

Now, overlays, or in our case Dependency Hell, is a feature of Maven to include dependencies from a WAR or a ZIP file and it “inlines” the complete package as is. And it does so with all its dependencies. As an example, this means, should an overlay have a different version of spring-core, you get two versions of spring-core in the final WAR artifact. When the application starts, it will get confused which version to use for which parts of the application, and various ClassNotFound exceptions will pop up. Bad luck, republishing all war-overlays with updated dependencies is required.


Go-live or don’t rush?

It took just two weeks of highly-motivated and self-driven work for two people to crack the problem and run the 500-module monolith on the laptop with Java 8. It took two more weeks to deploy it to the staging environment after fixing multiple issues. After that, it took two more months to finally deploy it to the production environment. Why so long? Because we deal with the utmost critical system that has several serious constraints, and here they are:

  1. Deployments. Deployment to production lasts up to five hours and it should not interfere with any other deployment, due to internal limitations of the deployment system. With absolute priority for production deployment there isn’t much time for experimenting with the migration. Solution? Tweaking the deployment service helped reduce deployment time by about one third to have some freedom for experimenting on a staging environment.
  2. Development. There are still about 25 commits per day in the main branch. Breaking it would have a significant impact on feature development, and it isn’t easy to experiment with JDK versions from the feature branch. This isn’t good, but still there is a more serious constraint.
  3. Warehouse operations. They are the backbone of an e-commerce company and should not be interrupted by the migration. The risk of any bug should be carefully minimized to maintain the service liveness.

To solve at least two constraints, we created a concrete three-step plan on how we execute the migration in a safe manner and be able to roll back at any time:

  1. Upgrades of all packages compatible with both Java 7 and 8 without changing runtime version. This ensured that there are no changes for deployment
  2. Switch to Java 8 runtime (JRE) keeping source code in Java 7 mode. This step ensured that we can safely change the deployment settings without touching the code and dependencies.
  3. Switch to Java 8 development mode to fully support Java 8 features. No major deployment changes were done with this step.

In addition, except for a staging environment, every step was carefully tested on a so-called beta environment which operates on production data.


Outlook

The migration was completed despite some failed attempts a few years ago. Several things have happened. The service has become a little more stable and secure. The code can now be written with lambdas, method references, etc. Deployment service has been improved too. But most importantly, the legacy system got attention. Even though we had one camp of people who said, “We tried that before, why do you want to try again?” there was also the second camp with, “You are crazy but yeah, do it”. No matter what was tried before and in what manner, it is never too late to try again.

Keep your legacy code under careful supervision: add code quality metrics, minimize maintenance efforts, optimize release cycles. With this you will stop having “Legacy Nightmares” but rather have a maintained piece of code.

Appendix

Here is a list Maven dependencies and related changes that finally made it working together:


In addition, the following compilation and runtime settings were required:

  • <source> and <target> properties for maven-compiler-plugin set to 1.8
  • tomcat 7, i.e. run services with “mvn tomcat7:run-war” and not “mvn tomcat:run-war” which uses tomcat 6 by default.

Come work with us! Have a look at our jobs page.

Zalando Tech Blog – Technical Articles Rohit Sharma

Using Akka cluster-sharding and Akka HTTP on Kubernetes

This article captures the implementation of an application serving data over HTTP which is stored in cluster-sharded actors and deployed on Kubernetes.

Use case: An application, serving data over HTTP and with a high request rate, and the latency of order of 10ms with limited database IOPS available.

My initial idea was to cache it in memory, which worked pretty well for some time. But this meant larger instances due to duplication of cached data in the instances behind the load balancer. As an alternative I wanted to use Kubernetes for this problem and do a proof of concept (PoC) of a distributed cache with Akka cluster-sharding and Akka-HTTP on Kubernetes.

This article is by no means a complete tutorial to Akka cluster sharding or Kubernetes. It outlines knowledge I gained while doing this PoC. The code for this PoC can be found here.

Let’s dig into the details of this implementation.

To form an Akka Cluster, there needs to a pre-defined ordered set of contact points often called seed nodes. Each Akka node will try to register itself with the first node from the list of seed nodes. Once, all the seed nodes have joined the cluster, any new node can join the cluster programmatically.

The ordered part is important here, because if the first seed node changes frequently then the chances of split-brain increases. More info about Akka Clustering can be found here.

So, the challenge here with Kubernetes was the ordered set of predefined nodes, and here comes StatefulSet(s) and Headless Services to the rescue.

StatefulSet guarantees stable and ordered pod creation, which satisfies the requirement of our seed nodes, and Headless Service is responsible for their deterministic discovery in the network. So, the first node will be “<application>-0” and the second will be “<application>-1” and so on.

  • <application> is replaced by the actual name of the application

The DNS for the seed nodes will be of the form:

<application-name>-<ordinal>.<service-name>.<namespace>.svc.cluster.local

Steps:

  1. Start with creating the Kubernetes resources. First, the Headless Service, which is responsible for deterministic discovery of seed nodes(Pods), can be created using the following manifest:
kind: Service
apiVersion: v1
metadata:
name: distributed-cache
labels:
  app: distributed-cache
spec:
clusterIP: None
selector:
  app: distributed-cache
ports:
  - port: 2551
    targetPort: 2551
    protocol: TCP

Note, that the “clusterIP” is set to “None.” Which indicates it’s a Headless Service.

2. Create a StatefulSet, which is a manifest for ordered pod creation:

apiVersion: "apps/v1beta2"
kind: StatefulSet
metadata:
name: distributed-cache
spec:
selector:
  matchLabels:
    app: distributed-cache
serviceName: distributed-cache
replicas: 3
template:
  metadata:
    labels:
      app: distributed-cache
  spec:
    containers:
     - name: distributed-cache
       image: "localhost:5000/distributed-cache-on-k8s-poc:1.0"
       env:
         - name: AKKA_ACTOR_SYSTEM_NAME
           value: "distributed-cache-system"
         - name: AKKA_REMOTING_BIND_PORT
           value: "2551"
         - name: POD_NAME
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
         - name: AKKA_REMOTING_BIND_DOMAIN
           value: "distributed-cache.default.svc.cluster.local"
         - name: AKKA_SEED_NODES
           value: "distributed-cache-0.distributed-cache.default.svc.cluster.local:2551,distributed-cache-1.distributed-cache.default.svc.cluster.local:2551,distributed-cache-2.distributed-cache.default.svc.cluster.local:2551"
       ports:
        - containerPort: 2551
       readinessProbe:
        httpGet:
          port: 9000
          path: /health

3. Create a service, which will be responsible for redirecting outside internet traffic to pods:

apiVersion: v1
kind: Service
metadata:
labels:
  app: distributed-cache
name: distributed-cache-service
spec:
selector:
  app: distributed-cache
type: ClusterIP
ports:
  - port: 80
    protocol: TCP
    # this needs to match your container port
    targetPort: 9000

4. Create anIngress, which is responsible for defining a set of rules to route traffic from outside internet to services.

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: distributed-cache-ingress
spec:
rules:
  # DNS name your application should be exposed on
  - host: "distributed-cache.com"
    http:
      paths:
        - backend:
            serviceName: distributed-cache-service
            servicePort: 80

And the distributed cache is ready to use:

Summary
This article covers Akka Cluster-sharding on Kubernetes with the pre-requirements of an ordered set of Seed Nodes and their deterministic discovery in the network, and how it can be solved with StatefulSet(s) and Headless Service(s).

This approach of caching data in a distributed fashion offered the following advantages:

  • Less database lookup, saving database IOPS
  • Efficient usage of resources; fewer instances as a result of no duplication of data
  • Lower latencies to serve data

This PoC opens up new doors to think about how we cache data in-memory. Give it a try (all steps to run it locally are mentioned in the Readme).

Interested in working at Zalando Tech? Our job openings are here.

Zalando Tech Blog – Technical Articles Dmytro Zharkov

Insights on making node APIs great

NodeJS is getting more and more popular these days. It’s gone through a long and painful history of mistakes and learning. By being a “window” for front-end developers to the “world of back-end,” it has improved the overall tech knowledge of each group of engineers by giving them the opportunity to write actual end-to-end solutions themselves using familiar approaches. It is still JavaScript, however, and that makes most back-end engineers nauseous when they see it. With this article and a number of suggestions, I would like to make NodeJS APIs look a bit better.

If you prefer looking at code over reading an article, jump to the sample project directly.

As a superset of JavaScript, TypeScript (TS) enhances ES6 inheritance with interfaces, access modifiers, abstract classes and methods (yeap, you read it correctly... abstract classes in JS), static properties, and brings strong typings. All of those can help us a lot. So, let’s walk through these cool features and check out how can we use them in NodeJS applications.

I split this post into two parts: an overview and actual code samples. If you know TS pretty well, you can jump to part two.

PART 1. OVERVIEW

INTERFACES, CLASSES, ABSTRACT CLASSES, AND TYPE ALIASES
When I first tried TS, sometimes I felt like it went nuts checking and applying types. It’s technically possible to define variable type with type aliases, interfaces, classes and abstract classes so they really look pretty similar–kind of twins or quadruplets in this case–but as I looked into TypeScript more, I found that just like siblings they are actually really individual.


Interfaces are “virtual structures” that are never transpiled into JS. Interfaces are playing a double role in TS. They can be used to check if class implements certain patterns, and also as type definitions (so called “structural subtyping”).

I really like how TS allows us to extend interfaces so we can always modify already existing ones to our own needs.

Say we have a middleware function that performs some checks on request and adds additional property to requests named “supeheroName.” TS compiler will not allow you to add it on a standard express request, so we can extend this interface with needed property.

import { Request, Response } from  "express";
interface SuperHeroRequest extends Request {
superheroName: string;
}

And then use it in a route:

app.router.get("/heroes", (req: SuperHeroRequest, res: Response) => {
if (req.superheroName) {
  res.send("I'm Batman")
}
});

Of course, let’s not forget about the main function of interfaces; enforcing classes to meet a particular contract.

interface Villain {
name: string;
crimes: string[];
performCrime(crimeName: string): void;
}
/* Compiler will ensure that all properties of IVillain interface are specified in implementing class and throw an errors on compile time if something is missing. */
class SuperVillain implements Villain {
public name: string;
public crimes: string[];

Abstract classes are usually used to define base level classes from which other classes may be derived.

abstract class Hero {
constructor(public name: string, public _feats: string[]) {
}
// Similar to interfaces we can specify method signature, that should be defined in derived classes.
abstract performFeat(feat: string): void;
// Unlike interfaces abstract classes can provide implementation along with method    signature.
getFeatsList() {
  return this._feats.join("\n");
}
}
class SuperHero extends Hero {
constructor(name: string, _feats: string[] = []) {
super(name, _feats);
}
performFeat(feat: string) {
this._feats.push(feat);
console.log(`I have just: ${feat}`);
}
}

const Thor: SuperHero = new SuperHero("Thor", ["Stop Loki"]);
Thor.performFeat("Save the world");
console.log(Thor.getFeatsList());


// Abstract classes can be used as a type as well.

const Hulk: Hero = new SuperHero("Bruce Banner");
Hulk.performFeat("Smash aliens");
console.log(Hulk.getFeatsList());


// A try to instantiate abstract class will not work
const Loki: Hero = new Hero("Thor", ["Stop Loki"]);

As you can see, we can potentially use all of those by specifying a variable type. So what should be used and when? Let's sum it up.


Type aliases can be used to define primitive and reference types: string, number, boolean, object. You can’t extend type aliases.

Interfaces can define only reference (object) types. TS documentation recommends that we use interfaces for object type literals. Interfaces can be extended and can have multiple merged declarations, so users of your APIs may benefit from it. Interface is a “virtual” structure that never appears in compiled JavaScript.

Classes, as opposed to interfaces, not only check how an object looks but ensure concrete implementation as well.

Classes allow us to specify the access modifiers of their members.

The TS compiler always transpiles classes to actual JS code, so they should be used if an actual instance of the class is created. EcmaScript native classes can be also used as a type definitions.

let numbersOnly: RegExp = /[0-9]/g;
let name: String = "Jack";

Abstract classes are really a mix of the previous two, but as it’s not possible to instantiate them directly you can only use them as a type, if an instance is created from a derived class that doesn’t provide any additional methods or properties.

ACCESS MODIFIERS
Unfortunately, JS doesn’t provide access modifiers so you can’t create, for example, a real private property. It’s possible to mock private property behaviour with closures and additional libraries, but such code looks a bit fuzzy and rather long. TS solves this issue just like any other Object Oriented Programming language. There are three access modifiers available in TS: public, private and protected.


PART 2. THE APPLICATION OR A DIVE INTO THE CODE.

So now, when we know and have all the tooling we need, we can build something great. For example, I would like to build a backend part of a MEAN (MongoDB, ExpresJS, Angular, NodeJS) stack; a simple RESTful service that will allow us to make CRUD operations with some articles. As including all the code will make this post too long, I’ll skip some parts, but you can always check the full version in the GitHub repository.

For project structure, see below:

To make code more declarative, easier to maintain and reusable, I’ll take advantage of ES6 classes and split the application into logical parts. I’m leaving most of the explanation in the comments.

./classes/Server.ts

import * as express from "express";
import * as http from "http";
import * as bodyParser from "body-parser";
import * as mongoose from "mongoose";
import * as dotenv from "dotenv";
import * as logger from "morgan";

/* Create a reusable server class that will bootstrap basic express application. */

export class Server {

/* Most of the core properties belove have their types defined by already existing interfaces. IDEs users can jump directly to interface definition by clicking on its name.  */

/* protected member will be accessible from deriving classes.  */
protected app: express.Application;

/* And here we are using http module Server class as a type. */
protected server: http.Server;

private db: mongoose.Connection;

/* restrict member scope to Server class only */
private routes: express.Router[] = [];
/*  This could be done using generics like syntaxis. You can choose which is looking better for you
private routes: Array<express.Router> = [];
*/

/* public modifiers are default ones and could be omitted. I prefer to always set them, so code  style is more consistent. */
public port: number;

constructor(port: number = 3000) {
  this.app = express();
  this.port = port;
  this.app.set("port", port);
  this.config();
  this.database();
}

private config() {
  // set bodyParser middleware to get form data
  this.app.use(bodyParser.json());
  this.app.use(bodyParser.urlencoded({ extended: true }));
  // HTTP requests logger
  this.app.use(logger("dev"));
  this.server = http.createServer(this.app);

  if (!process.env.PRODUCTION) {
    dotenv.config({ path: ".env.dev" });
  }
}

/* A simple public method to add routes to the application. */
public addRoute(routeUrl: string, routerHandler: express.Router): void {
  if (this.routes.indexOf(routerHandler) === -1) {
    this.routes.push();
    this.app.use(routeUrl, routerHandler);
  }
}

private database(): void {
  mongoose.connect(process.env.MONGODB_URI);
  this.db = mongoose.connection;
  this.db.once("open", () => {
    console.log("Database started");
  });
  mongoose.connection.on("error", () => {
    console.log("MongoDB connection error. Please make sure MongoDB is running.");
    process.exit();
  });
}

public start(): void {
  this.app.listen(this.app.get("port"), () => {
    console.log(("  App is running at http://localhost:%d in %s mode"), this.app.get("port"), this.app.get("env"));
    console.log("  Press CTRL-C to stop\n");
  });
}
}

export default Server;

I have set the server and app properties to “protected” as I want to keep them private, so it’s not possible to override or access them directly. They could be reachable from derived classes. For example, if we want to add web sockets support to our server, we can extend it with a new class and use “server” or an “app” properties as we need.

./classes/SocketServer.ts

import Server from "./Server";
import * as io from "socket.io";

class SocketServer extends Server {

/* this.server of a parent Server class is protected property, so we can access it to add a socket.  */
private socketServer = io(this.server);

constructor(public port: number) {
  super(port);
  this.socketServer.on('connection', (client) => {
    console.log("New connection established");
  });

}
}
export default SocketServer;

Going back to the application.

./app.ts

import Server from "./classes/Server";
import ArticlesRoute from "./routes/Articles.route";

const app = new Server(8080);
const articles = new ArticlesRoute();
app.addRoute("/articles", articles.router);
app.start();

As we can have multiple kinds of articles (products) e.g. electronic, fashion, digital, etc. and they might have rather different sets of properties, I’ll create a base abstract class with a number of default properties that should be common for all types of articles. All other properties can be defined in derived classes.

./classes/AbstractArticle.ts

// put basic properties into abstract class.

import ArticleType from "../enums/ArticleType";
import BaseArticle from "../interfaces/BaseArticle";
import * as uuid from "uuid";
import Price from "../interfaces/IPrice";

abstract class AbstractActrticle implements BaseArticle {
public SKU: string;
constructor(public name: string, public type: ArticleType, public price: Price, SKU: string) {
  this.SKU = SKU ? SKU : uuid.v4();
}
}

export default AbstractActrticle;

For this example, I’ll create a Shoe class that will derive from an AbstractArticle class and set its own properties.

./classes/Shoe.ts

import AbstractActrticle from "./AbstractArticle";
import ArticleType from "../enums/ArticleType";
import Colors from "../enums/Colors";
import FashionArticle from "../interfaces/FashionArticle";
import Price from "../interfaces/Price";
import Sizes from "../enums/Sizes";

class Shoe extends AbstractActrticle implements FashionArticle {
constructor(public name: string,
            public type: ArticleType,
            public size: Sizes,
            public color: Colors,
            public price: Price,
            SKU: string = "") {
  super(name, type, price, SKU);
}
}

export default Shoe;

You might have noticed that Shoe class implements FashionArticle interface. Let’s take a look at it and see how we can benefit from Interfaces and possibility to extend those.

./interfaces/BaseArticle.ts

import ArticleType from "../enums/ArticleType";
import Price from "./Price";

interface BaseArticle {
SKU: string;
name: string;
type: ArticleType;
price: Price;
}

Extension of interfaces allows us to extend our own interfaces with additional properties.

./interfaces/FashionArticle.ts

import Colors from "../enums/Colors";
import BaseArticle from "./BaseArticle";
import Sizes from "../enums/Sizes";

interface FashionArticle extends BaseArticle {
size: Sizes;
color: Colors;
}

We can also extend already existing interfaces. As an example, I’ll create an FashioArticleModel interface that will extend the Document interface from Mongoose and our  FashionArticle interface so we can use it when creating database schema.

./interfaces/FashionArticleModel.ts

import { Document } from "mongoose";
import FashionArticle from "./FashionArticle";

interface FashionArticleModel extends FashionArticle, Document {};
export default FashionArticleModel;

Using IFasionArticleModel interface in the schema allows us to create a model with properties from both the Mongoose Document and FashionArticle interfaces.

./schemas/FashionArticle.schema.ts

import { Schema, Model, model} from "mongoose";
import FashionArticleModel from "../interfaces/FashionArticleModel";

const ArticleSchema: Schema = new Schema({
name: String,
type: Number,
size: String,
color: Number,
price: {
  price: Number,
  basePrice: Number
},
SKU: String
});

// Use Model generic from mongoose to create a model of FashionArticle type.
const ArticleModel: Model<FashionArticleModel> = model<FashionArticleModel>("Article", ArticleSchema);
export {ArticleModel};

I hope this example application already shows how TypeScript can make your code more declarative, self documentable and potentially easier to maintain. Using TS is also a good exercise for frontend developers to learn and apply OOP paradigms in real life projects, and backend developers should find many familiar practices and code constructs.

Finally I would suggest to jump into Articles route class and check a CRUD functionality of the application.

./routes/Articles.route.ts

import { Request, Response, Router } from "express";
import ArticleType from "../enums/ArticleType";
import Colors from "../enums/Colors";
import Shoe from "../classes/Shoe";
import Sizes from "../enums/Sizes";
import { ArticleModel } from "../schemas/FashionArticle.schema";
import FashionArticleModel from "../interfaces/FashionArticleModel";

class ArticlesRoute {
public router: Router;

constructor() {
  this.router = Router();
  this.init();
}

// Putting all routes into one place makes it easy to search for specific functionality
// As this method will be called in a context of a different class, we need to bind methods objects to current class.
public init() {
  this.router.route("/")
    .get(this.getArticles.bind(this))=
    .post(this.createArticle.bind(this));

  this.router.route("/:id")
    .get(this.getArticleById.bind(this))
    .put(this.updateArticle.bind(this))
    .delete(this.deleteArticle.bind(this));
}
// I'm not a huge fan of JavaScript callbacks hell and especially of using it in NodeJS, so I'll use promises   instead.
public getArticles(request: Request, response: Response): void {
  ArticleModel.find()
    .then((articles: FashionArticleModel[]) => {
      return response.json(articles);
    })
    .catch((errror: Error) => {
      console.error(errror);
    })
}

public getArticleById(request: Request, response: Response): void {
  const id = request.params.id;
  ArticleModel
    .findById(id)
    .then((article: FashionArticleModel) => {
    return response.json(article);
  })
    .catch((error: Error) => {
      console.error(error);
      return response.status(400).json({ error: error });
  });
}

public createArticle(request: Request, response: Response): void {
  const requestBody = request.body;
  const article = new Shoe(requestBody.name, requestBody.type, requestBody.size, requestBody.color, requestBody.price);

  const articeModel = new ArticleModel({
    name:  article.name,
    type:  article.type,
    size:  article.size,
    color: article.color,
    price: article.price,
    SKU:   article.SKU
  });

  articeModel
    .save()
    .then((createdArticle: FashionArticleModel) => {
      return response.json(createdArticle);
    })
    .catch((error: Error) => {
      console.error(error);
      return response.status(400).json({ error: error });
    });
}

public updateArticle(request: Request, response: Response): void {
  const id = request.params.id;
  const requestBody = request.body;
  const article = new FashionArticle(requestBody.name, requestBody.type, requestBody.size, requestBody.color, requestBody.price, requestBody.SKU);

  ArticleModel.findByIdAndUpdate(id, article)
    .then((updatedArticle: FashionArticleModel) => {
      return response.json(updatedArticle);
    })
    .catch((error: Error) => {
      console.error(error);
      return response.json({ err: error });
    })
}

public deleteArticle(request: Request, response: Response): void {
  const articleId = request.params.id;
   ArticleModel.findByIdAndRemove(articleId)
    .then((res: any) => {
      return response.status(204).end();
    })
    .catch((error: Error) => {
      console.error(error);
      return response.json({ error: error });
    });
}
}
export default ArticlesRoute;

As a conclusion, TypeScript is a powerful tool that brings a really flexible, reach type checking system to your code. It also introduces enhanced well-known patterns like interfaces, abstract classes and access modifiers.

Of course, the application is not ready for production use, as we have to cover everything with tests and set up a proper development environment, but we can cover that in the future.

Work with engineers like Dmytro. Have a look at our jobs page.

Zalando Tech Blog – Technical Articles Eugen Kiss

Removing the burden of state management

State management and change propagation are arguably some of the hardest challenges in GUI programming. Many tools promised to save us from their burden. Only a few remained. Among them is MobX and its flavor of Transparent Reactive Programming.

To understand the appeal of MobX, it is helpful to first understand how React revolutionized GUI programming. Traditional approaches allow description of the initial state of the GUI. Further GUI state transitions must be accomplished with references to GUI elements and piece-wise mutations. This is error-prone as edge cases are easily missed. With React you describe the GUI at any given point in time. Put differently, taking care of GUI state transitions, e.g. manipulating the DOM, is a thing of the past: Your GUI code has become declarative.

React’s crucial advantage is making the “how” of updating the GUI transparent. That idea is reapplied by MobX. Not for GUI manipulation code, but instead for state management and change propagation. In fact, combining both React and MobX is synergistic since even though React nicely takes care of how to update the GUI, when to update the GUI remains cumbersome without MobX. Cross communication between components is the biggest pain point.

Let us explore an example in React and JavaScript illustrating MobX’s advantages:

import { observable } from ‘mobx’
import { observer } from ‘mobx-react’
const store = observable({
 itemCount: 0,
 lastItem: null
})
const handleBuy = (name) => () => {
 store.itemCount += 1
 store.lastItem = name
}

const handleClearCart = () => {
store.itemCount = 0
 store.lastItem = null
}
const Cart = observer(() =>
 <div>
   <span>Items in cart: {store.itemCount}</span>
   <button onClick={handleClearCart}>Clear cart</button>
 </div>
)
const Header = observer(({children}) =>
 <div>
   <span>Header</span>
   {children}
 </div>
)
const LastBought = observer(() =>
 <span>Last bought item: {store.lastItem}.</span>
)

const Main = observer(() =>
 <div>
   <Header><Cart/></Header>
   <button onClick={handleBuy("shoe")}>Buy shoes</button>
   <button onClick={handleBuy("shirt")}>Buy shirt</button>
   <LastBought/>
 </div>
)

The example represents a very simplified e-commerce page. Any resemblance to Zalando’s fashion store is, of course, coincidental. Here is a demo and its live-editable source code. The behavior is as follows: Initially, your cart is empty. When you click on “Buy shoe” your cart item count increases by one and the recently bought component shows “shoe”. A click on “Buy shirt” does the same for “shirt”. You can also clear your cart. In the live example you will see that only the cart in the header is re-rendered but not the header itself. You will also see that the recently bought component does not rerender when you buy the same product in succession.

Notice that the dependency between the observable variables itemCount and lastItem, and the components is not explicitly specified. Yet, the components correctly and efficiently rerender on changes. You may wonder how this is accomplished. The answer is that MobX implicitly builds up a dependency graph during execution of the components’ render functions that tracks which components to rerender when an observable variable changes. A way to think of MobX is in terms of a spreadsheet where your components are formulas of observable variables. Regardless of how the “magic” works underneath, and which analogy to use, the result is clear: You are freed from the burden of explicitly managing change propagation!

To sum up, MobX is a pragmatic, non-ceremonial, and efficient solution to the challenge of state management and change propagation. It works by building up a run-time dependency graph between observable variables and components. Using both React and MobX together is synergistic. Currently, MobX is used in just a few projects inside Zalando. Seeing as MobX offers a great deal to anyone writing GUI programs, I am confident that its adoption will rise in the future.

Keep in touch with fresh Zalando Tech news. Follow us on Twitter.

Zalando Tech Blog – Technical Articles Han Xiao

How We Built the Next Generation Product Search from Scratch using a Deep Neural Network

Product search is one of the key components in an online retail store. A good product search can understand a user’s query in any language, retrieve as many relevant products as possible, and finally present the results as a list in which the preferred products should be at the top, and the less relevant products should be at the bottom.

Unlike text retrieval (e.g. Google web search), products are structured data. A product is often described by a list of key-value pairs, a set of pictures and some free text. In the developers’ world, Apache Solr and Elasticsearch are known as de-facto solutions for full-text search, making them a top contender for building e-commerce product searches.

At the core, Solr/Elasticsearch is a symbolic information retrieval (IR) system. Mapping queries and documents to a common string space is crucial to the search quality. This mapping process is an NLP pipeline implemented with Lucene Analyzer. In this post, I will reveal some drawbacks of such a symbolic-pipeline approach, and then present an end-to-end way of building a product search system from query logs using Tensorflow. This deep learning based system is less prone to spelling errors, leverages underlying semantics better, and scales out to multiple languages much easier.

Recap: Symbolic Approach for Product Search

Let’s first do a short review of the classic approach. Typically, an information retrieval system can be divided into three tasks: indexing, parsing and matching. As an example, the next figure illustrates a simple product search system:

  1. indexing: storing products in a database with attributes as keys, e.g. brand, color, category;
  2. parsing: extracting attribute terms from the input query, e.g. red shirt -> {"color": "red", "category": "shirt"};
  3. matching: filtering the product database by attributes.

Many existing solutions such as Apache Solr and Elasticsearch follow this simple idea.Note, at the core, they are symbolic IR systems that rely on NLP pipelines for getting effective string representation of the query and product.

Pain points of A Symbolic IR System

1. The NLP pipeline is fragile and doesn’t scale out to multiple languages
The NLP Pipeline in Solr/Elasticsearch is based on the Lucene Analyzer class. A simple analyzer such as StandardAnalyzer would just split the sequence by whitespace and remove some stopwords. Quite often you have to extend it by adding more and more functionalities, which eventually results in a pipeline as illustrated in the figure below.

While it looks legit, my experience is that such NLP pipelines suffer from the following drawbacks:

  • The system is fragile. As the output of every component is the input of the next, a defect in the upstream component can easily break down the whole system. For example, canyourtoken izer split thiscorrectly?
  • Dependencies between components can be complicated. A component can take from and output to multiple components, forming a directed acyclic graph. Consequently, you may have to introduce some asynchronous mechanisms to reduce the overall blocking time.
  • It is not straightforward to improve the overall search quality. An improvement in one or two components does not necessarily improve the end-user search experience.
  • The system doesn’t scale out to multiple languages. To enable cross-lingual search, developers have to rewrite those language-dependent components in the pipeline for every language, which increases the maintenance cost.

2. Symbolic Systems do not Understand Semantics without Hard Coding
A good IR system should understand trainer is sneaker by using some semantic knowledge. No one likes hard coding this knowledge, especially you machine learning guys. Unfortunately, it is difficult for Solr/Elasticsearch to understand any acronym/synonym unless you implement SynonymFilter class, which is basically a rule-based filter. This severely restricts the generalizability and scalability of the system, as you need someone to maintain a hard-coded language-dependent lexicon. If one can represent query/product by a vector in a space learned from actual data, then synonyms and acronyms could easily be found in the neighborhood without hard coding.

Neural IR System
The next figure illustrates a neural information retrieval framework, which looks pretty much the same as its symbolic counterpart, except that the NLP pipeline is replaced by a deep neural network and the matching job is done in a learned common space.

End-to-End Model Training
There are several ways to train a neural IR system. One of the most straightforward (but not necessarily the most effective) ways is end-to-end learning. Namely, your training data is a set of query-product pairs feeding on the top-right and top-left blocks in the last figure. All the other blocks are learned from data. Depending on the engineering requirements or resource limitations, one can also fix or pre-train some of the components.

Where Do Query-Product Pairs Come From?
To train a neural IR system in an end-to-end manner, you need some associations between query and product such as the query log. This log should contain what products a user interacted with after typing a query. Typically, you can fetch this information from the query/event log of your system. After some work on segmenting, cleaning and aggregating, you can get pretty accurate associations. In fact, any user-generated text can be good association data. This includes comments, product reviews, and crowdsourcing annotations.

Neural Network Architecture
The next figure illustrates the architecture of the neural network. The proposed architecture is composed of multiple encoders, a metric layer, and a loss layer. First, input data is fed to the encoders which generate vector representations. In the metric layer, we compute the similarity of a query vector with an image vector and an attribute vector, respectively. Finally, in the loss layer, we compute the difference of similarities between positive and negative pairs, which is used as the feedback to train encoders via backpropagation.

Query Encoder
Here we need a model that takes in a sequence and outputs a vector. Besides the content of a sequence, the vector representation should also encode language information and be resilient to misspellings. The character-RNN (e.g. LSTM, GRU, SRU) model is a good choice. By feeding RNN character by character, the model becomes resilient to misspelling such as adding/deleting/replacing characters. The misspelled queries would result in a similar vector representation as the genuine one. Moreover, as European languages (e.g. German and English) share some Unicode characters, one can train queries from different languages in one RNN model. To distinguish the words with the same spelling but different meanings in two languages, such as German rot (color red) and English rot, one can prepend a special character to indicate the language of the sequence, e.g. 🇩🇪 rot and 🇬🇧 rot.

Image Encoder
The image encoder rests on purely visual information. The RGB image data of a product is fed into a multi-layer convolutional neural network based on the ResNet architecture, resulting in an image vector representation in 128-dimensions.

Attribute Encoder
The attributes of a product can be combined into a sparse one-hot encoded vector. It is then supplied to a four-layer, fully connected deep neural network with steadily diminishing layer size. Activation was rendered nonlinear by standard ReLUs, and drop-out is applied to address overfitting. The output yields attribute vector representation in 20 dimensions.

Metric & Loss Layer
After a query-product pair goes through all three encoders, one can obtain a vector representation of the query, an image representation and an attribute representation of the product. It is now the time to squeeze them into a common latent space. In the metric layer, we need a similarity function which gives higher value to the positive pair than the negative pair. To understand how a similarity function works, I strongly recommend you read my other blog post on “Optimizing Contrastive/Rank/Triplet Loss in Tensorflow for Neural Information Retrieval”. It also explains the metric and loss layer implementation in detail.

Inference
For a neural IR system, doing inference means serving search requests from users. Since products are updated regularly (say once a day), we can pre-compute the image representation and attribute representation for all products and store them. During the inference time, we first represent user input as a vector using query encoder; then iterate over all available products and compute the metric between the query vector and each of them; finally, sort the results. Depending on the stock size, the metric computation part could take a while. Fortunately, this process can be easily parallelized.

Qualitative Results
Here, I demonstrated (cherry-picked) some results for different types of query. It seems that the system goes in the right direction. It is exciting to see that the neural IR system is able to correctly interpret named-entity, spelling errors and multilinguality without any NLP pipeline or hard-coded rule. However, one can also notice that some top ranked products are not relevant to the query, which leaves quite some room for improvement.

Speed-wise, the inference time is about two seconds per query on a quad-core CPU for 300,000 products. One can further improve the efficiency by using model compression techniques.

Query & Top-20 Results


🇩🇪 nike

🇩🇪 schwarz (black)

🇩🇪 nike schwarz

🇩🇪 nike schwarz shirts

🇩🇪 nike schwarz shirts langarm (long-sleeved)

🇬🇧 addidsa (misspelled brand)

🇬🇧 addidsa trosers (misspelled brand and category)

🇬🇧 addidsa trosers blue shorrt (misspelled brand and category and property)

🇬🇧 striped shirts woman

🇬🇧 striped shirts man

🇩🇪 kleider (dress)

🇩🇪 🇬🇧 kleider flowers (mix-language)

🇩🇪 🇬🇧 kleid ofshoulder (mix-language & misspelled off-shoulder)

Summary
If you are a search developer who is building a symbolic IR system with Solr/Elasticsearch/Lucene, this post should make you aware of the drawbacks of such a system.

This post should also answer your What?, Why? and How? questions regarding a neural IR system. Compared to the symbolic counterpart, the new system is more resilient to the input noise and requires little domain knowledge about the products and languages. Nonetheless, one should not take it as a “Team Symbol” or “Team Neural” kind of choice. Both systems have their own advantages and can complement each other pretty well. A better solution would be combining these two systems in a way that we can enjoy all advantages from both sides.

Some implementation details and tricks are omitted here but can be found in my other posts. I strongly recommend readers to continue with the following posts:

Last but not least, the open-source project MatchZoo contains many state-of-the-art neural IR algorithms. In addition to product search, one may find its application in conversational chatbot and question-answer systems.

To work with great people like Han, have a look at our jobs page.

Zalando Tech Blog – Technical Articles Javier Arrieta

Leveraging the full power of a functional programming language

In Zalando Dublin, you will find that most engineering teams are writing their applications using Scala. We will try to explain why that is the case and the reasons we love Scala.

This content is coming both from my own experience and the team I'm working with in building the new Zalando Customer Data Platform.

How I came to use Scala

I have been working with JVM for the last 18 years. I find there is a lot of good work making the Java Virtual Machine very efficient and very fast, utilizing the underlying infrastructure well.

I feel comfortable debugging complex issues, such identifying those caused by garbage collection, and improving our code to alleviate the pauses (see Martin Thompson’s blog post or Aleksey Shipilёv’s JVM Anatomy Park).

I liked Java. I didn’t mind the boilerplate code too much if it didn’t get in the way of expressing the intent of the code. However, what bugged me was the amount of code required to encourage immutability and not having lambdas to transform collections.

At the end of 2012, I had to design a service whose only mission was to back up files from customer mobile devices (think a cloud backup service). It was a simple enough service, accepting bytes from the customer device (using a REST API) and writing them to disk. We were using the Servlet API, and the system was working well. However, as the devices were mobile phones and the upload bandwidth wasn’t very high, the machines were mostly idle, waiting for the buffers to fill up. Unfortunately, we couldn't scale up. When the system reached a few hundred workers, it would start to quickly degrade due to excessive context switching.

We were using Netty in other components, but the programming model of callbacks wasn’t something I wanted to introduce, as it becomes very complex very quickly to compose the callbacks.

Introducing Scala

I had been looking at Scala for some years and started to look into async HTTP frameworks. I liked spray because it allowed us to use it at a high level or low level depending on our requirements. It also wasn’t a framework that forced us into adapting everything to it. I created a quick proof of concept and was amazed at the conciseness of the code and how efficient it was (spray is optimised for throughput, not latency), being able to handle thousands of concurrent uploads with a single core. Previously we were bound by CPU because of all the context switching, but with spray, we managed to overcome this and become limited mostly by IO.

From that point on I decided I wanted to learn Scala and Functional Programming. I finished the Coursera Functional Programming in Scala course, started writing my side projects in Scala and tried to find a position in a company that worked with Scala.

I evaluated other FP languages like Clojure, but I like strongly typed languages as my experience is that the systems written with them are easier to maintain in the long term. I also looked at Haskell, but I felt more confident with a JVM compiled language that could use all the existing Java libraries.

The first thing I fell in love with was Monad composition to define the program (or subprogram) as a series of stages composed in a for-comprehension. It is a very convenient way to model asynchronous computations using Future as a Monad (I know that Future is not strictly a Monad, but for our code point of view we can assume it is, see https://stackoverflow.com/questions/27454798/is-future-in-scala-a-monad)

We will see some examples below; including a snippet here:

for {
customer <- findCustomer(address)
segment <- getCustomerSegment(customer.id)
email = promotionEmail(customer, segment)
result <- sendEmail(address, email)
} yield result

It becomes natural and straightforward to define the different stages of computation as a pipeline in a for-comprehension, layering your program in different components, each responsible for their steps inside a for-comprehension.

We could also run them in parallel using an Applicative instead of a Monad)

Now in Zalando

Zalando is a big company, currently with over 1,900 engineers working here. As we have mentioned in previous blog posts, we are empowered to use the technologies we choose to build our systems, so the teams pick the language, libraries, components and tools. As you can see on our public Tech Radar, Scala is one of our core languages, with several Scala libraries like AKKA and Play!.

So I cannot say how Zalando teams are working in Scala. Some teams are deep into the Functional Programming side while others are using the language mostly as a “better Java”;  adopting lambdas, case classes and pattern matching to make the code more concise and understandable.

But I can talk about how people are using the language in the Dublin office where the services and data pipelines are written mostly in Scala: How our team that is developing the Customer Data Platform is using Scala, what libraries we are using and what we like about Scala.

Things we love about Scala

Types

We love types. Types help us understand what we are dealing with. String  or Int can often be meaningless; we don’t want to mix a Customer Password with a Customer Name,  Email Address, etc. We want to know what a given value is.

For this we are currently using two different approaches:

  • Tagged types: Using shapeless @@ we decorate the primitive type with the tag we want to attach.
  • Value classes: Using a single attribute case class that extends AnyVal, so that the compiler tries to remove the boxing/unboxing whenever it can. This is useful when we want to override toString for example we may want to redact sensitive customer data when it goes into logs.

Here you have a simple example of both (full code here ):

import java.util.UUID
import shapeless.tag, tag.@@
import cats.data.Validated, Validated._

object model {
final case class Password private(value: String) extends AnyVal {
override def toString = "***"
}

object Password {
def create(s: String): Validated[String, Password] = s match {
case candidate if candidate.isEmpty || candidate.length < 8 => Invalid("Minimum password length has to be 8")
case valid => Valid(Password(valid))
}
}
sealed trait UserIdTag
type UserId = UUID @@ UserIdTag
object UserId {
def apply(v: UUID) = tag[UserIdTag](v)
}

sealed trait EmailAddressTag

type EmailAddress = String @@ EmailAddressTag

object EmailAddress {
def apply(s: String): Validated[String, EmailAddress] = s match {
case invalid if !invalid.contains("@") => Invalid(s"$invalid is not a valid email address")
case valid => val tagged = tag[EmailAddressTag](valid); Valid(tagged)
}
}
}

Function Composition

Monads/applicatives

One of my favourite features of Scala is how easy and elegant it is to compose functions to create more complex ones. The most common way of doing this is using Monads inside a for-comprehension.

This way we can run several operations sequentially and obtain a result. As soon as any of the operations fail, the comprehension will exit with that failure.

def promotionEmail(customer: Customer, segment: CustomerSegment): Email = ???

def sendEmail(address: EmailAddress, message: Email): Future[Unit] = ???

def findCustomer(address: EmailAddress): Future[Customer] = ???

def getCustomerSegment(id: CustomerId): Future[CustomerSegment] = ???

def sendPromotionalEmail(address: EmailAddress)(implicit ec: ExecutionContext): Future[Unit] = {
for {
customer <- findCustomer(address)
segment <- getCustomerSegment(customer.id)
email = promotionEmail(customer, segment)
result <- sendEmail(address, email)
} yield result
}

For a full example see here.

If what you want to do is evaluate several functions in parallel and collect all the errors or the successful results, you can use an Applicative Functor. This is very common when doing validations of a complex entity, where we can present all the detected errors in one go to the client.

type ValidatedNel[A] = Validated[NonEmptyList[String], A]
final case class Customer(name: Name, email: EmailAddress, password: Password)
object Customer {
def apply(name: String, email: String, password: String): ValidatedNel[Customer] = {
Apply[ValidatedNel]
.map3(Name(name), EmailAddress(email), Password(password))(Customer.apply)
}
}

For a full example see here.

Another combination is a simple function composition using compose or andThen, or if the arguments don’t match completely, using anonymous functions to combine them.

final case class Customer(id: CustomerId, address: EmailAddress, name: Name)
val findCustomer: EmailAddress => Customer
val sendEmail: Customer => Either[String, Unit]
val sendCustomerEmail: EmailAddress => Either[String, Unit] = findCustomer andThen sendEmail

For a full example see here.

Referential Transparency

We like being able to reason about a computation by using the substitution model, i.e., in a referential transparent computation you can always substitute a function with parameters, with the result of executing the function with those parameters.

This simplifies enormously the understanding of a complex system by understanding the components (functions) that together compose the system.

def sq(x: Int): Int = x * x
assert(sq(5) == 5 * 5)

The previous example might be too basic, but I hope it suffices to make the point. You can always replace calling the sq function with the result, and there is no difference between both. This is also very helpful when testing your program. For more detail, you can look at the Wikipedia article here.

One of the most common issues that stops people using referential transparency, apart from global mutable state, is the ability to blow up the stack throwing exceptions. Throwing exceptions across the call stack can be seen as very powerful.  However, it makes it difficult to reason about your program when composing functions.

Monad Transformers

One of the caveats of using effects (effects are orthogonal to the type you get, for instance Future is the effect of asynchrony, Option is the effect of optionality, Iterable of repeatability…) is that they become extremely cumbersome when composing more than two operations, or when composing and nesting.

One of the most popular solutions is using a Monad Transformer that allows us to stack two Monads in one and use them as if they were a standard Monad. You can visit this blog post for more detail.

Other option that we are not going to explore in this post is to use extensible effects, you can visit eff for more detail.

The reader can try this without using a Monad Transformer to see how complex it becomes, even if only composing two functions.

import scala.concurrent.{ExecutionContext, Future}
import cats.data._
import cats.instances.future._
final case class Customer(id: CustomerId, email: EmailAddress, fullName: String)
def findCustomer(email: EmailAddress): EitherT[Future, Throwable, Customer] =
EitherT[Future, Throwable, Customer](Future.successful(Right(Customer("id", "me@exampe.com", "John Doe"))))
def sendEmail(recipient: EmailAddress,
subject: String,
content: String): EitherT[Future, Throwable, Unit] =
EitherT[Future, Throwable, Unit](Future.successful {
println(s"Sending promotional email to $recipient, subject: '$subject', content: '$content'")
Right(())
})
def promotionSubject(fullName: String): String = s"Amazing promotion $fullName, only for you"
def promotionContent(fullName: String): String = s"Click this link for your personalised promotion $fullName..."
def sendPromotionEmailToCustomer(email: EmailAddress)(implicit ec: ExecutionContext): EitherT[Future, Throwable, Unit] = {
for {
customer <- findCustomer(email)
subject = promotionSubject(customer.fullName)
content = promotionContent(customer.fullName)
result <- sendEmail(customer.email, subject, content)
} yield result
}
import ExecutionContext.Implicits.global
sendPromotionEmailToCustomer("me@example.com")

For full example see here

Typeclasses

We like decoupling structure from behaviour. Some structures might encompass some business functionality, but we should not try as we did in OO to define everything we think a class can do as methods.

For this, we adopt the Typeclass pattern where we define laws for a given behaviour and then implement the particular functionality for a given class. Classic examples are Semigroup, Monoid, Applicative, Functor, Monad, etc.

As a simple example, we wanted to be able to serialise/deserialise data types on the wire. For this we defined two simple Typeclasses:

trait BytesEncoder[A] {
def apply(a: A): Array[Byte]
}
trait BytesDecoder[A] {
def apply(arr: Array[Byte]): Either[Throwable, A]
}

All we need to do is for every type we want to be able to serialise is to implement those interfaces. When we need to use an A serialiser, we will require an implicit BytesEncoder[A], and we will have to provide the implicit when we instantiate the user.

For example, we have a CustomerEntity, and we want to be able to write this to the wire using protobuf, we will then provide:

implicit def customerEntityProtoDecoder: BytesEncoder[CustomerEntity] = new BytesEncoder[CustomerEntity] {
override def apply(ce: CustomerEntity) = ProtoMapper.toProto(ce).toByteArray
}

Folds/merges

We use extensively Either so we need to fold both cases to get a uniform response from them. We can use a fold to merge both cases into a common response to the client, for instance, a HttpResponse.

We also want to consolidate all the errors from a Validated[NonEmptyList[String],T] into a common response by folding the NoneEmptyList into a String or other adequate type.

Conclusion

As you can see we are very happy using Scala and finding our way into more advanced topics in the functional programming side of it. But there is nothing stopping you from implementing all these useful features step by step to get up to speed and experiment with the benefits of using an FP language.

Learn more about opportunities at Zalando by visiting our jobs page.

Zalando Tech Blog – Technical Articles Ian Duffy

Backing up Apache Kafka and Zookeeper to S3

What is Apache Kafka?

Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It is horizontally scalable, fault-tolerant, and wicked fast. It runs in production in many companies.

Backups are important with any kind of data. Apache Kafka lowers this risk of data loss with replication across brokers. However, it is still necessary to have protection in place in the event of user error.

This post will demo and introduce tools that our small team of three at Zalando uses to backup and restore Apache Kafka and Zookeeper.

Backing up Apache Kafka

Getting started with Kafka Connect
Kafka Connect is a framework for connecting Kafka with external systems. Its purpose is to make it easy to add new systems to scalable and secure stream data pipelines.

By using a connector by Spredfast.com, backing up and restoring the contents of a topic to S3 becomes a trivial task.

Demo
Download the prerequisite

Checkout the following repository

$ git clone https://github.com/imduffy15/kafka-env.git 


It contains a docker-compose file for bring up a Zookeeper, Kafka, and Kafka Connect locally.

Kafka Connect will load all jars put in the ./kafka-connect/jars directory. Go ahead and download the Spredfast.com kafka-connect-s3.jar

$ wget "http://dl.bintray.com/iduffy/maven/com/spredfast/kafka/connect/s3/kafka-connect-s3/0.4.2-zBuild/kafka-connect-s3-0.4.2-zBuild-shadow.jar" -O kafka-connect-s3.jar


Bring up the stack
To boot the stack, use docker-compose up

Create some data
Using the Kafka command line utilities, create a topic and a console producer:

$ kafka-topics --zookeeper localhost:2181 --create --topic example-topic --replication-factor 1 --partitions 1Created topic "example-topic".$ kafka-console-producer --topic example-topic --broker-list localhost:9092>hello world


Using a console consumer, confirm the data is successfully written:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning hello world


Backing-up
Create a bucket on S3 to store the backups:

$ aws s3api create-bucket --create-bucket-configuration LocationConstraint=eu-west-1 --region eu-west-1 --bucket example-kafka-backup-bucket



Create a bucket on S3 to store the backups:


$ cat << EOF > example-topic-backup-tasks.json
{
 "name": "example-topic-backup-tasks",
 "config": {
   "connector.class": "com.spredfast.kafka.connect.s3.sink.S3SinkConnector",
   "format.include.keys": "true",
   "topics": "example-topic",
   "tasks.max": "1",
   "format": "binary",
 "s3.bucket": "example-kafka-backup-bucket",
   "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
   "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
   "local.buffer.dir": "/tmp"
 }
}
EOF
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @example-topic-backup-tasks.json /api/kafka-connect-1/connectors

(Check out the Spredfast documentation for more configuration options.)

After a few moments the backup task will begin. By listing the Kafka Consumer groups, one can identify the consumer group related to the backup task and query for its lag to determine if the backup is finished.

$ kafka-consumer-groups --bootstrap-server localhost:9092 --listNote: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).connect-example-topic-backup-task
$ kafka-consumer-groups --describe --bootstrap-server localhost:9092 --group connect-example-topic-backup-tasks

Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
example-topic                  0          1               1               0          consumer-5-e95f5858-5c2e-4474-bab9-8edfa722db21   /172.22.0.4                    consumer-5

The backup is completed when the lag reaches 0. On inspecting the S3 bucket, a folder of the raw backup data will be present.



Restoring
Let’s destroy all of the containers and start fresh:

$ docker-compose rm -f -v
$ docker-compose up

Re-create the topic:

$ kafka-topics --zookeeper localhost:2181 --create --topic example-topic --replication-factor 1 --partitions 1Created topic "example-topic".

Create a source with Kafka Connect:

$ cat << EOF > example-restore.json
{
"name": "example-restore",
"config": {
"connector.class": "com.spredfast.kafka.connect.s3.source.S3SourceConnector",
"tasks.max": "1",
"topics": "example-topic",
"s3.bucket": "imduffy15-example-kafka-backup-bucket",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"format": "binary",
"format.include.keys": "true"
}
}
EOF
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @example-restore.json /api/kafka-connect-1/connectors

(Check out the Spredfast documentation for more configuration options.)

The restore process should begin, in some time, the ‘hello world’ message will display when you run the Kafka console consumer again:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning 
hello world


The restore process should begin, in some time, the ‘hello world’ message will display when you run the Kafka console consumer again:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning 
hello world

Backing Up Zookeeper

Zookeeper’s role
Newer versions of Kafka ( >= version 0.10.x) use ZooKeeper in a small but still important coordination role. When new Kafka Brokers join the cluster, they use ZooKeeper to discover and connect to the other brokers. The cluster also uses ZooKeeper to elect the controller and track the controller epoch. Finally and perhaps most importantly, ZooKeeper stores the Kafka Broker topic partition mappings, which tracks the information stored on each broker. The data will still persist without these mappings, but won't be accessible or replicated.

Exhibitor
Exhibitor
is a popular supervisor system for ZooKeeper. It provides a number of housekeeping facilities for ZooKeeper as well as exposing a nice UI for exploring the stored data. It also provides some backup and restore capabilities for ZooKeeper out of the box. However, we should make sure we understand these features before relying on them.

On AWS, we run our Exhibitor brokers under the same stack. In this setup, when the stack auto scaling group is responsible for controlling when any of the Exhibitor instances are removed, it is relatively easy for multiple (or even all) Exhibitor brokers to be terminated at the same time. That’s why for our tests we set up an Exhibitor cluster and connected Kafka to it. We then indexed all the Kafka znodes to create an exhibitor backup. Finally, we tore down the Exhibitor stack and re-deployed it with the same config.

Unfortunately, after re-deploy, while the backup folder was definitely in S3, the new Exhibitor appliance did not recognise it as an existing index. With a bit of searching we found that this is actually the expected behaviour and the suggested solution is to read the S3 index and apply changes by hand.

Backing-up
Creating, deleting and re-assigning topics in Kafka is an uncommon occurrence for us, so we estimated that a daily backup task would be sufficient for our needs.

We came across Burry. Burry is a small tool which allows for snapshotting and restoring of a number of system critical stores, including ZooKeeper. It can save the snapshot dump locally or to various cloud storage options. Its backup dump is also conveniently organized along the znode structure making it very easy to work with manually if need be. Using this tool we set up a daily cron job on our production to get a full daily ZooKeeper snapshot and upload the resultant dump to an S3 bucket on AWS.

Restoring
Conveniently, Burry also works as a restore tool using a previous ZooKeeper snapshot. It will try to recreate the full snapshot znode structure and znode data. It also tries to be careful to preserve existing data, so if a znode exists it will not overwrite it.

But there is a catch. Some of the Kafka-created znodes are ephemeral and expected to expire when the Kafka Brokers disconnect. Currently Burry snapshots these as any other znodes, so restoring to a fresh Exhibitor cluster will recreate them. If we were to restore Zookeeper before restarting our Kafka brokers, we'd restore from the snapshot of the ephemeral znodes with information about the Kafka brokers in our previous cluster. If we then bring up our Kafka cluster, our new broker node IDs, which must be unique, would conflict with the IDs restored from our Zookeeper backup. In other words, we'd be unable to start up our Kafka brokers.

We can easily get around this problem by starting our new Zookeeper and new Kafka clusters before restoring the Zookeeper content from our backup. By doing this, the Kafka brokers will create their ephemeral znodes, and the Zookeeper restore will not overwrite these, and will go on to recreate the topic and partition assignment information. After restarting the Kafka brokers, the data stored on their persisted disks will once again be correctly mapped and available, and consumers will be able to resume processing from their last committed position.

Be part of Zalando Tech. We're hiring!

Zalando Tech Blog – Technical Articles Ian Duffy

A closer look at the ingredients needed for ultimate stability

This is part of a series of posts on Kafka. See Ranking Websites in Real-time with Apache Kafka’s Streams API for the first post in the series.

Remora is a small application to track the monitoring of Kafka. Due to many teams deploying this to their production environments, open sourcing this application made sense. Here, I’ll go through some technical pieces of what is streaming, why it is useful and the drive behind this useful monitoring application.

Streaming architectures have become an important architecture pattern at Zalando. To have fast, highly available and scalable systems to process data across numerous teams and layers, having a good streaming infrastructure and monitoring is key.  

Without streaming, the system is not reactive to changes. An older style would manage changes through incremental batches. Batches, such as CRONs, can be hard to manage as you have to keep track of a large number of jobs, each taking care of a shard of data, this may also be hard to scale. Having a streaming component, such as Kafka, allows for a centralised scalable resource to make your architecture reactive.

Some use cloud infrastructure such as AWS Kinesis or SQS. Alternatively, to reach better throughput and use frameworks like AKKA streams, Kafka is chosen by Zalando.

Monitoring lag is important; without it we don’t know where the consumer is relative to the size of the queue. An analogy might be piloting a plane without knowing how many more miles are left on your journey. Zalando has trialled:

We needed a simple independent application, which may scrape metrics from a URL and place them into our metrics system. But what to include in this application? I put on my apron and played chef with the range of ingredients available to us.

Scala is big at Zalando; a majority of the developers know AKKA or Play. At the time in our team, we were designing systems using AKKA HTTP with an actor pattern. It is a very light, stable and an asynchronous framework. Could you just wrap the command line tools in a light Scala framework? Potentially, it could take less time and be more stable. Sounds reasonable, we thought, let’s do that.

The ingredients for ultimate stability were as follows: a handful of Kafka java command line tools with a pinch of AKKA Http and a hint of an actor design. Leave to code for a few days and take out of the oven. Lightly garnish with a performance test to ensure stability, throw in some docker. Deploy, monitor, alert, and top it off with a beautiful graph to impress. Present Remora to your friends so that everyone may have a piece of the cake, no matter where in the world you are!

Bon app-etit!

Zalando Tech Blog – Technical Articles Ian Duffy

Second in our series about the use of Apache Kafka’s Streams API by Zalando

This is the second in a series about the use of Apache Kafka’s Streams API by Zalando, Europe’s leading online fashion platform. See Ranking Websites in Real-time with Apache Kafka’s Streams API for the first post in the series.

This piece was first published on confluent.io

Running Kafka Streams applications in AWS

At Zalando, Europe’s leading online fashion platform, we use Apache Kafka for a wide variety of use cases. In this blog post, we share our experiences and lessons learned to run our real-time applications built with Kafka’s Streams API in production on Amazon Web Services (AWS). Our team at Zalando was an early adopter of the Kafka Streams API. We have been using it since its initial release in Kafka 0.10.0 in mid-2016, so we hope you find this hands-on information helpful for running your own use cases in production.

What is Apache Kafka’s Streams API?

The Kafka Streams API is available as a Java library included in Apache Kafka that allows you to build real-time applications and microservices that process data from Kafka. It allows you to perform stateless operations such as filtering (where messages are processed independently from each other), as well as stateful operations like aggregations, joins, windowing, and more. Applications built with the Streams API are elastically scalable, distributed, and fault-tolerant. For example, the Streams API guarantees fault-tolerant data processing with exactly-once semantics, and it processes data based on event-time i.e., when the data was actually generated in the real world (rather than when it happens to be processed). This conveniently covers many of the production needs for mission-critical real-time applications.

An example of how we use Kafka Streams at Zalando is the aforementioned use case of ranking websites in real-time to understand fashion trends.

Library Upgrades of Kafka Streams

Largely due to our early adoption of Kafka Streams, we encountered many teething problems in running Streams applications in production. However, we stuck with it due to how easy it was to write Kafka Streams code. In our early days of adoption, we hit various issues around stream consumer groups rebalancing, issues with getting locks on the local RocksDB after a rebalance, and more. These eventually settled down and sorted themselves out in the 0.10.2.1 release (April 2017) of the Kafka Streams API.

I/O

After upgrading to 0.10.2.1, our Kafka Streams applications were mostly stable, but we would still see what appeared to be random crashes every so often. These crashes occurred more frequently on components doing complex stream aggregations. We eventually discovered that the actual culprit was AWS rather than Kafka Streams; on AWS General purpose SSD (GP2) EBS volumes operate using I/O credits. The AWS pricing model allocates a baseline read and write IOPS allowance to a volume, based on the volume size. Each volume also has an IOPS burst balance, to act as a buffer if the base limit is exceeded. Burst balance replenishes over time but as it gets used up, the reading and writing to disks starts getting throttled to the baseline, leaving the application with an EBS that is very unresponsive. This ended up being the root cause of most of our issues with aggregations. When running Kafka Stream applications, we had initially assigned 10gb disks as we didn’t foresee much storage occurring on these boxes. However, under the hood, the applications performed lots of read/write operations on the RocksDBs which resulted in I/O credits being used up, and given the size of our disks, the I/O credits were not replenished quickly enough, grinding our application to a halt. We remediated this issue by provisioning Kafka Streams applications with much larger disks. This gave us more baseline IOPS, and the burst balance was replenished at a faster rate.

Monitoring

EBS Burst Balance

Our monitoring solution polls CloudWatch metrics and pulls back all AWS exposed metrics. For the issue outlined, the most important of these is EBS burst balance. As mentioned above, in many cases applications that use Kafka Streams rely on heavy utilization of locally persisted RocksDBs for storage and quick data access. This storage is persisted on the instance’s EBS volumes and generates a high read and write workload on the volumes. GP2 disks were used in preference to provisioned IOPS disks (IO1) since these were found to be much more cost-effective in our case.

Fine-tuning your application

With upgrades in the underlying Kafka Streams library, the Kafka community introduced many improvements to the underlying stream configuration defaults. Where in previous, more unstable iterations of the client library we spent a lot of time tweaking config values such as session.timeout.ms, max.poll.interval.ms, and request.timeout.ms to achieve some level of stability.

With new releases we found ourselves discarding these custom values and achieving better results. However, some timeout issues persisted on some of our services, where a service would frequently get stuck in a rebalancing state. We noticed that reducing the max.poll.records value for the stream configs would sometimes alleviate issues experienced by these services. From partition lag profiles we also saw that the consuming issue seemed to be confined to only a few partitions, while the others would continue processing normally between re-balances. Ultimately we realised that the processing time for a record in these services could be very long (up to minutes) in some edge cases. Kafka has a fairly large maximum offset commit time before a stream consumer is considered dead (five minutes), but with larger message batches of data, this timeout was still being exceeded. By the time the processing of the record was finished, the stream was already marked as failed and so the offset could not be committed. On rebalance, this same record would once again be fetched from Kafka, would fail to process in a timely manner and the situation would repeat. Therefore for any of the affected applications, we introduced a processing timeout, ensuring there was an upper bound on the time taken by any of our edge cases.

Monitoring

Consumer Lag

By looking at the metadata of a Kafka Consumer Group, we can determine a few key metrics. How many messages are being written to the partitions within the group? How many messages are being read from those partitions? The difference between these is called lag; it represents how far the Consumers lag behind the Producers.

The ideal running state is that lag is a near zero value. At Zalando, we wanted a way to monitor and plot this to see if our streams applications are functioning.

After trying out a number of consumer lag monitoring utilities, such as Burrow, Kafka Lag Monitor and Kafka Manager, we ultimately found these tools either too unstable or a poor fit for our use case. From this need, our co-worker, Mark Kelly, build a small utility called Remora. It is a simple HTTP wrapper around the Kafka consumer group “describe” command. By polling the Remora HTTP endpoints from our monitoring system at a set time interval, we were able to get good insights into our stream applications.

Memory

Our final issue was due to memory consumption. Initially we somewhat naively assigned very large heaps to the Java virtual machine (JVM). This was a bad idea because Kafka Streams applications utilize a lot of off-heap memory when configured to use RocksDB as their local storage engine, which is the default. By assigning large heaps, there wasn’t much free system memory. As a result, applications would eventually come to a halt and crash. For our applications we use M4.large instances, we assign 4gb of ram to the heap and usually utilize about 2gb of it, the system has a remaining 4gb of ram free for off-heap and system usage, utilization of overall system memory is at 70%. Additionally, we would recommend reviewing the memory management section of Confluent’s Kafka Streams documentation as customising the RocksDB configuration was necessary in some of our use cases.

Monitoring

JVM Heap Utilization

We expose JVM heap utilization using Dropwizard metrics via HTTP. This is polled on an interval by our monitoring solution and graphed. Many of our applications are fairly memory intensive, with in-memory caching, so it was important for us to be able to see at a glance how much memory was available to the application. Additionally, due to the relative complexity of many of our applications, we wanted to have easy visibility into garbage collection in the systems. Dropwizard metrics offered a robust, ready-made solution for these problems.

CPU, System Memory Utilization and Disk Usage

We run Prometheus node exporteron all of our servers; this exports lots of system metrics via HTTP. Again, our monitoring solution polls this on interval and graphs them. While the JVM monitoring provided a great insight into what was going on in the JVM, we needed to also have insight into what was going on in the instance. In general, most of the applications we ended up writing had a much greater network and memory overheads than CPU requirements. However, in many failure cases we saw, our instances were ultimately terminated by auto-scaling groups on failing their health checks. These health checks would fail because the endpoints became unresponsive due to high CPU loads as other resources were used up. While this was usually not due to high CPU use in the application processing itself, it was a great symptom to capture and dig further into where this CPU usage was coming from. Disk monitoring also proved very valuable, particularly for Kafka Streams applications consuming from topics with a large partitioning factor and/or doing more complex aggregations. These applications store a fairly large amount of data (200MB per partition) in RocksDB on the host instance, so it is very easy to accidentally run out of space. Finally, it is also good to monitor how much memory the system has available as a whole since this was frequently directly connected to CPU loads saturating on the instances, as briefly outlined above.

Conclusion: The Big Picture of our Journey

As mentioned in the beginning, our team at Zalando has been using the Kafka Streams API since its initial release in Kafka 0.10.0 in mid-2016.  While it wasn’t a smooth journey in the very beginning, we stuck with it and, with its recent versions, we now enjoy many benefits: our productivity as developers has skyrocketed, writing new real-time applications is now quick and easy with the Streams API’s very natural programming model, and horizontal scaling has become trivial.

In the next article of this series, we will discuss how we are backing up Apache Kafka and Zookeeper to Amazon S3 as part of our disaster recovery system.

About Apache Kafka’s Streams API

If you have enjoyed this article, you might want to continue with the following resources to learn more about Apache Kafka’s Streams API:

Join our ace tech team. We’re hiring!