The easiest way to set up logging for ai experiments is to use mlflow, which is a ready made python package.

installation #

To get started we can add mlflow to our project, using reasonable package managers like poetry or uv

$ poetry add mlflow

and then inside our environment we can run

$ mlflow server --host 127.0.0.1

This sets up a web server on localhost:5000, which is only accessible via the computer (for local monitoring). If you want to make this accessible to other computers (say locally via LAN, or via the internet) use --host 0.0.0.0. Just make sure that you open the proper port in the firewall (by default port 5000)

for example, to serve publicly on port 8889, we run

$ mlflow server --host 127.0.0.1 --port 8889

docker-compose #

In order to use docker and easily handle/manage updates we can create a docker-compose.yaml

services:
  mlflow:
  image: ghcr.io/mlflow/mlflow
  container_name: mlflow
  ports:
    - '5000:5000'
  environment:
    MLFLOW_TRACKING_URI: http://0.0.0.0:5000
  volumes:
    - ./mlflow:/mlflow/mlruns
  restart: always
  command: ["mlflow", "server", "--host", "0.0.0.0", "--port", "5000"]

This pulls the latest image of mlflow from github and sets it to always run so we can access the service from anywhere on port 5000.

if we want to serve it on port 8889, we need to set ports: '8889:5000'

demo #

To get a ready-made demo, we will do a basic MNIST setup

import mlflow

import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader

from torchvision import datasets
from torchvision.transforms import ToTensor

train_data = datasets.MNIST(
    root='data',
    train=True,
    transform=ToTensor(),
    download=True
    )

test_data = datasets.MNIST(
    root='data',
    train=False,
    transform=ToTensor(),
    download=True
    )

loaders = {
	'train': DataLoader(
		train_data,
		batch_size=params['batch_size'],
		shuffle=True,
		num_workers=1
	),
	
	'test': DataLoader(
		test_data,
		batch_size=params['batch_size'],
		shuffle=True,
		num_workers=1
	)
}

and set up an ImageClassifier

class ImageClassifier(nn.Module):

    def __init__(self):
        super(ImageClassifier, self).__init__()

        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(x)
        x = self.conv2(x)
        x = self.conv2_drop(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(x)
        x = x.view(-1, 320)
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.softmax(x)

model = ImageClassifier().to(device)
optimizer = optim.Adam(model.parameters(), lr=params['learning_rate'])
loss_func = nn.CrossEntropyLoss()

train/test functions #

Using the official documentation, we can build a tracking experiment

We will need two functions, train and test:

def train(epoch):
    """
    Train the model on a single pass of the dataloader, and send the metrics to mlflow
    """
    model.train()
    for batch_idx, (data, target) in enumerate(loaders['train']):

        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)

        loss = loss_func(output, target)
        loss.backward()
        accuracy = batch_idx/len(loaders['train'].dataset)

        optimizer.step()

        if batch_idx % 20 == 0:
            print(
              f"Train Epoch: {epoch}, [{
                batch_idx*len(data)
              }/{
                len(loaders['train'].dataset)
              } ({
                100*batch_idx/len(loaders['train'].dataset):.0f
              }%)]), Loss: {loss}"
            )

            loss, current = loss.item(), batch_idx
            step = batch_idx // 20 * (epoch + 1)
            mlflow.log_metric("loss", f"{loss:2f}", step=step)
            mlflow.log_metric("accuracy", f"{accuracy:2f}", step=step)

def test():
    """
    Evaluate the model, and log results with mlflow
    """
    model.eval()

    loss = 0
    correct = 0

    with T.no_grad():
        for data, target in loaders['test']:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss += loss_func(output, target).item()
            pred = output.argmax(dim=1,keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    loss /=len(loaders['test'].dataset)
    accuracy = correct/len(loaders['test'].dataset)

    print(
      f"\nTest set: Average Loss: {loss:.4f}, Accuracy: {correct}/{
        len(loaders['test'].dataset)
      } ({
        100*correct/len(loaders['test'].dataset):.0f
      })\n"
    )

    mlflow.log_metric("eval_loss", f"{loss:2f}", step=epoch)
    mlflow.log_metric("eval_accuracy", f"{accuracy:2f}", step=epoch)

parameter logging #

In order to log the hyperparameters so we can reference them during finetuning, we first need to inform the script where our mlflow instance is at, and to do this we set

mlflow.set_tracking_uri(uri="http://localhost:5000")

mlflow.set_experiment("MNIST mlflow demo")

set_tracking_uri points to the url we run mlflow at. This means that is we run it on 127.0.0.1, we use localhost or 127.0.0.1. If we set it up as 0.0.0.0, and the experiment is run outside of the mlflow server (ie another computer), we use the IP that points to that computer; either the LAN IP provided by the router (if we are using a LAN), or the public IP of the server.

set_experiment is the name of the experiment inside the mlflow instance, and is used for experiment grouping and comparisons.

Now we can define the hyperparameters and log them

params = {
	"batch_size": batch,
	"learning_rate": lr,
	"num_epochs": epochs
}
mlflow.log_params(params)

the loop #

We are now ready to let the experiment run.

The main training loop needs to run inside the mlflow context

with mlflow.start_run():
	for epoch in range(params['num_epochs']):
		train(epoch)
		test()

and wait.