Exception in flatmap function when deploying to cluster

July 02, 2019, at 02:50 AM

I have a flink-ignite application. I receive messages from kafka and process the mesages and then cache to ignite. when i run program in ide(intellij) and stand-alone jar there is no problem but when i deploy to cluster i got this exception (I have created the table earlier in the code.). Thanks in advance. Note that connection variables are static in my main class.

   Caused by: java.lang.NullPointerException
        at altosis.flinkcompute.compute.Main$2.flatMap(Main.java:95)
        at altosis.flinkcompute.compute.Main$2.flatMap(Main.java:85)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        ... 22 more
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
            DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);
            KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks(
                    new AssignerWithPeriodicWatermarksImpl()
                    keyBy(new KeySelector<EventSalesQuantity, String>() {
                        public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
                            return  eventSalesQuantity.getDealer();
            DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS),Time.hours(21))).aggregate(new AggregateImpl());
            ignite = Ignition.start();
            ClientConfiguration cfg = new ClientConfiguration().setAddresses("");
            igniteClient = Ignition.startClient(cfg);
            System.out.println(">>> Thin client put-get example started.");
                    new SqlFieldsQuery(String.format(
                            "CREATE TABLE IF NOT EXISTS Eventcache (eventtime VARCHAR PRIMARY KEY, bayi VARCHAR, sales INT ) WITH \"VALUE_TYPE=%s\"",
            eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
            conn = DriverManager.getConnection("jdbc:ignite:thin://");
            eventSinkStream.flatMap(new FlatMapFunction<Tuple2<EventSalesQuantity, Integer>, Object>() {
                public void flatMap(Tuple2<EventSalesQuantity, Integer> eventSalesQuantityIntegerTuple2, Collector<Object> collector) throws Exception {
                    Ignsql= conn.prepareStatement(
                            "INSERT INTO Eventcache (eventtime, bayi, sales) VALUES (?, ?, ?)");
                    Ignsql.setString(1, eventSalesQuantityIntegerTuple2.f0.getTransactionDate());
                    Ignsql.setString(2, eventSalesQuantityIntegerTuple2.f0.getDealer());
                    Ignsql.setInt(3, eventSalesQuantityIntegerTuple2.f1);
           // eventSinkStream.print();
Answer 1

I assume when you say "Note that connection variables are static in my main class", you're talking about Ignsql. If so, then your code won't work because that variable isn't available to your map function, which is serialized and distributed by the JobManager before the workflow actually starts running.

You should create a RichFlatMapFunction class, and in the open() method you set up the connection variable(s) that you need, and then close them in the close() method. If you have configuration parameters required to set up the connection variable, you'd pass those into the RichFlatMapFunction's constructor and save them in (non-transient) variables, and then use them in the open() method.

