import { Observable, merge } from "rxjs";
import { filter, map, scan } from "rxjs/operators";

import { mergeDeepAllConversations } from "./ConversationData.utils";
import { EpochStatus } from "./types/component.types";
import { Conversation, ConversationUpdate } from "./types/conversation.types";

export const createUnifiedConversationFeed = (
    trackedConversations: Map<string, Conversation>,
    currentUserId: string,
    conversationUpdatesFeed: Observable<ConversationUpdate[]>,
    epochStatusFeed: Observable<EpochStatus>,
): Observable<Conversation[]> => {
    // Map this to a standardized output so we can easily tell the difference between a conversation update or an
    // epoch update (which we will use below).
    const mergedStandardizedConversationFeeds = conversationUpdatesFeed.pipe(
        map((updates) => {
            const standardized: StandardizeConversationFeedData = {
                conversations: updates,
            };
            return standardized;
        }),
    );

    // we map this is a standardized output so we can easily tell the difference between a conversation update or an
    // epoch update (which we will use below).
    const standardizedEpochFeed = epochStatusFeed.pipe(
        map((update) => {
            const standardized: StandardizeConversationFeedData = {
                epochStatus: update,
            };
            return standardized;
        }),
    );

    const defaultState: BufferedConversationAccumulator = {
        bufferedConversationUpdates: [],
        currentEpochStatus: undefined,
        processedUpdatesToEmit: undefined,
    };
    return merge(
        mergedStandardizedConversationFeeds,
        standardizedEpochFeed,
    ).pipe(
        // accumulate state, which either buffers conversation updates until a epochStatus arrives
        // or processes pending conversation updates and carries them.
        scan(
            (state, update) =>
                accumulateFeedState(
                    state,
                    update,
                    currentUserId,
                    trackedConversations,
                ),
            defaultState,
        ),

        // map the output to just the updates to emit for this feed.
        map((state) => state.processedUpdatesToEmit ?? []),
        // only continue if the accumulated state has updates to emit (i.e. we arent in buffering mode)
        filter((updates) => updates.length > 0),
    );
};

const accumulateFeedState = (
    state: BufferedConversationAccumulator,
    newUpdate: StandardizeConversationFeedData,
    currentUserId: string,
    trackedConversations: Map<string, Conversation>,
): BufferedConversationAccumulator => {
    // if the update is for an epoch update.
    if (newUpdate.epochStatus) {
        // then we want to record that new epoch status, and process any pending conversation updates we have.
        return {
            ...state,
            currentEpochStatus: newUpdate.epochStatus,
            processedUpdatesToEmit: processUpdates(
                state.bufferedConversationUpdates,
                newUpdate.epochStatus,
                currentUserId,
                trackedConversations,
            ),
            bufferedConversationUpdates: [],
        };
    }
    // otherwise if the update if for a conversation change
    else if (newUpdate.conversations) {
        // If we have buffered conversations then we should merge those in with the new updates.
        // Note that these could therefore include multiple updates to the same conversation.
        // We already need to handle the process of out of order updates when processing so
        // not duplicating the logic to tease out the "most accurate" update here and letting
        // the processing clean that up.
        const allBufferedConversations =
            state.bufferedConversationUpdates.length > 0
                ? state.bufferedConversationUpdates.concat(
                      newUpdate.conversations,
                  )
                : newUpdate.conversations;

        // then we need to check if we have an epoch status already. If we do, then
        // we can process the currently buffered conversations and send them on.
        if (state.currentEpochStatus) {
            return {
                ...state,
                processedUpdatesToEmit: processUpdates(
                    allBufferedConversations,
                    state.currentEpochStatus,
                    currentUserId,
                    trackedConversations,
                ),
                bufferedConversationUpdates: [],
            };
        }
        // if the conversations have changed but we dont have an epoch
        // then we want to buffer those conversations to be processed once we do.
        else {
            return {
                ...state,
                bufferedConversationUpdates: allBufferedConversations,
            };
        }
    }

    // Otherwise recurse the same state.
    return state;
};

const processUpdates = (
    updates: ConversationUpdate[],
    epochStatus: EpochStatus,
    currentUserId: string,
    trackedConversations: Map<string, Conversation>,
): Conversation[] => {
    const merged = mergeDeepAllConversations(
        currentUserId,
        trackedConversations,
        updates,
        epochStatus.epochVersion,
    );
    merged.forEach((conversation) =>
        trackedConversations.set(conversation.id, conversation),
    );

    return merged;
};

type StandardizeConversationFeedData = {
    conversations?: ConversationUpdate[];
    epochStatus?: EpochStatus;
};

type BufferedConversationAccumulator = {
    bufferedConversationUpdates: ConversationUpdate[];
    currentEpochStatus: EpochStatus | undefined;

    processedUpdatesToEmit: Conversation[] | undefined;
};
