Throwing bricks and attracting jade: the combination of Redis and interface automation testing framework

Directory

Redis Basic Operations

Operate Redis common commands

Application of Redis in interface automation

Common application scenarios in interface automation testing

?More dry goods

How to download the full version of the document:

Interface automated testing has become an important means to ensure software quality and stability. As a high-performance cache database, Redis has the characteristics of fast reading and writing and multiple data structures, which provides strong support for interface automation testing. Brother Yong here briefly introduces how to combine Python to operate Redis and apply it to the interface automation testing framework to improve test efficiency and data management capabilities.

Redis basic operations

(1) Redis installation and configuration
Before starting, you first need to install Redis and configure it accordingly.
Redis official website: Redis
Redis Chinese Network: Redis Chinese Network
After the installation is complete, make sure that the Redis service has been successfully started, and the connection information (such as host address, port number, password, etc.) has been correctly configured. This information will not be introduced too much!

(2) Integration of Redis and interface automation testing framework
To use Python to operate Redis, you need to import the corresponding client library, for example:

pip install redis
import redis

(3) Initialize the Redis connection
During the initialization process of the interface automation test framework, you can add the code to connect to Redis to ensure that the connection to Redis can be established during the test.

class TestFramework:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, password='your_password')
 

Operation Redis common commands

(4) String operation

# Set the string value of the key "key1" to "Hello, Redis!"
r.set('key1', 'Hello, Redis!')
 
# Get the string value whose key is "key1"
value = r.get('key1')
print(value) # Output: b'Hello, Redis!'

(5) List operation

# Insert elements to the left of the list named "list1"
r.lpush('list1', 'item1')
r.lpush('list1', 'item2')
r.lpush('list1', 'item3')
 
# Get all elements of the list named "list1"
items = r.lrange('list1', 0, -1)
print(items) # output: [b'item3', b'item2', b'item1']
 

(6) Hash table operation

# Set the hash table field and value named "hash1"
r.hset('hash1', 'field1', 'value1')
r.hset('hash1', 'field2', 'value2')
 
# Get the hash table field and value named "hash1"
value1 = r.hget('hash1', 'field1')
value2 = r.hget('hash1', 'field2')
print(value1, value2) # output: b'value1' b'value2'
 

(7) Collection operations

# Add elements to the collection named "set1"
r. sadd('set1', 'item1')
r. sadd('set1', 'item2')
r. sadd('set1', 'item3')
 
# Get all elements of the collection named "set1"
items = r. smembers('set1')
print(items) # output: {b'item1', b'item2', b'item3'}
 

The above are the common operations of redis, is it simpler than writing `sql` statements, isn’t it `so easy!! `

Application of Redis in Interface Automation

Encapsulate Redis operation method

In order to facilitate the use of the interface automation testing framework, it is necessary to start encapsulation again. The simple encapsulation code is as follows:

class RedisClient:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, password='your_password')
 
    def set_data(self, key, value, expire_time=None):
        self.redis.set(key, value)
        if expire_time is not None:
            self.redis.expire(key, expire_time)
 
    def get_data(self, key):
        return self. redis. get(key)
 
    def delete_data(self, key):
        self.redis.delete(key)
 
    def hash_set_field(self, key, field, value):
        self.redis.hset(key, field, value)
 
    def hash_get_field(self, key, field):
        return self.redis.hget(key, field)
 
    def hash_delete_field(self, key, field):
        self.redis.hdel(key, field)

Strings are commonly used in interface automation. In order to meet the needs of more scenarios, we encapsulate the operation method of the price hash data structure.

Common application scenarios in interface automation testing

(1) Test data management
In interface automation testing, test data is stored in Redis, such as user information, configuration parameters, etc. By using the encapsulated Redis operation method, data can be added, deleted, modified, and checked conveniently.

redis_client= RedisClient()
redis_client.set_data('user:1', '{"name": "kira", "age": 18}')
user = redis_client.get_data('user:1')
print(user.decode()) # Output: {"name": "kira", "age": 18}

(2) Processing interface dependent data

The general steps are as follows:

  • First clarify the dependencies of the interface: Who should call who or whom before calling whom?
  • Set data to redis: that is, after the successful execution of interface B, store the key data in redis, you can use our encapsulated set, the key is generally an identifier, and the value is the return value of the interface
  • Get data from redis: For example, before interface A is executed, get B data and save it in Redis, and then call redis to get data to A or B, C, etc.

Above code:

redis_client = RedisClient()
# The first interface, set dependent data
def first_api():
    response = requests. get('https://api.example.com/first')
    data = response.json()
    redis_client.set_data('key', data['value'])
def second_api():
    # Get dependent data
    dependency_data = redis_client. get_data('key')
    response = requests. post('https://api.example.com/second', data={'data': dependency_data})
    result = response.json()
    # Process the interface response result
if __name__ == '__main__':
    first_api()
    second_api()
 

(3) Cache management

What if you encounter an interface that requires frequent access?
In order to reduce the overhead of interface calls and improve test efficiency, Redis can be used as a cache tool to cache the response results of the interface so that subsequent test cases can be reused.

redis_client= RedisClient()
def get_user_info(user_id):
    cache_key = f'user:{user_id}'
    user_info = redis_client.get_data(cache_key)
    if not user_info:
        # Call the interface to get user information
        user_info = api. get_user_info(user_id)
        redis_client.set_data(cache_key, user_info, expire_time=3600)
    return user_info

We first check whether the corresponding user information already exists in the Redis cache, if not, call the interface to obtain the user information and store it in the Redis cache for subsequent use. At the same time, by setting the expire_time parameter, you can set the expiration time for the cached data to avoid the use of expired data.

(4) Concurrency test

In automated testing, testing for concurrent scenarios is very important. We can simulate some actual scenarios concurrently, for example: use the atomicity and distributed locks of redis to create a unique identifier for each user and store it in redis, so that different user requests can simulate concurrent access by checking and comparing the results of redis, for example:

# Create Redis client
redis_client = RedisClient()
 
def get_user_info(user_id):
    cache_key = f'user:{user_id}'
    user_info = redis_client.get_data(cache_key)
    if not user_info:
        # Call the interface to get user information
        response = requests.get(f'http://127.0.0.1:5000/?user_id={user_id}')
        if response.status_code == 200:
            user_info = response.text
            print(user_info)
            redis_client.set_data(cache_key, user_info, expire_time=3600)
        else:
            print(f"Failed to retrieve user info for user_id: {user_id}. Status code: {response. status_code}")
    return user_info
 
# concurrent test function
def run_concurrent_test(user_ids):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the thread pool
        future_to_user_id = {executor. submit(get_user_info, user_id): user_id for user_id in user_ids}
        # process the returned result
        for future in concurrent.futures.as_completed(future_to_user_id):
            user_id = future_to_user_id[future]
            try:
                user_info = future. result()
                print(f"user_id: {user_id}; user_info: {user_info}")
            except Exception as e:
                print(f"Error occurred for user_id: {user_id}, Error: {str(e)}")
 
if __name__ == '__main__':
    u_ids = [i for i in range(10, 99)]
    run_concurrent_test(u_ids)

We create a thread pool and use submit to submit tasks (get_user_info) to the thread pool. Each task has a user_id. Here, each user id is simply printed. For the information, by executing multiple tasks concurrently, multiple user information can be obtained at the same time to improve test efficiency.

?More dry goods

How to download the full version of the document:

These materials should be the most comprehensive and complete preparation warehouse for friends who are engaged in [software testing] and other related work. This warehouse has also accompanied me through the most difficult journey, and I hope it can help you too! All of the above can be shared.

Interact with me in the comment area or privately? I [Software Testing and Learning] can get it, and you are welcome to take it away.

If my blog is helpful to you, if you like my blog content, please “Like”, “Comment”, “Favorite” with one click!