Apache Beam Streaming from Kafka into Files over Fixed Window not working

123
April 15, 2022, at 4:10 PM

In python, I am trying to have a streaming pipeline that ingests data from Kafka, and then writes that data in 30 second intervals into a file at the end of each interval. I am using the FlinkRunner.

Here is a snippet of the code:

loaded_windowed_data = (
            pipeline
            | 'Read from kafka' >> ReadFromKafka(consumer_config={'bootstrap.servers': bootstrap_servers, 'auto.offset.reset': 'earliest'},topics=[input_topic])
            | 'Decode Kafka' >> beam.Map(lambda record: decode_kafka(record))
            | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
            | 'Json Load' >> beam.ParDo(JsonLoad())
            | "PrintOutput" >> beam.ParDo(lambda c: print(c, type(c)))
            #| 'Write' >> WriteToText(output_path)
       
            )

The above code properly prints out the decoded json data. According to my understanding, a fixed window of 30 seconds should execute the my write/print. And the code as is does print. However, when I try to use the WriteToText class, I do not get any errors, but neither are my files written as desired.

I've also tried using a sink, as I've seen suggested by various posts:

class JsonSink(TextSink):
    def write(self, record):
      self._fh.write(json.dumps(record).encode('utf8'))
      self._fh.write('\n'.encode('utf8'))
loaded_windowed_data = (
    pipeline
    | 'Read from kafka' >> ReadFromKafka(consumer_config={'bootstrap.servers': bootstrap_servers, 'auto.offset.reset': 'earliest'},topics=[input_topic])
    | 'Decode Kafka' >> beam.Map(lambda record: decode_kafka(record))
    | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
    | 'Json Load' >> beam.ParDo(JsonLoad())
    | 'Write Files' >> WriteToFiles(
                      path='./output',
                      #destination=lambda record: hash(record),
                      sink=lambda dest: JsonSink())

This snippet doesn't write either unless I replace 30 second fixed windows with the following:

loaded_windowed_data = (
    pipeline
    | 'Read from kafka' >> ReadFromKafka(consumer_config={'bootstrap.servers': bootstrap_servers, 'auto.offset.reset': 'earliest'},topics=[input_topic])
    | 'Decode Kafka' >> beam.Map(lambda record: decode_kafka(record))
    | 'Json Load' >> beam.ParDo(JsonLoad())
    | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
    | 'Write Files' >> WriteToFiles(
                      path='./output',
                      #destination=lambda record: hash(record),
                      sink=lambda dest: JsonSink())

Even though this snippet does work, it seems to write about 60 text files in the output folder and then stops.

My question is what is the proper way to write a file streamed from Kafka? particularly, why doesn't the simple fixed window write every 30 seconds as desired? Do I need any particular window/trigger settings to get this to work as desire.

My code is largely based on this example provided by GCP: https://cloud.google.com/pubsub/docs/samples/pubsub-to-gcs

Rent Charter Buses Company
READ ALSO
Writing to a file with mode "w" versus "a" - I can't get "w" to work properly

Writing to a file with mode "w" versus "a" - I can't get "w" to work properly

Let me start out by saying I am very new to PythonI'm taking a Udemy bootcamp, so I am not by any means condoning my solution as the best solution, but for the life of me, I cannot figure out why, when I write to the output files, I have to use mode "a"...

89
Selenium python: iterating through dropdown element - number of items varies on page

Selenium python: iterating through dropdown element - number of items varies on page

I have a repository of pages accessible from a Home pageI have a selenium script that currently loops through each of these (a set number of pages) as I would like, telling me if an iframe on the respective page loads certain elements or not

126
How to plot in edges and routes on the same folium in osmnx

How to plot in edges and routes on the same folium in osmnx

I'm trying to plot edges and routes in an interactive map by using: oxplot_graph_folium or ox

117
Capturing stdout result of `flutter test integration_test`

Capturing stdout result of `flutter test integration_test`

For CI, I need to write a program that runs flutter test integration_test and captures the stdout resultIn python the code would be:

95