import isEqual from "lodash/isEqual";
import { BehaviorSubject, Observable, Unsubscribable } from "rxjs";
import { distinctUntilChanged, map, startWith } from "rxjs/operators";

import {
    ConversationFilter,
    buildConversationFilterFromRuleset,
} from "shared/concierge/conversations/ConversationFilters";
import { Conversation } from "shared/concierge/conversations/types/conversation.types";
import {
    ConversationGroupRuleset,
    ConversationGroupSummary,
} from "shared/concierge/conversations/types/conversationGroup.types";
import { successfulSubscriptionResult } from "shared/concierge/subscription.utils";
import {
    Subscription,
    SubscriptionResult,
} from "shared/concierge/types/subscription.types";

class ConversationGroupSummarySubscription
    implements Subscription<ConversationGroupSummary>
{
    /**
     * A store for conversations in the conversation group.
     */
    private readonly conversations: Map<string, Conversation> = new Map();

    /**
     * A filter function that get created based on the group ruleset.
     * If a conversation satisfies every filter we know that it falls within
     * this group.
     */
    private readonly accepts: ConversationFilter;

    /**
     * Feed of conversation group summary updates that will flow to the UI
     */
    private readonly publicFeed = new BehaviorSubject<
        SubscriptionResult<ConversationGroupSummary>
    >({
        status: "LOADING",
        data: null,
        errorMessage: null,
    });

    /**
     * It's important that we unsubscribe to any observables when the
     * subscription has been torn down. Create a store for them so that we can
     * unsubscribe to them in bulk.
     */
    private readonly subscriptionHandles: Unsubscribable[] = [];

    /**
     * @param ruleset - Defines which conversations fit within the group
     * @param onUnsubscribe - A callback that will get called when this subscription is finished
     * @param initialConversations - A bulk load of conversations used to hydrate the store
     * @param conversationsFeed - An observable of updates to all conversations
     */
    constructor(
        private readonly ruleset: ConversationGroupRuleset,
        private onUnsubscribe: () => void,
        initialConversations: Conversation[],
        conversationsFeed: Observable<Conversation[]>,
    ) {
        this.accepts = buildConversationFilterFromRuleset(ruleset);
        this.listenToConversationFeed(initialConversations, conversationsFeed);
    }

    /**
     * Feed of subscription results that allows the consumer to subscribe to changes.
     */
    get feed(): Observable<SubscriptionResult<ConversationGroupSummary>> {
        return this.publicFeed;
    }

    /**
     * Unsubscribe
     * - teardown all observables we've set up internally,
     * - stop broadcasting events
     * - alert parents so they can dispose of this subscription
     */
    public teardown(): void {
        this.onUnsubscribe();
        this.subscriptionHandles.forEach((handle) => handle.unsubscribe());
        this.publicFeed.unsubscribe();
    }

    /**
     * Starts listening to updates to the conversation updates feed.
     * The pipe operates like this:
     * - Receive an event containing a bulk list of updates
     * - Calculate how those updates effect this group
     * - Convert the update into a success state object
     * - Only broadcast the event if the value has changed
     */
    private listenToConversationFeed(
        initialConversations: Conversation[],
        conversationsFeed: Observable<Conversation[]>,
    ): void {
        const handle = conversationsFeed
            .pipe(
                startWith(initialConversations),
                map((conversations) => this.processUpdate(conversations)),
                map(successfulSubscriptionResult),
                distinctUntilChanged(isEqual),
            )
            .subscribe(this.publicFeed);

        this.subscriptionHandles.push(handle);
    }

    private processUpdate(
        updates: Iterable<Conversation>,
    ): ConversationGroupSummary {
        for (const conversation of updates) {
            const accepted = this.accepts(conversation);
            if (accepted) {
                this.conversations.set(conversation.id, conversation);
            } else if (this.conversations.has(conversation.id)) {
                this.conversations.delete(conversation.id);
            }
        }

        const unreadCount = Array.from(this.conversations.values()).filter(
            (conversation) => conversation.unreadCount && !conversation.isStale,
        ).length;

        return { unreadCount };
    }

    public async start(): Promise<void> {
        /* don't need to do anything, but do need this method for the interface */
    }
}

/**
 * We prefer to use a factory function to create subscription instances over
 * directly using the class because it's easier to mock functions than
 * classes in tests.
 */
export const createConversationGroupSummarySubscription = (
    ...args: ConstructorParameters<typeof ConversationGroupSummarySubscription>
): Subscription<ConversationGroupSummary> => {
    return new ConversationGroupSummarySubscription(...args);
};
