feat: live messages via SSE + real client IP
- backend: SSE endpoint /api/messages/stream backed by Redis pub/sub - backend: read real client IP via getConnInfo (fallback for x-forwarded-for) - backend: CORS allow any origin (dev: LAN access from phone) - frontend: useMessages subscribes via EventSource, auto-reconnect, merges new messages/replies live - frontend: vite host:true to expose dev server on LAN Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -9,7 +9,7 @@ app.use("*", logger());
|
||||
app.use(
|
||||
"*",
|
||||
cors({
|
||||
origin: ["http://localhost:5173"],
|
||||
origin: (origin) => origin ?? "*",
|
||||
allowMethods: ["GET", "POST", "OPTIONS"],
|
||||
allowHeaders: ["Content-Type"],
|
||||
})
|
||||
|
||||
18
backend/src/lib/redis.ts
Normal file
18
backend/src/lib/redis.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import Redis from "ioredis";
|
||||
|
||||
const URL = process.env.REDIS_URL ?? "redis://localhost:6379";
|
||||
|
||||
const globalForRedis = globalThis as unknown as {
|
||||
redisPub?: Redis;
|
||||
redisSub?: Redis;
|
||||
};
|
||||
|
||||
export const redisPub = globalForRedis.redisPub ?? new Redis(URL);
|
||||
export const redisSub = globalForRedis.redisSub ?? new Redis(URL);
|
||||
|
||||
if (process.env.NODE_ENV !== "production") {
|
||||
globalForRedis.redisPub = redisPub;
|
||||
globalForRedis.redisSub = redisSub;
|
||||
}
|
||||
|
||||
export const MESSAGES_CHANNEL = "xip:messages";
|
||||
@@ -1,8 +1,21 @@
|
||||
import { Hono } from "hono";
|
||||
import { Hono, type Context } from "hono";
|
||||
import { streamSSE } from "hono/streaming";
|
||||
import { getConnInfo } from "hono/bun";
|
||||
import { prisma } from "../lib/prisma";
|
||||
import { redisPub, redisSub, MESSAGES_CHANNEL } from "../lib/redis";
|
||||
|
||||
const messages = new Hono();
|
||||
|
||||
function clientIp(c: Context): string {
|
||||
const fwd = c.req.header("x-forwarded-for");
|
||||
if (fwd) return fwd.split(",")[0].trim();
|
||||
try {
|
||||
return getConnInfo(c).remote.address ?? "0.0.0.0";
|
||||
} catch {
|
||||
return "0.0.0.0";
|
||||
}
|
||||
}
|
||||
|
||||
// GET /api/messages — top-level threads with replies
|
||||
messages.get("/", async (c) => {
|
||||
const data = await prisma.message.findMany({
|
||||
@@ -10,19 +23,44 @@ messages.get("/", async (c) => {
|
||||
orderBy: { createdAt: "desc" },
|
||||
take: 50,
|
||||
include: {
|
||||
replies: {
|
||||
orderBy: { createdAt: "asc" },
|
||||
},
|
||||
replies: { orderBy: { createdAt: "asc" } },
|
||||
},
|
||||
});
|
||||
return c.json(data);
|
||||
});
|
||||
|
||||
// GET /api/messages/stream — SSE live feed
|
||||
messages.get("/stream", (c) =>
|
||||
streamSSE(c, async (stream) => {
|
||||
const sub = redisSub.duplicate();
|
||||
await sub.subscribe(MESSAGES_CHANNEL);
|
||||
|
||||
sub.on("message", (channel, payload) => {
|
||||
if (channel !== MESSAGES_CHANNEL) return;
|
||||
stream.writeSSE({ event: "message", data: payload }).catch(() => {});
|
||||
});
|
||||
|
||||
await stream.writeSSE({ event: "ready", data: "ok" });
|
||||
|
||||
const ping = setInterval(() => {
|
||||
stream
|
||||
.writeSSE({ event: "ping", data: String(Date.now()) })
|
||||
.catch(() => {});
|
||||
}, 25_000);
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.onAbort(() => {
|
||||
clearInterval(ping);
|
||||
sub.disconnect();
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
// POST /api/messages — create a message or reply
|
||||
messages.post("/", async (c) => {
|
||||
const ip =
|
||||
c.req.header("x-forwarded-for")?.split(",")[0].trim() ?? "127.0.0.1";
|
||||
|
||||
const ip = clientIp(c);
|
||||
const body = await c.req.json<{ content: string; parentId?: string }>();
|
||||
|
||||
if (!body.content || body.content.trim().length === 0) {
|
||||
@@ -40,6 +78,7 @@ messages.post("/", async (c) => {
|
||||
},
|
||||
});
|
||||
|
||||
await redisPub.publish(MESSAGES_CHANNEL, JSON.stringify(message));
|
||||
return c.json(message, 201);
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user