Introduction
In this article we will build a simple pipeline to produce, and process live streaming data with Apache Kafka and Apache Flink. Before we dive into details let’s look at the key concepts and technologies that we are going to use.
What is streaming data?
Streaming data is data that is continuously generated (often by various sources). Usually, such data cannot be ingested for further processing, therefore different streaming processing techniques should be applied.
There are two types of streams:
- Bounded streams – streams requiring a defined start and end. These streams however can be processed after all the data is ingested before doing further computations.
- Unbounded streams – streams requiring a start but no defined end. They are not expected to be terminated. They provide data continuously when it is generated. Therefore, such data should be processed continuously, because it is not possible to wait for all data to appear.
In this article, we will focus on the unbounded streams. So, our pipeline example will consist of two microservices – a Kafka producer one that will generate the unbounded streaming data. The second one will consume the data from the producer, and will use Flink to make some computations and stream the processed result data into a new aggregated unbounded stream.
Before we start the implementation, let’s introduce the key technologies that we’ll be using.
Apache Kafka
Apache Kafka is an open-source distributed event streaming platform developed by the Apache Software Foundation.
The platform can be used to:
- Publish and subscribe to streams of events.
- To store streams of events with high level durability and reliability.
- To process streams of events as they occur.
In short Kafka topology uses topics to categorise events/messages. The Kafka topics are multi-subscriber (this means that there can be multiple consumers for a single topic) and are partitioned across the Kafka cluster.
Apache Flink
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded streaming data. It can run on all common cluster environments (like Kubernetes) and it performs computations over streaming data with in-memory speed and at any scale.
Stateful Stream Processing
Flink operations can be stateful – This means that how one message/event is handled can depend on the accumulated effect of all events that are processed. State can be used for simple computations – like how many messages/events came per time unit or for more complex logic – such as computations for fraud detection models.
What can be streamed
- Basic types; String, Long, Integer, Boolean, Array
- Composite types; Tuples, POJOs, and Scala case classes
Streaming windows
Windows are the most important features, while processing infinite streams. Windows split the stream into separate buckets of finite size, over which we can apply computations (e.g., average value).
In the picture below there are examples of two types of timed windows – tumbling and slicing windows:
Both types of streaming windows are time framed, but unlike the tumbling windows, in the slicing windows there is overlapping between the window frames.
In the next section, we explain the actual implementation and go more into detail about the stream computations provided by Flink.
Streaming data pipeline implementation
The pipeline that we will build will be a simulation of a data stream from a traffic camera that records each vehicle captured by it. Then we will have a consumer service that will create a statistic result for how many vehicles of certain types (cars, trucks) pass for a certain time period.
The producer microservice
First for demonstration purposes we need to install Kafka locally. This topic will use the Confluent Community Docker Image for Apache Kafka distribution in docker hub.
After we have Kafka up and running, we can proceed with building the java microservice application that will serve as our producer. We will use Spring Boot and therefore we do not need to create configurational beans for Kafka¸ the Spring Boot autoconfiguration will create everything we need so we only need the maven dependencies and the application properties as follow:
Figure 1: Maven dependencies
Figure 2: application.yml
The model
Our message model is a simple POJO class called Vehicle:
Figure 3: Vehicle
Figure 4: VehicleType
Then the real producer service will just randomly generate Vehicle events. The vehicles of type “Truck” are less frequent than the “Car” types. To produce messages for Kafka we only need to inject a bean of type KafkaTemplate:
Figure 5: ProducerService
When we run the Spring application it will start sending messages of type Vehicle to the topic “vehicle-topic” to our local cluster. With this we can consider our producer finished.
The Data Processor Service
Figure 6: Maven Dependencies
Above are the maven dependencies, what we can notice is that besides the version of Flink, there is also the Scala version that Flink is compatible with.
We again use Spring Boot, however there is no such suitable integration for Flink as we noticed there is for Apache Kafka.
The model that we are going to use maps the model from the producer service but in addition to that we introduce one new model class which we are going to use to preserve the result of the streaming data computation. This time we need to use @JsonProperty annotation for serialization purposes, because this time we don’t count on Spring to do the work for us and instead we expect Flink to do it.
To support the deserialisation of the Vehicle model and serialisation to VehicleStatistics we must implement respectively the DeserialisationSchema and SerialisationSchema interfaces from Flink library. In the implementation we will use Jackson’s ObjectMapper.
Figure 7: VehicleDeserializationSchema
Figure 8: VehicleStatisticsSerializationSchema
We have implemented the Flink part in a class called Processing service:
@Service @Log4j2 @RequiredArgsConstructor public class ProcessingService { @Value("${kafka.bootstrap-servers}") private String kafkaAddress; @Value("${kafka.group-id}") private String kafkaGroupId; public static final String TOPIC = "vehicle-topic"; public static final String VEHICLE_STATISTICS_TOPIC = "vehicle-statistics-topic"; private final VehicleDeserializationSchema vehicleDeserializationSchema; private final VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema; @PostConstruct public void startFlinkStreamProcessing() { try { processVehicleStatistic(); } catch (Exception e) { log.error("Cannot process", e); } } public void processVehicleStatistic() throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId); consumer.setStartFromLatest(); consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress); DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer); inputMessagesStream .keyBy((vehicle -> vehicle.getVehicleType().ordinal())) .window(TumblingEventTimeWindows.of(Time.seconds(20))) .aggregate(new VehicleStatisticsAggregator()) .addSink(producer); environment.execute(); } private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id", kafkaGroup); return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties); } private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){ return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema); } }
The methods createVehicleConsumerForTopic and createVehicleStatisticsProducer create the Flink-Kafka consumer and producer used in the pipeline. The producer is also called “sink” in the Flink terminology. The real execution happens in the method processVehicleStatistic.
Every Flink application needs StreamExecutionEnvironment.
Figure 9: Flink Data Stream Processing
The input streaming data is the data that comes from the vehicle-topic or the randomly generated Vehicle messages that we create from the producer service.
The keyBy transformation is used to partition the data based on the vehicle type.
The window operator is used to assign windows to the streaming data in a keyed streaming data. If we wanted to assign windows to all the data from the stream without being keyed, we can use windowAll
With the aggregate transformation we assign a specific aggregator that in our case will aggregate the total sum of the vehicles for the windowed time frame:
If we want to define a custom aggregator, we must implement AggregateFunction.
Then at last we use addSink to specify where the processed output should be directed – in our case, we use another Kafka topic but there are also other multiple possibilities.
Here are the Flink supported connectors listed:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- FileSystem (Hadoop included) – Streaming only (sink)
- FileSystem (Hadoop included) – Streaming and Batch (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
- JDBC (sink)
We can also use the print method for printing the sink results in the System.out stream for testing and development purposes.
If we start both services and use the print sink we can observe how the data processor service is calculating the sum of all the vehicles keyed by vehicle type for a certain time window.
Conclusion
We created a simple pipeline for producing, consuming, and processing endless streaming data using Apache Kafka and Apache Flink.
And in doing so we presented the following topics:
- General understanding of the streaming data.
- Introduction to Apache Kafka and its usage related to streaming data processing.
- Introduction to Apache Flink and its stateful streaming data computations.
- Understanding how Flink can be integrated with Kafka and to use it both as a source and sink.
For more information, the official documentation pages are referenced.
The full source code of this example can be found at: https://github.com/helecloud/blogposts/tree/master/streaming-data-kafka-flink