init
This commit is contained in:
148
server/influx.mjs
Normal file
148
server/influx.mjs
Normal file
@@ -0,0 +1,148 @@
|
||||
import { InfluxDB } from '@influxdata/influxdb-client'
|
||||
import { cfg } from './config.mjs'
|
||||
|
||||
let client, currentUrl, currentToken
|
||||
|
||||
function getClient() {
|
||||
const c = cfg()
|
||||
if (c.INFLUX_URL !== currentUrl || c.INFLUX_TOKEN !== currentToken) {
|
||||
client = new InfluxDB({ url: c.INFLUX_URL, token: c.INFLUX_TOKEN })
|
||||
currentUrl = c.INFLUX_URL
|
||||
currentToken = c.INFLUX_TOKEN
|
||||
}
|
||||
return { client, c }
|
||||
}
|
||||
|
||||
async function query(flux) {
|
||||
const { client, c } = getClient()
|
||||
const api = client.getQueryApi(c.INFLUX_ORG)
|
||||
const rows = []
|
||||
return new Promise((resolve, reject) => {
|
||||
api.queryRows(flux, {
|
||||
next(row, meta) { rows.push(meta.toObject(row)) },
|
||||
error(err) { reject(err) },
|
||||
complete() { resolve(rows) },
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function truncateToHour(iso) {
|
||||
const d = new Date(iso)
|
||||
d.setMinutes(0, 0, 0)
|
||||
const pad = n => String(n).padStart(2, '0')
|
||||
return `${d.getFullYear()}-${pad(d.getMonth()+1)}-${pad(d.getDate())}T${pad(d.getHours())}:00`
|
||||
}
|
||||
|
||||
function truncateToDay(iso) {
|
||||
return new Date(iso).toISOString().slice(0, 10)
|
||||
}
|
||||
|
||||
function extractRow(row) {
|
||||
const out = {}
|
||||
for (const [k, v] of Object.entries(row)) {
|
||||
if (!k.startsWith('_') && k !== 'result' && k !== 'table') out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
export async function queryCurrent() {
|
||||
const { c } = getClient()
|
||||
const flux = `
|
||||
from(bucket: "${c.INFLUX_BUCKET}")
|
||||
|> range(start: -1h)
|
||||
|> filter(fn: (r) => r._measurement =~ /.*/)
|
||||
|> last()
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
`
|
||||
const rows = await query(flux)
|
||||
const result = {}
|
||||
for (const row of rows) {
|
||||
Object.assign(result, extractRow(row))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
export async function queryHourly(start, end) {
|
||||
const { c } = getClient()
|
||||
const flux = `
|
||||
from(bucket: "${c.INFLUX_BUCKET}")
|
||||
|> range(start: ${start}, stop: ${end})
|
||||
|> filter(fn: (r) => r._measurement =~ /.*/)
|
||||
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
`
|
||||
const rows = await query(flux)
|
||||
return rows.map(row => ({
|
||||
time: truncateToHour(row._time),
|
||||
...extractRow(row),
|
||||
}))
|
||||
}
|
||||
|
||||
export async function queryDaily(start, end) {
|
||||
const { c } = getClient()
|
||||
const measurements = ['environment', 'light', 'seismic', 'compass', 'ground', 'accumulation', 'lightning', 'gps']
|
||||
const results = {}
|
||||
|
||||
for (const m of measurements) {
|
||||
const [means, mins, maxs] = await Promise.all([
|
||||
query(`
|
||||
from(bucket: "${c.INFLUX_BUCKET}")
|
||||
|> range(start: ${start}, stop: ${end})
|
||||
|> filter(fn: (r) => r._measurement == "${m}")
|
||||
|> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
`),
|
||||
query(`
|
||||
from(bucket: "${c.INFLUX_BUCKET}")
|
||||
|> range(start: ${start}, stop: ${end})
|
||||
|> filter(fn: (r) => r._measurement == "${m}")
|
||||
|> aggregateWindow(every: 1d, fn: min, createEmpty: false)
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
`),
|
||||
query(`
|
||||
from(bucket: "${c.INFLUX_BUCKET}")
|
||||
|> range(start: ${start}, stop: ${end})
|
||||
|> filter(fn: (r) => r._measurement == "${m}")
|
||||
|> aggregateWindow(every: 1d, fn: max, createEmpty: false)
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
`),
|
||||
])
|
||||
|
||||
// Mean values are the base — flat field names, no suffix
|
||||
for (const row of means) {
|
||||
const time = truncateToDay(row._time)
|
||||
if (!results[time]) results[time] = { time }
|
||||
Object.assign(results[time], extractRow(row))
|
||||
}
|
||||
|
||||
// Only promote temp min/max to named fields, ignore the rest
|
||||
for (const row of mins) {
|
||||
const time = truncateToDay(row._time)
|
||||
if (!results[time]) results[time] = { time }
|
||||
if (row.env_temp_c != null) results[time].env_temp_min_c = row.env_temp_c
|
||||
}
|
||||
for (const row of maxs) {
|
||||
const time = truncateToDay(row._time)
|
||||
if (!results[time]) results[time] = { time }
|
||||
if (row.env_temp_c != null) results[time].env_temp_max_c = row.env_temp_c
|
||||
}
|
||||
}
|
||||
|
||||
return Object.values(results).sort((a, b) => a.time.localeCompare(b.time))
|
||||
}
|
||||
|
||||
export async function getCoords() {
|
||||
const { c } = getClient()
|
||||
const flux = `
|
||||
from(bucket: "${c.INFLUX_BUCKET}")
|
||||
|> range(start: -24h)
|
||||
|> filter(fn: (r) => r._measurement == "gps")
|
||||
|> last()
|
||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
`
|
||||
const rows = await query(flux)
|
||||
if (rows.length && rows[0].gps_lat && rows[0].gps_lon) {
|
||||
return { lat: rows[0].gps_lat, lon: rows[0].gps_lon, alt: rows[0].gps_alt_m || c.DEFAULT_ALT }
|
||||
}
|
||||
return { lat: c.DEFAULT_LAT, lon: c.DEFAULT_LON, alt: c.DEFAULT_ALT }
|
||||
}
|
||||
Reference in New Issue
Block a user