2.flink cep

教程 阿布都的都 ⋅ 于 2023-01-07 16:46:57 ⋅ 1019 阅读

flink cep

什么是 CEP

非确定有限状态机

复杂事件处理(Complex Event Processing,CEP)

Flink CEP是在 Flink 中实现的复杂事件处理(CEP)库

CEP 允许在无休止的事件流中检测事件模式,让我们有机会掌握数据 中重要的部分

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出 用户想得到的数据 —— 满足规则的复杂事件

file

file

目标:从有序的简单事件流中发现一些高阶特征

输入:一个或多个由简单事件构成的事件流

处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

输出:满足规则的复杂事件

引入依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.10.3</version>
</dependency>

flink在升级1.10.3的时候需要注意一个问题,keyedState状态管理的时候有一个TTL

.setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)

第一个cep程序

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.process(
    new PatternProcessFunction<Event, Alert>() {
        @Override
        public void processMatch(
                Map<String, List<Event>> pattern,
                Context ctx,
                Collector<Alert> out) throws Exception {
            out.collect(createAlertFrom(pattern));
        }
    });

第一个案例代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        SingleOutputStreamOperator<Tuple3<String, String, String>> ds = env.readTextFile("data/cep_data.txt").map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                String[] split = value.split(" ");
                return Tuple3.of(split[0], split[1], split[2]);
            }
        });

        KeyedStream<Tuple3<String, String, String>, String> ds2 = ds.keyBy(t -> t.f0);
        Pattern<Tuple3<String, String, String>, Tuple3<String, String, String>> pattern = Pattern.<Tuple3<String, String, String>>begin("start").where(new SimpleCondition<Tuple3<String, String, String>>() {
            @Override
            public boolean filter(Tuple3<String, String, String> value) throws Exception {
                return value.f2.equals("regist");
            }
        }).times(5);

        PatternStream<Tuple3<String, String, String>> patternDs = CEP.pattern(ds2, pattern);
        patternDs.select(new PatternSelectFunction<Tuple3<String,String,String>, String>() {
            @Override
            public String select(Map<String, List<Tuple3<String, String, String>>> map) throws Exception {
                return map.get("start").get(0).f1;
            }
        }).print();
        env.execute();

Pattern API

Pattern API允许你定义要从输入流中提取的复杂模式序列。

每个复杂模式序列都是由多个简单模式组成,即寻找具有相同属性的单个事件的模式。我们可以先定义一些简单的模式,然后组合成复杂的模式序列。
可以将模式序列视为此类模式的结构图,基于用户指定的条件从一个模式转换到下一个模式,例如, event.getName().equals(“start”)。
匹配是一系列输入事件,通过一系列有效的模式转换访问复杂模式图中的所有模式。

注意每个模式必须具有唯一的名称,以便后续可以使用该名称来标识匹配的事件。

注意模式名称不能包含字符“:”。

单个模式

Pattern可以是单个,也可以是循环模式。单个模式接受单个事件,而循环模式可以接受多个事件。在模式匹配符号中,模式“a b + c?d”(或“a”,后跟一个或多个“b”,可选地后跟“c”,后跟“d”),a,c ?,和d是单例模式,而b +是循环模式。

默认情况下,模式是单个模式,您可以使用Quantifiers将其转换为循环模式。每个模式可以有一个或多个条件,基于它接受事件。

量词

在FlinkCEP中,您可以使用以下方法指定循环模式:pattern.oneOrMore(),用于期望一个或多个事件发生的模式(例如之前提到的b +);和pattern.times(#ofTimes),
用于期望给定类型事件的特定出现次数的模式,例如4个;和patterntimes(#fromTimes,#toTimes),用于期望给定类型事件的最小出现次数和最大出现次数的模式,例如, 2-4。

您可以使用pattern.greedy()方法使循环模式变得贪婪,但是还不能使组模式变得贪婪。您可以使用pattern.optional()方法使得所有模式,循环与否,变为可选。

// expecting 4 occurrences
 start.times(4);

 // expecting 0 or 4 occurrences
 start.times(4).optional();

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore();

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional();

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences and repeating as many as possible

start.timesOrMore(2).optional().greedy();

条件

注意:until方法不能放在非循环的算子后面,必须使用循环算子,必须oneOrMore

对于每个模式,您可以指定传入事件必须满足的条件才能被“接受”到模式中,例如,其值应大于 5,或大于先前接受事件的平均值。您可以通过pattern.where()pattern.or()pattern.until()方法指定事件属性的条件。这些可以是IterativeConditions 或SimpleConditions。

SimpleConditionsiterativeConditions的子类,在IterativeConditions中是可以获取之前步骤数据的内容的

迭代条件:这是最通用的条件类型。这就是您可以根据先前接受的事件的属性或它们的子集的统计数据指定接受后续事件的条件的方式。

下面是一个迭代条件的代码,它接受名为“middle”的模式的下一个事件,如果它的名称以“foo”开头,并且如果该模式的先前接受事件的价格总和加上当前的价格事件不超过 5.0 的值。迭代条件可能很强大,尤其是与循环模式结合使用时,例如oneOrMore().

middle.oneOrMore()
    .subtype(SubEvent.class)
    .where(new IterativeCondition<SubEvent>() {
        @Override
        public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
            if (!value.getName().startsWith("foo")) {
                return false;
            }

            double sum = value.getPrice();
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.getPrice();
            }
            return Double.compare(sum, 5.0) < 0;
        }
    });

一下是until案例,匹配一个内容出现五次以上就停止匹配(使用until)

如果是判断次数的条件,那么必须是在正常的条件上面-1

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        SingleOutputStreamOperator<Tuple3<String, String, String>> ds = env.readTextFile("data/cep_data.txt").map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                String[] split = value.split(" ");
                return Tuple3.of(split[0], split[1], split[2]);
            }
        });

        KeyedStream<Tuple3<String, String, String>, String> ds2 = ds.keyBy(t -> t.f0);
        Pattern<Tuple3<String, String, String>, Tuple3<String, String, String>> pattern = Pattern.<Tuple3<String, String, String>>begin("start").where(new SimpleCondition<Tuple3<String, String, String>>() {
            @Override
            public boolean filter(Tuple3<String, String, String> value) throws Exception {
                return value.f2.equals("regist");
            }
        }).oneOrMore().until(new IterativeCondition<Tuple3<String, String, String>>() {
            @Override
            public boolean filter(Tuple3<String, String, String> stringStringStringTuple3, Context<Tuple3<String, String, String>> context) throws Exception {
                Iterator<Tuple3<String, String, String>> start = context.getEventsForPattern("start").iterator();
                int count = 0;
                while(start.hasNext()){
                    count++;
                    start.next();
                }
                System.out.println(count);
                return count>4;
            }
        });

        PatternStream<Tuple3<String, String, String>> patternDs = CEP.pattern(ds2, pattern);
        patternDs.select(new PatternSelectFunction<Tuple3<String,String,String>, String>() {
            @Override
            public String select(Map<String, List<Tuple3<String, String, String>>> map) throws Exception {
                return map.get("start").get(0).f1;
            }
        }).print();
        env.execute();

简单条件:这种类型的条件扩展了上述IterativeCondition类,并根据事件本身的属性来决定是否接受事件。

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});

最后,您还可以Event通过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型(此处)的子类型。

start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});

组合条件:如上所示,您可以将subtype条件与附加条件组合。这适用于所有条件。您可以通过顺序调用来任意组合条件where()。最终结果将是各个条件结果的逻辑。要使用OR组合条件,您可以使用该or()方法,如下所示。

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});

为了更好地理解它,请看以下示例。给定的

  • 模式像"(a+ until b)"(一个或多个"a"直到"b"
  • 一系列传入事件 "a1" "c" "a2" "b" "a3"
  • 库将输出结果:{a1 a2} {a1} {a2} {a3}.

如您所见,{a1 a2 a3}{a2 a3}由于停止条件而未返回。

     StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> ds = env.readTextFile("data/cep_data1.txt");
        Pattern<String, String> pattern = Pattern.<String>begin("start").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("a");
            }
        }).oneOrMore().until(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("b");
            }
        });
        CEP.pattern(ds,pattern).flatSelect(new PatternFlatSelectFunction<String, String>() {
            @Override
            public void flatSelect(Map<String, List<String>> map, Collector<String> collector) throws Exception {
                String ss = "";
                for(String s:map.get("start")){
                    ss+=" "+s;
                }
                collector.collect(ss);
            }
        }).print();

在匹配规则中也是一样的,因为flink是流式输出,只要符合规则的都要进行输出,所以until等只是一个中间的间隔符而已,遇见就会跳过,继续进行匹配,但是并不代表终结

条件API

where(条件) 定义当前模式的条件。要匹配模式,事件必须满足条件。多个连续的 where() 子句导致它们的条件被 AND 运算:pattern.where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // some condition } });
or(条件) 添加一个与现有条件进行 OR 运算的新条件。仅当事件至少通过以下条件之一时,它才能匹配模式:pattern.where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // some condition } }).or(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // alternative condition } });
until(条件) 指定循环模式的停止条件。这意味着如果发生与给定条件匹配的事件,则模式中将不再接受更多事件。仅适用于与 oneOrMore()注意:它允许在基于事件的条件下清除相应模式的状态。pattern.oneOrMore().until(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // alternative condition } });
subtype(子类) 定义当前模式的子类型条件。如果事件属于此子类型,则事件只能匹配模式:pattern.subtype(SubEvent.class);
oneOrmore() 指定此模式期望匹配事件至少发生一次。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参见连续.注意:建议使用until()within()启用状态清除pattern.oneOrMore();
timesOrMore(#times) 指定此模式期望匹配事件至少出现#times次。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参见连续.pattern.timesOrMore(2);
times(#ofTimes) 指定此模式期望匹配事件出现的确切次数。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参见连续.pattern.times(2);
times(start,end) 指定此模式期望在匹配事件的#fromTimes#toTimes之间发生。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参见连续.pattern.times(2, 4);
optional() 指定此模式是可选的,即它可能根本不会出现。这适用于所有上述量词。pattern.oneOrMore().optional();
greedy() 指定这个模式是贪婪的,即它会尽可能多地重复。这仅适用于量词,目前不支持组模式。pattern.oneOrMore().greedy();

组合模式

现在您已经了解了单个模式的外观,是时候看看如何将它们组合成一个完整的模式序列了。

模式序列必须以初始模式开始,如下所示:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

接下来,您可以通过指定它们之间所需的连续条件,将更多模式附加到您的模式序列中。FlinkCEP 支持以下事件之间的连续形式:

  1. 严格连续性:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件。
  2. 宽松连续性:忽略出现在匹配事件之间的非匹配事件。
  3. 非确定性宽松连续性:进一步放松连续性,允许忽略某些匹配事件的额外匹配。

file

接下来,您可以通过指定它们之间所需的连续条件,将更多模式附加到您的模式序列中。FlinkCEP 支持以下事件之间的连续形式:

  1. 严格连续性:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件。

    – 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由 .next() 指定

    – 例如对于模式”a next b”,事件序列 [a, c, b1, b2] 没有匹配

  2. 宽松连续性:忽略出现在匹配事件之间的非匹配事件。

    – 允许中间出现不匹配的事件,由 .followedBy() 指定

    – 例如对于模式”a followedBy b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1}

  3. 非确定性宽松连续性:进一步放松连续性,允许忽略某些匹配事件的额外匹配。

    – 进一步放宽条件,之前已经匹配过的事件也可以再次使用,由 .followedByAny() 指定

    – 例如对于模式”a followedByAny b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1},{a, b2}

宽松的连续性意味着只会匹配第一个后续匹配事件,而对于非确定性的宽松连续性,将针对同一个开始发出多个匹配项。例如,"a b"给定事件序列的模式"a", "c", "b1", "b2"将给出以下结果:

  1. "a"和之间的严格连续性"b":({}不匹配),"c""a"导致"a"被丢弃。
  2. "a""b":之间的{a b1}宽松连续性,因为宽松的连续性被视为“跳过不匹配的事件直到下一个匹配的事件”。
  3. "a""b": {a b1},之间的非确定性松弛邻接{a b2},因为这是最一般的形式。

也可以定义模式有效的时间约束。例如,您可以通过该pattern.within()方法定义模式应在 10 秒内发生。处理时间和事件时间都支持时间模式。

组匹配条件

模式操作 描述
begin(#名称) 定义一个起始模式:Pattern<Event, ?> start = Pattern.<Event>begin("start");
begin(#pattern_sequence) 定义一个起始模式:Pattern<Event, ?> start = Pattern.<Event>begin( Pattern.<Event>begin("start").where(...).followedBy("middle").where(...) );
next(#名称) 附加一个新模式。匹配事件必须直接继承前一个匹配事件(严格连续):Pattern<Event, ?> next = start.next("middle");
followedBy(#名称) 附加一个新模式。其他事件可能发生在匹配事件和前一个匹配事件之间(宽松连续性):Pattern<Event, ?> followedBy = start.followedBy("middle");
followedByAny(#name) 附加一个新模式。其他事件可能发生在匹配事件和前一个匹配事件之间,并且将为每个替代匹配事件呈现替代匹配(非确定性宽松连续性):Pattern<Event, ?> followedByAny = start.followedByAny("middle");
not next() 附加一个新的否定模式。匹配(否定)事件必须直接在前一个匹配事件(严格连续)之后才能丢弃部分匹配:Pattern<Event, ?> notNext = start.notNext("not");
notFollowedBy() 附加一个新的否定模式。即使在匹配(否定)事件和前一个匹配事件(宽松连续)之间发生其他事件,部分匹配事件序列也将被丢弃:Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
within 定义事件序列与模式匹配的最大时间间隔。如果一个未完成的事件序列超过这个时间,它就会被丢弃:pattern.within(Time.seconds(10));

以下为演示代码 输入文件内容为 a c b1 b2

next演示代码

 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStreamSource<String> ds = env.readTextFile("data/cep_data1.txt");
        Pattern<String, String> pattern = Pattern.<String>begin("start").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("a");
            }
        }).next("next").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("b");
            }
        });
        CEP.pattern(ds,pattern).flatSelect(new PatternFlatSelectFunction<String, String>() {
            @Override
            public void flatSelect(Map<String, List<String>> map, Collector<String> collector) throws Exception {
                System.out.println(map);
                String ss = "";
                for(String s:map.get("next")){
                    ss+=" "+s;
                }
                collector.collect(ss);
            }
        }).print();

file

followby代码

public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStreamSource<String> ds = env.readTextFile("data/cep_data1.txt");
        Pattern<String, String> pattern = Pattern.<String>begin("start").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("a");
            }
        }).followedBy("next").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("b");
            }
        });
        CEP.pattern(ds,pattern).flatSelect(new PatternFlatSelectFunction<String, String>() {
            @Override
            public void flatSelect(Map<String, List<String>> map, Collector<String> collector) throws Exception {
                System.out.println(map);
                String ss = "";
                for(String s:map.get("next")){
                    ss+=" "+s;
                }
                collector.collect(ss);
            }
        }).print();

        env.execute();
    }

file

  public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStreamSource<String> ds = env.readTextFile("data/cep_data1.txt");
        Pattern<String, String> pattern = Pattern.<String>begin("start").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("a");
            }
        }).followedByAny("next").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("b");
            }
        });
        CEP.pattern(ds,pattern).flatSelect(new PatternFlatSelectFunction<String, String>() {
            @Override
            public void flatSelect(Map<String, List<String>> map, Collector<String> collector) throws Exception {
                System.out.println(map);
                String ss = "";
                for(String s:map.get("next")){
                    ss+=" "+s;
                }
                collector.collect(ss);
            }
        }).print();

        env.execute();
    }

file

notFollowBy代码

不能放在最后一个位置使用

 public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStreamSource<String> ds = env.readTextFile("data/cep_data1.txt");
        Pattern<String, String> pattern = Pattern.<String>begin("start").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("a");
            }
        }).notFollowedBy("not").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("c");
            }
        }).followedByAny("next").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("b");
            }
        });
        CEP.pattern(ds,pattern).flatSelect(new PatternFlatSelectFunction<String, String>() {
            @Override
            public void flatSelect(Map<String, List<String>> map, Collector<String> collector) throws Exception {
                System.out.println(map);
                String ss = "";
                for(String s:map.get("next")){
                    ss+=" "+s;
                }
                collector.collect(ss);
            }
        }).print();

        env.execute();
    }

file

within时间匹配代码

package com.hainiu_cep;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.*;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;
import java.util.Objects;

public class TestCep06 {
//    4 zhaosi_login 1634993066
//    5 zhangsan_operator 1634993067
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        KeyedStream<TimeoutVo, String> ds = env.readTextFile("data/cep_data2.txt").map(new MapFunction<String, TimeoutVo>() {
            @Override
            public TimeoutVo map(String value) throws Exception {
                String[] split = value.split("[_| ]");
                TimeoutVo timeoutVo = new TimeoutVo(Integer.valueOf(split[0]), split[1], split[2], Long.valueOf(split[3]));
                return timeoutVo;
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TimeoutVo>(Time.seconds(1)) {
            @Override
            public long extractTimestamp(TimeoutVo element) {
                return element.time * 1000;
            }
        }).keyBy(new KeySelector<TimeoutVo, String>() {
            @Override
            public String getKey(TimeoutVo value) throws Exception {
                return value.name;
            }
        });

        Pattern<TimeoutVo, TimeoutVo> pattern = Pattern.<TimeoutVo>begin("start").where(new SimpleCondition<TimeoutVo>() {
            @Override
            public boolean filter(TimeoutVo value) throws Exception {
                return value.op.equals("login");
            }
        }).next("next").where(new SimpleCondition<TimeoutVo>() {
            @Override
            public boolean filter(TimeoutVo value) throws Exception {
                return value.op.equals("operator");
            }
        }).within(Time.seconds(10));

        PatternStream<TimeoutVo> ps = CEP.pattern(ds, pattern);

        OutputTag<TimeoutVo> tag = new OutputTag<>("late", TypeInformation.of(TimeoutVo.class));
        SingleOutputStreamOperator<String> result = ps.select(tag, new PatternTimeoutFunction<TimeoutVo, TimeoutVo>() {
            @Override
            public TimeoutVo timeout(Map<String, List<TimeoutVo>> pattern, long timeoutTimestamp) throws Exception {
                List<TimeoutVo> next = pattern.get("start");
                if(next!=null && next.size()>0)
                    return next.get(0);
                else
                    return null;
            }
        }, new PatternSelectFunction<TimeoutVo, String>() {
            @Override
            public String select(Map<String, List<TimeoutVo>> pattern) throws Exception {
                List<TimeoutVo> next = pattern.get("next");
                if(next!=null && next.size()>0)
                    return next.get(0).name;
                else
                    return null;
            }
        });

        result.print("onTime:");
        result.getSideOutput(tag).print("late:");

        env.execute();

    }

    public static class TimeoutVo{
        private int id;
        private String name;
        private String op;
        private Long time;

        public TimeoutVo() {
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (!(o instanceof TimeoutVo)) return false;
            TimeoutVo timeoutVo = (TimeoutVo) o;
            return getId() == timeoutVo.getId() &&
                    Objects.equals(getName(), timeoutVo.getName()) &&
                    Objects.equals(getOp(), timeoutVo.getOp()) &&
                    Objects.equals(getTime(), timeoutVo.getTime());
        }

        @Override
        public int hashCode() {

            return Objects.hash(getId(), getName(), getOp(), getTime());
        }

        @Override
        public String toString() {
            return "TimeoutVo{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", op='" + op + '\'' +
                    ", time=" + time +
                    '}';
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getOp() {
            return op;
        }

        public void setOp(String op) {
            this.op = op;
        }

        public Long getTime() {
            return time;
        }

        public void setTime(Long time) {
            this.time = time;
        }

        public TimeoutVo(int id, String name, String op, Long time) {
            this.id = id;
            this.name = name;
            this.op = op;
            this.time = time;
        }
    }
}

file

输入数据为

1 liuneng_login 1634993062
10 zhangsan_login 1634993063
2 lisi_login 1634993064
3 wangwu_login 1634993065
4 zhaosi_login 1634993066
5 zhangsan_operator 1634993067
6 lisi_operator 1634993068
7 wangwu_operator 1634993069
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-阿布都的都,http://hainiubl.com/topics/76106
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter