BLOG

Streaming Data Processing with Apache Kafka and Apache Flink

Introduction

In this article we will build a simple pipeline to produce, and process live streaming data with Apache Kafka and Apache Flink. Before we dive into details let’s look at the key concepts and technologies that we are going to use.

What is streaming data?

Streaming data is data that is continuously generated (often by various sources). Usually, such data cannot be ingested for further processing, therefore different streaming processing techniques should be applied.

There are two types of streams:

  • Bounded streams – streams requiring a defined start and end. These streams however can be processed after all the data is ingested before doing further computations.
  • Unbounded streams – streams requiring a start but no defined end. They are not expected to be terminated. They provide data continuously when it is generated. Therefore, such data should be processed continuously, because it is not possible to wait for all data to appear.
Streaming Data Processing with Apache Kafka and Apache Flink

In this article,  we will focus on the unbounded streams. So, our pipeline example will consist of two microservices – a Kafka producer one that will generate the unbounded streaming data. The second one will consume the data from the producer, and will use Flink to make some computations and stream the processed result data into a new aggregated unbounded stream.

Streaming Data Processing with Apache Kafka and Apache Flink

Before we start the implementation, let’s introduce the key technologies that we’ll be using.

Apache Kafka

Apache Kafka is an open-source distributed event streaming platform developed by the Apache Software Foundation.

The platform can be used to:

  1. Publish and subscribe to streams of events.
  2. To store streams of events with high level durability and reliability.
  3. To process streams of events as they occur.

In short Kafka topology uses topics to categorise  events/messages. The Kafka topics are multi-subscriber (this means that there can be multiple consumers for a single topic) and are partitioned across the Kafka cluster.

Streaming Data Processing with Apache Kafka and Apache Flink

Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded streaming data. It can run on all common cluster environments (like Kubernetes)  and it performs computations over streaming data with in-memory speed and at any scale.

Stateful Stream Processing

Flink operations can be stateful – This means that how one message/event is handled can depend on the accumulated effect of all events that are processed. State can be used for simple computations – like how many messages/events came per  time unit or for more complex logic – such as computations for fraud detection models.

What can be streamed

  • Basic types; String, Long, Integer, Boolean, Array
  • Composite types; Tuples, POJOs, and Scala case classes

Streaming windows

Windows are the  most important features, while processing infinite streams. Windows split the stream into separate buckets of finite size, over which we can apply computations (e.g., average value).

In the picture below there are examples of two types of timed windows – tumbling and slicing windows:

Streaming Data Processing with Apache Kafka and Apache Flink

Both types of streaming windows are time framed, but unlike the tumbling windows, in the slicing windows there is overlapping between the window frames.

In the next section, we explain the actual implementation and go more into detail about the stream computations provided by Flink.

Streaming data pipeline implementation

The pipeline that we will build will be a simulation of a data stream from a traffic camera that records each vehicle captured by it. Then we will have a consumer service that will create a statistic result for how many vehicles of certain types (cars, trucks) pass for a certain time period.

The producer microservice

First for demonstration purposes we need to install Kafka locally. This topic will use the  Confluent Community Docker Image for Apache Kafka distribution in docker hub.

After we have Kafka up and running, we can proceed with building the java microservice application that will serve as our producer. We will use Spring Boot and therefore we do not need to create configurational beans for Kafka¸ the Spring Boot autoconfiguration will create everything we need so we only need the maven dependencies and the application properties as follow:

Figure 1: Maven dependencies

Streaming Data Processing with Apache Kafka and Apache Flink

Figure 2: application.yml

Streaming Data Processing with Apache Kafka and Apache Flink

The model

Our message model is a simple POJO class called Vehicle:

Figure 3: Vehicle

Streaming Data Processing with Apache Kafka and Apache Flink

Figure 4: VehicleType

Streaming Data Processing with Apache Kafka and Apache Flink

Then the real producer service will just randomly generate Vehicle events. The vehicles of type “Truck” are less frequent than the “Car” types. To produce messages for Kafka we only need to inject a bean of type KafkaTemplate:

Figure 5: ProducerService

Streaming Data Processing with Apache Kafka and Apache Flink
Streaming Data Processing with Apache Kafka and Apache Flink

When we run the Spring application it will start sending messages of type Vehicle to the topic “vehicle-topic” to our local cluster. With this we can consider our producer finished.

The Data Processor Service

Figure 6: Maven Dependencies

Streaming Data Processing with Apache Kafka and Apache Flink

Above are the maven dependencies, what we can notice is that besides the version of Flink, there is also the Scala version that Flink is compatible with.

We again use Spring Boot, however there is no such suitable integration for Flink as we noticed there is for Apache Kafka.

The model that we are going to use maps the model from the producer service but in addition to that we introduce one new model class which we are going to use to preserve the result of the streaming data computation. This time we need to use @JsonProperty annotation for serialization purposes, because this time we don’t count on Spring to do the work for us and instead we expect Flink to do it.

Streaming Data Processing with Apache Kafka and Apache Flink

To support the deserialisation of the Vehicle model and serialisation to VehicleStatistics we must implement respectively the DeserialisationSchema and SerialisationSchema interfaces from Flink library. In the implementation we will use Jackson’s ObjectMapper.

Figure 7: VehicleDeserializationSchema

Streaming Data Processing with Apache Kafka and Apache Flink

Figure 8: VehicleStatisticsSerializationSchema

Streaming Data Processing with Apache Kafka and Apache Flink

We have implemented the Flink part in a class called Processing service:

@Service
@Log4j2
@RequiredArgsConstructor
public class ProcessingService {

  @Value("${kafka.bootstrap-servers}")
  private String kafkaAddress;

  @Value("${kafka.group-id}")
  private String kafkaGroupId;

  public static final String TOPIC = "vehicle-topic";

  public static final String VEHICLE_STATISTICS_TOPIC = "vehicle-statistics-topic";

  private final VehicleDeserializationSchema vehicleDeserializationSchema;

  private final VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;

  @PostConstruct
  public void startFlinkStreamProcessing() {
    try {

      processVehicleStatistic();
    } catch (Exception e) {

      log.error("Cannot process", e);
    }
  }

  public void processVehicleStatistic() throws Exception {

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

    FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);

    consumer.setStartFromLatest();

    consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

    FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);

    DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);

    inputMessagesStream
        .keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
        .window(TumblingEventTimeWindows.of(Time.seconds(20)))
        .aggregate(new VehicleStatisticsAggregator())
        .addSink(producer);

    environment.execute();

  }

  private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", kafkaAddress);

    properties.setProperty("group.id", kafkaGroup);

    return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);

  }

  private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){

    return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
  }

}

The methods createVehicleConsumerForTopic and createVehicleStatisticsProducer create the Flink-Kafka consumer and producer used in the pipeline. The producer is also called “sink” in the Flink terminology. The real execution happens in the method processVehicleStatistic.

Every Flink application needs StreamExecutionEnvironment. 

Figure 9: Flink Data Stream Processing

Streaming Data Processing with Apache Kafka and Apache Flink

The input streaming data is the data that comes from the vehicle-topic or the randomly generated Vehicle messages that we create from the producer service. 

The keyBy transformation is used to partition the data based on the vehicle type.

The window operator is used to assign windows to the streaming data in a keyed streaming data. If we wanted to assign windows to all the data from the stream without being keyed, we can use windowAll 

Streaming Data Processing with Apache Kafka and Apache Flink

With the aggregate transformation we assign a specific aggregator that in our case will aggregate the total sum of the vehicles for the windowed time frame:

Streaming Data Processing with Apache Kafka and Apache Flink

If we want to define a custom aggregator, we must implement AggregateFunction.

Then at last we use addSink to specify where the processed output should be directed – in our case, we use another Kafka topic but there are also other multiple possibilities.

Here are the Flink supported connectors listed:

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • FileSystem (Hadoop included) – Streaming only (sink)
  • FileSystem (Hadoop included) – Streaming and Batch (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)

We can also use the print method for printing the sink results in the System.out stream for testing and development purposes. 

If we start both services and use the print sink we can observe how the data processor service is calculating the sum of all the vehicles keyed by vehicle type for a certain time window.

Conclusion

We created a simple pipeline for producing, consuming, and processing endless streaming data using Apache Kafka and Apache Flink

And in doing so we presented the following topics:

  • General understanding of the streaming data.
  • Introduction to Apache Kafka and its usage related to streaming data processing.
  • Introduction to Apache Flink and its stateful streaming data computations.
  • Understanding how Flink can be integrated with Kafka and to use it both as a source and sink.

For more information, the official documentation pages are referenced.

The full source code of this example can be found at: https://github.com/helecloud/blogposts/tree/master/streaming-data-kafka-flink

References

https://kafka.apache.org/

https://flink.apache.org/