18 ifstream file(filename);
19 uint64_t line_count = 0;
21 while (getline(file, line)) line_count++;
32void SplitFiles(
const vector<string>& output_files,
const string& input_file)
34 ifstream input(input_file);
36 vector<ofstream> outputs((
int)output_files.size());
38 filesystem::path path_string(input_file);
39 string filename=path_string.filename();
40 cout << filename << endl;
41 cout << input_file<<endl;
42 for (
int i = 0; i < (int)output_files.size(); i++) {
43 string output_filename = output_files[i]+
"/"+filename;
44 outputs[i].open(output_filename);
46 cerr <<
"Error: Can't create file: " << output_filename << endl;
53 uint64_t chunk_count=(
ConutLines(input_file)+(uint64_t)output_files.size())/(uint64_t)output_files.size();
54 uint64_t file_index = 0;
55 uint64_t line_count = 0;
56 while (getline(input, line)) {
58 outputs[file_index] << line << endl;
60 if (line_count % chunk_count == 0) {
67 for (
auto& output : outputs) {
80void MergeFiles(
const vector<string>& input_files,
const string& output_file)
82 ofstream output(output_file, ios::binary);
84 cerr <<
"Failed to open output file: " << output_file << endl;
88 for (
const auto &input_file : input_files) {
89 ifstream input(input_file, ios::binary);
91 cerr <<
"Failed to open input file: " << input_file << endl;
96 output << input.rdbuf();
105 const string output_folder_path)
112 uint32_t min_length= 5;
113 uint32_t max_length = 5000;
114 set<string> accept_language{
"__label__ja"};
116 bool store_rejected =
true;
117 bool execute_sentence_segment =
false;
118 double language_threshold = 0.3;
119 double perplexity_threshold = 40000;
121 const string blacklist_file_path = output_folder_path+
"/blacklist.txt";
125 LSHDeduplicator deduplicator(
true,blacklist_file_path,
true,1280000000);
134 execute_sentence_segment,
136 perplexity_threshold,
147 const string original_folder_path =
"../../results/dataset/original/";
148 const string base_folder_path =
"../../results/dataset/";
149 const string results_folder_path =
"../../results/dataset/cleaned/";
150 filesystem::create_directories(fs::path(results_folder_path));
153 vector<string> filelist;
157 int32_t num_threads = std::thread::hardware_concurrency() - 4;
158 num_threads = (num_threads>0) ? num_threads : 1;
160 cout <<
"Multi Process Number: "<< num_threads << endl;
163 vector<string> output_files;
164 for(
int i=0;i<num_threads;i++){
165 string temp = base_folder_path+to_string(i)+
"/input/";
166 fs::path new_directory(temp);
167 filesystem::create_directories(new_directory);
168 output_files.push_back(temp);
171 for(
auto file:filelist)
SplitFiles(output_files, original_folder_path+
"/"+file+
".txt");
174 vector<thread> threads;
175 for(
int i=0;i<num_threads;i++){
176 string input_folder_path = base_folder_path+to_string(i)+
"/input/";
177 string output_folder_path = base_folder_path+to_string(i)+
"/output/";
184 for(
auto& process : threads) {
189 for(
auto file:filelist){
190 vector<string> splited_filelist;
191 for(
int i=0;i<num_threads;i++) splited_filelist.push_back(base_folder_path+to_string(i)+
"/output/cleaned/"+file+
".jsonl");
192 MergeFiles(splited_filelist,results_folder_path+file+
".jsonl");
int32_t CleanPipeline(void)
Pipeline that sequentially executes the configured CorpusCleaner methods.
void SplitFiles(const vector< string > &output_files, const string &input_file)
split one file into multiple equal parts based on the number of lines
void MergeFiles(const vector< string > &input_files, const string &output_file)
split one file into multiple equal parts based on the number of lines
uint64_t ConutLines(const string &filename)
Get line count of filename file.
void MultiProcessCorpusClean(const string input_folder_path, const string output_folder_path)
void GetFileNameListWithoutExtention(const string folder_path, vector< string > *file_list)
Get filename list in folder_path.