Source: data/StaLoader.js

import { DataFrame } from "./DataFrame.js";
import { DataLoader } from "./DataLoader.js";

/**
 * A loader for SensorThings API data.
 * 
 * @author rkoppe <roland.koppe@awi.de>
 * @author rhess <roland.koppe@awi.de>
 */
export class StaLoader extends DataLoader {

    type = 'sta';

    _parameters = null;

    constructor(baseUrl) {
        super();
        this.baseUrl = baseUrl || 'https://ingest.o2a-data.de/sta/v1.1';
    }

    async parameters() {
        if (this._parameters) return this._parameters;

        const parameters = [];

        let url = this.baseUrl + '/Datastreams';
        do {
            const response = await fetch(url, {
                method: 'GET',
                headers: {
                    'Content-Type': 'application/json'
                }
            });

            if (!response.ok) return Promise.reject();

            const json = await response.json();
            for (const p of json.value) {
                parameters.push({
                    id: p['@iot.id'],
                    name: p.name,
                    unit: p.unitOfMeasurement?.name,
                    meta: p.properties
                });
            }

            url = json['@iot.nextLink'];
        } while (url);

        return this._parameters = parameters;
    }

    async data(parameters = [], config = {}) {

        // prepare filters
        const filters = [];
        if (config.beginDate) filters.push(`phenomenonTime ge ${config.beginDate}+00:00`);
        if (config.endDate) filters.push(`phenomenonTime le ${config.endDate}+00:00`);

        // prepare params
        const params = {
            '$orderby': 'phenomenonTime desc'
        };
        if (config.limit) params['$top'] = config.limit;
        if (filters.length > 0) params['$filter'] = filters.join(' and ');

        const promises = [];

        // FIXME: check to use fewer requests with IN syntax
        // Observations?$filter=Datastream/@iot.id in (8757)
        const results = [];

        for (const parameter of parameters) {
            const promise = fetch(this.baseUrl + `/Datastreams(${parameter})/Observations?` + new URLSearchParams(params))
                .then(r => r.json())
                .then(j => results.push({
                    parameter: parameter,
                    data: j
                }));

            promises.push(promise);
        }

        await Promise.allSettled(promises);

        const rowMap = {};
        const columns = ['datetime'];
        const columnMap = { 'datetime': 0 };
        const series = [];

        for (const result of results) {
            const parameter = result.parameter;
            const data = result.data.value;

            columns.push(parameter);
            columnMap[parameter] = columns.length - 1;

            for (const value of data) {
                const idx = value.resultTime || value.phenomenonTime;
                if (!(idx in rowMap)) rowMap[idx] = [idx, ...Array(parameters.length)];
                rowMap[idx][columns.length - 1] = value.result;
            }
        }

        const rows = Object.values(rowMap);

        // prepare series
        for (const col in columns) {
            const serie = [];
            for (const row of rows) {
                serie.push(row[col]);
            }
            series.push(serie);
        }

        const df = new DataFrame();
        df.rows = rows;
        df.series = series;
        df.columns = columns;
        df.columnMap = columnMap;

        return df;
    }
}