DolphinDB stream computing application: engine cascade monitoring access control exception status

The development of the Internet of Things has brought more convenience to intelligent security and automated monitoring. At the same time, the development of new town construction, smart cities and smart communities has also put forward higher requirements for the intelligentization of security issues such as access control management. In the context of intelligent development, access control is not only access control, but also a comprehensive and convenient system security application that integrates more functions such as visitor, attendance, consumption, patrol, elevator control, etc. At present, the access control system is mainly used for entrance and exit management. Under the premise of accelerating the development of intelligent construction such as smart cities, smart construction sites, and smart communities in my country, the trend of intelligent upgrading of access control systems has become inevitable, and its penetration rate and utilization rate will also be more extensive. , with more and more devices connected to the access control system, real-time and fast processing of the massive data generated by it has become an increasingly important issue.

DolphinDB provides streaming data tables and streaming computing engines for real-time data processing, providing strong support for smart security. This tutorial will introduce how to use the multi-level cascade of the flow computing engine to realize real-time monitoring of the abnormal status of the access control equipment.

1. Background introduction

1.1 Industry background

The continuous development of my country’s new urbanization construction, smart cities and smart communities has provided a huge market expansion space for the smart security industry. Intelligent security systems are extending to security, military, transportation, government, electric power, communications, energy, finance, cultural museums, warehouses, villas, factories and many other industries, covering a wide range of intelligent products and solutions, mainly including intelligent security video Analysis system, intelligent traffic video surveillance system, smart city intelligent surveillance system, customized system based on abnormal behavior, and forward-looking technology exploration of computer vision analysis, etc.

According to people’s growing demand for intelligent security systems, intelligent security systems are gradually developing in the direction of excellent index performance, strong environmental adaptability, reliable and stable operation, and more compatible technologies. Common intelligent security systems generally include four main functions of monitoring, alarm, access control and remote control, which can be operated independently or managed in a unified manner. Access control is the basic application in the entire intelligent security system, and it is also related to public security, urban management, smart home and other aspects in the field of the Internet of Things.

1.2 Real scene

At present, the general access control system works in the client/server mode, including the following function settings, which can complete event monitoring and alarm linkage.

①Access control and alarm comprehensive management system server: provide centralized management and monitoring, output and linkage functions.

②Access control workstation: The access control workstation provides function setting and event monitoring, and can be connected to a card issuing device as a card issuing workstation.

③Alarm input: Each control box has an independent alarm input interface, which is connected to alarm input devices, such as infrared alarms.

④Alarm output: Each control box has an independent alarm output interface, which is connected to alarm output devices, such as sound and light alarms.

⑤Access control controller: It is the core part of the access control management system. It directly manages the cards of the system and controls related equipment. It has a storage function and can store cardholder information and various event records.

⑥Card reader: works in the radio frequency mode, collects the data of the proximity card and transmits it to the access control controller, so that the controller can perform various management and corresponding control.

⑦Electric lock: Electronic switch to realize opening and locking of the door, directly controlled by the access controller.

⑧Open the door button: provide a convenient way to open the door.

⑨Door sensor: detect the status information of the door, and then transmit it to the controller.

⑩ Alarm input and output devices: In order to strengthen the security of the system, the input and output devices can be connected to the input and output interfaces of the access control controller to realize system alarm and linkage.

Fig. 1 Structure of the industrial center access control management system

The above figure shows a common access control management system structure. Generally speaking, the alarm system is an important functional component and link in the security and access control system to ensure safety. It can also realize joint prevention and control with other monitoring equipment. With more and more terminals connected to the smart access control system, how to perform real-time and efficient calculation of massive data and timely feedback of alarm messages has become a key issue in the construction of smart access control and smart communities.

1.3 Advantages of DolphinDB

DolphinDB is a high-performance distributed time-series database that integrates a powerful programming language and a high-capacity, high-speed stream data analysis system to provide a one-stop solution for fast storage, retrieval, analysis and calculation of massive structured data. Applicable to the field of industrial Internet of things. The streaming computing framework provided by DolphinDB has high-performance real-time streaming data processing capabilities, supports data computing at the millisecond or even microsecond level, and is very suitable for the processing and analysis of access control security data.

2. Requirements

Assume that there is a monitoring system that collects data for all access control devices every 5 seconds. At the same time, the event of opening or closing the door will actively report the data, and the collected data will be written to the mqtt server in json format. The data examples used in this article are as follows:

| recordType | doorEventCode | eventDate | readerType | sn | doorNum | card |
|------------|---------------|-------------------- -|------------|-------|---------|----------|
| 0 | 11 | 2022.12.01 00:00:00 | false | a1008 | 1 | ic100000 |
| 1 | 65 | 2022.12.01 00:00:00 | false | a1010 | 2 | ic100000 |
| 3 | 61 | 2022.12.01 00:00:53 | true | a1004 | 1 | ic100044 |
| 2 | 66 | 2022.12.01 00:00:53 | true | a1002 | 2 | ic100020 |
| 2 | 60 | 2022.12.01 00:19:54 | false | a1008 | 1 | ic100000 |
| 3 | 11 | 2022.12.01 00:19:54 | true | a1000 | 2 | ic100000 |
| 2 | 66 | 2022.12.01 00:23:21 | true | a1009 | 1 | ic100082 |
| 2 | 61 | 2022.12.01 00:23:21 | false | a1006 | 2 | ic100068 |
| 3 | 12 | 2022.12.01 00:45:26 | true | a1003 | 1 | ic100000 |
| 1 | 11 | 2022.12.01 00:45:26 | false | a1004 | 2 | ic100000 |

The data fields that need to be used in this tutorial to realize the abnormal state detection of access control are described as follows:

field name

illustrate

doorEventCode

Event code “ 11: Open the door legally 12: Open the door by password 56: Open the door by button 60: Open the door 61: Close the door 65: Open the door by software 66: Close the door by software

eventDate

event time

doorNum

door number 0-4

Keeping the access control normally closed is one of the basic requirements to ensure the safety of residents in the community or building. Therefore, the abnormal state detection requirement of the access control that needs to be realized in this case is: the door is opened continuously for more than 5 minutes and an alarm is issued.

3. Experimental environment

The configuration of the experimental environment is as follows:

  • Server environment:

  • CPU type: Intel(R) Core(TM) i5-11500 @ 2.70GHz 2.71 GHz

  • Total Logical CPUs: 12

  • Memory: 16GB

  • OS: 64-bit Windows

  • DolphinDB server deployment

  • server version: 2.00.8 Windows 64, Community Edition

  • Deployment mode: single node mode

  • DolphinDB GUI: Version 1.30.14

  • MQTT server: mosquitto-2.0.15

4. Design ideas

DolphinDB’s stream computing framework currently provides more than 10 computing engines such as time series aggregation engine, cross-section aggregation engine, anomaly detection engine, session window engine and responsive state engine to deal with different computing scenarios. This article mainly introduces how to use the responsive state engine and the session window engine to realize real-time monitoring of abnormal state of access control.

4.1 Use DolphinDB’s built-in stream computing engine to monitor the abnormal state of access control

  • Reactive State Engine (createReactiveStateEngine)

The factors calculated by the DolphinDB stream data engine can be divided into stateless factors and stateful factors. The stateless factor can complete the calculation based on the latest piece of data only, and does not need the previous data and does not rely on the previous calculation results. In addition to the latest data, stateful factor calculation also requires historical data or intermediate results from previous calculations, which are collectively referred to as “state”. Therefore, stateful factor calculations need to store state for subsequent factor calculations, and each calculation will update the state. Each piece of data input to the responsive state engine will trigger a result output, so the input and output data volumes are consistent. The operators of the responsive state engine can only contain vector functions. DolphinDB has optimized the common state operators (sliding window function, cumulative function, sequence correlation function, topN correlation function, etc.) Computational efficiency of children in reactive state engines.

  • Session Window Engine (creatSessionWindowEngine)

The session window can be understood as an active phase (data generation phase), followed by inactive phases (no data generation phase). The session window engine is very similar to the time series engine, they calculate rules and trigger calculations in the same way. The difference is that the time series engine has a fixed window length and sliding step, but the window of the session window engine is not generated according to a fixed frequency, and its window length is not fixed. The session window engine uses the timestamp of the first piece of data received by the engine as the start time of the first session window. After the session window receives a piece of data, if the next piece of new data is not received within the specified waiting time, then (time stamp of the data + waiting time) is the end time of the window. The timestamp of the first new piece of data received after the window ends is the start time of the new session window.

In the application scenario of the Internet of Things field, due to the different time periods when the devices are online, there may be a large amount of data generated during some time periods, while there is no data at all during some time periods. If the sliding window calculation is performed on the data of such characteristics, the window without data will increase unnecessary calculation overhead. So DolphinDB developed a session window engine to solve such problems.

4.2 Design ideas and solutions

For the requirements of this case, since the access control monitoring equipment uses the polling method to collect data once every 5 seconds, duplication of recorded data will be generated during the time when no new event is reported. Therefore, it is necessary to deduplicate the collected data first, and then detect A record of persistent state timeouts in the data. The record at this time will include all data that lasts for more than 5 minutes, so the data still needs to be connected to the next-level engine to remove the door closing alarm, and only keep the door opening state timeout alarm. According to the characteristics of each engine of DolphinDB, the responsive state engine is used to complete the first and third tasks of filtering and filtering data, and the session window engine is used to detect timeout data. The three engines are cascaded to realize the pipeline processing mode in which the multi-stage engine cascade detects the abnormal access control state with the door opening time greater than 5 minutes.

The processing flow in DolphinDB is shown in the figure below:

Figure 2 Access control abnormal state data processing flow

5. Implementation steps

5.1 Define and share input and output stream data tables

First, define a stream data table for receiving data from access control monitoring devices in real time. The table structure contains seven columns in total. The stream data table is shared and persisted to the hard disk through the enableTableShareAndPersistence function. Set the maximum amount of data that can be stored in memory to 100,000 rows through the cacheSize parameter. code show as below:

st=streamTable(
array(INT,0) as recordype, //record type
array(INT,0) as doorEventCode, //event code
    array(DATETIME,0) as eventDate, //event time
    array(BOOL,0) as readerType, //In and out type 1: in 0: out
   array(SYMBOL,0) as sn, //device SN number
    array(INT,0) as doorNum, //door number
    array(SYMBOL,0) as card //card number
)
enableTableShareAndPersistence(st,`doorRecord, false, true, 100000, 100, 0);

Next, define the abnormal state flow data table outputSt1, which is used for the output of the responsive state engine and persists it to disk. The createReactiveStateEngine reactive state engine has strict requirements on the format of the output table. Its first column must be a grouping column. According to the setting of keyColumn, the first few columns of the output table must be consistent with the columns set by keyColumn and their order. In this example, the grouping column is the door number doorNum and the data type is INT . The next two columns are DATETIME type and INT type, which are used to record time and event code. The code for creating and sharing stream data tables is as follows:

out1 =streamTable(10000:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME,INT])
enableTableShareAndPersistence(out1,`outputSt,false,true,100000)

For detailed descriptions of functions and parameters, refer to the DolphinDB User Manual

5.2 Create a responsive state engine to filter duplicate data

The reactive state engine will calculate and respond to each input message, and generate a record as a result. The calculated results will be output to the result table by default, that is, input n messages and output n records. If you want to output only part of the results, you can enable filter conditions, and only the results that meet the conditions will be output.

The following example checks whether the record data has changed, and only records with changed event types will be output. Set the grouping column to doorNum, and the order of the columns in the output table is the grouping column and the calculation result column. It is necessary to keep the Schema of the next-level engine dummyTable consistent with this order. Set filter to prev(doorEventCode)!=doorEventCode, where the filter conditions are expressed in the form of metacodes, and only the results that meet the filter conditions, that is, the data with a change in the event code, will be output to the output table set by outputTable. The two calculation indicators are eventDate and doorEventCode, which represent the original output.

DolphinDB’s built-in stream computing engines all implement the interface of the data table (table), so the pipeline processing of multiple engines becomes extremely simple, as long as the latter engine is used as the output of the former engine. The introduction of pipeline processing can solve more complex calculation problems. In this example, the output table is connected to the next-level session window engine through the getStreamEngine() method. The specific creation engine code is as follows:

For detailed descriptions of functions and parameters, refer to the DolphinDB User Manual.

reactivEngine1 = createReactiveStateEngine(name=`reactivEngine1,metrics=<[eventDate,doorEventCode]>,
    dummyTable=objByName(`doorRecord), outputTable= getStreamEngine("swEngine"), keyColumn=`doorNum,
    filter=<prev(doorEventCode)!=doorEventCode>)

5.3 Detect status timeout data through cascading session window engine

First, create a memory table to provide the input table structure for the responsive state engine. The table structure needs to be consistent with the output table structure of the upper-level engine. In the session window engine, set the grouping column keyColumn to the door number doorNum, and the time column timeColumn to the time eventDate. The detection requirement is that there is no data alarm within five minutes, so the sessionGap is 300 (in seconds, the same as the eventDate column), which means that the current window will be terminated if no new data arrives after waiting for this time after receiving a certain data. Metrics is set to last(doorEventCode), which returns the last recorded data in the window. Set useSessionStartTime to false, indicating that the time in the output table is the end time of the data window, that is, the time of the last piece of data in each window + sessionGap. After subscribing to stream data, the input data of the session window engine is the output of the upper-level responsive state engine, and the output is used as the input of the next-level responsive state engine. Refer to the createSessionWindowEngine page in the DolphinDB User Manual to complete the setting of other parameters. code show as below:

swOut2 = table(1:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME,INT])
swEngine = createSessionWindowEngine(name="swEngine",sessionGap = 300,metrics=<last(doorEventCode)>,
    dummyTable = objByName(`doorRecord), outputTable = getStreamEngine("reactivEngine"),
    timeColumn = `eventDate, keyColumn =`doorNum, useSessionStartTime = false)

5.4 Responsive state engine filtering door closing alarm

The data acquired by the upper-level session window engine includes the data of opening and closing the door for more than 5 minutes, so it is necessary to filter out the timeout data of the closing state through the responsive state engine, and only keep the door opening alarm. Similar to the upper-level engine, first create a memory table to provide the input table structure for the responsive state engine. In the responsive state engine, set the grouping column keyColumn as the door number doorNum, and the two calculation indicators as eventDate and doorEventCode, indicating the original output. The filter parameter is set to doorEventCode in [11,12,56,60,65,67], that is, only the data whose recorded event code is the door opening event is output. Refer to the createReactiveStateEngine page in the DolphinDB User Manual to complete the setting of other parameters. code show as below:

swOut1 =table(1:0,`eventDate`doorNum`doorEventCode,[DATETIME,INT,INT])
reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[eventDate,doorEventCode]>,
    dummyTable=swOut1, outputTable= objByName(`outputSt), keyColumn= "doorNum",
    filter=<doorEventCode in [11,12,56,60,65,67]>)

5.5 Subscribe to streaming data

After filtering the door closing alarm data, subscribe to the flow data table doorRecord and set the handler to “add data to reactivEngine1”, write the received flow data to the above session window engine, and set msgAsTable to true, indicating that the subscribed data is composed of columns composed of tuples. code show as below:

subscribeTable(tableName="doorRecord", actionName="monitor", offset=0,
               handler=append!{reactivEngine1}, msgAsTable=true

5.6 Receive data from MQTT server

DolphinDB provides an MQTT plug-in for subscribing to data from an MQTT server. The DolphinDB server 2.00.8 linux 64 JIT version already includes the MQTT plug-in in the server/plugins/mqtt directory, which can be directly loaded and used without downloading the plug-in. Users can use mqtt::subscribe to subscribe data from the MQTT server. Data format parsing functions are required when subscribing. Currently, the plug-in provides parsing functions in json and csv formats. In this example, mqtt::createJsonParser is used to parse json format data. The sample code is as follows:

loadPlugin(getHomeDir() + "/plugins/mqtt/PluginMQTTClient.txt")
sp = createJsonParser([INT,INT,DATETIME,BOOL,SYMBOL,INT,SYMBOL],
    `recordType`doorEventCode`eventDate`readerType`sn`doorNum`card)
mqtt::subscribe(host, port, topic, sp, objByName(`doorRecord))

6. Simulate writing and verification

6.1 Analog access control device write data

The following code simulates the door opening and closing events written by the access control device. Data is generated every five seconds, and a total of 350 access control data records are generated. There are 7 non-repeated records and 3 timeout data, of which there are 2 door open timeout records. Simulate The code for writing data is as follows:

def duplicateData(mutable st, num, doorCode, time){
    for(i in 0:num){
        eventTime = time
        st.append!(table(rand(0..5,1) as recordType, doorCode as doorEventCode, eventTime as eventDate, rand([true,false],1) as readerType, rand(`a + string(1000.. 1010),1) as sn, 1 as doorNum, rand(`ic + string(100000..100000),1) as card))
        eventTime = datetimeAdd(eventTime, 5, `s)
    }
}
startEventDate = 2022.12.01T00:00:00
duplicateData(st, 75, 11, startEventDate)
startEventDate=datetimeAdd(startEventDate, 375, `s)
duplicateData(st, 25, 56, startEventDate)
startEventDate=datetimeAdd(startEventDate, 125, `s)
duplicateData(st, 100, 61, startEventDate)
startEventDate=datetimeAdd(startEventDate, 500, `s)
duplicateData(st, 25, 66, startEventDate)
startEventDate=datetimeAdd(startEventDate, 125, `s)
duplicateData(st, 70, 12, startEventDate)
startEventDate=datetimeAdd(startEventDate, 350, `s)
duplicateData(st, 30, 60, startEventDate)
startEventDate=datetimeAdd(startEventDate, 150, `s)
duplicateData(st, 25, 67, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)

6.2 Verify the accuracy of monitoring results

From the simulated data, query the data that has timed out and meets the filter conditions, and compares whether the abnormal data obtained by the stream computing engine is the same as the real abnormal data through the eqObj() method, so as to verify the accuracy of the monitoring results.

t = select *, deltas(eventDate), prev(doorNum), prev(eventDate), prev(doorEventCode)
    from doorRecord context by doorNum
resultTable = select prev_doorNum as doorNum, prev_eventDate + 300 as eventDate,
              prev_doorEventCode as doorEventCode from t
              where deltas_eventDate>= 300 and prev_doorEventCode in [11,12,56,60,65,67]
              and (prev(eventDate)!=eventDate or prev(doorEventCode)!=doorEventCode)
              order by eventDate
eqObj(resultTable.values(), outputSt1.values())

7. Summary

Today, with the rapid development of network and digital technology, the access control system is no longer a simple doorway and key management, but has gradually developed into a complete access control security management system. Today’s access control management system integrates microcomputer automatic identification technology and modern security management measures, involving many new technologies such as electronics, machinery, optics, acoustics, computer technology, communication technology, biotechnology, etc., and realizes entrance and exit security for various confidential departments. Management provides effective measures.

Based on the DolphinDB streaming data processing framework, this tutorial provides a low-latency solution for real-time monitoring of abnormal status of access control equipment, which can effectively improve the real-time computing efficiency for massive data and meet the intelligent computing needs of the access control system. Utilizing the pipeline processing method of the engine in the DolphinDB streaming data processing framework, the cascading of the session window engine and the responsive state engine is realized, which greatly reduces the difficulty of development. This tutorial aims to improve developers’ development efficiency when using DolphinDB’s built-in stream data framework to develop stream computing business scenarios in the IoT field, reduce development difficulty, and better tap the value of DolphinDB in complex real-time stream computing scenarios.

References

  1. “Smart Building Technology”

  1. “Security & Intelligence”

  1. “The Importance of Smart Security to Smart Communities”

  1. “Talking about why access control is the foundation of smart home security”

  1. “Application and Prospect of Intelligent Security System in Smart City”

  1. “On the Present and Future of Intelligent Access Control System”

Appendix

Code: streaming_engine_anomaly_alerts

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge MySQL entry skill tree Database composition Table 44893 people are studying systematically