Add MySQL2 library as dependency for the hazard history, rewrite to use mysql for data
This commit is contained in:
parent
5998d2583a
commit
bbaa2cb1a4
7 changed files with 336 additions and 238 deletions
|
|
@ -1,141 +1,24 @@
|
|||
/**
|
||||
* Hazard History persistence module
|
||||
* Tracks the last 7 hazard alerts encountered by this server instance
|
||||
*/
|
||||
import { getPool } from './mysql.mjs';
|
||||
|
||||
import { readFile, writeFile, mkdir } from 'fs/promises';
|
||||
import path from 'path';
|
||||
|
||||
const HISTORY_FILE = path.resolve('./data/hazard-history.json');
|
||||
const MAX_HISTORY_ENTRIES = 7;
|
||||
|
||||
/**
|
||||
* Ensure the cache directory exists
|
||||
*/
|
||||
const ensureCacheDir = async () => {
|
||||
const cacheDir = path.dirname(HISTORY_FILE);
|
||||
try {
|
||||
await mkdir(cacheDir, { recursive: true });
|
||||
} catch (error) {
|
||||
// Directory may already exist
|
||||
}
|
||||
const toIsoString = (value) => {
|
||||
if (!value) return null;
|
||||
const date = value instanceof Date ? value : new Date(value);
|
||||
return Number.isNaN(date.getTime()) ? null : date.toISOString();
|
||||
};
|
||||
|
||||
/**
|
||||
* Load hazard history from disk
|
||||
* @returns {Array} Array of hazard history entries
|
||||
*/
|
||||
const loadHistory = async () => {
|
||||
try {
|
||||
await ensureCacheDir();
|
||||
const data = await readFile(HISTORY_FILE, 'utf8');
|
||||
const parsed = JSON.parse(data);
|
||||
return Array.isArray(parsed) ? parsed : [];
|
||||
} catch (error) {
|
||||
// File doesn't exist or is corrupted, return empty array
|
||||
return [];
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Save hazard history to disk
|
||||
* @param {Array} history - Array of hazard history entries
|
||||
*/
|
||||
const saveHistory = async (history) => {
|
||||
try {
|
||||
await ensureCacheDir();
|
||||
await writeFile(HISTORY_FILE, JSON.stringify(history, null, '\t'));
|
||||
} catch (error) {
|
||||
console.error('Failed to save hazard history:', error.message);
|
||||
}
|
||||
};
|
||||
|
||||
const isCoordinateLocationKey = (value) => /^-?\d+(?:\.\d+)?,-?\d+(?:\.\d+)?$/.test(value ?? '');
|
||||
|
||||
/**
|
||||
* Generate a stable identity for a hazard entry.
|
||||
* This intentionally ignores upstream alert ids so alert revisions
|
||||
* continue updating the same logical history row.
|
||||
* @param {string} locationKey - Stable location key
|
||||
* @param {string} hazardType - Hazard/event name
|
||||
* @param {string} source - Hazard source
|
||||
* @returns {string} Stable identity key
|
||||
*/
|
||||
const generateKey = (locationKey, hazardType, source) => `${locationKey}::${hazardType}::${source}`;
|
||||
|
||||
const normalizeTimestamp = (value, fallback) => {
|
||||
const date = new Date(value);
|
||||
return Number.isNaN(date.getTime()) ? fallback : date.toISOString();
|
||||
};
|
||||
|
||||
const isSameLogicalHazard = (left, right) => left.location === right.location
|
||||
&& left.hazardType === right.hazardType
|
||||
&& left.source === right.source;
|
||||
|
||||
const isSameRequestedLocation = (entry, location, locationKey) => {
|
||||
if (locationKey && entry.locationKey === locationKey) return true;
|
||||
return entry.location === location;
|
||||
};
|
||||
|
||||
const upgradeEntryLocationKey = (entry, location, locationKey) => {
|
||||
if (!locationKey || entry.locationKey === locationKey) return entry;
|
||||
return {
|
||||
...entry,
|
||||
location,
|
||||
locationKey,
|
||||
key: generateKey(locationKey, entry.hazardType, entry.source),
|
||||
};
|
||||
};
|
||||
|
||||
const mergeEntries = (existing, incoming) => {
|
||||
const existingEncountered = normalizeTimestamp(existing.encounteredAt, incoming.encounteredAt);
|
||||
const incomingEncountered = normalizeTimestamp(incoming.encounteredAt, existing.encounteredAt);
|
||||
const existingLastSeen = normalizeTimestamp(existing.lastSeenAt, incoming.lastSeenAt);
|
||||
const incomingLastSeen = normalizeTimestamp(incoming.lastSeenAt, existing.lastSeenAt);
|
||||
const keepIncomingLocationKey = isCoordinateLocationKey(incoming.locationKey) && !isCoordinateLocationKey(existing.locationKey);
|
||||
const latestHazardId = new Date(incomingLastSeen) >= new Date(existingLastSeen)
|
||||
? (incoming.latestHazardId ?? existing.latestHazardId)
|
||||
: (existing.latestHazardId ?? incoming.latestHazardId);
|
||||
|
||||
return {
|
||||
...existing,
|
||||
location: keepIncomingLocationKey ? incoming.location : (existing.location || incoming.location),
|
||||
locationKey: keepIncomingLocationKey ? incoming.locationKey : (existing.locationKey || incoming.locationKey),
|
||||
key: keepIncomingLocationKey ? incoming.key : existing.key,
|
||||
encounteredAt: new Date(existingEncountered) <= new Date(incomingEncountered) ? existingEncountered : incomingEncountered,
|
||||
lastSeenAt: new Date(existingLastSeen) >= new Date(incomingLastSeen) ? existingLastSeen : incomingLastSeen,
|
||||
ongoing: Boolean(existing.ongoing || incoming.ongoing),
|
||||
severity: incoming.severity || existing.severity,
|
||||
source: incoming.source || existing.source,
|
||||
latestHazardId,
|
||||
};
|
||||
};
|
||||
|
||||
const normalizeHistory = (history = []) => {
|
||||
const normalized = [];
|
||||
|
||||
for (const rawEntry of history) {
|
||||
if (!rawEntry?.hazardType || !rawEntry?.source) continue;
|
||||
const locationKey = rawEntry.locationKey || rawEntry.location;
|
||||
const entry = {
|
||||
...rawEntry,
|
||||
locationKey,
|
||||
key: generateKey(locationKey, rawEntry.hazardType, rawEntry.source),
|
||||
encounteredAt: normalizeTimestamp(rawEntry.encounteredAt, new Date().toISOString()),
|
||||
lastSeenAt: normalizeTimestamp(rawEntry.lastSeenAt ?? rawEntry.encounteredAt, new Date().toISOString()),
|
||||
latestHazardId: rawEntry.latestHazardId ?? rawEntry.hazardId ?? rawEntry.id ?? rawEntry.key,
|
||||
};
|
||||
|
||||
const existingIndex = normalized.findIndex((candidate) => candidate.key === entry.key || isSameLogicalHazard(candidate, entry));
|
||||
if (existingIndex >= 0) {
|
||||
normalized[existingIndex] = mergeEntries(normalized[existingIndex], entry);
|
||||
} else {
|
||||
normalized.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
return normalized;
|
||||
};
|
||||
const mapRowToHistoryEntry = (row) => ({
|
||||
location: row.location_label,
|
||||
locationKey: row.location_key,
|
||||
hazardType: row.hazard_type,
|
||||
source: row.source,
|
||||
severity: row.severity,
|
||||
latestHazardId: row.latest_hazard_id,
|
||||
encounteredAt: toIsoString(row.encountered_at),
|
||||
lastSeenAt: toIsoString(row.last_seen_at),
|
||||
ongoing: Boolean(row.ongoing),
|
||||
});
|
||||
|
||||
/**
|
||||
* Format location label from weather parameters
|
||||
|
|
@ -147,120 +30,114 @@ const normalizeHistory = (history = []) => {
|
|||
*/
|
||||
const formatLocation = (city, state, country, countryCode) => {
|
||||
const cleanCity = city?.trim() || 'Unknown';
|
||||
|
||||
// US locations: "City, State"
|
||||
|
||||
if (countryCode === 'US' || countryCode === 'USA') {
|
||||
const cleanState = state?.trim();
|
||||
return cleanState ? `${cleanCity}, ${cleanState}` : cleanCity;
|
||||
}
|
||||
|
||||
// Non-US locations: "City, Country"
|
||||
|
||||
const cleanCountry = country?.trim();
|
||||
return cleanCountry ? `${cleanCity}, ${cleanCountry}` : cleanCity;
|
||||
};
|
||||
|
||||
/**
|
||||
* Update hazard history with current active hazards for a location
|
||||
* @param {Object} payload - Request payload
|
||||
* @param {string} payload.location - Formatted location label (for display)
|
||||
* @param {string} payload.locationKey - Stable location key from lat/lon (for matching)
|
||||
* @param {Array} payload.hazards - Array of active hazards
|
||||
* @returns {Array} Updated history
|
||||
*/
|
||||
const updateHistory = async (payload) => {
|
||||
const { location, locationKey, hazards = [] } = payload;
|
||||
|
||||
// Load existing history
|
||||
let history = normalizeHistory(await loadHistory());
|
||||
const now = new Date().toISOString();
|
||||
|
||||
// Use locationKey for matching if provided, fall back to location for backward compatibility
|
||||
const matchKey = locationKey || location;
|
||||
|
||||
// Create a set of active hazard identities for this location
|
||||
const activeKeys = new Set(hazards.map((hazard) => generateKey(matchKey, hazard.hazardType, hazard.source)));
|
||||
|
||||
// Mark previously ongoing hazards for this location as ended if no longer active
|
||||
history = history.map((entry) => {
|
||||
if (!isSameRequestedLocation(entry, location, locationKey)) return entry;
|
||||
const getHistory = async () => {
|
||||
const [rows] = await getPool().query(
|
||||
`SELECT
|
||||
location_label,
|
||||
location_key,
|
||||
hazard_type,
|
||||
source,
|
||||
severity,
|
||||
latest_hazard_id,
|
||||
encountered_at,
|
||||
last_seen_at,
|
||||
ongoing
|
||||
FROM hazard_history
|
||||
ORDER BY last_seen_at DESC
|
||||
LIMIT ?`,
|
||||
[MAX_HISTORY_ENTRIES],
|
||||
);
|
||||
|
||||
const upgradedEntry = upgradeEntryLocationKey(entry, location, locationKey);
|
||||
|
||||
// If this entry is ongoing but not in the current active set, mark it as ended
|
||||
if (upgradedEntry.ongoing && !activeKeys.has(upgradedEntry.key)) {
|
||||
return {
|
||||
...upgradedEntry,
|
||||
ongoing: false,
|
||||
lastSeenAt: now,
|
||||
};
|
||||
}
|
||||
return upgradedEntry;
|
||||
});
|
||||
|
||||
// Add or update active hazards
|
||||
hazards.forEach((hazard) => {
|
||||
const key = generateKey(matchKey, hazard.hazardType, hazard.source);
|
||||
const existingIndex = history.findIndex((entry) => entry.key === key);
|
||||
|
||||
if (existingIndex >= 0) {
|
||||
// Update existing entry
|
||||
history[existingIndex] = {
|
||||
...history[existingIndex],
|
||||
lastSeenAt: now,
|
||||
ongoing: true,
|
||||
// Update severity if it changed
|
||||
severity: hazard.severity || history[existingIndex].severity,
|
||||
latestHazardId: hazard.id,
|
||||
};
|
||||
} else {
|
||||
// Create new entry
|
||||
history.push({
|
||||
key,
|
||||
location,
|
||||
locationKey: matchKey,
|
||||
hazardType: hazard.hazardType,
|
||||
encounteredAt: now,
|
||||
lastSeenAt: now,
|
||||
ongoing: true,
|
||||
severity: hazard.severity,
|
||||
source: hazard.source,
|
||||
latestHazardId: hazard.id,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
history = normalizeHistory(history);
|
||||
|
||||
// Sort by lastSeenAt descending (newest first)
|
||||
history.sort((a, b) => new Date(b.lastSeenAt) - new Date(a.lastSeenAt));
|
||||
|
||||
// Trim to max entries
|
||||
if (history.length > MAX_HISTORY_ENTRIES) {
|
||||
history = history.slice(0, MAX_HISTORY_ENTRIES);
|
||||
}
|
||||
|
||||
// Save updated history
|
||||
await saveHistory(history);
|
||||
|
||||
return history;
|
||||
return rows.map(mapRowToHistoryEntry);
|
||||
};
|
||||
|
||||
/**
|
||||
* Get current hazard history
|
||||
* @returns {Array} Current history entries
|
||||
*/
|
||||
const getHistory = async () => {
|
||||
const history = normalizeHistory(await loadHistory());
|
||||
// Ensure sorted by lastSeenAt descending
|
||||
return history.sort((a, b) => new Date(b.lastSeenAt) - new Date(a.lastSeenAt));
|
||||
const updateHistory = async (payload) => {
|
||||
const { location, locationKey, hazards = [] } = payload;
|
||||
const validHazards = hazards.filter((hazard) => hazard?.hazardType && hazard?.source);
|
||||
const pool = getPool();
|
||||
const connection = await pool.getConnection();
|
||||
|
||||
try {
|
||||
await connection.beginTransaction();
|
||||
|
||||
if (validHazards.length === 0) {
|
||||
await connection.execute(
|
||||
`UPDATE hazard_history
|
||||
SET ongoing = 0,
|
||||
last_seen_at = UTC_TIMESTAMP()
|
||||
WHERE location_key = ?
|
||||
AND ongoing = 1`,
|
||||
[locationKey],
|
||||
);
|
||||
} else {
|
||||
const keepClauses = validHazards.map(() => '(hazard_type = ? AND source = ?)').join(' OR ');
|
||||
const keepParams = validHazards.flatMap((hazard) => [hazard.hazardType, hazard.source]);
|
||||
|
||||
await connection.execute(
|
||||
`UPDATE hazard_history
|
||||
SET ongoing = 0,
|
||||
last_seen_at = UTC_TIMESTAMP()
|
||||
WHERE location_key = ?
|
||||
AND ongoing = 1
|
||||
AND NOT (${keepClauses})`,
|
||||
[locationKey, ...keepParams],
|
||||
);
|
||||
}
|
||||
|
||||
for (const hazard of validHazards) {
|
||||
await connection.execute(
|
||||
`INSERT INTO hazard_history (
|
||||
location_label,
|
||||
location_key,
|
||||
hazard_type,
|
||||
source,
|
||||
severity,
|
||||
latest_hazard_id,
|
||||
encountered_at,
|
||||
last_seen_at,
|
||||
ongoing
|
||||
) VALUES (?, ?, ?, ?, ?, ?, UTC_TIMESTAMP(), UTC_TIMESTAMP(), 1)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
location_label = VALUES(location_label),
|
||||
severity = VALUES(severity),
|
||||
latest_hazard_id = VALUES(latest_hazard_id),
|
||||
last_seen_at = UTC_TIMESTAMP(),
|
||||
ongoing = 1`,
|
||||
[
|
||||
location,
|
||||
locationKey,
|
||||
hazard.hazardType,
|
||||
hazard.source,
|
||||
hazard.severity ?? null,
|
||||
hazard.id ?? null,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
await connection.commit();
|
||||
} catch (error) {
|
||||
await connection.rollback();
|
||||
throw error;
|
||||
} finally {
|
||||
connection.release();
|
||||
}
|
||||
|
||||
return getHistory();
|
||||
};
|
||||
|
||||
export {
|
||||
loadHistory,
|
||||
saveHistory,
|
||||
updateHistory,
|
||||
getHistory,
|
||||
formatLocation,
|
||||
generateKey,
|
||||
getHistory,
|
||||
MAX_HISTORY_ENTRIES,
|
||||
updateHistory,
|
||||
};
|
||||
|
|
|
|||
66
src/mysql.mjs
Normal file
66
src/mysql.mjs
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
import mysql from 'mysql2/promise';
|
||||
|
||||
let pool;
|
||||
|
||||
const getConfig = () => {
|
||||
const {
|
||||
WS4KP_MYSQL_HOST = '127.0.0.1',
|
||||
WS4KP_MYSQL_PORT = '3306',
|
||||
WS4KP_MYSQL_USER,
|
||||
WS4KP_MYSQL_PASSWORD,
|
||||
WS4KP_MYSQL_DATABASE,
|
||||
WS4KP_MYSQL_SOCKET_PATH,
|
||||
} = process.env;
|
||||
|
||||
if (!WS4KP_MYSQL_USER || !WS4KP_MYSQL_PASSWORD || !WS4KP_MYSQL_DATABASE) {
|
||||
throw new Error('Missing MySQL configuration. Set WS4KP_MYSQL_USER, WS4KP_MYSQL_PASSWORD, and WS4KP_MYSQL_DATABASE.');
|
||||
}
|
||||
|
||||
const config = {
|
||||
user: WS4KP_MYSQL_USER,
|
||||
password: WS4KP_MYSQL_PASSWORD,
|
||||
database: WS4KP_MYSQL_DATABASE,
|
||||
waitForConnections: true,
|
||||
connectionLimit: 10,
|
||||
queueLimit: 0,
|
||||
};
|
||||
|
||||
if (WS4KP_MYSQL_SOCKET_PATH) {
|
||||
config.socketPath = WS4KP_MYSQL_SOCKET_PATH;
|
||||
} else {
|
||||
config.host = WS4KP_MYSQL_HOST;
|
||||
config.port = Number(WS4KP_MYSQL_PORT);
|
||||
}
|
||||
|
||||
return config;
|
||||
};
|
||||
|
||||
const getPool = () => {
|
||||
if (!pool) {
|
||||
pool = mysql.createPool(getConfig());
|
||||
}
|
||||
return pool;
|
||||
};
|
||||
|
||||
const checkHazardHistoryTable = async () => {
|
||||
const config = getConfig();
|
||||
const [rows] = await getPool().query(
|
||||
`SELECT 1
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = ?
|
||||
AND table_name = 'hazard_history'
|
||||
LIMIT 1`,
|
||||
[config.database],
|
||||
);
|
||||
|
||||
if (rows.length === 0) {
|
||||
throw new Error(`Hazard history database table 'hazard_history' is missing in database '${config.database}'. Run the documented CREATE TABLE statement before using Hazard List.`);
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
export {
|
||||
checkHazardHistoryTable,
|
||||
getPool,
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue