Flink-CEP detects brute force cracking and remote login behavior based on web logs Demo
Code example
(1) Main program code
import Beans.EventPOJO; import Beans.WaringMsgPOJO; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import scala.util.parsing.json.JSONObject; import java.net.URL; import java.util.List; import java.util.Map; public class cepTestPro {<!-- --> public static void main(String[] args) throws Exception {<!-- --> // 1. Create a streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2. Read data from the file, convert it to POJO data object, and configure the water level URL resource = cepTestPro.class.getResource("/loginlog.csv"); DataStream<EventPOJO> eventPOJOStream = env.readTextFile(resource.getPath()) .map(line -> {<!-- --> String[] fields = line.split(","); return new EventPOJO(fields[0].trim(), fields[1].trim(), fields[2].trim(), fields[3].trim(), Long.valueOf(fields[4].trim ())); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventPOJO>(Time.seconds(2)) {<!-- --> @Override public long extractTimestamp(EventPOJO eventPOJO) {<!-- --> return eventPOJO.getTimestamp() * 1000L; } }); // 3. Group by user name KeyedStream<EventPOJO, String> eventKeyedStream = eventPOJOStream.keyBy(event -> event.getName()); // eventPOJOStream.print(); // 4.1 Define a cep matching rule, 3 consecutive login failures within 5 seconds Pattern<EventPOJO, EventPOJO> loginFailPattern = Pattern .<EventPOJO>begin("loginFailEvents").where(new SimpleCondition<EventPOJO>() {<!-- --> @Override public boolean filter(EventPOJO value) throws Exception {<!-- --> return "fail".equals(value.getLoginStatus()); } }).times(3).consecutive() .within(Time.seconds(5)); // 4.2 Define a cep matching rule: successful login at different IP addresses within 5 seconds Pattern<EventPOJO, EventPOJO> LoginMorePlacePattern = Pattern .<EventPOJO>begin("firstLogin").where(new SimpleCondition<EventPOJO>() {<!-- --> @Override public boolean filter(EventPOJO eventPOJO) throws Exception {<!-- --> return "success".equals(eventPOJO.getLoginStatus()); } }).next("secondLogin").where(new IterativeCondition<EventPOJO>() {<!-- --> @Override public boolean filter(EventPOJO eventPOJO, Context<EventPOJO> context) throws Exception {<!-- --> EventPOJO firstLogin = context.getEventsForPattern("firstLogin").iterator().next(); return !eventPOJO.getIp().equals(firstLogin.getIp()); } }).within(Time.seconds(5)); // 4.3 Apply the matching pattern to the data stream to obtain patternStream PatternStream<EventPOJO> loginFailPatternStream = CEP.pattern(eventKeyedStream, loginFailPattern); PatternStream<EventPOJO> LoginMorePlacePatternStream = CEP.pattern(eventKeyedStream, LoginMorePlacePattern); // 4.4 Detect complex events that meet the matching rules, perform conversion processing, and obtain alarm information SingleOutputStreamOperator<WaringMsgPOJO> loginMorePlaceWarning = LoginMorePlacePatternStream.select(new LoginMorePlaceWarning()); SingleOutputStreamOperator<WaringMsgPOJO> loginFailWarning = loginFailPatternStream.select(new LoginFailWarning()); // 5. Print alarm output loginMorePlaceWarning.print(); loginFailWarning.print(); //Execute flink task env.execute(); } //Alarm output for multiple login events public static class LoginMorePlaceWarning implements PatternSelectFunction<EventPOJO, WaringMsgPOJO> {<!-- --> @Override public WaringMsgPOJO select(Map<String, List<EventPOJO>> pattern) throws Exception {<!-- --> EventPOJO firstLoginEvent = pattern.get("firstLogin").iterator().next();; EventPOJO secondLoginEvent = pattern.get("secondLogin").get(0); return new WaringMsgPOJO(firstLoginEvent.getName(), firstLoginEvent.getIp() + ", " + secondLoginEvent.getIp(), "More place login", firstLoginEvent.getTimestamp(), secondLoginEvent.getTimestamp()); } } //Alarm output for consecutive login failure events public static class LoginFailWarning implements PatternSelectFunction<EventPOJO, WaringMsgPOJO> {<!-- --> @Override public WaringMsgPOJO select(Map<String, List<EventPOJO>> pattern) throws Exception {<!-- --> EventPOJO firstFailEvent = pattern.get("loginFailEvents").get(0); EventPOJO lastFailEvent = pattern.get("loginFailEvents").get(pattern.get("loginFailEvents").size() - 1); return new WaringMsgPOJO(firstFailEvent.getName(), firstFailEvent.getIp(), "Login fail " + pattern.get("loginFailEvents").size() + " times", firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp()); } } }
(2) Bean objects used
package Beans; public class EventPOJO {<!-- --> public String name; public String url; public String ip; public String loginStatus; public Long timestamp; public EventPOJO() {<!-- --> } public EventPOJO(String name, String url, String ip, String loginStatus, Long timestamp) {<!-- --> this.name = name; this.url = url; this.ip = ip; this.loginStatus = loginStatus; this.timestamp = timestamp; } public String getName() {<!-- --> return name; } public void setName(String name) {<!-- --> this.name = name; } public String getUrl() {<!-- --> return url; } public void setUrl(String url) {<!-- --> this.url = url; } public String getIp() {<!-- --> return ip; } public void setIp(String ip) {<!-- --> this.ip = ip; } public String getLoginStatus() {<!-- --> return loginStatus; } public void setLoginStatus(String loginStatus) {<!-- --> this.loginStatus = loginStatus; } public Long getTimestamp() {<!-- --> return timestamp; } public void setTimestamp(Long timestamp) {<!-- --> this.timestamp = timestamp; } @Override public String toString() {<!-- --> return "EventPOJO{" + "name='" + name + '\'' + ", url='" + url + '\'' + ", ip='" + ip + '\'' + ", loginStatus='" + loginStatus + '\'' + ", timestamp=" + timestamp + '}'; } }
package Beans; public class WaringMsgPOJO {<!-- --> public String username; public String ip; public String warningMsg; public Long firstTime; public Long lastTime; public WaringMsgPOJO() {<!-- --> } public WaringMsgPOJO(String username, String ip, String warningMsg, Long firstTime, Long lastTime) {<!-- --> this.username = username; this.ip = ip; this.warningMsg = warningMsg; this.firstTime = firstTime; this.lastTime = lastTime; } public String getUsername() {<!-- --> return username; } public void setUsername(String username) {<!-- --> this.username = username; } public String getIp() {<!-- --> return ip; } public void setIp(String ip) {<!-- --> this.ip = ip; } public String getWarningMsg() {<!-- --> return warningMsg; } public void setWarningMsg(String warningMsg) {<!-- --> this.warningMsg = warningMsg; } public Long getFirstTime() {<!-- --> return firstTime; } public void setFirstTime(Long firstTime) {<!-- --> this.firstTime = firstTime; } public Long getLastTime() {<!-- --> return lastTime; } public void setLastTime(Long lastTime) {<!-- --> this.lastTime = lastTime; } @Override public String toString() {<!-- --> return "WaringMsgPOJO{" + "username='" + username + '\'' + ", ip='" + ip + '\'' + ", warningMsg='" + warningMsg + '\'' + ", firstTime='" + firstTime + '\'' + ", lastTime='" + lastTime + '\'' + '}'; } }
(3) Test data
Bob,./index.html,10.255.82.110,fail,1597184223 Bob,./index.html,10.255.82.110,fail,1597184224 Bob,./index.html,10.255.82.110,fail,1597184226 Bob,./index.html,10.255.82.110,success,1597184225 Bob,./index.html,10.255.82.110,fail,1597184226 Frank,./index.html,10.255.82.110,fail,1597184227 Bob,./index.html,10.255.82.110,fail,1597184227 Frank,./index.html,10.255.82.110,fail,1597184228 Bob,./index.html,10.255.82.110,success,1597184235 Alice,./index.html,10.255.82.110,fail,1597184236 Alice,./index.html,10.255.82.110,fail,1597184237 Frank,./index.html,10.255.82.110,success,15971842238 Frank,./index.html,10.255.82.111,success,15971842239 Alice,./index.html,10.255.82.110,success,1597184246