import {
    APPLICATION_JSON,
    BufferEncoders,
    encodeBearerAuthMetadata,
    encodeCompositeMetadata,
    encodeRoute,
    IdentitySerializer,
    JsonSerializer,
    MESSAGE_RSOCKET_AUTHENTICATION,
    MESSAGE_RSOCKET_COMPOSITE_METADATA,
    MESSAGE_RSOCKET_ROUTING,
    RSocketClient,
    toBuffer
} from 'rsocket-core';
import RSocketWebsocketClient from 'rsocket-websocket-client';
import {ISubscription, ReactiveSocket} from "rsocket-types";
import {auth0Plugin} from "~/utils/auth0";
import {defineStore} from "pinia";
    import {useUiNotifications} from "~/stores/ui-notifications";
import {generateId} from "~/utils/id";
import * as Sentry from "@sentry/browser";

const serializers = {
    data: {
        serialize: IdentitySerializer.serialize,
        deserialize: JsonSerializer.deserialize,
    },
    metadata: IdentitySerializer,
}

/**
 * Settings for the permit system.
 * Permits are used to request elements from the server in the streams, and are used to control the flow of data.
 * This is built into the RSocket protocol.
 */
const PERMIT_SETTINGS = {
    INITIAL_COUNT: 500,
    REFRESH_COUNT: 400,
    REFRESH_THRESHOLD: 100,
}

const setupBase = {
    keepAlive: 10000,
    lifetime: 30000,
    dataMimeType: APPLICATION_JSON.string,
    metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
}

type ConnectionStatus = "NOT_CONNECTED" | "CONNECTING" | "CONNECTED" | "CLOSED" | "ERROR"

export interface PersistentRSocketStreamSubscription<I, U> {
    onConnected: (initialData: I) => void
    onUpdate: (update: U) => void
    onError: (error: Error) => void
    onDisconnected: () => void
}

export interface ActionResult {
    success: boolean
    result?: any
    _resultClass?: any
    message?: string
}

/**
 * A persistent RSocket stream that can be reconnected when the connection is lost.
 * Can be constructed via the `useConnectionStore` store. Should not be constructed directly.
 *
 * @param I The type of the initial data
 * @param U The type of the update data
 */
class PersistentRSocketStream<I, U> {
    private readonly _id: string;
    private readonly _route: string;
    private readonly _payload: any;
    private readonly _removeCallback: () => void;
    private readonly _handler: Partial<PersistentRSocketStreamSubscription<I, U>>;

    private _activeSubscription: ISubscription | null
    private _initialLoadCompleted: boolean = false
    private _permits: number = 0
    private _connected = false
    private _status: ConnectionStatus = "NOT_CONNECTED"

    constructor(
        id: string,
        route: string,
        payload: any = null,
        handler: Partial<PersistentRSocketStreamSubscription<I, U>>,
        removeCallback: () => void
    ) {
        this._id = id
        this._route = route
        this._payload = payload
        this._handler = handler
        this._removeCallback = removeCallback
        console.log(`[RSOC][${id}] Created new persistent stream with route ${route} and payload ${JSON.stringify(payload)})`)
    }

    private setup(connection: ReactiveSocket<any, any>) {
        connection.requestStream(withRoute(this._route, this._payload))
            .subscribe({
                onNext: (value) => {
                    if (value.data.initial) {
                        this._handler.onConnected?.(value.data.initial)
                        this._initialLoadCompleted = true
                    } else if (value.data.update) {
                        this._handler.onUpdate?.(value.data.update)
                    } else {
                        console.error(`[RSOC][${this._id}] Unknown message on stream `, value)
                    }
                    this._permits--
                    if (this._permits <= PERMIT_SETTINGS.REFRESH_THRESHOLD && this._activeSubscription) {
                        this._activeSubscription.request(PERMIT_SETTINGS.REFRESH_COUNT)
                        this._permits += PERMIT_SETTINGS.REFRESH_COUNT
                    }
                },
                onSubscribe: (subscription) => {
                    this._activeSubscription = subscription
                    this._activeSubscription.request(PERMIT_SETTINGS.INITIAL_COUNT)
                    this._permits = PERMIT_SETTINGS.INITIAL_COUNT
                },
                onComplete: () => {
                    console.info(`[RSOC][${this._id}] Stream completed`)
                    this._activeSubscription = null
                },
                onError: (error) => {
                    // Clear Rsocket subscription
                    this._activeSubscription = null

                    // If we're still connected on this connection, re-do the call
                    connection.connectionStatus().take(1).subscribe((status) => {
                        if (status.kind === "CONNECTED") {
                            // We're still connected, so this is a real error on the call itself
                            this._handler.onError?.(error)
                            Sentry.captureException(error)
                            setTimeout(() => {
                                this.setup(connection)
                            }, 500)
                        }
                    })
                }

            })
    }

    acceptNewConnection(connection: ReactiveSocket<any, any>) {
        console.debug(`[RSOC][${this._id}] Setting connection`)
        this._connected = false
        this._status = "NOT_CONNECTED"

        connection.connectionStatus().subscribe((status) => {
            if (status.kind === this._status) {
                return
            }
            this._status = status.kind
            if (status.kind === "CONNECTED") {
                this._connected = true
                this.setup(connection)
            }
        })
    }

    disconnect() {
        console.log(`Disconnecting ${this._route} (${JSON.stringify(this._payload)}) with id ${this._id}`)
        this._activeSubscription?.cancel()
        this._connected = false
    }

    discard() {
        this.disconnect()
        this._removeCallback()
    }
}

export const useConnectionStore = defineStore('connection', () => {
    const {showUiNotification} = useUiNotifications()

    const started = ref(false)
    const connected = ref(false)
    const connectionStatus = ref<ConnectionStatus>("NOT_CONNECTED")
    const connection = ref<ReactiveSocket<any, any> | null>(null)

    const activeSubscriptions: { [id: string]: PersistentRSocketStream<any, any> } = {};

    /**
     * Private utility function to get the connection, or retry if it's not available.
     * This ensures a smooth user experience in the case of race conditions up to 500ms, which should be plenty.
     */
    const getConnection = (triesRemaining: number = 10) => {
        return new Promise<ReactiveSocket<any, any>>((resolve, reject) => {
            if (connection.value) {
                resolve(connection.value)
            } else if (triesRemaining > 0) {
                setTimeout(() => {
                    getConnection(triesRemaining - 1).then(resolve, reject)
                }, 100)
            } else {
                const error = new Error("Could not stablish a conection to the AxonIQ backend.");
                Sentry.captureException(error)
                reject(error)
            }
        })
    }

    /**
     * Query the server with a request-response call. This will send a payload to the server and wait for a response.
     * Errors will be propagated and expected to be handled by the caller. You can suppress errors by setting
     * `suppressNotification` to true.
     * @param route The route of the request
     * @param payload The payload to send to the server
     * @param suppressNotification If true, the store will not show a notification when an error occurs. The promise will always be rejected, no matter this setting
     */
    const query = async <T>(route: string, payload?: any, suppressNotification: boolean = false): Promise<T> => {
        const c = await getConnection()

        console.debug(`[RSOC] Executing call for route ${route} with payload ${payload}`)
        return new Promise<any>((resolve, reject) => {
            c!!.requestResponse(withRoute(route, payload)).then((data) => {
                resolve(data.data)
            }, error => {
                const message = error?.source?.message ?? error.message
                if (!suppressNotification) {
                    showUiNotification({
                        type: "error",
                        message: message,
                        timeout: 5000
                    })
                }
                Sentry.captureException(error)
                console.error(`[RSOC] Error executing call for route ${route} with payload ${payload}: ${message}`)
                reject(error)
            })
        })
    }

    /**
     * Execute a request-response call to the server. This will send a payload to the server and wait for a response.
     * @param route The route of the request
     * @param payload The payload to send to the server
     * @param autoHandleErrors  If true, the store will automatically show a notification when an error occurs. If false, the error will be thrown. Setting this to true will always make the promise resolve and prevent rejection
     */
    const execute = async (route: string, payload?: any, autoHandleErrors: boolean = true): Promise<ActionResult> => {
        const c = await getConnection()

        return new Promise<ActionResult>((resolve, reject) => {
            const handleResult = (result: ActionResult) => {
                if (!result.success) {
                    if (autoHandleErrors) {
                        showUiNotification({
                            type: "error",
                            message: result.message ?? JSON.stringify(result),
                            timeout: 5000
                        })
                        resolve(result)
                    } else {
                        reject(result)
                    }
                } else {
                    resolve(result)
                }
            }
            c!!.requestResponse(withRoute(route, payload)).then((data) => {
                handleResult(data.data)
            }, error => {
                console.log(`[RSOC] Error executing call for route ${route} with payload ${payload}`, error)
                const result = {
                    success: false,
                    message: error.message
                };
                Sentry.captureException(error)
                handleResult(result)
            })
        })
    }

    /**
     * Subscribe to a stream with a route and a payload.
     * @param route The route of the stream
     * @param payload The payload to send to the stream (Optional)
     * @param handler The handler for handling the incoming data
     * @returns The ID of the subscription. You can use {@link cancel} to cancel the subscription with this ID.
     */
    const subscribe = <I, U>(route: string, payload: any, handler: Partial<PersistentRSocketStreamSubscription<I, U>>) => {
        const id = generateId()
        activeSubscriptions[id] = new PersistentRSocketStream(id, route, payload, handler, () => {
            delete activeSubscriptions[route]
        });

        // If we are connected, we can immediately accept the new connection
        let connectionValue = connection.value;
        if (connectionValue) {
            activeSubscriptions[id].acceptNewConnection(connectionValue)
        }
        return id as string
    }

    /**
     * Cancel a subscription with the given ID.
     * @param id The ID of the subscription to cancel. This ID is returned by {@link subscribe}
     */
    const cancel = (id: string) => {
        if (activeSubscriptions[id]) {
            activeSubscriptions[id].discard()
            delete activeSubscriptions[id]
        }
    }

    const determineConnectionUrl = () => {
        if (window.location.host.includes("localhost")) {
            // There's a bug in the devproxy in the version we use, so we need to use the direct URL
            return `ws://localhost:8080/rsocket`
        }
        return `wss://${window.location.host}/api/rsocket`
    }

    async function setupClient() {
        const accessToken = await auth0Plugin.getAccessTokenSilently()

        const transport = new RSocketWebsocketClient({
            url: determineConnectionUrl(),
        }, BufferEncoders);
        const client = new RSocketClient({
            serializers,
            setup: {
                ...setupBase,
                payload: {
                    metadata: encodeCompositeMetadata([
                        [MESSAGE_RSOCKET_ROUTING, encodeRoute("setup")],
                        [MESSAGE_RSOCKET_AUTHENTICATION, encodeBearerAuthMetadata(accessToken)],
                    ]),
                }
            },
            transport
        });

        client.connect().then(c => {
            c.connectionStatus().subscribe((status) => {
                connectionStatus.value = status.kind as ConnectionStatus
                if (status.kind === 'CONNECTED') {
                    if (!connected.value) {
                        console.log("[RSOC] Connection established")
                        for (const key in activeSubscriptions) {
                            activeSubscriptions[key].acceptNewConnection(c)
                        }
                    }
                    connected.value = true;
                }
                if (status.kind !== "CONNECTED" && status.kind !== 'CONNECTING') {
                    connected.value = false
                    if (status.kind === "ERROR") {
                        console.log("[RSOC] Connection closed due to error", status["error"])
                    }
                    connection.value = null
                    console.log("[RSOC] Connection closed, trying to reconnect")
                    setTimeout(() => setupClient(), 1000)
                    for (const key in activeSubscriptions) {
                        activeSubscriptions[key].disconnect()
                    }

                }
            })
            connection.value = c
        }, error => {
            console.log("[RSOC] Error connecting", error)
            setTimeout(() => setupClient(), 1000)
        })
    }

    /**
     * Connect to the RSocket server. This will establish a connection and start the streams.
     */
    const connect = async () => {
        if (!started.value) {
            started.value = true
            await setupClient()
        }
        return await getConnection(10)
    }

    /**
     * Disconnect from the RSocket server. This will close the connection and stop the streams.
     * Note that this will not cancel the subscriptions. These will resume when the connection is re-established.
     */
    const disconnect = () => {
        if (started && connection.value) {
            connection.value!!.close()
            started.value = false
        }
    }

    const isConnected = computed(() => {
        return connected.value
    })

    return {
        started,
        connect,
        isConnected,
        disconnect,
        query,
        subscribe,
        execute,
        cancel,
        connected,
        connectionStatus,
    }
})

export const withRoute = (route: string, payload: any = null) => {
    const result = {
        metadata: encodeCompositeMetadata([
            [MESSAGE_RSOCKET_ROUTING, encodeRoute(route)],
        ]),
    }
    if (payload) {
        result["data"] = toBuffer(JSON.stringify(payload))
    }
    return result
}