
%20(1200%20x%20485%20px).png)
Fine-tuning Large Language Models (LLMs) is a powerful way to adapt them to specific tasks, but it often exposes a major bottleneck in AI workflows: the data pipeline. Typically, data engineering and model training are two separate, disconnected worlds. Data teams use tools like Spark to query and preprocess data from a data lake or warehouse (like Iceberg or Delta Lake), save the results to intermediate files (e.g., JSON on S3), and then the ML team spins up a separate cluster to load those files and begin training.
This two-step process is inefficient and not built for the modern AI world where these components have to live together side-by-side, with speedy iteration. It creates data silos, introduces I/O latency, and complicates the entire workflow, requiring two different systems to be managed and scaled. The intermediate files also lose the benefits of data warehousing such as strong schemas and version control which can complicate the data loading process.
Using Bodo, we can close this gap. With Bodo, the entire pipeline, from raw data in your warehouse to a fine-tuned model, exists in a single, unified application. Bodo DataFrames provides a distributed, high-performance engine for data loading and preprocessing, while Bodo AI Toolkit seamlessly hands that data off to a distributed PyTorch training job, all using familiar Python APIs and scaling to large clusters efficiently.
In this post, we’ll show how to use Bodo DataFrames to load and preprocess data directly from an Apache Iceberg table and feed it seamlessly into Bodo AI Toolkit to fine-tune a Llama 3.1 8B model using LoRa. We'll be training a chatbot on its own "liked" feedback to improve its responses.
Bodo is an open-source, high-performance DataFrame library for Python that is a drop-in replacement for Pandas. Bodo simplifies accelerating and scaling Python workloads from laptops to clusters without code rewrites. Under the hood, Bodo relies on MPI-based high-performance computing (HPC) technology and an innovative auto-parallelizing just-in-time (JIT) compiler. This makes it both easier to use and often orders of magnitude faster than tools like Spark or Dask.
First, let’s start by installing the packages we need:
conda install bodo-ai torch transformers peft
Next, open a new notebook or file and import the required packages and set up our configurations.
import bodo.pandas as pd
import bodo.ai
import torch
import torch.distributed as dist
import torch.distributed.checkpoint
import tqdmfrom torch.optim import AdamW
from transformers import(
AutoTokenizer,
AutoModelForCausalLM,
get_cosine_schedule_with_warmup
)
from peft import (
LoraConfig,
get_peft_model,
get_peft_model_state_dict
)
# --- Configuration for the filter ---
CUTOFF_DATE = pd.Timestamp("2025-10-29 00:00:00")
# --- Configuration for data loading ---
USER_TABLE = "chat_analytics.user_messages"
BOT_TABLE = "chat_analytics.bot_messages"
S3_LOCATION = "s3://bodo-iceberg-training-demo"
# --- Model Configuration ---
MODEL_NAME = "meta-llama/Meta-Llama-3.1-8B"
LR = 2e-5
EPOCHS = 1
BATCH_SIZE = 2
CHECKPOINT_DIR = "./llama3_lora_checkpoint_dir"
Our goal is to create a seamless flow from data preprocessing to model training, eliminating the need to save intermediate files.
First, we define our load_data function. This function uses Bodo DataFrames to read directly from our Iceberg tables stored in S3.
Note: We use a “directory catalog” that just consists of metadata files in S3 in this example to ease deployment of a public, read-only Iceberg table but directory catalogs should not be used in a production environment.
def load_data():
# Load Data from the iceberg table in S3
print(f"--- Loading data from {S3_LOCATION} ---")
print(f"Filtering for 'liked' messages since {CUTOFF_DATE}...")
user_df = pd.read_iceberg(USER_TABLE, location=S3_LOCATION)
bot_df = pd.read_iceberg(BOT_TABLE, location=S3_LOCATION)
# Filter bot messages for "liked" feedback and recent timestamps
liked_bot_messages_df = bot_df[
(bot_df["feedback_status"] == "liked")
& (bot_df["response_timestamp"] >= CUTOFF_DATE)
]
# Use pd.merge() to join the two dataframes
joined_df = pd.merge(
user_df, liked_bot_messages_df, on=["conversation_id", "message_number"]
)
# Select just the relevant columns
output_columns = ["message_text", "response_text"]
final_df = joined_df[output_columns]This code looks like standard Pandas, but Bodo executes it as a high-performance, distributed query. Instead of a separate Spark job, our Python application itself is handling the large-scale ETL. The resulting final_df is a BodoDataFrame, a distributed object ready for the next step.
Next, we define our PyTorch Dataset. This is a standard class, but it’s designed to work directly with the Bodo DataFrame that load_data will provide.
class LlamaDataset(torch.utils.data.Dataset):
def __init__(self, df: pd.DataFrame, tokenizer):
self.df = df
self.tokenizer = tokenizer
tokenizer.pad_token = tokenizer.eos_token
self.template = "User: {user_message}\nBot: {bot_response}"
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
row = self.df.iloc[idx]
user_message = row["message_text"]
bot_response = row["response_text"]
# Create the prompt for the model
prompt = self.template.format(
user_message=user_message, bot_response=bot_response
)
# Tokenize the prompt
encoding = self.tokenizer(
prompt,
truncation=True,
padding="longest",
return_tensors="pt",
)
example = encoding["input_ids"].squeeze(0)
attention_mask = encoding["attention_mask"].squeeze(0)
# For causal LM, labels are the same as input_ids
labels = example.clone()
return example, labels, attention_mask
def __getitems__(self, idxs):
prompts = []
for idx in idxs:
row = self.df.iloc[idx]
user_message = row["message_text"]
bot_response = row["response_text"]
prompt = self.template.format(
user_message=user_message, bot_response=bot_response
)
prompts.append(prompt)
encoding = self.tokenizer(
prompts,
truncation=True,
padding="longest",
return_tensors="pt",
)
input_ids = encoding["input_ids"]
attention_mask = encoding["attention_mask"]
labels = input_ids.clone()
return input_ids, labels, attention_mas
Now, we set up our main training function. We load the Llama 3.1 model and apply a LoRa configuration using PEFT. The key line is model = bodo.ai.prepare_model(model), which prepares our model for distributed training.
def train_main(train_df):
# Load tokenizer here to get pad_token_id ---
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
# Llama models don't have a default pad token. Set it to EOS.
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
dtype=torch.bfloat16, # Use bfloat16 for memory efficiency
pad_token_id=tokenizer.pad_token_id, # Set pad token ID in model config
)
print("Applying LoRA configuration...")
peft_config = LoraConfig(
task_type="CAUSAL_LM", # Specify task type for classification
r=16, # Rank of the LoRA matrices (default 8 or 16)
lora_alpha=32, # Alpha scaling factor (often 2x rank)
lora_dropout=0.1, # Dropout
target_modules=["q_proj", "k_proj", "v_proj"]
)
model = get_peft_model(model, peft_config)
if bodo.get_rank() == 0:
model.print_trainable_parameters()
model = bodo.ai.prepare_model(model)
if model:
device = next(model.parameters()).device
else:
device = None
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
dataset_func = lambda df: LlamaDataset(df, tokenizer)
train_loader = bodo.ai.prepare_dataset(train_df, BATCH_SIZE, dataset_func=dataset_func, pin_memory=True)
if model == None:
return
pytorch_rank = dist.get_rank()
total_steps = EPOCHS * len(train_loader)
optimizer = AdamW(model.parameters(), lr=LR)
scheduler = get_cosine_schedule_with_warmup(optimizer,
num_warmup_steps=10,
num_training_steps=total_steps)
for epoch in range(EPOCHS):
if pytorch_rank == 0:
print(f"Train Epoch: \t{epoch}")
train_one_epoch(model, train_loader, optimizer, scheduler)
# Checkpoint only the LoRA adapter weights using a distributed checkpoint
# for each epoch
base_model = (model.module
if isinstance(model, torch.nn.parallel.DistributedDataParallel) else model)
# Get only the trainable (adapter) parameters
adapter_state_dict = get_peft_model_state_dict(base_model)
torch.distributed.checkpoint.save(
{"model_state_dict": adapter_state_dict}, # Save only the adapter
checkpoint_id=CHECKPOINT_DIR
)
# Save in peft-preferred format on rank 0 to allow easy loading later
if pytorch_rank == 0:
base_model.save_pretrained(CHECKPOINT_DIR) # Saves adapter_config.json etc.
Notice what's happening:
train_df (our Bodo DataFrame) is passed directly into train_main.bodo.ai.prepare_model takes the model and automatically distributes it across all available GPUs in the node(s) in the cluster.bodo.ai.prepare_dataset rebalances the DataFrame onto workers assigned to GPUs and then loads it into LlamaDataset to create a high-performance, distributed-aware data loader.The data flows directly from the Iceberg query (ETL) to the DataFrame (preprocessing) to the GPU workers (training) without ever being written to disk as an intermediate file.
Finally, we tie it all together with:
if __name__ == "__main__":
train_df = load_data()
bodo.ai.torch_train(train_main, train_df)The train_df created by load_data is passed directly to bodo.ai.torch_train, which orchestrates the entire distributed training run defined in train_main. The full example is available on our Github.
This example demonstrates a unified pipeline for AI. We went from raw data in an Iceberg warehouse to a fine-tuned LoRa adapter for Llama 3.1, all within a single Python application.
By leveraging Bodo, we eliminate the traditional barrier between data engineering and machine learning. There is no separate Spark ETL job, no intermediate JSON files, and no complex data hand-off. Bodo’s ability to use the Pandas API for distributed data processing and feed it directly into a distributed PyTorch job brings HPC-grade performance and scalability to the entire workflow, not just one piece of it.
To get started using Bodo yourself: