Skip to content
Snippets Groups Projects
Commit 0f68664f authored by joeld's avatar joeld
Browse files

initial prepocessing logic for cc_news multi-process preprocessing with sharding + lock

parent a963b60a
No related branches found
No related tags found
No related merge requests found
.venv/
data/
\ No newline at end of file
from datasets import load_dataset
import os
import json
dataset_path = "./data/cc_news"
os.makedirs(dataset_path, exist_ok=True)
dataset = load_dataset("cc_news", split="train[:10%]")
output_file = os.path.join(dataset_path, "train.jsonl")
with open(output_file, "w", encoding="utf-8") as f:
for item in dataset:
json.dump({"text": item["text"]}, f)
f.write("\n")
print(f"Dataset saved at: {output_file}")
import os
import json
import time
import multiprocessing
import logging
import torch
from datasets import load_dataset
from megatron.tokenizer import build_tokenizer
from megatron.data import indexed_dataset
from transformers import AutoTokenizer
from argparse import Namespace
logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s")
PROC_RANK = int(os.getenv("PROC_RANK", "0"))
TOTAL_PROCS = int(os.getenv("TOTAL_PROCS", "1"))
input_file = "C:/Dev_Projects/HTYLLM-PG/data/cc_news/train.jsonl"
output_dir = "C:/Dev_Projects/HTYLLM-PG/data/processed"
#args = Namespace(tokenizer_type="GPT2BPETokenizer", rank=0)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
def acquire_lock(file_path):
lock_file = file_path + ".lock"
try:
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
return True
except FileExistsError:
return False
def release_lock(file_path):
lock_file = file_path + ".lock"
if os.path.exists(lock_file):
os.remove(lock_file)
def preprocess_line(line):
""" Tokenizes and processes a single line of text. """
data = json.loads(line)
text = data.get("text", "").strip()
if not text:
return None
tokenized_text = tokenizer.tokenize(text)
return {"tokens": tokenized_text}
def process_shard(input_path, output_path):
if not acquire_lock(output_path):
logging.warning(f"Skipping {input_path} (already being processed).")
return
dataset_builder = indexed_dataset.make_builder(
output_path + ".bin",
impl="mmap",
vocab_size=tokenizer.vocab_size
)
with open(input_path, "r", encoding="utf-8") as infile:
for line in infile:
processed = preprocess_line(line)
if processed:
dataset_builder.add_doc(processed["tokens"], len(processed["tokens"]))
dataset_builder.finalize(output_path + ".idx")
release_lock(output_path)
logging.warning(f"Processed {input_path} -> {output_path}")
def main():
logging.warning(f"Process {PROC_RANK} started. Total processes: {TOTAL_PROCS}")
with open(input_file, "r", encoding="utf-8") as f:
lines = f.readlines()
num_lines = len(lines)
lines_per_proc = num_lines // TOTAL_PROCS
start_idx = PROC_RANK * lines_per_proc
end_idx = num_lines if PROC_RANK == TOTAL_PROCS - 1 else (PROC_RANK + 1) * lines_per_proc
shard_lines = lines[start_idx:end_idx]
shard_input = os.path.join(output_dir, f"shard_{PROC_RANK}.jsonl")
shard_output = os.path.join(output_dir, f"processed_{PROC_RANK}")
with open(shard_input, "w", encoding="utf-8") as f:
f.writelines(shard_lines)
process_shard(shard_input, shard_output)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment