Skip to content
Snippets Groups Projects
Commit 16f64fbe authored by joeld's avatar joeld
Browse files

added pipline performance measurement logs

parent defc1823
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ import os
import gzip
import json
import glob
import time
import multiprocessing
from pathlib import Path
......@@ -47,12 +48,16 @@ class ShardProcessor:
self.batch_size = batch_size
def process(self):
lines_count = 0
tokens_count = 0
with open(self.shard_file, "r", encoding="utf-8") as infile, open(self.output_file, "w", encoding="utf-8") as outfile:
batch = []
for line in infile:
data = json.loads(line)
text = data.get("text", "")
tokens = self.preprocessor.preprocess(text)
lines_count += 1
tokens_count += len(tokens)
batch.append({"tokens": tokens})
if len(batch) >= self.batch_size:
for item in batch:
......@@ -61,30 +66,53 @@ class ShardProcessor:
if batch:
for item in batch:
outfile.write(json.dumps(item) + "\n")
return lines_count, tokens_count
INPUT_DIR = r"C:\Dev_Projects\HTYLLM-PG\data\c4_realnewslike"
OUTPUT_DIR = r"C:\Dev_Projects\HTYLLM-PG\data\processed"
TOTAL_PROCS = int(os.getenv("TOTAL_PROCS", "20"))
def test_pipeline(proc_rank):
def test_pipeline(proc_rank, return_queue):
start_total = time.time()
loader = DataLoader(input_dir=INPUT_DIR, output_dir=OUTPUT_DIR, total_procs=TOTAL_PROCS, proc_rank=proc_rank)
start_partition = time.time()
loader.partition_data()
partition_time = time.time() - start_partition
shard_file = os.path.join(OUTPUT_DIR, f"shard_{proc_rank}.jsonl")
output_file = os.path.join(OUTPUT_DIR, f"processed_{proc_rank}.jsonl")
preprocessor = Preprocessor()
processor = ShardProcessor(shard_file, output_file, preprocessor)
processor.process()
print(f"Process {proc_rank} completed.")
start_processing = time.time()
lines_count, tokens_count = processor.process()
processing_time = time.time() - start_processing
total_time = time.time() - start_total
avg_tokens = tokens_count / lines_count if lines_count else 0
print(f"Proc {proc_rank}: Partition {partition_time:.2f}s, Processing {processing_time:.2f}s, Total {total_time:.2f}s, Lines {lines_count}, Tokens {tokens_count}, Avg Tokens/Line {avg_tokens:.2f}")
return_queue.put((proc_rank, partition_time, processing_time, total_time, lines_count, tokens_count))
def main():
manager = multiprocessing.Manager()
return_queue = manager.Queue()
processes = []
for proc_rank in range(TOTAL_PROCS):
p = multiprocessing.Process(target=test_pipeline, args=(proc_rank,))
p = multiprocessing.Process(target=test_pipeline, args=(proc_rank, return_queue))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All processes completed.")
results = []
while not return_queue.empty():
results.append(return_queue.get())
total_lines = sum(r[4] for r in results)
total_tokens = sum(r[5] for r in results)
avg_tokens_all = total_tokens / total_lines if total_lines else 0
avg_partition = sum(r[1] for r in results) / len(results)
avg_processing = sum(r[2] for r in results) / len(results)
avg_total = sum(r[3] for r in results) / len(results)
print(f"Avg Partition Time: {avg_partition:.2f}s")
print(f"Avg Processing Time: {avg_processing:.2f}s")
print(f"Avg Total Time: {avg_total:.2f}s")
print(f"Total Lines: {total_lines}, Total Tokens: {total_tokens}, Avg Tokens per Line: {avg_tokens_all:.2f}")
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment