import { lastValueFrom } from "rxjs";
import { isPage } from "../page";
import Sorting from "./sorting";
/**
 * This class handles streaming based data access.
 * Creating a new stream allows users to interact with that stream
 * either but subscribing to recieve result updates or by set/clear
 * filters and paging of the data.
 */
class Stream extends Sorting {
    /**
     *
     * @param handler Handler the stream interacts with
     * @param subject Subject that data results will be broadcasted on
     *                TODO: Allow no subject and create it on the fly
     */
    constructor(handler, subject, defaultFilters, defaultSort) {
        super(defaultFilters, defaultSort);
        this.subject = subject;
        this.handler = handler;
    }
    /** The id of the underlying handler for this stream. If none was provided when the handler was built, defaults to 'observable'.  */
    get id() {
        return this.handler.id;
    }
    /**
     * Allows a client to subscribe to the subject of this stream
     * @param next This is the call back that will fire when subject.next is called
     * @returns Returns Subscribe object to allow unsubs etc.
     */
    subscribe(next) {
        // if this is our first subscription, we want to make sure we fetch some data for them
        if (!this.subject.observed) {
            this.executeQuery();
        }
        return this.subject.subscribe(next);
    }
    /**
     * Re-sends the current value of the subject to all subscribers.
     */
    resend() {
        this.subject.next(this.subject.value);
    }
    /**
     * This method handles executing the current filters/paging. It takes
     * the current page and filters, executes a find(), and pushes results
     * to the subject.
     */
    executeQuery() {
        const paging = {
            token: this.paging.token,
            size: this.paging.size,
            offset: this.paging.offset
        };
        const filters = this.filters.get("filter");
        const fields = this.filters.fields.get();
        const sort = this.getSort();
        /**
         * If we are about to execute a new query we don't need
         * to Subscribe to the old query.
         */
        if (this.querySubscription) {
            this.querySubscription.unsubscribe();
        }
        this.querySubscription = this.handler
            .find({ ...filters, paging, fields }, sort)
            .subscribe((next) => {
            if (Array.isArray(next)) {
                this.subject.next(next);
            }
            else if (isPage(next)) {
                this.paging.count = next.page.count;
                this.paging.offset = next.page.offset;
                this.paging.token = next.page.token;
                this.subject.next(next.data);
            }
            else {
                this.subject.next(next);
            }
        });
    }
    /**
     * This method is just a simple way to get all data associated to the current stream via
     * a standard promise. It will ignore all paging but use the filters
     */
    all() {
        return new Promise((resolve) => {
            const filters = this.filters.get("filter");
            const fields = this.filters.fields.get();
            const sort = this.getSort();
            lastValueFrom(this.handler.find({ ...filters, fields }, sort)).then((result) => {
                if (Array.isArray(result)) {
                    resolve(result);
                }
                else if (isPage(result)) {
                    resolve(result.data);
                }
                else {
                    resolve([result]);
                }
            });
        });
    }
    /**
     * Overrides the Paging class's handlePagingUpdate to executeQuery when
     * paging changes.
     */
    handlePagingUpdate() {
        this.executeQuery();
    }
    /**
     * Overrides the Filter class's handleFilterUpdate to fire when filters are
     * change. It will cause a paging reset which then causes handlePagingUpdate to
     * fire, thus why we don't call this.executeQuery
     */
    handleFilterUpdate() {
        // it's possible that `handleFilterUpdate` is called during construction,
        // which causes a crash without this optionality check
        this.paging?.reset();
    }
    /**
     * Overrides the Sort class's handleSortingUpdate to fire when sorts are changed.
     * It will cause a paging reset which then causes handlePagingUpdate to fire. thus
     * why we don't call this.executeQuery
     */
    handleSortingUpdate() {
        this.paging?.reset();
    }
    /**
     * Overrides the Filter class's parser to call the streams handler's parserFilter
     * @param str Map of the string fitlers
     * @returns Filter for the current handler.
     */
    parser(str) {
        return this.handler.parseFilter(str);
    }
}
export default Stream;
