import { Interceptor } from "@connectrpc/connect";
import { AwaitableQueue, GoatTransport, Rpc, RpcReadWriter } from "goat-es";
import WebSocket from "isomorphic-ws";
import log from "../misc/log";

export { WebsocketAuthMessage } from "../../gen/proto/clients/clients_pb";
import { tracingInterceptor } from "./trace";
import { isBondTestUk, isDevEnv } from "../misc/environment";

///////////////////////////////////////////////////////////////////////////////
// GOAT!

// Does there really not exist a typescript project that creates this enum?
export enum WebsocketErrorCode {
    Normal = 1000,
    GoingAway = 1001, // client leaving
    ProtocolError = 1002, // endpoint received malformed frame
    Unsupported = 1003, // endpoint received unsupported frame
    Internal = 1004, // reserved
    ClosedNoStatus = 1005, // expected a close status, got none
    Abnormal = 1006, // no close code frame received
    UnsupportedPayload = 1007, // endpoint received inconsistent message
    PolicyViolation = 1008, // generic code, but not Unsupported or TooLarge
    TooLarge = 1009, // endpoint won't process large frame
    MandatoryExtension = 1010, // client wanted extension server did not negotiate
    ServerError = 1011,
    ServiceRestart = 1012,
    TryAgainLater = 1013,
    BadGateway = 1014,
    TLSHandshakeFail = 1015,

    // Our own error codes follow. Application-defined codes exist after 4000, and we try
    // and follow HTTP codes where it makes sense. Hence 4401 ~= HTTP 401.
    BadRequest = 4400,
    Unauthorised = 4401,
    UnsupportedProtocol = 4402,

    Generic = 5000,
}

export interface WebsocketError {
    code: WebsocketErrorCode;
    reason: string;
}

export class CompletableDeferred<T> {
    private val?: T;
    private waiters: ((value: T) => void)[] = [];

    isCompleted() {
        return this.val !== undefined;
    }

    complete(val: T): boolean {
        if (this.isCompleted()) {
            return false;
        }
        this.val = val;
        const w = this.waiters;
        this.waiters = [];
        w.forEach(r => r(val));
        return true;
    }

    async wait(): Promise<T> {
        if (this.isCompleted()) {
            return this.val!;
        }

        return new Promise<T>(r => {
            this.waiters.push(r);
        });
    }
}

export interface WebsocketRpcsInterface extends RpcReadWriter {
    connect(dest: URL, authCallback: (ws: WebSocket) => Promise<void>): void;
    connected: CompletableDeferred<null | WebsocketError>;
    disconnected: CompletableDeferred<WebsocketError>;
}
export class WebsocketRpcs implements WebsocketRpcsInterface {
    private ws?: WebSocket;
    private queue = new AwaitableQueue<Rpc>();
    public connected = new CompletableDeferred<null | WebsocketError>();
    public disconnected = new CompletableDeferred<WebsocketError>();

    private warnIfNotDisconnected(msg: string, ...other: any[]) {
        if (!this.disconnected.isCompleted()) {
            log.warn(msg, ...other);
        }
    }

    connect(dest: URL, authCallback?: (ws: WebSocket) => Promise<void>) {
        log.info(`Websocket connecting to ${dest}...`);

        this.ws = new WebSocket(dest, ["goat"]);
        this.ws.binaryType = "arraybuffer";

        this.ws.addEventListener("error", ev => {
            this.warnIfNotDisconnected(
                `Websocket connection to ${dest} error${
                    ev.error === undefined && ev.message === undefined ? ""
                        : `: ${ev.error}, ${ev.message}`
                }`,
                ev,
            );
            // *DON'T* call disconnect here.
            // The spec (https://websockets.spec.whatwg.org/#closeWebSocket) says
            // that an error event is always followed by a close event.
            // We might hit an error if our auth callback fails, and calling
            // `this.disconnect()` here causes us to think that the problem is
            // just a temporary one, rather than the correct behaviour of
            // forcing us to fetch a new token.
        });
        this.ws.addEventListener("close", ev => {
            this.warnIfNotDisconnected(
                `Websocket connection to ${dest} disconnected: code ${ev.code} (${
                    WebsocketErrorCode[ev.code] || "unknown"
                }) reason ${ev.reason}`,
            );
            this.disconnect({
                code: ev.code,
                reason: ev.reason,
            }, false);
        });
        this.ws.addEventListener("open", _ev => {
            log.info(`Websocket connection to ${dest} established`);
            if (authCallback) {
                authCallback(this.ws!)
                    .then(() => {
                        this.connected.complete(null);
                    })
                    .catch(reason => {
                        // reason can be an error if the authCallback throws
                        const err = (reason instanceof Error) ?
                            reason :
                            new Error(`Pre-GOAT failed: ${reason}`);
                        const msg = (reason instanceof Error) ?
                            reason.message :
                            (reason?.ToString() ?? "unauthorised");

                        const result = {
                            err,
                            code: WebsocketErrorCode.Unauthorised,
                            reason: msg,
                        };
                        this.connected.complete(result);
                        this.disconnected.complete(result);
                    });
            }
            else {
                this.connected.complete(null);
            }
        });
        this.ws.addEventListener("message", msgEvent => {
            try {
                const data = msgEvent.data as ArrayBuffer;

                // This is probably quite a bit of extra trace data...
                // this.span?.addEvent("websocket-data", {
                //    [SemanticAttributes.MESSAGE_UNCOMPRESSED_SIZE]: data.byteLength,
                // });

                const rpc = new Rpc({}).fromBinary(new Uint8Array(data));
                this.queue.push(rpc);
            }
            catch (ex: unknown) {
                let msg = "unknown protocol error";
                if (ex instanceof Error) {
                    msg = ex.message;
                }
                this.disconnect({
                    code: WebsocketErrorCode.BadRequest,
                    reason: msg,
                });
            }
        });
    }

    private disconnect(err: WebsocketError, actuallyClose: boolean = true) {
        this.connected.complete(err);
        this.disconnected.complete(err);
        if (actuallyClose) {
            this.ws?.close(err.code, err.reason);
        }
    }

    done() {
        this.disconnect({
            code: WebsocketErrorCode.Normal,
            reason: "finished",
        });
    }

    async read(): Promise<Rpc> {
        if (!this.connected.isCompleted()) {
            const ret = await this.connected.wait();
            if (ret instanceof Error) {
                throw ret;
            }
        }

        const ret = await Promise.race([
            this.disconnected.wait(),
            this.queue.nonEmpty(),
        ]);

        if (ret instanceof Error) {
            throw ret;
        }

        return this.queue.pop();
    }

    async write(rpc: Rpc): Promise<void> {
        if (!this.connected.isCompleted()) {
            const ret = await this.connected.wait();
            if (ret instanceof Error) {
                throw ret;
            }
        }

        if (this.disconnected.isCompleted()) {
            throw await this.disconnected.wait();
        }

        if (!this.ws) {
            throw new Error("invalid state, no websocket");
        }

        if (this.ws.readyState !== WebSocket.OPEN) {
            throw new Error(`websocket is not ready: state=${this.ws.readyState}`);
        }

        this.ws?.send(rpc.toBinary());
    }
}
export class DisconnectedRpcReadWriter implements RpcReadWriter {
    private cleanup?: (reason?: any) => void;

    async read(): Promise<Rpc> {
        await new Promise((_resolve, reject) => {
            this.cleanup = reject;
        });
        throw new Error("not connected");
    }
    async write(_rpc: Rpc): Promise<void> {
        throw new Error("not connected");
    }
    done() {
        if (this.cleanup) {
            this.cleanup();
        }
    }
}

// We need to define the `authInterceptor` early on in order
// to define the transport in order to define the gRPC services.
// This means that we have to have the interceptor pull out
// some value for the token from state/a function. Make a closure
// here for that.
// We want something similar for the tracing context in `trace.ts`.
export type BearerToken = string | null;
const bearerToken = (() => {
    let token: BearerToken;
    return {
        get: () => token,
        set: (newToken: BearerToken) => {
            token = newToken;
        },
    };
})();

export const { set: setBearerToken } = bearerToken;

const authInterceptor: Interceptor = next => req => {
    const token = bearerToken.get();
    if (token) {
        req.header.set("Authorization", `bearer ${token}`);
    }
    return next(req);
};

const interceptors = [authInterceptor, tracingInterceptor];

if (isDevEnv || isBondTestUk) {
    const addGrpcDevInterceptor = () => {
        const i = (window as any).__CONNECT_WEB_DEVTOOLS__;
        if (i) interceptors.push(i);
    };

    // If the event has already fired, we have to do it manually.
    // If it hasn't, running it manually will do nothing.
    addGrpcDevInterceptor();
    window.addEventListener("connect-web-dev-tools-ready", addGrpcDevInterceptor);
}

export const transport = new GoatTransport(new DisconnectedRpcReadWriter(), {
    // Name needs to match that used by the backend in goat.NewServer()
    destinationName: "beyond",
    interceptors,
});

export default transport;
