• Home
  • Resume
    • Kim Roder photo

      Kim Roder

      A short description of my Software Development skillset.

    • Learn More
    • Email
    • Github
  • Projects
  • Posts

Machine Learning Pipeline (GCP) Part 3: Prediction Microservice

05 Apr 2023

Reading time ~5 minutes

Project description can be found HERE
Part 2 can be found HERE

This post describes the implementation basics required to write a cost-effective, high-uptime data prediction microservice on the Google Cloud as part of a Machine Learning pipeline. The example will implement a weather prediction model, whilst adhering to some generic design priciples such that the service can be reused for any and all model training we wish to perform.

Code for this post can be found HERE


Architecture

The model will be implmented using Pytorch and deployed to the Google Vertex AI Cloud using a Docker container for the custom training process and the automatic inference-deploy feature for our pretrained model.

To generalize the model trainer and allow for it to train a plethera of differnt models we will adhere to following file structure:

├── model
│   │   ├── dataset
│   │   │   ├── weather.py
│   │   │   └── ...
│   │   ├── modules
│   │   │   ├── attention.py
│   │   │   └── ...
│   │   └── repository
│   │   │   ├── model.py
│   │   │   └── ...
│   ├── weather.py
│   └── ...
├── main.py
├── Dockerfile
└── cloudbuild.yaml

Implementation

Deployment: By modifying our cloudbuild.yaml to instead override our latest image, by always giving it the same name, we effectivly achieve pseudo-CI/CD for our model trainer:

steps:
  # Build the container image
  - name: 'gcr.io/cloud-builders/docker'
    args:
    - 'build'
    - '-t'
    - 'gcr.io/$PROJECT_ID/train:$COMMIT_SHA'
    - '.'
    dir: 'train'

  # Push the container image to Container Registry
  - name: 'gcr.io/cloud-builders/docker'
    args:
    - 'push'
    - 'gcr.io/$PROJECT_ID/train:latest'

Dataset: The dataset we extracted from our ingest service in Part 1 contains the following weather features:

  • time
  • latitude
  • longitude
  • temperature
  • apparent temperature
  • pressure
  • humidity
  • cloudiness
  • visibility
  • precipitation probability
  • rain volume
  • snow volume
  • wind speed
  • wind angle
  • wind speed

By extending the torch.utils.data.Dataset class we can create a dataset that our Pytorch library can fully manage and turn into training & validation batches for us:

class Weather (torch.utils.data.Dataset):
    def __init__ (self):
        # connect to bigquery
        client = bigquery.Client()

        # load dataset
        data = client.query("SELECT * from weather")
        data = data.to_dataframe()

        # store dataset and tensor column selection
        self.data = data
        self.tensor_columns = [
            "latitude_sin", "latitude_cos",
            "longitude_sin", "longitude_cos",
            "week_of_the_year_sin", "week_of_the_year_cos",
            "day_of_the_week_sin", "day_of_the_week_cos",
            "temperature", "temperature_feels_like",
            "pressure", "humidity", "cloudiness", "visibility",
            "precipitation_probability", "rain_volume", "snow_volume",
            "wind_speed", "wind_angle", "wind_speed"
        ]

        # override len(data)
        def __len__ (self):
            return len(self.data)

        # override data[idx]
        def __getitem__ (self, idx):

            location = torch.tensor(self.data.iloc[idx][self.tensor_columns[:8]].values)
            history = self.locationHistory(forecast["latitude"], forecast["longitude"], 100, forecast["time"], forecast["time"] - 60*60*24*7)
            forecast = torch.tensor(self.data.iloc[idx][self.tensor_columns[8:]].values)

            return location, history, forecast

        # weather history for a location + radius
        def locationHistory (self, latitude, longitude, radius, time_start, time_stop):
            ...

Model: Though transformers are a bit flavour of the month right now, most of their principlis actually work very well with weather contextualization and prediction. It stands to reason that encoding the previous weather conditions, in the area, would allow a model to make better predictions. We define this history context in the above locationHistory function to be the area around geo location to be predicted, during a certain period. Now each training sample will consist of:

  • location: the encoding for the geo-location and datetime of the forecast
  • history: the weather conditions in a 100km “radius” around the geo-location for the last week
  • forecast: the next forecast for the geo-location (our target)

We want to encode the history data and decode the location request, using the history encoding as context, to predict the forecast. Whilst this sounds very transformer-like, an exact copy of the standard transformer model would not work for us. These models come with something called positional-encoding baked in for sequence prediction. Our data does not come in a chronological oder, it’s location depends on the space-time which we have already encoded during pre-processing. Therefore we will be implementing a simplification of the standard transformer model for our own needs.

For the sake of our weather example, our model uses a simple attention encoder:

class AttentionEncoder (torch.nn.Module):
    def __init__ (self, model_dim,
        nhead=8,
        activation=torch.nn.ReLU,
        dropout=1e-1,
        layer_norm_eps=1e-5
    ):
        self.dropout = torch.nn.Dropout(dropout)
        self.activation = activation

        self.attention = torch.nn.MultiheadAttention(model_dim, nhead, dropout=dropout)
        self.normalize1 = torch.nn.LayerNorm(attention_dim, eps=layer_norm_eps)
        self.linear1 = torch.nn.Linear(model_dim, model_dim)
        self.normalize2 = torch.nn.LayerNorm(model_dim, eps=layer_norm_eps)

    def forward (self, key, query, value):

        attention = self.attention(key, query, value)
        attention = self.dropout(attention)

        output = self.normalize1(input + attention)
        output = self.normalize2(output + self.linear2(output))

        return self.activation(output)

This encoder is used in both the encoding and decoding step to generate our simple weather model:

class Weather (torch.nn.Module):
    def __init__ (self,
        dropout=1e-1,
    ):
        self.dropout = torch.nn.Dropout(dropout)

        self.linear1 = torch.nn.Linear(18, 512)
        self.encoder = modules.AttentionEncoder(512)
        self.linear2 = torch.nn.Linear(8, 512)
        self.decoder = modules.AttentionEncoder(512)
        self.linear3 = torch.nn.Linear(512, 10)

    def forward (self, location, history):

        # encode the historical data using self-attention
        encode = self.linear1(history)
        encode = self.dropout(encode)
        encode = self.encoder(encode, encode, encode)

        # decode the forecast using the historical weather data and the location/time we wish to predict.
        decode = self.linear2(location)
        decode = self.dropout(decode)
        decode = self.decoder(encode, encode, decode)

        return self.linear3(decode)

Training: Because the goal for the microservice is to be able serve as the trainer for all the models we may wish to train on any of the data we are ingesting, we override the torch.nn.Module.train function with all the training steps for our end models. The models semselves are responsible to initiating datasets, training and saving of trained state_dicts:

    def train (self, device,
        batch_size=args.batch_size,
        learning_rate=args.learning_rate,
        learning_rate_decay=args.learning_rate_decay,
        learning_rate_decay_step_size=args.learning_rate_decay_step_size,
        momentum=args.momentum,
        weight_decay=args.weight_decay
    ):
        self.to(device)
        self.training = True
        for module in self.children(): module.train(self.training)

        # set up dataset
        dataset = dataset.Weather()

        training_set, validation_set = torch.utils.data.random_split(dataset, [len(dataset) - len(dataset) / 9, len(dataset) / 9])
        training_loader, validation_loader = torch.utils.data.DataLoader(training_set, batch_size=batch_size, shuffle=True), torch.utils.data.DataLoader(validation_set, batch_size=batch_size)

        # prepare optimizer
        criterion = torch.nn.MSELoss().to(device)
        optimizer = torch.optim.SGD(model.parameters(), learning_rate, momentum=momentum, weight_decay=weight_decay)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=learning_rate_decay_step_size, gamma=learning_rate_decay)

        # remember losses
        losses = []
        best = 1.

        # run training
        for epoch in range(epochs):
            for batch, (location, history, target) in enumerate(training_loader):
                location = location.view(-1, 1, 8)
                target = target.view(-1, 1, 10)

                location = location.to(device=device, dtype=torch.bfloat16, non_blocking=True)
                history = history.to(device=device, dtype=torch.bfloat16, non_blocking=True)
                target = target.to(device=device, dtype=torch.bfloat16, non_blocking=True)

                output = model(location, history)
                loss = criterion(output, target)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            losses.append(0.)
            for batch, (location, history, target) in enumerate(validation_loader):
                with torch.no_grad():
                    location = location.to(device=device, dtype=torch.bfloat16, non_blocking=True)
                    history = history.to(device=device, dtype=torch.bfloat16, non_blocking=True)
                    target = target.to(device=device, dtype=torch.bfloat16, non_blocking=True)

                    output = model(location, history)
                    losses[-1] += criterion(output, target)

            print(f'epoch: {epoch} loss: {losses[-1]}')

            # save if best checkpoint
            if losses[-1] < best:
                repository.model.save({
                    "epoch": epoch,
                    "model_state_dict": model.model_state_dict(),
                    "loss": loss
                }, "model/weather.pt")
                best = losses[-1]

            scheduler.step()

Prediction: Finally we can deploy our models using the Vertex AI GCP interface and use our google credentials to to make requests to our trained models:

from google.cloud import aiplatform

def predict(project, location, inputs, endpoint):
    aiplatform.init(project=project, location=location)

    endpoint = aiplatform.Endpoint(endpoint)
    prediction = endpoint.predict(instances=inputs)

    return prediction


Share Tweet +1