I have the following code to preprocess large file of text. To accelerate the processing, I try to use multiple processes. Before launching the pool, I split my file into smaller data files, one destined to each process.
Each process take as input one data file, preprocess it and put the result into a “preprocessed” json file.
PROCESS_NB = 8
def task(task_data):
input_file_path = "data/input/{}.tsv".format(task_data[0])
output_file_path = "data/preprocessed_files/preprocessed_{}.json".format(task_data[0])
with open(input_file_path, "r") as input_file:
content = input_file.read()
preprocessed_collection = preprocess(text_collection=content, stopwords=task_data[1].stopwords)
with open(output_file_path, "w") as preprocessed_file:
preprocessed_file.write(json.dumps(preprocessed_collection))
input_file.close()
preprocessed_file.close()
def run():
config = Config()
prepare_inputs_file()
filename_list = []
for i in range(1, PROCESS_NB):
filename_list.append(["data_{}".format(str(i)), config])
pool = Pool(processes=PROCESS_NB)
pool.map(task, filename_list)
In results, I wan’t to have 8 data files and 8 preprocessed files. But I wan’t figure out why I have 8 data files but only 7 preprocessed files. It seems like the eighth process can’t finish his job.
I tried to print when each process start and end, and also to catch possible exceptions. But only 7 processes seems to run.