Делаем проект – прогнозирование погоды в реальном времени

Введение

В этой статье я построю проект, используемый для двух целей:

  1. Получение и загрузка данных о погоде.
  2. Прогнозирование погоды на следующий день.

Продукт включает в себя конвейер данных для наполнения Google BigQuery и облачного хранилища. Он также имеет API, с помощью которого пользователи могут отправить GET-запрос и получить прогноз погоды на следующий день с помощью модели машинного обучения.

Этапы проекта:

  1. Разверните Apache Airflow.
  2. Создание класса WeatherPipeline.
  3. Создание конвейера данных.
  4. Создание API для получения прогнозов погоды.

Шаг #1 – Развертывание Apache Airflow

Чтобы полностью следовать этой статье, разверните airflow любым удобным для вас способом. Я использовал для этой задачи Dockerfile и Docker compose; вы можете ознакомиться с ними в моем репозитории GitHub.

Шаг № 2 – Создание класса WeatherPipeline

На этом шаге мы создадим класс WeatherPipeline. Этот класс может извлекать данные о погоде, сохранять их в облачном хранилище и BigQuery, обучать модель машинного обучения и делать прогнозы на основе новых данных. Рассмотрим методы этого класса:

1) extract_weather_data
Первый метод отправит GET-запрос на https://weatherapi-com.p.rapidapi.com. Они предоставляют бесплатный api для получения данных о погоде.

Этот метод создаст новую папку, получит данные о погоде в различных столицах мира и сохранит их в виде текстовых файлов.

import requests
import datetime
import json
import os
from dotenv import load_dotenv
from google.cloud.storage.client import Client as storage_client 
from google.cloud.bigquery.client import Client as bigquery_client 
from google.cloud.exceptions import Conflict
import pandas as pd
from xgboost import XGBRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split

## Global Variables and Configs ##
load_dotenv("/opt/airflow/.env")
WORK_DIR = os.getenv("WORK_DIR") 
DATA_DIR = os.getenv("DATA_DIR")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{WORK_DIR}/airflow/config/ServiceKey_GoogleCloud.json'
STORAGE_CLIENT = storage_client()
BIGQUERY_CLIENT = bigquery_client()
CURRENT_DATE = datetime.datetime.now().strftime("%Y-%m-%d").replace("-","_")

## Weather Class ##
class WeatherPipline:

    def __init__(self, locations = None) -> None:
        self.locations = locations

    def extract_weather_data(self):
        '''
        Description: 
            - Extracts weather data from weatherapi.com
            - Stores data as txt files at {WORK_DIR}/data/{current_date}
        Args:
            None
        Returns:
            None 
        '''
        # Create a folder for the data
        if not os.path.exists(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}"):
            os.mkdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")

        # Grab data for each location
        for location in self.locations:
            url = "https://weatherapi-com.p.rapidapi.com/current.json?q=" + location + "&lang=en"

            headers = {
                "X-RapidAPI-Key": "24cc538b51msh9dd38f0d1f4fd7ap150793jsn82c69f528d4e",
                "X-RapidAPI-Host": "weatherapi-com.p.rapidapi.com"
            }

            response = requests.get(url, headers=headers)

            print(response.json())

            # Generate text files for each location
            with open(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}/{location}.txt", "w") as write_file:
                write_file.write(json.dumps(response.json()))
                write_file.close()

Пример:

if __name__ == "__main__":
    locations = ["London", "Tokyo", "Sydney", "Paris", "Berlin", "Moscow", "Madrid", "Rome", "Cairo"]
    weather = WeatherPipline(locations)
    weather.extract_weather_data()

В результате будет создана папка с именем “2023_07_21”, содержащая текстовый файл для каждой из локализаций:

airflow@40d14d7f4b70:/opt/airflow/data/2023_07_21$ ls

"""
Berlin.txt  Cairo.txt  London.txt  Madrid.txt  Moscow.txt  Paris.txt  Rome.txt  Sydney.txt  Tokyo.txt
"""

2)getOrCreate_bucket & load_to_cloudStorage
Эти две функции используются для получения или создания облачного хранилища в Google Cloud и загрузки в него текстовых файлов.

Это будет служить озером данных, в котором будут храниться необработанные данные.

import requests
import datetime
import json
import os
from dotenv import load_dotenv
from google.cloud.storage.client import Client as storage_client 
from google.cloud.bigquery.client import Client as bigquery_client 
from google.cloud.exceptions import Conflict
import pandas as pd
from xgboost import XGBRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split

## Global Variables and Configs ##
load_dotenv("/opt/airflow/.env")
WORK_DIR = os.getenv("WORK_DIR") 
DATA_DIR = os.getenv("DATA_DIR")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{WORK_DIR}/airflow/config/ServiceKey_GoogleCloud.json'
STORAGE_CLIENT = storage_client()
BIGQUERY_CLIENT = bigquery_client()
CURRENT_DATE = datetime.datetime.now().strftime("%Y-%m-%d").replace("-","_")

## Weather Class ##
class WeatherPipline:

    def __init__(self, locations = None) -> None:
        self.locations = locations

    def extract_weather_data(self):
        '''
        Description: 
            - Extracts weather data from weatherapi.com
            - Stores data as txt files at {WORK_DIR}/data/{current_date}
        Args:
            None
        Returns:
            None 
        '''
        # Create a folder for the data
        if not os.path.exists(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}"):
            os.mkdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")

        # Grab data for each location
        for location in self.locations:
            url = "https://weatherapi-com.p.rapidapi.com/current.json?q=" + location + "&lang=en"

            headers = {
                "X-RapidAPI-Key": "24cc538b51msh9dd38f0d1f4fd7ap150793jsn82c69f528d4e",
                "X-RapidAPI-Host": "weatherapi-com.p.rapidapi.com"
            }

            response = requests.get(url, headers=headers)

            print(response.json())

            # Generate text files for each location
            with open(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}/{location}.txt", "w") as write_file:
                write_file.write(json.dumps(response.json()))
                write_file.close()

    def getOrCreate_bucket(self, bucket_name = f'weather_bucket_{CURRENT_DATE}'):
        '''
        Description:
            - Create bucket if doesnt exist
            - If exist, return it
        Args:
            bucket_name(str)
        Returns:
            bucket(Google Cloud Storage bucket)
        '''
        try:
            bucket = STORAGE_CLIENT.create_bucket(bucket_or_name=bucket_name)
        except Conflict:
            bucket = STORAGE_CLIENT.get_bucket(bucket_or_name=bucket_name)
        finally:
            return bucket

    def load_to_cloudStorage(self, bucket_name = f'weather_bucket_{CURRENT_DATE}', overwrite = False):
        '''
        Description:
            - getOrCreate a Cloud Storage bucket
            - Load today's text files
            - For cleanup, set overwrite = True
        Args:
            bucket_name(str)
            overwrite(bool)
        Returns:
            None
        '''

        # Delete bucket
        if overwrite:
            bucket = STORAGE_CLIENT.get_bucket(bucket_name)
            bucket.delete(force=True)

        # Load text files to bucket
        bucket = self.getOrCreate_bucket(bucket_name)
        os.chdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")
        for file in os.listdir():
            # load to tmp folder in bucket
            blob = bucket.blob(file + f"_{CURRENT_DATE}_{datetime.datetime.now().strftime('%H:%M:%S')}")
            blob.upload_from_filename(file)

Пример:

if __name__ == "__main__":
    locations = ["London", "Tokyo", "Sydney", "Paris", "Berlin", "Moscow", "Madrid", "Rome", "Cairo"]
    weather = WeatherPipline(locations)
    #weather.extract_weather_data()
    weather.load_to_cloudStorage()

В результате будет создан “weather_bucket_2023_07_21”, содержащий все исходные данные в текстовых файлах.

Делаем проект - прогнозирование погоды в реальном времени
Делаем проект - прогнозирование погоды в реальном времени

3)process_data & getOrCreate_dataset & getOrCreate_table & load_to_bigquery
Далее эти четыре метода будут использоваться для обработки текстовых файлов и формирования кадра данных, получения или создания таблицы в BigQuery и загрузки данных о погоде.

BigQuery будет служить хранилищем данных для этого информационного продукта.

import requests
import datetime
import json
import os
from dotenv import load_dotenv
from google.cloud.storage.client import Client as storage_client 
from google.cloud.bigquery.client import Client as bigquery_client 
from google.cloud.exceptions import Conflict
import pandas as pd
from xgboost import XGBRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split

## Global Variables and Configs ##
load_dotenv("/opt/airflow/.env")
WORK_DIR = os.getenv("WORK_DIR") 
DATA_DIR = os.getenv("DATA_DIR")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{WORK_DIR}/airflow/config/ServiceKey_GoogleCloud.json'
STORAGE_CLIENT = storage_client()
BIGQUERY_CLIENT = bigquery_client()
CURRENT_DATE = datetime.datetime.now().strftime("%Y-%m-%d").replace("-","_")

## Weather Class ##
class WeatherPipline:

    def __init__(self, locations = None) -> None:
        self.locations = locations

    def extract_weather_data(self):
        '''
        Description: 
            - Extracts weather data from weatherapi.com
            - Stores data as txt files at {WORK_DIR}/data/{current_date}
        Args:
            None
        Returns:
            None 
        '''
        # Create a folder for the data
        if not os.path.exists(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}"):
            os.mkdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")

        # Grab data for each location
        for location in self.locations:
            url = "https://weatherapi-com.p.rapidapi.com/current.json?q=" + location + "&lang=en"

            headers = {
                "X-RapidAPI-Key": "24cc538b51msh9dd38f0d1f4fd7ap150793jsn82c69f528d4e",
                "X-RapidAPI-Host": "weatherapi-com.p.rapidapi.com"
            }

            response = requests.get(url, headers=headers)

            print(response.json())

            # Generate text files for each location
            with open(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}/{location}.txt", "w") as write_file:
                write_file.write(json.dumps(response.json()))
                write_file.close()

    def getOrCreate_bucket(self, bucket_name = f'weather_bucket_{CURRENT_DATE}'):
        '''
        Description:
            - Create bucket if doesnt exist
            - If exist, return it
        Args:
            bucket_name(str)
        Returns:
            bucket(Google Cloud Storage bucket)
        '''
        try:
            bucket = STORAGE_CLIENT.create_bucket(bucket_or_name=bucket_name)
        except Conflict:
            bucket = STORAGE_CLIENT.get_bucket(bucket_or_name=bucket_name)
        finally:
            return bucket

    def load_to_cloudStorage(self, bucket_name = f'weather_bucket_{CURRENT_DATE}', overwrite = False):
        '''
        Description:
            - getOrCreate a Cloud Storage bucket
            - Load today's text files
            - For cleanup, set overwrite = True
        Args:
            bucket_name(str)
            overwrite(bool)
        Returns:
            None
        '''

        # Delete bucket
        if overwrite:
            bucket = STORAGE_CLIENT.get_bucket(bucket_name)
            bucket.delete(force=True)

        # Load text files to bucket
        bucket = self.getOrCreate_bucket(bucket_name)
        os.chdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")
        for file in os.listdir():
            # load to tmp folder in bucket
            blob = bucket.blob(file + f"_{CURRENT_DATE}_{datetime.datetime.now().strftime('%H:%M:%S')}")
            blob.upload_from_filename(file)

    def process_data(self):
        '''
        Process raw weather data into a Pandas DataFrame
        Args:
            None
        Returns
            - df(pandas.DataFrame)
        '''
        # Change directory to today's data
        os.chdir(f"{DATA_DIR}/{CURRENT_DATE}")
        files = os.listdir()
        df = pd.DataFrame()
        current_index = 0

        # Read each file and append to a DataFrame
        for file in files:
            with open(file, 'r') as read_file:
                data = json.loads(read_file.read())

                # Extract data
                location_data = data.get("location")  
                current_data = data.get("current")

                # Create DataFrames
                location_df = pd.DataFrame(location_data, index=[current_index])
                current_df = pd.DataFrame(current_data, index=[current_index])
                current_index += 1
                current_df['condition'] = current_data.get('condition').get('text')

                # Concatenate DataFrames and append to main DataFrame
                temp_df = pd.concat([location_df, current_df],axis=1)
                df = pd.concat([df, temp_df])

                read_file.close()

        # Return main DataFrame
        df = df.rename(columns={'name':'city'})
        df['localtime'] = pd.to_datetime(df['localtime'])
        return df

    def getOrCreate_dataset(self, dataset_name :str = "weather"):
        '''
        Get dataset. If the dataset does not exists, create it.
        Args:
            - dataset_name(str) = Name of the new/existing data set.
            - project_id(str) = project id(default = The project id of the bigquery_client object)
        Returns:
            - dataset(google.cloud.bigquery.dataset.Dataset) = Google BigQuery Dataset
        '''
        print('Fetching Dataset...')
        try:
            # get and return dataset if exist
            dataset = BIGQUERY_CLIENT.get_dataset(dataset_name)
            print('Done')
            print(dataset.self_link)
            return dataset

        except Exception as e:
            # If not, create and return dataset
            if e.code == 404:
                print('Dataset does not exist. Creating a new one.')
                BIGQUERY_CLIENT.create_dataset(dataset_name)
                dataset = BIGQUERY_CLIENT.get_dataset(dataset_name)
                print('Done')
                print(dataset.self_link)
                return dataset
            else:
                print(e)


    def getOrCreate_table(self, dataset_name:str = "weather", table_name:str = "weather"):
        '''
        Create a table. If the table already exists, return it.
        Args:
            - table_name(str) = Name of the new/existing table.
            - dataset_name(str) = Name of the new/existing data set.
            - project_id(str) = project id(default = The project id of the bigquery_client object)
        Returns:
            - table(google.cloud.bigquery.table.Table) = Google BigQuery table
        '''
        # Grab prerequisites for creating a table
        dataset = self.getOrCreate_dataset()
        project = dataset.project
        dataset = dataset.dataset_id
        table_id = project + '.' + dataset + '.' + table_name

        print('\nFetching Table...')

        try:
            # Get table if exists
            table = BIGQUERY_CLIENT.get_table(table_id)
            print('Done')
            print(table.self_link)
        except Exception as e:
            # If not, create and get table
            if e.code == 404:
                print('Table does not exist. Creating a new one.')
                BIGQUERY_CLIENT.create_table(table_id)
                table = BIGQUERY_CLIENT.get_table(table_id)
                print(table.self_link)
            else:
                print(e)
        finally:
            return table

    def load_to_bigquery(self, dataframe, dataset_name, table_name):
        '''
        Description:
            - Get or Create a dataset
            - Get or Create a table
            - Load data from a DataFrame to BigQuery
        Args:
            dataset_name(str)
            table_name(str)
        Returns:
            None
        '''
        table = self.getOrCreate_table(dataset_name=dataset_name, table_name=table_name)
        BIGQUERY_CLIENT.load_table_from_dataframe(dataframe=dataframe, destination=table)

Пример:

if __name__ == "__main__":
    locations = ["London", "Tokyo", "Sydney", "Paris", "Berlin", "Moscow", "Madrid", "Rome", "Cairo"]
    weather = WeatherPipline(locations)
    #weather.extract_weather_data()
    #weather.load_to_cloudStorage()
    df = weather.process_data()
    weather.load_to_bigquery(dataframe = df, dataset_name='weather', table_name='weather')

В результате в BigQuery загружается текущая погода:

Делаем проект - прогнозирование погоды в реальном времени

4) train_model & predict_next_day_weather
Последние два метода используются для получения всех данных из BigQuery и обучения модели XGB Regressor. После обучения модели мы протестируем ее на новых данных, чтобы предсказать температуру для всех указанных мест ровно через 24 часа.

import requests
import datetime
import json
import os
from dotenv import load_dotenv
from google.cloud.storage.client import Client as storage_client 
from google.cloud.bigquery.client import Client as bigquery_client 
from google.cloud.exceptions import Conflict
import pandas as pd
from xgboost import XGBRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split

## Global Variables and Configs ##
load_dotenv("/opt/airflow/.env")
WORK_DIR = os.getenv("WORK_DIR") 
DATA_DIR = os.getenv("DATA_DIR")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{WORK_DIR}/airflow/config/ServiceKey_GoogleCloud.json'
STORAGE_CLIENT = storage_client()
BIGQUERY_CLIENT = bigquery_client()
CURRENT_DATE = datetime.datetime.now().strftime("%Y-%m-%d").replace("-","_")

## Weather Class ##
class WeatherPipline:

    def __init__(self, locations = None) -> None:
        self.locations = locations

    def extract_weather_data(self):
        '''
        Description: 
            - Extracts weather data from weatherapi.com
            - Stores data as txt files at {WORK_DIR}/data/{current_date}
        Args:
            None
        Returns:
            None 
        '''
        # Create a folder for the data
        if not os.path.exists(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}"):
            os.mkdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")

        # Grab data for each location
        for location in self.locations:
            url = "https://weatherapi-com.p.rapidapi.com/current.json?q=" + location + "&lang=en"

            headers = {
                "X-RapidAPI-Key": "24cc538b51msh9dd38f0d1f4fd7ap150793jsn82c69f528d4e",
                "X-RapidAPI-Host": "weatherapi-com.p.rapidapi.com"
            }

            response = requests.get(url, headers=headers)

            print(response.json())

            # Generate text files for each location
            with open(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}/{location}.txt", "w") as write_file:
                write_file.write(json.dumps(response.json()))
                write_file.close()

    def getOrCreate_bucket(self, bucket_name = f'weather_bucket_{CURRENT_DATE}'):
        '''
        Description:
            - Create bucket if doesnt exist
            - If exist, return it
        Args:
            bucket_name(str)
        Returns:
            bucket(Google Cloud Storage bucket)
        '''
        try:
            bucket = STORAGE_CLIENT.create_bucket(bucket_or_name=bucket_name)
        except Conflict:
            bucket = STORAGE_CLIENT.get_bucket(bucket_or_name=bucket_name)
        finally:
            return bucket

    def load_to_cloudStorage(self, bucket_name = f'weather_bucket_{CURRENT_DATE}', overwrite = False):
        '''
        Description:
            - getOrCreate a Cloud Storage bucket
            - Load today's text files
            - For cleanup, set overwrite = True
        Args:
            bucket_name(str)
            overwrite(bool)
        Returns:
            None
        '''

        # Delete bucket
        if overwrite:
            bucket = STORAGE_CLIENT.get_bucket(bucket_name)
            bucket.delete(force=True)

        # Load text files to bucket
        bucket = self.getOrCreate_bucket(bucket_name)
        os.chdir(f"{WORK_DIR}/airflow/data/{CURRENT_DATE}")
        for file in os.listdir():
            # load to tmp folder in bucket
            blob = bucket.blob(file + f"_{CURRENT_DATE}_{datetime.datetime.now().strftime('%H:%M:%S')}")
            blob.upload_from_filename(file)

    def process_data(self):
        '''
        Process raw weather data into a Pandas DataFrame
        Args:
            None
        Returns
            - df(pandas.DataFrame)
        '''
        # Change directory to today's data
        os.chdir(f"{DATA_DIR}/{CURRENT_DATE}")
        files = os.listdir()
        df = pd.DataFrame()
        current_index = 0

        # Read each file and append to a DataFrame
        for file in files:
            with open(file, 'r') as read_file:
                data = json.loads(read_file.read())

                # Extract data
                location_data = data.get("location")  
                current_data = data.get("current")

                # Create DataFrames
                location_df = pd.DataFrame(location_data, index=[current_index])
                current_df = pd.DataFrame(current_data, index=[current_index])
                current_index += 1
                current_df['condition'] = current_data.get('condition').get('text')

                # Concatenate DataFrames and append to main DataFrame
                temp_df = pd.concat([location_df, current_df],axis=1)
                df = pd.concat([df, temp_df])

                read_file.close()

        # Return main DataFrame
        df = df.rename(columns={'name':'city'})
        df['localtime'] = pd.to_datetime(df['localtime'])
        return df

    def getOrCreate_dataset(self, dataset_name :str = "weather"):
        '''
        Get dataset. If the dataset does not exists, create it.
        Args:
            - dataset_name(str) = Name of the new/existing data set.
            - project_id(str) = project id(default = The project id of the bigquery_client object)
        Returns:
            - dataset(google.cloud.bigquery.dataset.Dataset) = Google BigQuery Dataset
        '''
        print('Fetching Dataset...')
        try:
            # get and return dataset if exist
            dataset = BIGQUERY_CLIENT.get_dataset(dataset_name)
            print('Done')
            print(dataset.self_link)
            return dataset

        except Exception as e:
            # If not, create and return dataset
            if e.code == 404:
                print('Dataset does not exist. Creating a new one.')
                BIGQUERY_CLIENT.create_dataset(dataset_name)
                dataset = BIGQUERY_CLIENT.get_dataset(dataset_name)
                print('Done')
                print(dataset.self_link)
                return dataset
            else:
                print(e)


    def getOrCreate_table(self, dataset_name:str = "weather", table_name:str = "weather"):
        '''
        Create a table. If the table already exists, return it.
        Args:
            - table_name(str) = Name of the new/existing table.
            - dataset_name(str) = Name of the new/existing data set.
            - project_id(str) = project id(default = The project id of the bigquery_client object)
        Returns:
            - table(google.cloud.bigquery.table.Table) = Google BigQuery table
        '''
        # Grab prerequisites for creating a table
        dataset = self.getOrCreate_dataset()
        project = dataset.project
        dataset = dataset.dataset_id
        table_id = project + '.' + dataset + '.' + table_name

        print('\nFetching Table...')

        try:
            # Get table if exists
            table = BIGQUERY_CLIENT.get_table(table_id)
            print('Done')
            print(table.self_link)
        except Exception as e:
            # If not, create and get table
            if e.code == 404:
                print('Table does not exist. Creating a new one.')
                BIGQUERY_CLIENT.create_table(table_id)
                table = BIGQUERY_CLIENT.get_table(table_id)
                print(table.self_link)
            else:
                print(e)
        finally:
            return table

    def load_to_bigquery(self, dataframe, dataset_name, table_name):
        '''
        Description:
            - Get or Create a dataset
            - Get or Create a table
            - Load data from a DataFrame to BigQuery
        Args:
            dataset_name(str)
            table_name(str)
        Returns:
            None
        '''
        table = self.getOrCreate_table(dataset_name=dataset_name, table_name=table_name)
        BIGQUERY_CLIENT.load_table_from_dataframe(dataframe=dataframe, destination=table)

    def train_model(self, dataset_name, table_name):
        '''
        Description:
            - Get or Create a dataset
            - Get or Create a table
            - Load data from BigQuery to a DataFrame
            - Train a model
        Args:
            dataset_name(str)
            table_name(str)
        Returns:
            model(XGBRegressor)
        '''
        
        # Grab all the data from bigquery
        table = self.getOrCreate_table(dataset_name=dataset_name, table_name=table_name)
        query = f"select * from {table}"
        df = BIGQUERY_CLIENT.query(query).to_dataframe()

        # Pre processing
        df = df.drop(columns = ['region', 'country', 'tz_id', 'localtime','last_updated_epoch', 'last_updated', 'wind_dir', 'condition'])
        city_map = {
                'London':0,
                'Moscow':1,
                'Berlin':2,
                'Paris':3,
                'Rome':4,
                'Madrid':5,
                'Cairo':6,
                'Tokyo':7,
                'Sydney':8}
        df['city'] = df['city'].map(city_map)

        # divide to train test data
        x = df.drop(columns = ['temp_c'])
        y = df['temp_c']
        x_train, x_test, y_train, y_test = train_test_split(x, y, train_size=0.9, random_state=365)

        # predict next day weather
        # Train an XGB model
        model = XGBRegressor()
        model.fit(x_train, y_train)

        # Predict test data and generate accuracy score
        predictions = model.predict(x_test)
        model_score = model.score(x_test, y_test)

        # Generate and print a predictions dataframe
        cities = []
        for city_number in x_test.city.to_list():
            for city, num in city_map.items():
                if city_number == num:
                    cities.append(city)
        predictions_df = pd.DataFrame([*zip(cities, y_test, predictions, abs(y_test-predictions), [model_score]*len(cities))], columns=['city', 'actual_temp(Celcius)', 'predicted_temp(Celcius)', 'diff(Celcius)','score'])
        print(f"Test Data Predictions:\n {predictions_df}")
        
        # Return trained model
        return model
    
    def predict_next_day_weather(self, model, dataset_name, table_name):
        '''
        Description:
            - Use an already-trained model to predict new data
        Args:
            model(XGBRegressor)
            dataset_name(str)
            table_name(str)
        Returns:
            predictions_df(pandas.DataFrame)
        '''
        # Set variables
        cities = ['London', 'Moscow' ,'Berlin' ,'Paris' ,'Rome' ,'Madrid', 'Cairo' ,'Tokyo', 'Sydney']
        next_day = datetime.datetime.now() + datetime.timedelta(days=1)
        next_day = next_day.strftime("%Y-%m-%d")
        table = self.getOrCreate_table(dataset_name=dataset_name, table_name=table_name)

        # Query BigQuery for the latest weather data
        query = f"""WITH RankedWeather AS (
                    SELECT
                        *,
                        ROW_NUMBER() OVER (PARTITION BY city ORDER BY localtime DESC) AS rn
                    FROM
                        {table}
                    )
                    SELECT
                    *
                    FROM
                    RankedWeather
                    WHERE
                    rn = 1;"""
        new_data = BIGQUERY_CLIENT.query(query).to_dataframe()

        # Pre processing
        new_data = new_data.drop(columns = ['temp_c','rn', 'region', 'country', 'tz_id', 'localtime','last_updated_epoch', 'last_updated', 'wind_dir', 'condition'])
        city_map = {
                'London':0,
                'Moscow':1,
                'Berlin':2,
                'Paris':3,
                'Rome':4,
                'Madrid':5,
                'Cairo':6,
                'Tokyo':7,
                'Sydney':8}
        new_data['city'] = new_data['city'].map(city_map)

        # Add 24 hours to date related columns
        new_data['localtime_epoch'] = new_data['localtime_epoch'] + 86400

        # Predict next day weather
        predictions = model.predict(new_data)

        # Generate and print a predictions dataframe
        predictions_df = pd.DataFrame([*zip(cities, predictions)], columns=['city', 'predicted_temp(Celcius)'])
        predictions_df['at_date(UTC+0)'] = new_data['localtime_epoch']

        # translate epoch to datetime
        predictions_df['at_date(UTC+0)'] = pd.to_datetime(predictions_df['at_date(UTC+0)'], unit='s')
        print(f"Next Day Predictions:\n {predictions_df}")

        return predictions_df

Пример:

if __name__ == "__main__":
    locations = ["London", "Tokyo", "Sydney", "Paris", "Berlin", "Moscow", "Madrid", "Rome", "Cairo"]
    weather = WeatherPipline(locations)
    #weather.extract_weather_data()
    #weather.load_to_cloudStorage()
    #df = weather.process_data()
    #weather.load_to_bigquery(dataframe = df, dataset_name='weather', table_name='weather')
    model = weather.train_model(dataset_name='weather', table_name='weather')
    predictions_df = weather.predict_next_day_weather(model, dataset_name='weather', table_name='weather')

В результате получается кадр данных прогноза, содержащий прогнозируемую температуру (в градусах Цельсия) на завтра ровно через 24 часа.

print(predictions_df)
Next Day Predictions:
      city  predicted_temp(Celcius)      at_date(UTC+0)
0  London                20.000006 2023-07-22 11:42:59
1  Moscow                22.000299 2023-07-22 11:42:59
2  Berlin                18.999817 2023-07-22 11:42:58
3   Paris                26.000587 2023-07-22 11:42:58
4    Rome                27.997335 2023-07-22 11:43:00
5  Madrid                34.999870 2023-07-22 11:43:00
6   Cairo                22.000074 2023-07-22 11:42:59
7   Tokyo                35.000057 2023-07-22 11:43:00
8  Sydney                13.000155 2023-07-22 11:42:58

Шаг №3 – Создание конвейера данных

После завершения работы с классом WeatherPipeline я создам конвейер данных с помощью airflow для извлечения и загрузки погодных данных в облачное хранилище и BigQuery с использованием методов из этого класса.

Этот конвейер будет запускаться каждый час и наполнять облачное хранилище и BigQuery новой порцией погодных данных.

from airflow import DAG
from airflow.decorators import task
from dotenv import load_dotenv
from datetime import datetime, timedelta
import os
import sys
load_dotenv()
WORK_DIR = os.environ["WORK_DIR"]
sys.path.append(f"{WORK_DIR}/airflow")
from helper.weather import WeatherPipline

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{WORK_DIR}/config/ServiceKey_GoogleCloud.json'

locations = ["London", "Tokyo", "Sydney", "Paris", "Berlin", "Moscow", "Madrid", "Rome", "Cairo"]
weather = WeatherPipline(locations)

default_args = {
    'owner': 'airflow',
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id = 'load_weather_data',
    default_args=default_args, 
    start_date = datetime(2023,7,20), 
    schedule_interval='@hourly', 
    catchup=False
) as dag:

    # Task #1 - Extract data
    @task
    def extract_weather_data():
        weather.extract_weather_data()

    # Task #2 - load_to_cloudStorage
    @task
    def load_to_cloudStorage():
        weather.load_to_cloudStorage()

    # Task #3 - load_to_bigquery
    @task
    def load_to_bigquery(dataset_name, table_name):
        df = weather.process_data()
        weather.load_to_bigquery(df, dataset_name, table_name)


    # Dependencies
    extract_weather_data() >> load_to_cloudStorage() >> load_to_bigquery("weather", "weather")

В результате получается следующий конвейер данных:

Делаем проект - прогнозирование погоды в реальном времени

Шаг №4 – Создание API для получения прогнозов погоды

Наконец, я создам API, который позволит пользователям взаимодействовать с созданным нами сервисом и запрашивать прогнозы погоды на следующий день, отправляя GET-запрос. GET-запрос активизирует методы train_model и predict_next_day и предоставит пользователю запрашиваемые данные.

from flask import Flask
import requests
from bs4 import BeautifulSoup
import json
import os
import sys
from dotenv import load_dotenv

load_dotenv()
WORK_DIR = os.getenv("WORK_DIR")
sys.path.append(f"{WORK_DIR}/airflow")
from helper.weather import WeatherPipline
import pandas as pd

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{WORK_DIR}/airflow/config/ServiceKey_GoogleCloud.json'

# Creating Flask app
app = Flask(__name__)

# Creating WeatherPipline object
locations = ["London", "Tokyo", "Sydney", "Paris", "Berlin", "Moscow", "Madrid", "Rome", "Cairo"]
weather = WeatherPipline(locations)

@app.route("/city=<city>")
def get_data(city):

    # Train model and predict next day weather
    model = weather.train_model('weather', 'weather')
    predictions_df = weather.predict_next_day_weather(model, 'weather', 'weather')
    predictions_df['at_date(UTC+0)'] = predictions_df['at_date(UTC+0)'].astype(str)

    # Return all predictions or predictions for a specific city
    if city == 'All':
        return predictions_df.to_json(orient='records')
    else:
        return predictions_df[predictions_df['city'] == city].to_json(orient='records')

if __name__ == "__main__":
    app.run(debug=True)

Пример:

Сначала необходимо запустить приложение:

(streaming_real_time_weather_data) root@DESKTOP-3U7IV4I:/projects/streaming_real_time_weather_data# cd app
(streaming_real_time_weather_data) root@DESKTOP-3U7IV4I:/projects/streaming_real_time_weather_data/app# flask run

Далее мы можем отправить GET-запрос на URL приложения и запросить прогнозы модели. Например, запросим прогнозы температуры в Лондоне и Берлине через 24 часа:

Делаем проект - прогнозирование погоды в реальном времени

Или мы можем запросить прогнозы по всем доступным городам:

Делаем проект - прогнозирование погоды в реальном времени

Таким образом, теперь пользователи могут просто отправить GET-запрос к нашему сервису, обработать результаты и использовать их по своему усмотрению.

Кроме того, модель будет становиться все более точной по мере поступления новых данных, что будет происходить автоматически благодаря конвейеру сбора данных, который получает все больше и больше данных.

На этом мы закончили статью; если у вас есть вопросы, задавайте их в разделе комментариев.

+1
0
+1
1
+1
0
+1
0
+1
0

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *