| |
| """Open-Track-Document-Bassline-Readability-Arabertv2-d3tok-reg.ipynb |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1bwUyQ1WscI7jXo6arMRwn4NIswjaqK7Y |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| import os |
| import torch |
| import zipfile |
| from sklearn.metrics import cohen_kappa_score |
| from torch.utils.data import Dataset as TorchDataset |
| from transformers import ( |
| AutoTokenizer, |
| AutoModelForSequenceClassification, |
| TrainingArguments, |
| Trainer, |
| EarlyStoppingCallback |
| ) |
| from camel_tools.disambig.bert import BERTUnfactoredDisambiguator |
| from camel_tools.tokenizers.word import simple_word_tokenize |
| from camel_tools.utils.dediac import dediac_ar |
|
|
| |
| MODEL_NAME = "CAMeL-Lab/readability-arabertv2-d3tok-reg" |
| NUM_LABELS = 1 |
| TARGET_CLASSES = 19 |
| BASE_DIR = '.' |
| DATA_DIR = os.path.join(BASE_DIR, "data") |
| CHECKPOINT_DIR = os.path.join(BASE_DIR, "results", f"regression_{MODEL_NAME.split('/')[-1]}") |
| SUBMISSION_DIR = os.path.join(BASE_DIR, "submission") |
|
|
| os.makedirs(CHECKPOINT_DIR, exist_ok=True) |
| os.makedirs(SUBMISSION_DIR, exist_ok=True) |
|
|
| |
| BAREC_TRAIN_PATH = os.path.join(DATA_DIR, 'train_augmented.csv') |
| BAREC_DEV_PATH = os.path.join(DATA_DIR, 'dev_augmented.csv') |
| BLIND_TEST_PATH = os.path.join(DATA_DIR, 'blind_test_data.csv') |
| SUBMISSION_PATH = os.path.join(SUBMISSION_DIR, "submission_regression_big_data.csv") |
| ZIPPED_SUBMISSION_PATH = os.path.join(SUBMISSION_DIR, "submission_regression_final_op4n.zip") |
| TRAIN_PREPROCESSED_PATH = os.path.join(DATA_DIR, 'train_augmented.csv') |
| DEV_PREPROCESSED_PATH = os.path.join(DATA_DIR, 'dev_augmented.csv') |
|
|
| |
|
|
| def preprocess_d3tok(text, disambiguator): |
| """ |
| Preprocesses text into the D3Tok format using BERTUnfactoredDisambiguator. |
| This version includes robust error handling for missing 'd3tok' keys. |
| """ |
| if not isinstance(text, str) or not text.strip(): |
| return "" |
| tokens = simple_word_tokenize(text) |
| disambiguated_sentence = disambiguator.disambiguate(tokens) |
| d3tok_forms = [] |
| for disambig_word in disambiguated_sentence: |
| if disambig_word.analyses: |
| analysis_dict = disambig_word.analyses[0][1] |
| |
| if 'd3tok' in analysis_dict: |
| d3tok = dediac_ar(analysis_dict['d3tok']).replace("_+", " +").replace("+_", "+ ") |
| d3tok_forms.append(d3tok) |
| else: |
| |
| d3tok_forms.append(disambig_word.word) |
| else: |
| |
| d3tok_forms.append(disambig_word.word) |
| return " ".join(d3tok_forms) |
|
|
| def load_or_preprocess_data(disambiguator): |
| """ |
| Loads preprocessed data if it exists, otherwise, it runs preprocessing. |
| """ |
| print("--- Loading BAREC Data ---") |
| if os.path.exists(TRAIN_PREPROCESSED_PATH) and os.path.exists(DEV_PREPROCESSED_PATH): |
| print("✔ Found preprocessed files. Loading them directly...") |
| train_df = pd.read_csv(TRAIN_PREPROCESSED_PATH) |
| val_df = pd.read_csv(DEV_PREPROCESSED_PATH) |
| train_df['text'] = train_df['text'].astype(str) |
| val_df['text'] = val_df['text'].astype(str) |
| print(f"Successfully loaded {len(train_df)} training and {len(val_df)} validation records.") |
| return train_df, val_df |
| else: |
| print("Preprocessed files not found. Starting one-time preprocessing...") |
| try: |
| train_df = pd.read_csv(BAREC_TRAIN_PATH) |
| val_df = pd.read_csv(BAREC_DEV_PATH) |
| train_df = train_df[['Sentence', 'Readability_Level_19']].rename( |
| columns={'Sentence': 'text', 'Readability_Level_19': 'label'}) |
| val_df = val_df[['Sentence', 'Readability_Level_19']].rename( |
| columns={'Sentence': 'text', 'Readability_Level_19': 'label'}) |
| train_df.dropna(subset=['text', 'label'], inplace=True) |
| val_df.dropna(subset=['label', 'text'], inplace=True) |
| train_df['text'] = train_df['text'].astype(str) |
| val_df['text'] = val_df['text'].astype(str) |
| train_df['label'] = train_df['label'].astype(int) - 1 |
| val_df['label'] = val_df['label'].astype(int) - 1 |
| train_df['label'] = train_df['label'].astype(float) |
| val_df['label'] = val_df['label'].astype(float) |
| print(f"Successfully loaded raw data: {len(train_df)} training and {len(val_df)} validation records.") |
| print("\n--- Preprocessing Text to D3Tok format (this will only run once) ---") |
| train_df['text'] = train_df['text'].apply(lambda x: preprocess_d3tok(x, disambiguator)) |
| val_df['text'] = val_df['text'].apply(lambda x: preprocess_d3tok(x, disambiguator)) |
| print("✔ Text preprocessing finished.") |
| print("\n--- Saving preprocessed data for future use... ---") |
| train_df.to_csv(TRAIN_PREPROCESSED_PATH, index=False) |
| val_df.to_csv(DEV_PREPROCESSED_PATH, index=False) |
| print(f"** Saved preprocessed files to {TRAIN_PREPROCESSED_PATH} and {DEV_PREPROCESSED_PATH} **") |
| return train_df, val_df |
| except FileNotFoundError: |
| print(f"! ERROR: Raw file not found. Make sure 'train.csv' and 'dev.csv' are in the '{DATA_DIR}' directory.") |
| return None, None |
| except Exception as e: |
| print(f"! ERROR during initial processing: {e}") |
| return None, None |
|
|
| print("Initializing BERT Disambiguator for preprocessing...") |
| bert_disambiguator = BERTUnfactoredDisambiguator.pretrained('msa') |
|
|
| train_df, val_df = load_or_preprocess_data(bert_disambiguator) |
|
|
| if train_df is not None: |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
| else: |
| print("Stopping script due to data loading failure.") |
| exit() |
|
|
| |
| class ReadabilityDataset(TorchDataset): |
| def __init__(self, texts, labels=None): |
| self.encodings = tokenizer(texts, truncation=True, padding="max_length", max_length=256) |
| self.labels = labels |
|
|
| def __getitem__(self, idx): |
| item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} |
| if self.labels is not None: |
| item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float) |
| return item |
|
|
| def __len__(self): |
| return len(self.encodings.get('input_ids', [])) |
|
|
| def compute_metrics(p): |
| preds = p.predictions.flatten() |
| rounded_preds = np.round(preds) |
| clipped_preds = np.clip(rounded_preds, 0, TARGET_CLASSES - 1).astype(int) |
| labels = p.label_ids.astype(int) |
| qwk = cohen_kappa_score(labels, clipped_preds, weights='quadratic') |
| return {"qwk": qwk} |
|
|
| |
| print("\n===== INITIALIZING REGRESSION MODEL AND TRAINER =====\n") |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS) |
| train_dataset = ReadabilityDataset(train_df['text'].tolist(), train_df['label'].tolist()) |
| val_dataset = ReadabilityDataset(val_df['text'].tolist(), val_df['label'].tolist()) |
|
|
| training_args = TrainingArguments( |
| output_dir=CHECKPOINT_DIR, |
| num_train_epochs=10, |
| per_device_train_batch_size=16, |
| per_device_eval_batch_size=32, |
| learning_rate=5e-5, |
| warmup_ratio=0.1, |
| weight_decay=0.01, |
| logging_steps=100, |
| evaluation_strategy="epoch", |
| save_strategy="epoch", |
| load_best_model_at_end=True, |
| metric_for_best_model="qwk", |
| greater_is_better=True, |
| save_total_limit=2, |
| fp16=torch.cuda.is_available(), |
| report_to="none" |
| ) |
|
|
| trainer = Trainer( |
| model=model, |
| args=training_args, |
| train_dataset=train_dataset, |
| eval_dataset=val_dataset, |
| compute_metrics=compute_metrics, |
| callbacks=[EarlyStoppingCallback(early_stopping_patience=4)] |
| ) |
|
|
| print("Starting training...") |
| trainer.train() |
| print("✔ Training finished.") |
|
|
| |
| |
| |
| print("\n===== INITIALIZING REGRESSION MODEL AND TRAINER =====\n") |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS) |
| train_dataset = ReadabilityDataset(train_df['text'].tolist(), train_df['label'].tolist()) |
| val_dataset = ReadabilityDataset(val_df['text'].tolist(), val_df['label'].tolist()) |
|
|
| training_args = TrainingArguments( |
| output_dir=CHECKPOINT_DIR, |
| num_train_epochs=20, |
| per_device_train_batch_size=16, |
| per_device_eval_batch_size=32, |
| learning_rate=5e-5, |
| warmup_ratio=0.1, |
| weight_decay=0.01, |
| logging_steps=100, |
| evaluation_strategy="epoch", |
| save_strategy="epoch", |
| load_best_model_at_end=True, |
| metric_for_best_model="qwk", |
| greater_is_better=True, |
| save_total_limit=2, |
| fp16=torch.cuda.is_available(), |
| report_to="none" |
| ) |
|
|
| trainer = Trainer( |
| model=model, |
| args=training_args, |
| train_dataset=train_dataset, |
| eval_dataset=val_dataset, |
| compute_metrics=compute_metrics, |
| callbacks=[EarlyStoppingCallback(early_stopping_patience=2)] |
| ) |
|
|
|
|
| |
| |
| if os.path.isdir(CHECKPOINT_DIR): |
| |
| checkpoints = [d for d in os.listdir(CHECKPOINT_DIR) if d.startswith("checkpoint-40350")] |
| if checkpoints: |
| |
| latest_checkpoint = max(checkpoints, key=lambda x: int(x.split('-')[-1])) |
| latest_checkpoint_path = os.path.join(CHECKPOINT_DIR, latest_checkpoint) |
| print(f"Resuming training from checkpoint: {latest_checkpoint_path}") |
| trainer.train(resume_from_checkpoint=latest_checkpoint_path) |
| else: |
| |
| print("No checkpoint found. Starting training from the beginning...") |
| trainer.train() |
| else: |
| |
| print("No checkpoint directory found. Starting training from the beginning...") |
| trainer.train() |
|
|
| print("✔ Training finished.") |
|
|
| |
| print("\n===== FINAL PREDICTION AND SUBMISSION =====\n") |
| try: |
| test_df = pd.read_csv(BLIND_TEST_PATH) |
| test_df.dropna(subset=['Sentence'], inplace=True) |
|
|
| print("Preprocessing blind test text to D3Tok format...") |
| |
| test_df['processed_text'] = test_df['Sentence'].apply(lambda x: preprocess_d3tok(x, bert_disambiguator)) |
|
|
| print("Generating predictions on the test set...") |
| test_dataset = ReadabilityDataset(test_df['processed_text'].tolist()) |
| predictions = trainer.predict(test_dataset) |
|
|
| raw_preds = predictions.predictions.flatten() |
| rounded_preds = np.round(raw_preds) |
| clipped_preds = np.clip(rounded_preds, 0, TARGET_CLASSES - 1) |
|
|
| test_df['Prediction'] = (clipped_preds + 1).astype(int) |
|
|
| |
| |
| submission_df = test_df[['ID', 'Prediction']] |
| |
| submission_df = submission_df.rename(columns={'ID': 'Sentence ID'}) |
|
|
| print(f"Saving prediction file to: {SUBMISSION_PATH}") |
| submission_df.to_csv(SUBMISSION_PATH, index=False) |
|
|
| print(f"\nCompressing {os.path.basename(SUBMISSION_PATH)} into {os.path.basename(ZIPPED_SUBMISSION_PATH)}...") |
| with zipfile.ZipFile(ZIPPED_SUBMISSION_PATH, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| zipf.write(SUBMISSION_PATH, arcname=os.path.basename(SUBMISSION_PATH)) |
|
|
| print(f"✔ Submission file {os.path.basename(ZIPPED_SUBMISSION_PATH)} created successfully.") |
|
|
| except FileNotFoundError: |
| print(f"! ERROR: Test file not found. Make sure 'blind_test_data.csv' is in the '{DATA_DIR}' directory.") |
| except KeyError: |
| print("! KEY ERROR: Could not find the 'ID' column in the test data. Please check the blind_test_data.csv file.") |
| except Exception as e: |
| print(f"An error occurred during final prediction: {e}") |
|
|
| print("\n--- Script Finished ---") |
|
|
|
|
|
|
|
|