Exception in flatmap function when deploying to cluster

171
July 02, 2019, at 02:50 AM

I have a flink-ignite application. I receive messages from kafka and process the mesages and then cache to ignite. when i run program in ide(intellij) and stand-alone jar there is no problem but when i deploy to cluster i got this exception (I have created the table earlier in the code.). Thanks in advance. Note that connection variables are static in my main class.

   Caused by: java.lang.NullPointerException
        at altosis.flinkcompute.compute.Main$2.flatMap(Main.java:95)
        at altosis.flinkcompute.compute.Main$2.flatMap(Main.java:85)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        ... 22 more
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.getConfig();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            environment.setParallelism(1);
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id","event-group");
            FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
            DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);
            KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks(
                    new AssignerWithPeriodicWatermarksImpl()
            ).
                    keyBy(new KeySelector<EventSalesQuantity, String>() {
                        @Override
                        public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
                            return  eventSalesQuantity.getDealer();
                        }
                    });
            DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS),Time.hours(21))).aggregate(new AggregateImpl());
            ignite = Ignition.start();
            ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
            igniteClient = Ignition.startClient(cfg);
            System.out.println(">>> Thin client put-get example started.");
            igniteClient.query(
                    new SqlFieldsQuery(String.format(
                            "CREATE TABLE IF NOT EXISTS Eventcache (eventtime VARCHAR PRIMARY KEY, bayi VARCHAR, sales INT ) WITH \"VALUE_TYPE=%s\"",
                            EventSalesQuantity.class.getName()
                    )).setSchema("PUBLIC")
            ).getAll();
            eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
            Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
            conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");
            eventSinkStream.flatMap(new FlatMapFunction<Tuple2<EventSalesQuantity, Integer>, Object>() {
                @Override
                public void flatMap(Tuple2<EventSalesQuantity, Integer> eventSalesQuantityIntegerTuple2, Collector<Object> collector) throws Exception {
                    Ignsql= conn.prepareStatement(
                            "INSERT INTO Eventcache (eventtime, bayi, sales) VALUES (?, ?, ?)");
                    Ignsql.setString(1, eventSalesQuantityIntegerTuple2.f0.getTransactionDate());
                    Ignsql.setString(2, eventSalesQuantityIntegerTuple2.f0.getDealer());
                    Ignsql.setInt(3, eventSalesQuantityIntegerTuple2.f1);
                    Ignsql.execute();
                    Ignsql.close();
                }
            });
           // eventSinkStream.print();
            environment.execute();```
Answer 1

I assume when you say "Note that connection variables are static in my main class", you're talking about Ignsql. If so, then your code won't work because that variable isn't available to your map function, which is serialized and distributed by the JobManager before the workflow actually starts running.

You should create a RichFlatMapFunction class, and in the open() method you set up the connection variable(s) that you need, and then close them in the close() method. If you have configuration parameters required to set up the connection variable, you'd pass those into the RichFlatMapFunction's constructor and save them in (non-transient) variables, and then use them in the open() method.

Rent Charter Buses Company
READ ALSO
How to make SurfaceView&#39;s background transparent without using function setZOrderOnTop(true)?

How to make SurfaceView's background transparent without using function setZOrderOnTop(true)?

I'm programming a turntable game view by LuckyPan from githubAs you see, it extends a SurfaceView

299
TestNG intelli-sense not working in Eclipse neon

TestNG intelli-sense not working in Eclipse neon

Here is the software, library i am using:

223
redirect ws-security warning to log4j in java 5

redirect ws-security warning to log4j in java 5

I have some client-side code built from wsdl via jax-ws way with command

185
How to enable mutual Authentication only if this is the option user chooses to authenticate the system?

How to enable mutual Authentication only if this is the option user chooses to authenticate the system?

We provide different type of authentication mechanisms for our users to authenticate our appOne of them is using smart cards certificates installed on it

122