TDengine database-TAOS Taosi data-batch download hundreds of millions of big data into csv to solve bug: Query interrupted (Query terminated) 4798749 row(s) in set

Table of Contents

  • foreword
  • The taos shell command batch download database encounters an interruption problem
  • analyse problem:
  • solution
    • View the tao.cfg file
    • Use pagination to download, merge csv
      • 1. Build .sql files to download in batches
      • 2. Merge sub-csv files into a total csv file
  • Summarize
  • Other information download

Foreword

TDengine is an open-source, cloud-native Time Series Database developed by a Chinese team, designed and optimized for scenarios such as the Internet of Things, Industrial Internet, finance, and IT operation and maintenance monitoring. It enables efficient and real-time processing of terabytes or even petabytes of data generated by a large number of devices and data collectors every day, real-time monitoring and early warning of business operation status, and mining of business value from big data.

It has all the functions needed to process IoT data, including:

  • SQL-like query language to insert or query data
  • Support development interfaces such as C/C++, Java(JDBC), Python, Go, RESTful, and Node.JS
  • Various Ad Hoc query analysis can be done through TDengine Shell or Python/R/Matlab
  • Through continuous query, support streaming computing based on sliding window
  • Introduce super tables to make data aggregation between devices simple and flexible through tags
  • Embedded message queue, applications can subscribe to the latest data
  • Built-in cache mechanism, the latest status or records of each device can be quickly obtained
  • No distinction between historical data and real-time data

taos shell command batch download database encounters an interruption problem

When using the taos shell command to directly download a super table to the specified csv file, it is found that when the data in the table exceeds several million, the request will be interrupted:

Query interrupted (Query terminated), 4798749 row(s) in set (357.461000s)

The specific taos shell commands are as follows:

SELECT * FROM <database_name> >> <***.csv>;

Analysis problem:

This kind of problem is likely to be caused by the large amount of data, insufficient computer cache, or restrictions on the internal configuration file when the taos database reads a large table.

Solution

View tao.cfg file

Try not to set these max restrictions

Use paged download, merge csv

1. Build .sql files to download in batches

The basic steps:

  1. Get the taos super table name
  2. According to the super table name and page limit number, print the SQL statement

Note here that the paging Taos command statement is as follows:

SELECT * FROM <table_name> LIMIT 1000000 offset 1000000 >> data_<number>.csv

The detailed code is as follows:

import taos

def get_number(numbers, limit_numbers):
    number_list = []
    mid_number = numbers//limit_numbers
    mod_number = numbers% limit_numbers
    for i in range(mid_number + 1):
        # print(i)
        if mod_number and i!=mid_number:
            number_list.append([limit_numbers,i*limit_numbers])
            
        elif mod_number and i==mid_number:
            number_list.append([mod_number, i*limit_numbers])
        elif i!=mid_number:
            number_list.append([limit_numbers,i*limit_numbers])
        else:
            pass
            
    # print(number_list)
    return number_list


class UtilsTao:

    def __init__(self,db_name,port,hostname,username,password):
        self.db_name = db_name
        self.port = port
        self.hostname = hostname
        self. username = username
        self.password = password
        self.conn, self.cursor = self.connect_taos_db() # Connect to database and cursor.


    def connect_taos_db(self):
        # Connect to the Taos database
        conn = taos.connect(host=self.hostname, port=self.port, user=self.username, password=self.password, database=self.db_name)
        cursor = conn. cursor()
        cursor. execute("use {}". format(self. db_name))
        return conn, cursor

    def printsql(self, super_tables):
        for table in super_tables:
            # View the number of rows in the specified table
            self. cursor. execute("SELECT COUNT(*) FROM {}". format(table))
            # If the number of rows exceeds 1 million, perform pagination to obtain the data in the table, and print out the sql statement.
            results = self. cursor. fetchall() # Get all rows from table.
            length = int(results[0][0])
            if length >= 1000000:
                number_list = get_number(numbers=length, limit_numbers=1000000)
                # Execute a pagination query to obtain a list of the row numbers of the data in the table. print it out
                for i in number_list:
                    if i[1] == 0 :
                        sql = "SELECT * FROM {} LIMIT 1000000 >> {}_{}_data_{}.csv;".format(table,self.db_name,table,i[1])
                        print(sql)
                        # self. cursor. execute(sql)
                    else:
                        sql = "SELECT * FROM {} LIMIT 1000000 offset {} >> {}_{}_data_{}.csv;".format(table,i[1],self.db_name,table,i[1])
                        print(sql)
                        # self. cursor. execute(sql)
            else:
                sql_content = "SELECT * FROM {} >> {}_{}_data.csv;".format(table,self.db_name,table)
                # self. cursor. execute(sql_content)
                print(sql_content)
                # results_csv = self. cursor. fetchall()
        

    def close_taos_db(self):
        # Close the Taos database
        # Close the database connection
        self. cursor. close()
        self. conn. close()

    def get_super_tables_name(self):
        # get all super tablesname from database;
        sql = "SHOW stables"# Show all tables name in database.
        self. cursor. execute(sql) # Execute the SQL statement.
        # Get a list of super table names

        results = self. cursor. fetchall()
        super_tables = [row[0] for row in results]

        # Output list of super table names
        print(super_tables)
        return super_tables
        
        
if __name__ == '__main__':
    db_name, hostname, port, username, password = 'xxx', 'xxxx',8888, 'xxxx', 'xxxxx'
    tao_ = UtilsTao(db_name,port,hostname,username,password)
    super_tables = tao_.get_super_tables_name()
    tao_.printsql(super_tables)
    tao_.close_taos_db()

The output is as follows:

  1. Copy SQL statement and generate .sql file
  2. Run the batch download command in the taos Shell command window
source sql_batch.sql


It can be seen that the query is successfully OK, which means it has been downloaded. Here we can see that some data volume is less than 1 million, so it is directly downloaded into a csv file.

2. Merge sub-csv files into a total csv file

Next, merge the paged csv files into a total csv file. The specific code is as follows:

import pandas as pd
import os

class ConcatCsv:
    def __init__(self, dir, db_name):
        self.dir = dir
        self.dbname = db_name # database name
        
    def get_table_list(self):
        file_name = os.listdir(self.dir) # Get a list of file names in the folder.
        table_dict = {<!-- -->}
        for file_ in file_name:
            table_name = file_.split(self.dbname)[1].split("_")[1]
            if table_name in table_dict.keys(): # Check if the table name is in the list. If so, add it to the corresponding list.
                number = table_dict[table_name] + 1
                table_dict.update({<!-- -->table_name: number})
            else:
                table_dict.update({<!-- -->table_name: 1})
        return file_name, table_dict
    
    def concat_csv(self):
        file_name, table_dict = self. get_table_list()
        for table_name, number in table_dict.items():
            if number == 1:
                pass
            else:
                list_table = [file for file in file_name if table_name in file]
                #Sort the list_table, pay attention to sorting, only for time series data can the csv files be merged in order
                list_table.sort(key=lambda x: str(x.split("_")[-1]))
                print(list_table)
                for i, table_ in enumerate(list_table):
                    if i == 0:
                        c = pd.read_csv(self.dir + os.sep + table_)
                    else:
                        c = pd.concat([c, pd.read_csv(self.dir + os.sep + table_)],axis=0, ignore_index=False)
                c.to_csv(self.dir + os.sep + self.dbname + "_" + table_name + ".csv", index= False)
                
                for table_ in list_table: # Delete the old file list.
                    os.remove(self.dir + os.sep + table_)
            
        print("concat_csv task is done!" )



if __name__ == '__main__':
    dir_path = r'E:\Data\Database'
    table_dict = ConcatCsv(dir_path, 'xxxx').concat_csv()

Note: After merging into the total csv file, the sub-csv files will be deleted, and only the total csv file will be kept at the end

Summary

On the whole, the download speed of the tables in the taos database is still very fast, with an average of 1 million data, and a maximum of about 200-300 seconds. Using the method in this article, you can download in batches through the command window of taos, and finally generate the desired total csv file.

Other data download

If you want to continue to learn about artificial intelligence-related learning routes and knowledge systems, welcome to read my other blog “Heavy | Complete artificial intelligence AI learning-basic knowledge learning route, all materials can be downloaded directly from the network disk without paying attention to routines “
This blog refers to Github’s well-known open source platform, AI technology platform and experts in related fields: Datawhale, ApacheCN, AI Youdao and Dr. Huang Haiguang, etc. There are about 100G related materials, hoping to help all friends.