149 lines
4.8 KiB
JavaScript
149 lines
4.8 KiB
JavaScript
import { InfluxDB } from '@influxdata/influxdb-client'
|
|
import { cfg } from './config.mjs'
|
|
|
|
let client, currentUrl, currentToken
|
|
|
|
function getClient() {
|
|
const c = cfg()
|
|
if (c.INFLUX_URL !== currentUrl || c.INFLUX_TOKEN !== currentToken) {
|
|
client = new InfluxDB({ url: c.INFLUX_URL, token: c.INFLUX_TOKEN })
|
|
currentUrl = c.INFLUX_URL
|
|
currentToken = c.INFLUX_TOKEN
|
|
}
|
|
return { client, c }
|
|
}
|
|
|
|
async function query(flux) {
|
|
const { client, c } = getClient()
|
|
const api = client.getQueryApi(c.INFLUX_ORG)
|
|
const rows = []
|
|
return new Promise((resolve, reject) => {
|
|
api.queryRows(flux, {
|
|
next(row, meta) { rows.push(meta.toObject(row)) },
|
|
error(err) { reject(err) },
|
|
complete() { resolve(rows) },
|
|
})
|
|
})
|
|
}
|
|
|
|
function truncateToHour(iso) {
|
|
const d = new Date(iso)
|
|
d.setMinutes(0, 0, 0)
|
|
const pad = n => String(n).padStart(2, '0')
|
|
return `${d.getFullYear()}-${pad(d.getMonth()+1)}-${pad(d.getDate())}T${pad(d.getHours())}:00`
|
|
}
|
|
|
|
function truncateToDay(iso) {
|
|
return new Date(iso).toISOString().slice(0, 10)
|
|
}
|
|
|
|
function extractRow(row) {
|
|
const out = {}
|
|
for (const [k, v] of Object.entries(row)) {
|
|
if (!k.startsWith('_') && k !== 'result' && k !== 'table') out[k] = v
|
|
}
|
|
return out
|
|
}
|
|
|
|
export async function queryCurrent() {
|
|
const { c } = getClient()
|
|
const flux = `
|
|
from(bucket: "${c.INFLUX_BUCKET}")
|
|
|> range(start: -1h)
|
|
|> filter(fn: (r) => r._measurement =~ /.*/)
|
|
|> last()
|
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|
`
|
|
const rows = await query(flux)
|
|
const result = {}
|
|
for (const row of rows) {
|
|
Object.assign(result, extractRow(row))
|
|
}
|
|
return result
|
|
}
|
|
|
|
export async function queryHourly(start, end) {
|
|
const { c } = getClient()
|
|
const flux = `
|
|
from(bucket: "${c.INFLUX_BUCKET}")
|
|
|> range(start: ${start}, stop: ${end})
|
|
|> filter(fn: (r) => r._measurement =~ /.*/)
|
|
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|
`
|
|
const rows = await query(flux)
|
|
return rows.map(row => ({
|
|
time: truncateToHour(row._time),
|
|
...extractRow(row),
|
|
}))
|
|
}
|
|
|
|
export async function queryDaily(start, end) {
|
|
const { c } = getClient()
|
|
const measurements = ['environment', 'light', 'seismic', 'compass', 'ground', 'accumulation', 'lightning', 'gps']
|
|
const results = {}
|
|
|
|
for (const m of measurements) {
|
|
const [means, mins, maxs] = await Promise.all([
|
|
query(`
|
|
from(bucket: "${c.INFLUX_BUCKET}")
|
|
|> range(start: ${start}, stop: ${end})
|
|
|> filter(fn: (r) => r._measurement == "${m}")
|
|
|> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
|
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|
`),
|
|
query(`
|
|
from(bucket: "${c.INFLUX_BUCKET}")
|
|
|> range(start: ${start}, stop: ${end})
|
|
|> filter(fn: (r) => r._measurement == "${m}")
|
|
|> aggregateWindow(every: 1d, fn: min, createEmpty: false)
|
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|
`),
|
|
query(`
|
|
from(bucket: "${c.INFLUX_BUCKET}")
|
|
|> range(start: ${start}, stop: ${end})
|
|
|> filter(fn: (r) => r._measurement == "${m}")
|
|
|> aggregateWindow(every: 1d, fn: max, createEmpty: false)
|
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|
`),
|
|
])
|
|
|
|
// Mean values are the base — flat field names, no suffix
|
|
for (const row of means) {
|
|
const time = truncateToDay(row._time)
|
|
if (!results[time]) results[time] = { time }
|
|
Object.assign(results[time], extractRow(row))
|
|
}
|
|
|
|
// Only promote temp min/max to named fields, ignore the rest
|
|
for (const row of mins) {
|
|
const time = truncateToDay(row._time)
|
|
if (!results[time]) results[time] = { time }
|
|
if (row.env_temp_c != null) results[time].env_temp_min_c = row.env_temp_c
|
|
}
|
|
for (const row of maxs) {
|
|
const time = truncateToDay(row._time)
|
|
if (!results[time]) results[time] = { time }
|
|
if (row.env_temp_c != null) results[time].env_temp_max_c = row.env_temp_c
|
|
}
|
|
}
|
|
|
|
return Object.values(results).sort((a, b) => a.time.localeCompare(b.time))
|
|
}
|
|
|
|
export async function getCoords() {
|
|
const { c } = getClient()
|
|
const flux = `
|
|
from(bucket: "${c.INFLUX_BUCKET}")
|
|
|> range(start: -24h)
|
|
|> filter(fn: (r) => r._measurement == "gps")
|
|
|> last()
|
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|
`
|
|
const rows = await query(flux)
|
|
if (rows.length && rows[0].gps_lat && rows[0].gps_lon) {
|
|
return { lat: rows[0].gps_lat, lon: rows[0].gps_lon, alt: rows[0].gps_alt_m || c.DEFAULT_ALT }
|
|
}
|
|
return { lat: c.DEFAULT_LAT, lon: c.DEFAULT_LON, alt: c.DEFAULT_ALT }
|
|
}
|