import { createClient } from "@connectrpc/connect";

import { NotificationService } from "../../gen/proto/notifications/notifications_connect";
import * as notifications_pb from "../../gen/proto/notifications/notifications_pb";

import { streamHandler } from "@/api/stream";
import transport from "@/api/transport";
import { pbNotifId, pbUserId } from "@/api/util";
import * as d from "@/domain/domain";
import { NotificationOrDismissedSchema } from "@/domain/notifications";
import { parseMessageWithSchema } from "./parser";

const service = createClient(NotificationService, transport);
export default service;

export async function dismissNotification(userId: d.UserId, id: d.NotificationId) {
    const req = new notifications_pb.DismissNotificationRequest({
        userId: pbUserId(userId),
        notifId: pbNotifId(id),
    });

    try {
        await service.dismissNotification(req);
    }
    catch (e) {
        throw new Error(`Error dismissing notification ${id}: ${e}, ${JSON.stringify(e)}`);
    }
}

export async function ackNotification(userId: d.UserId, id: d.NotificationId) {
    const req = new notifications_pb.AckNotificationRequest({
        userId: pbUserId(userId),
        notifId: pbNotifId(id),
    });

    try {
        await service.ackNotification(req);
    }
    catch (e) {
        throw new Error(`Error acking notification ${id}: ${e}, ${JSON.stringify(e)}`);
    }
}

export async function* streamNotifications(
    userId: d.UserId,
    exclusive: boolean,
    signal: AbortSignal,
) {
    const req = new notifications_pb.SubWebNotificationChangesRequest({
        userId: pbUserId(userId),
        exclusive,
    });

    const logPrefix = `${streamNotifications.name} ${userId}`;

    const resp = service.subWebNotificationChanges(req, { signal });

    yield* streamHandler(
        resp,
        parseMessageWithSchema(NotificationOrDismissedSchema, "notificationOrDismissed"),
        logPrefix,
    );
}
