public class JobCodeKeyProcessFunction extends KeyedProcessFunction<String,ScanWrapInfo, JobInfo> {
private static Jedis jedis = null;
private static ValueState<Double> weight = null;
private static ValueState<Double> volume = null;
private static ValueState<Double> pcs = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jedis = JedisUtil.getJedis();
//keyState的TTL策略
StateTtlConfig ttlConfig = StateTtlConfig
//keyState的超时时间为1分钟
.newBuilder(Time.seconds(60))
//当创建和更新时,重新计时超时时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//失败时不返回keyState的值
//.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//失败时返回keyState的值
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
//从runtimeContext中获得ck时保存的状态
# ValueStateDescriptor<Double> weightState = new ValueStateDescriptor<>("weight", Double.class);
# ValueStateDescriptor<Double> volumeState = new ValueStateDescriptor<>("volume", Double.class);
# ValueStateDescriptor<Double> pcsState = new ValueStateDescriptor<>("pcs", Double.class);
weightState.enableTimeToLive(ttlConfig);
volumeState.enableTimeToLive(ttlConfig);
pcsState.enableTimeToLive(ttlConfig);
ValueState<Double> weight = getRuntimeContext().getState(weightState);
ValueState<Double> volume = getRuntimeContext().getState(volumeState);
ValueState<Double> pcs = getRuntimeContext().getState(pcsState);
}
}
设置状态的代码如上:
1.运行后异常:
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=0)
Caused by: java.lang.NullPointerException
at operator.JobCodeKeyProcessFunction.processElement(JobCodeKeyProcessFunction.java:146)
at operator.JobCodeKeyProcessFunction.processElement(JobCodeKeyProcessFunction.java:20)
其中: (JobCodeKeyProcessFunction.java:146)代码为:Double curWeight = weight.value();
weight为valueState;
2.debug异常
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=0)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) timed out.
Caused by: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) timed out.
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/taskmanager_0#-2127188183]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalRpcInvocation]. A typical reason for AskTimeoutException
is that the recipient actor didn't send a reply.
请问是什么导致以上异常的呢