From Iceberg to LoRa: A Unified LLM Fine-Tuning Pipeline with BodoFrom Iceberg to LoRa: A Unified LLM Fine-Tuning Pipeline with Bodo

From Iceberg to LoRa: A Unified LLM Fine-Tuning Pipeline with Bodo

Date
November 6, 2025
Author
Isaac Warren

Fine-tuning Large Language Models (LLMs) is a powerful way to adapt them to specific tasks, but it often exposes a major bottleneck in AI workflows: the data pipeline. Typically, data engineering and model training are two separate, disconnected worlds. Data teams use tools like Spark to query and preprocess data from a data lake or warehouse (like Iceberg or Delta Lake), save the results to intermediate files (e.g., JSON on S3), and then the ML team spins up a separate cluster to load those files and begin training.

This two-step process is inefficient and not built for the modern AI world where these components have to live together side-by-side, with speedy iteration. It creates data silos, introduces I/O latency, and complicates the entire workflow, requiring two different systems to be managed and scaled. The intermediate files also lose the benefits of data warehousing such as strong schemas and version control which can complicate the data loading process.

Using Bodo, we can close this gap. With Bodo, the entire pipeline, from raw data in your warehouse to a fine-tuned model, exists in a single, unified application. Bodo DataFrames provides a distributed, high-performance engine for data loading and preprocessing, while Bodo AI Toolkit seamlessly hands that data off to a distributed PyTorch training job, all using familiar Python APIs and scaling to large clusters efficiently.

In this post, we’ll show how to use Bodo DataFrames to load and preprocess data directly from an Apache Iceberg table and feed it seamlessly into Bodo AI Toolkit to fine-tune a Llama 3.1 8B model using LoRa. We'll be training a chatbot on its own "liked" feedback to improve its responses.

What is Bodo?

Bodo is an open-source, high-performance DataFrame library for Python that is a drop-in replacement for Pandas. Bodo simplifies accelerating and scaling Python workloads from laptops to clusters without code rewrites. Under the hood, Bodo relies on MPI-based high-performance computing (HPC) technology and an innovative auto-parallelizing just-in-time (JIT) compiler. This makes it both easier to use and often orders of magnitude faster than tools like Spark or Dask.

Getting Started

First, let’s start by installing the packages we need:

conda install bodo-ai torch transformers peft

Next, open a new notebook or file and import the required packages and set up our configurations.

import bodo.pandas as pd
import bodo.ai
import torch
import torch.distributed as dist
import torch.distributed.checkpoint
import tqdmfrom torch.optim import AdamW
from transformers import(
	AutoTokenizer,
	AutoModelForCausalLM,
	get_cosine_schedule_with_warmup
)
from peft import (
	LoraConfig,
	get_peft_model,
	get_peft_model_state_dict
)‍
    
# --- Configuration for the filter ---
CUTOFF_DATE = pd.Timestamp("2025-10-29 00:00:00")‍

# --- Configuration for data loading ---
USER_TABLE = "chat_analytics.user_messages"
BOT_TABLE = "chat_analytics.bot_messages"
S3_LOCATION = "s3://bodo-iceberg-training-demo"
# --- Model Configuration ---
MODEL_NAME = "meta-llama/Meta-Llama-3.1-8B"
LR = 2e-5
EPOCHS = 1
BATCH_SIZE = 2
CHECKPOINT_DIR = "./llama3_lora_checkpoint_dir"

The Unified Data-to-Model Pipeline

Our goal is to create a seamless flow from data preprocessing to model training, eliminating the need to save intermediate files.

1. Data Loading and Preprocessing with Bodo DataFrames

First, we define our load_data function. This function uses Bodo DataFrames to read directly from our Iceberg tables stored in S3. 

Note: We use a “directory catalog” that just consists of metadata files in S3 in this example to ease deployment of a public, read-only Iceberg table but directory catalogs should not be used in a production environment.

def load_data():
	# Load Data from the iceberg table in S3
	print(f"--- Loading data from {S3_LOCATION} ---")
	print(f"Filtering for 'liked' messages since {CUTOFF_DATE}...")
    
	user_df = pd.read_iceberg(USER_TABLE, location=S3_LOCATION)
	bot_df = pd.read_iceberg(BOT_TABLE, location=S3_LOCATION)‍
 
	# Filter bot messages for "liked" feedback and recent timestamps
	liked_bot_messages_df = bot_df[
		(bot_df["feedback_status"] == "liked")
		& (bot_df["response_timestamp"] >= CUTOFF_DATE)
	]

	# Use pd.merge() to join the two dataframes
	joined_df = pd.merge(
		user_df, liked_bot_messages_df, on=["conversation_id", "message_number"]
	)
    
	# Select just the relevant columns
	output_columns = ["message_text", "response_text"]
	final_df = joined_df[output_columns]‍

This code looks like standard Pandas, but Bodo executes it as a high-performance, distributed query. Instead of a separate Spark job, our Python application itself is handling the large-scale ETL. The resulting final_df is a BodoDataFrame, a distributed object ready for the next step.

2. Seamless Hand-off to PyTorch

Next, we define our PyTorch Dataset. This is a standard class, but it’s designed to work directly with the Bodo DataFrame that load_data will provide.

class LlamaDataset(torch.utils.data.Dataset):
    def __init__(self, df: pd.DataFrame, tokenizer):
        self.df = df
        self.tokenizer = tokenizer
        tokenizer.pad_token = tokenizer.eos_token
        self.template = "User: {user_message}\nBot: {bot_response}"

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        user_message = row["message_text"]
        bot_response = row["response_text"]
        # Create the prompt for the model
        prompt = self.template.format(
            user_message=user_message, bot_response=bot_response
        )
        # Tokenize the prompt
        encoding = self.tokenizer(
            prompt,
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        example = encoding["input_ids"].squeeze(0)
        attention_mask = encoding["attention_mask"].squeeze(0)
        # For causal LM, labels are the same as input_ids
        labels = example.clone()

        return example, labels, attention_mask

    def __getitems__(self, idxs):
        prompts = []
        for idx in idxs:
            row = self.df.iloc[idx]
            user_message = row["message_text"]
            bot_response = row["response_text"]
            prompt = self.template.format(
                user_message=user_message, bot_response=bot_response
            )
            prompts.append(prompt)
        encoding = self.tokenizer(
            prompts,
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        input_ids = encoding["input_ids"]
        attention_mask = encoding["attention_mask"]
        labels = input_ids.clone()
        return input_ids, labels, attention_mas

3. Distributed Training with Bodo AI

Now, we set up our main training function. We load the Llama 3.1 model and apply a LoRa configuration using PEFT. The key line is model = bodo.ai.prepare_model(model), which prepares our model for distributed training.

def train_main(train_df):

    # Load tokenizer here to get pad_token_id ---
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    # Llama models don't have a default pad token. Set it to EOS.
    tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        dtype=torch.bfloat16, # Use bfloat16 for memory efficiency
        pad_token_id=tokenizer.pad_token_id, # Set pad token ID in model config
    )

    print("Applying LoRA configuration...")
    peft_config = LoraConfig(
        task_type="CAUSAL_LM", # Specify task type for classification
        r=16,                # Rank of the LoRA matrices (default 8 or 16)
        lora_alpha=32,       # Alpha scaling factor (often 2x rank)
        lora_dropout=0.1,    # Dropout
        target_modules=["q_proj", "k_proj", "v_proj"]
    )
    model = get_peft_model(model, peft_config)
    if bodo.get_rank() == 0: 
        model.print_trainable_parameters()

    model = bodo.ai.prepare_model(model)

    if model:
        device = next(model.parameters()).device
    else:
        device = None


    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    dataset_func = lambda df: LlamaDataset(df, tokenizer)
    train_loader = bodo.ai.prepare_dataset(train_df, BATCH_SIZE, dataset_func=dataset_func, pin_memory=True)


    if model == None:
        return
    pytorch_rank = dist.get_rank()
    total_steps = EPOCHS * len(train_loader)
    optimizer = AdamW(model.parameters(), lr=LR)
    scheduler = get_cosine_schedule_with_warmup(optimizer,
                                                num_warmup_steps=10,
                                                num_training_steps=total_steps)

    for epoch in range(EPOCHS):

        if pytorch_rank == 0:
            print(f"Train Epoch: \t{epoch}")
        train_one_epoch(model, train_loader, optimizer, scheduler)


        # Checkpoint only the LoRA adapter weights using a distributed checkpoint
        # for each epoch
        base_model = (model.module
            if isinstance(model, torch.nn.parallel.DistributedDataParallel) else model)
        
        # Get only the trainable (adapter) parameters
        adapter_state_dict = get_peft_model_state_dict(base_model)
        
        torch.distributed.checkpoint.save(
            {"model_state_dict": adapter_state_dict}, # Save only the adapter
            checkpoint_id=CHECKPOINT_DIR
        )
        
    # Save in peft-preferred format on rank 0 to allow easy loading later
    if pytorch_rank == 0:
       base_model.save_pretrained(CHECKPOINT_DIR) # Saves adapter_config.json etc.

Notice what's happening:

  1. The train_df (our Bodo DataFrame) is passed directly into train_main.
  2. bodo.ai.prepare_model takes the model and automatically distributes it across all available GPUs in the node(s) in the cluster.
  3. bodo.ai.prepare_dataset rebalances the DataFrame onto workers assigned to GPUs and then loads it into LlamaDataset to create a high-performance, distributed-aware data loader.

The data flows directly from the Iceberg query (ETL) to the DataFrame (preprocessing) to the GPU workers (training) without ever being written to disk as an intermediate file.

Finally, we tie it all together with:

if __name__ == "__main__":

    train_df = load_data()
    bodo.ai.torch_train(train_main, train_df)

The train_df created by load_data is passed directly to bodo.ai.torch_train, which orchestrates the entire distributed training run defined in train_main. The full example is available on our Github.

Wrapping Up

This example demonstrates a unified pipeline for AI. We went from raw data in an Iceberg warehouse to a fine-tuned LoRa adapter for Llama 3.1, all within a single Python application.

By leveraging Bodo, we eliminate the traditional barrier between data engineering and machine learning. There is no separate Spark ETL job, no intermediate JSON files, and no complex data hand-off. Bodo’s ability to use the Pandas API for distributed data processing and feed it directly into a distributed PyTorch job brings HPC-grade performance and scalability to the entire workflow, not just one piece of it.

To get started using Bodo yourself:

Ready to see Bodo in action?
Schedule a demo with a Bodo expert

Let’s go