This code shows how to generate a map of wildfire hotspots in the United States. The map uses data from the NASA Fire Information for Resource Management System (FIRMS) API and updates every minute.
To get a free API key ("MAP_KEY"), sign up here: https://firms.modaps.eosdis.nasa.gov/api/map_key
MAP_KEY limit is 5000 transactions / 10-minute interval. Put the key in the .env
file.
import os
import pandas as pd
import geopandas as gpd
import atexit
from dotenv import load_dotenv
from ipyleaflet import Map, GeoData, CircleMarker, MarkerCluster
from apscheduler.schedulers.background import BackgroundScheduler
from IPython.display import display, clear_output
# Load environment variables from .env file
load_dotenv()
# Get the API key from the environment variable
API_KEY = os.getenv('FIRMS_API_KEY')
# Define the URL for the FIRMS API
FIRMS_URL = f"https://firms.modaps.eosdis.nasa.gov/api/country/csv/{API_KEY}/VIIRS_NOAA21_NRT/USA/2"
Intel MKL WARNING: Support of Intel(R) Streaming SIMD Extensions 4.2 (Intel(R) SSE4.2) enabled only processors has been deprecated. Intel oneAPI Math Kernel Library 2025.0 will require Intel(R) Advanced Vector Extensions (Intel(R) AVX) instructions. Intel MKL WARNING: Support of Intel(R) Streaming SIMD Extensions 4.2 (Intel(R) SSE4.2) enabled only processors has been deprecated. Intel oneAPI Math Kernel Library 2025.0 will require Intel(R) Advanced Vector Extensions (Intel(R) AVX) instructions.
def get_color(frp):
"""
Returns a color based on the Fire Radiative Power (FRP) value
Low FRP: Green/Yellow
Medium FRP: Orange
High FRP: Red
Very High FRP: Dark Red
"""
if frp < 1.64:
return '#ffff00' # Yellow
elif frp < 3.77:
return '#ffa500' # Orange
elif frp < 11.77:
return '#ff4500' # Red-Orange
else:
return '#ff0000' # Red
def fetch_and_process_fire_data():
"""Fetch and process fire data from the FIRMS API and return a GeoDataFrame with the data and color information."""
try:
df = pd.read_csv(FIRMS_URL)
df_subset = df[['latitude', 'longitude', 'bright_ti4', 'acq_date', 'acq_time', 'confidence', 'frp']]
# Filter out low confidence data
filtered_df = df_subset[df_subset['confidence'] != 'l'].copy()
# Filter out rows with NaN values in critical columns
filtered_df = filtered_df.dropna(subset=['latitude', 'longitude', 'frp'])
# Add color column based on FRP
filtered_df.loc[:, 'color'] = filtered_df['frp'].apply(get_color)
# Create GeoDataFrame with style information
geodf = gpd.GeoDataFrame(
filtered_df,
geometry=gpd.points_from_xy(filtered_df.longitude, filtered_df.latitude)
)
return geodf
except Exception as e:
print(f"Error fetching data: {e}")
return None
# Create a map centered around the USA
wildfire_map = Map(center=(37.0902, -95.7129), zoom=4)
def scheduled_task():
"""Defined a scheduled task to update the layers of the map"""
# Clear existing layers (retain only the base layer)
wildfire_map.layers = wildfire_map.layers[:1] # Keep only the base layer
# Fetch and process fire data
gdf = fetch_and_process_fire_data()
if gdf is not None:
markers = []
# Iterate over each point in the GeoDataFrame
for idx, row in gdf.iterrows():
# Create a CircleMarker with the specified color
marker = CircleMarker(
location=(row['latitude'], row['longitude']),
radius=8,
color=row['color'],
fill_color=row['color'],
fill_opacity=0.7,
stroke=True,
weight=1
)
# Add the marker to the cluster
markers.append(marker)
cluster = MarkerCluster(markers=markers)
wildfire_map.add_layer(cluster)
# Set up a background scheduler to update the map every minute
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_task, 'interval', minutes=1) # Fetch every minute
scheduler.start()
# Display the initial map
scheduled_task()
Run time of job "scheduled_task (trigger: interval[0:01:00], next run at: 2025-01-20 09:26:18 PST)" was missed by 0:00:21.741856 Run time of job "scheduled_task (trigger: interval[0:01:00], next run at: 2025-01-20 09:28:18 PST)" was missed by 0:00:15.745688 Run time of job "scheduled_task (trigger: interval[0:01:00], next run at: 2025-01-20 09:29:18 PST)" was missed by 0:00:51.825884 Run time of job "scheduled_task (trigger: interval[0:01:00], next run at: 2025-01-20 09:36:18 PST)" was missed by 0:00:36.374801 Run time of job "scheduled_task (trigger: interval[0:01:00], next run at: 2025-01-20 09:45:18 PST)" was missed by 0:00:08.867419
# To prevent orphaned background processes when the notebook is closed, ensure that the scheduler shuts down properly.
atexit.register(lambda: scheduler.shutdown())
# Display the map (it will be updated every minute)
display(wildfire_map)
Map(center=[37.0902, -95.7129], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', 'z…