Parallel programming|A PySpark multi-process programming practice

Recently, I encountered a requirement at work. In terms of program design, considering the need for parallel processing tasks, I tried to learn and call the multiprocessing library to operate tasks in multiple processes. Before reciting stereotyped essays and asking questions related to parallelism and concurrency, muscle memory blurted out. But when you really need to use the corresponding knowledge, you can only sigh: this knowledge, it does not enter the brain! It just so happens that I took this opportunity to learn how to implement parallel programming in the project. I will briefly record it here.

Questions

Process data based on PySpark, input the data source and the conversion condition of a certain column of data, modify the data of the corresponding column in the data source according to the conversion condition, save and output. At the same time, it is required to input either a single data source or multiple data sources.

(Author’s Note: For compliance reasons, this article only describes the technical details, and the work and business related content is hidden…)

Problem Analysis

We first need to consider two issues, one is to use PySpark to transform data, and the other is to process multiple data source inputs. For data processing, PySpark provides a rich API, we can use related methods similar to SQL Case-When statements, or use mapPartitions method to convert data by creating RDD. In contrast, the latter is more flexible. We can quickly go through the usage postures of the two through two simple examples.

Spark data conversion

First, create a simple DataFrame as a data example, the code is as follows:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("playground").getOrCreate()
data = [("James","M",60000),("Michael","M",70000),
        ("Robert",None,400000),("Maria", "F",500000),
        ("Jen","",None)]
columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df. show()

Run the above code with tools such as Jupyter Notebook, you can print a DataFrame:

 + --------- + -------- + -------- +
| name | gender | salary |
 + --------- + -------- + -------- +
| James | M | 60000 |
| Michael | M | 70000 |
| Robert | null | 400000 |
| Maria | F | 500000 |
| Jen | | null |
 + --------- + -------- + -------- +

At this point, if we want to change the abbreviated value represented by gender to a gender word, then we can use the when method to achieve the effect similar to the Case-When statement in the SQL language:

from pyspark.sql.functions import when


df2 = df.withColumn("new_gender", when(df.gender == "M","Male")
                                 .when(df.gender == "F","Female")
                                 .when(df.gender.isNull() ,"")
                                 .otherwise(df.gender))
df2. show()

Looking at the printout, we get:

 + ------- + ------ + ------ + ---------- +
| name|gender|salary|new_gender|
 + ------- + ------ + ------ + ---------- +
| James| M| 60000| Male|
|Michael| M| 70000| Male|
| Robert| null|400000| |
| Maria| F|500000| Female|
| Jen| | null| |
 + ------- + ------ + ------ + ---------- +

The method when accepts two parameters, one is the data conversion judgment condition, and the other is the data conversion target value. Friends who are familiar with SQL must accept when in Spark very quickly. Another way of data conversion is through the mapPartitions method of RDD. In this case, a function needs to be defined to describe the conversion rules. For example, the same conversion as above can be done like this:

def transform(rows):
    mapping = {<!-- -->"M": "Male", "F": "Female"}
    
    for row in rows:
        if not isinstance(row, dict):
            instance = row.asDict(recursive=True)
        else:
            instance = row
            
        if row["gender"] is None:
            row["new_gender"] = ""
            yield row
        
        if row["gender"] in ["M", "F"]:
            row["new_gender"] = mapping[row["gender"]]
            yield row
            
        row["new_gender"] = row["gender"]
            
        yield row

df3 = df.rdd.mapPartitions(transform)

The above code can also print out the content consistent with df2. After understanding some of the usage postures of Spark data conversion, let’s consider how to deal with multiple data source inputs.

Multiple data sources processing

Intuitively, we naturally think of defining the input as a data source or a list of data sources (assuming that there is a function that allows us to obtain the data source content according to the data source name), then the input type is defined as follows:

from typing import List, Union

DataSource = Union[str, List[str]]

In order to be able to process one or more data sources at the same time, we will uniformly convert the data sources into List type values, then traverse each data source, and complete the data conversion, for example:

def convert(data_source: DataSource) -> DataFrame:
        data_source = data_source if isinstance(data_source, list) else [datasource].
        
        for source in data_source:
            # ...code
        
        return #...code

It can be seen from the above that we can easily complete the data conversion, but a fatal factor is the time-consuming task. If there are a large number of data sources and the same data conversion needs to be done, it is necessary to continue to wait for each task to complete. And this is exactly where parallel programming comes in.

Python multiprocessing

When encountering a scenario where multiple tasks are running and a single task takes a long time, a word will immediately pop up in our minds – “parallel”, accompanied by another word – “concurrency”. The two look very similar, and it is easy to confuse each other under the literal meaning. However, the essential difference between the two is that the former means that different tasks are running at the same time, and the latter means that different tasks are running alternately. Either way, it’s a huge improvement in reducing the time it takes to run tasks one by one. But if the resources are sufficient, the tasks can run at the same time, why not do it?

As far as Python is concerned, we can achieve parallel programming through the built-in multiprocesssing library. First, let’s take a look at how to start multiple processes:

from multiprocessing import Process

def proc(i: int) -> None:
    print("I'm Process {}". format(i))
    
if __name__ == "__main__":
    for i in range(10):
        Process(target=proc, args=(i,)).start()

As shown above, we create a very small task proc that accepts a numeric parameter and prints a string. Next, we create a process by instantiating the Process class, and pass in the target and args parameters, which represent the tasks running in the process, and The parameters required for the task to run; then call the start method to start the process.

So the question is, what if we want multiple processes to communicate with each other? The multiprocesssing library provides another class for communication between processes, the Queue. Let’s take a brief look at its usage:

from multiprocessing import Process, Queue

def proc(i: int, queue: Queue) -> None:
    message = f'I am Process {<!-- -->i}'
    queue. put(message)

if __name__ == "__main__":
    queue = Queue()
    
    for i in range(10):
        Process(target=proc, args=(i, queue,)).start()
        
    for i in range(10):
        message = queue. get()
        print(message)

First, we create a queue instance and pass it as a parameter to the task proc, and when the task is running, use the put method to add messages to the queue; similarly, We can use the get method to get the message out of the queue. It should be noted that the operation of adding a message to the queue is performed in a different process, which is an asynchronous operation; while taking the message out of the queue is a synchronous operation to prevent message loss.

It looks like everything is ready, but we still owe a stake. Obviously, it is impossible for us to start an unlimited number of processes to run tasks at the same time, so there will probably be problems. Fortunately, the multiprocesssing library provides Pool, which allows us to create countless processes and keep some processes active, of course it depends on Pool size, for example:

from multiprocessing import Pool, Process, Queue

def proc(i: int, queue: Queue) -> None:
    message = f'I am Process {<!-- -->i}'
    queue. put(message)

def consume(i: int, queue: Queue) -> None:
    message = queue. get()
    print(message)
    
if __name__ == "__main__":
    queue = Queue()
    
    for i in range(10):
        Process(target=proc, args=(i, queue,)).start()
        
    pool = Pool(10)
    
    consumers = []
    for i in range(10):
        consumers.append(pool.apply_async(consume, (i, queue,)))
        
    [r. get() for r in readers]

It looks quite elegant, but if you actually run the above code, you will get an Exeption:

RuntimeError: Queue objects should only be shared between processes through inheritance.

That is to say, directly create and start multiple different processes, and the communication between processes will not work here. Don’t panic, we can introduce a new class Manager, which is specially used to create data shared between different processes, so we can modify it slightly:

from multiprocessing import Manager, Pool, Process, Queue

def proc(i: int, queue: Queue) -> None:
    message = f'I am Process {<!-- -->i}'
    queue. put(message)

def consume(i: int, queue: Queue) -> None:
    message = queue. get()
    print(message)
    
if __name__ == "__main__":
    manager = Manager()
    
    queue = manager. Queue()
    for i in range(10):
        Process(target=proc, args=(i, queue,)).start()
        
    pool = Pool(10)
    
    consumers = []
    for i in range(10):
        consumers.append(pool.apply_async(consume, (i, queue,)))
        
    [r. get() for r in readers]

In this way, we can use multiple different processes to run the same task and share task data! Going back to our previous question, for multiple data sources, the conditions for data conversion are the same. We can start multiple processes to process data at the same time, which can greatly reduce the time consumption of tasks.

PySpark multi-process data processing

The previous article assumes that we can get the data content through the data source name. Here we can define it more specifically, and obtain the storage location corresponding to the data through the data source name (regardless of how it is obtained):

def get_datasource_path(datasource_name: str) -> str:
    #...
    return path

Define tasks

Since the data is processed by multiple processes, we first define the task itself clearly, that is, based on the PySpark library, given the conversion conditions, and output the converted data. The logic related to data conversion is omitted here, and it is not complicated in itself. Based on the methods provided by PySpark, you can learn and use them flexibly:

from pyspark.sql import DataFrame, SparkSession

spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

def task(path: str) -> DataFrame:
    df = spark.read.json(path)
    
    #...
    return df

Multiprocessing tasks

Combined with the above Python multi-process related knowledge, we need to write three functions, one creates and starts a multi-process processing task, the other collects the task running results, and the third manages the first two functions. An additional thing to note is that since we use multiple processes to process the same task with different parameters, if we need to combine and output all the task results, we may need to write another function to combine the task output results.

from multiprocessing import Manager, Pool, Process, Queue, cpu_count
from pyspark.sql import DataFrame

def dispatch(queue: Queue, path: str) -> None:
    df = task(path)
    queue. put(df)
    
def listen(queue: Queue) -> DataFrame:
    df = queue. get()
    return df
    
def execute() -> DataFrame:
    manager = Manager()
    
    queue = manager. Queue()
    
    datasource_path_list = [
        get_datasource_path(datasource) for datasource in datasource_list
    ]
    for path in datasource_path_list:
        process = Process(target=dispatch, args=(queue, path,))
        process. start()
    
    pool = Pool(cpu_count())
    listeners = [
        pool.apply_async(listen, (queue,)) for _ in range(len(datasource_list))
    ]
    
    try:
        results = [listener. get() for listener in listeners]
        return merge(results)
    except:
        pool. terminate()
        pool.join
        
    return

Here, we use the cpu_count method provided by multiprocessingsing to get the number of CPUs as the pool size, and make full use of resources while pursuing task performance. However, if we directly run the above code, the result is not satisfactory, and a large number of Spark errors will be reported, and some tasks cannot be executed at all. Frankly speaking, I did not find the real reason for the error. I can only guess that the Spark tasks cannot be started at the same time, so before starting, I gave a waiting time and staggered the start of the tasks. It is still very acceptable to get up and run this result almost at the same time:

import time

def execute() -> DataFrame:
    #...
    for path in datasource_path_list:
        time. sleep(15)
        process = Process(target=dispatch, args=(queue, path,))
        process. start()
    #...

Well, the above is a record of PySpark multi-process programming practice. In fact, at the beginning, I was stuck with the problem that some of the Spark tasks that started with multiple processes could run and some could not. I have been searching for how to use Pool to start multiple Spark tasks. In the end, although I didn’t find the reason, I found out the method that can be used, and I am old and pragmatic. Hope to help students in need, the above!