"""Statistical analysis of earthquake catalogs."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
import numpy as np
from seismoalert.models import EarthquakeCatalog
[docs]
@dataclass
class GutenbergRichterResult:
"""Result of a Gutenberg-Richter law fit.
The G-R law states: log10(N) = a - b * M
where N is the number of events >= magnitude M.
Attributes:
a_value: Productivity parameter (log10 of total event rate).
b_value: Slope of the frequency-magnitude distribution.
mc: Magnitude of completeness used for the fit.
"""
a_value: float
b_value: float
mc: float
[docs]
@dataclass
class AnomalyPeriod:
"""Represents a detected anomalous seismicity period.
Attributes:
start_index: Start index in the time-sorted catalog.
end_index: End index in the time-sorted catalog.
event_count: Number of events in the window.
expected_count: Expected number of events based on the mean rate.
sigma_deviation: Number of standard deviations above the mean.
"""
start_index: int
end_index: int
event_count: int
expected_count: float
sigma_deviation: float
[docs]
def magnitude_of_completeness(catalog: EarthquakeCatalog) -> float:
"""Estimate the magnitude of completeness (Mc) using the max-curvature method.
The magnitude of completeness is the magnitude at which the frequency-magnitude
distribution reaches its maximum, i.e., the most frequently occurring magnitude bin.
Args:
catalog: Earthquake catalog to analyze.
Returns:
Estimated magnitude of completeness.
Raises:
ValueError: If catalog is empty.
"""
if len(catalog) == 0:
raise ValueError("Cannot compute Mc for an empty catalog")
mags = np.array(catalog.magnitudes)
# Round to nearest 0.1 for binning
bin_min = np.floor(mags.min() * 10) / 10
bin_max = np.ceil(mags.max() * 10) / 10 + 0.1
bins = np.arange(bin_min, bin_max, 0.1)
counts, bin_edges = np.histogram(mags, bins=bins)
max_idx = np.argmax(counts)
mc = (bin_edges[max_idx] + bin_edges[max_idx + 1]) / 2
return round(float(mc), 1)
[docs]
def gutenberg_richter(
catalog: EarthquakeCatalog, mc: float | None = None
) -> GutenbergRichterResult:
"""Fit the Gutenberg-Richter frequency-magnitude relation.
Uses the maximum likelihood estimate for b-value:
b = log10(e) / (M_mean - Mc)
Args:
catalog: Earthquake catalog to analyze.
mc: Magnitude of completeness. If None, estimated automatically.
Returns:
GutenbergRichterResult with a-value, b-value, and mc.
Raises:
ValueError: If catalog has insufficient data above Mc.
"""
if len(catalog) == 0:
raise ValueError("Cannot fit G-R law to an empty catalog")
if mc is None:
mc = magnitude_of_completeness(catalog)
filtered = catalog.filter_by_magnitude(min_mag=mc)
if len(filtered) < 2:
raise ValueError(f"Insufficient events (n={len(filtered)}) above Mc={mc}")
mags = np.array(filtered.magnitudes)
mean_mag = np.mean(mags)
# Aki (1965) maximum likelihood b-value estimator
# bin width correction: delta_m = 0.1
delta_m = 0.1
b_value = np.log10(np.e) / (mean_mag - (mc - delta_m / 2))
# a-value: log10(N) = a - b * Mc
a_value = np.log10(len(filtered)) + b_value * mc
return GutenbergRichterResult(
a_value=round(float(a_value), 3),
b_value=round(float(b_value), 3),
mc=mc,
)
[docs]
def interevent_times(catalog: EarthquakeCatalog) -> np.ndarray:
"""Compute inter-event time intervals in seconds.
Args:
catalog: Earthquake catalog (will be sorted by time).
Returns:
Array of time differences in seconds between consecutive events.
Raises:
ValueError: If catalog has fewer than 2 events.
"""
if len(catalog) < 2:
raise ValueError("Need at least 2 events to compute inter-event times")
sorted_cat = catalog.sort_by_time()
times = [eq.time for eq in sorted_cat]
deltas = [(times[i + 1] - times[i]).total_seconds() for i in range(len(times) - 1)]
return np.array(deltas)
[docs]
def detect_anomalies(
catalog: EarthquakeCatalog,
window_days: int = 7,
threshold_sigma: float = 2.0,
) -> list[AnomalyPeriod]:
"""Detect anomalous seismicity rate periods using a sliding window.
Compares event counts in each window to the overall mean rate.
Windows with counts exceeding mean + threshold_sigma * std are flagged.
Args:
catalog: Earthquake catalog to analyze.
window_days: Sliding window size in days.
threshold_sigma: Number of standard deviations for anomaly threshold.
Returns:
List of AnomalyPeriod objects for each detected anomaly.
"""
if len(catalog) < 2:
return []
sorted_cat = catalog.sort_by_time()
events = sorted_cat.earthquakes
window = timedelta(days=window_days)
# Count events in each sliding window
window_counts = []
window_indices = []
for i, eq in enumerate(events):
window_end = eq.time + window
count = sum(1 for e in events[i:] if e.time <= window_end)
window_counts.append(count)
# Find end index
end_idx = i
for j in range(i, len(events)):
if events[j].time <= window_end:
end_idx = j
else:
break
window_indices.append((i, end_idx))
counts_arr = np.array(window_counts, dtype=float)
mean_count = np.mean(counts_arr)
std_count = np.std(counts_arr)
if std_count == 0:
return []
anomalies = []
for i, count in enumerate(window_counts):
sigma_dev = (count - mean_count) / std_count
if sigma_dev >= threshold_sigma:
start_idx, end_idx = window_indices[i]
anomalies.append(
AnomalyPeriod(
start_index=start_idx,
end_index=end_idx,
event_count=count,
expected_count=round(mean_count, 1),
sigma_deviation=round(float(sigma_dev), 2),
)
)
return anomalies
[docs]
def clustering_coefficient(
catalog: EarthquakeCatalog,
radius_km: float = 50.0,
time_window_hours: float = 72.0,
) -> float:
"""Compute a spatio-temporal clustering coefficient.
Measures the fraction of event pairs that fall within a given
spatial radius and time window.
Args:
catalog: Earthquake catalog to analyze.
radius_km: Spatial clustering radius in kilometers.
time_window_hours: Temporal clustering window in hours.
Returns:
Clustering coefficient between 0.0 and 1.0.
"""
if len(catalog) < 2:
return 0.0
events = catalog.earthquakes
n = len(events)
total_pairs = n * (n - 1) / 2
clustered_pairs = 0
time_window = timedelta(hours=time_window_hours)
for i in range(n):
for j in range(i + 1, n):
# Temporal check
time_diff = abs(events[i].time - events[j].time)
if time_diff > time_window:
continue
# Spatial check (approximate distance using Haversine)
dist = _haversine_km(
events[i].latitude,
events[i].longitude,
events[j].latitude,
events[j].longitude,
)
if dist <= radius_km:
clustered_pairs += 1
return clustered_pairs / total_pairs
def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Compute the Haversine distance between two points in kilometers."""
r = 6371.0 # Earth radius in km
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(
np.radians(lat2)
) * np.sin(dlon / 2) ** 2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
return float(r * c)