import { channel, buffers } from 'redux-saga';
import { call, delay, flush, fork, take, put } from 'redux-saga/effects';

import { RECV_MESSAGE } from './actionTypes';

function* subscriptionWorker(worker, processAll, streamChannel) {
    while (true) {
        let data = yield take(streamChannel);

        if (processAll) {
            yield delay(5);

            const next = yield flush(streamChannel);

            data = [data, ...(next || [])];
        }

        yield call(worker, data);

        // Do not block the main thread
        yield delay(10);
    }
}

/**
 * High-level saga creator to subscribe to socket stream.
 *
 * @param worker
 * @param options
 * @param options.stream {string}
 * @param [options.totalWorkers=1] {number}
 * @param [options.processAll=false] {boolean} - Instead of single packet, all packets in queue are sent to worker.
 *     This option is only available `totalWorkers===1`
 * @param [options.filterFn=null] {function}
 * @return {*}
 */
export const subscribeSocket = (worker, options) =>
    fork(function* subscriber() {
        const { stream, totalWorkers = 1, processAll = false, filterFn = null } = options;

        if (!stream) {
            throw new Error('Subscription "stream" is required');
        }

        const actionFilter = (action) => {
            const match =
                action.type === RECV_MESSAGE && action.response.stream === stream;

            if (filterFn) {
                return filterFn(match, action);
            }

            return match;
        };

        const streamChannel = yield call(channel, buffers.expanding());

        if (process.env.NODE_ENV !== 'production') {
            // eslint-disable-next-line no-console
            // console.log(`Starting ${totalWorkers} workers for ${stream} ...`);
        }

        for (let i = 0; i <= totalWorkers; i += 1) {
            yield fork(
                subscriptionWorker,
                worker,
                processAll && totalWorkers === 1,
                streamChannel,
            );
        }

        while (true) {
            // Filter all matching events and send them to forked worker
            const action = yield take(actionFilter);

            yield put(streamChannel, action.response.payload);
        }
    });
