Source code for server

"""FastAPI 地上局バックエンド

MQTT からテレメトリ/画像を受信し、WebSocket でフロントエンドへリアルタイム配信。
InfluxDB クエリ API と コマンド送信 API を提供。
"""

import asyncio
import json
import struct
import threading
import time
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path

import numpy as np
import paho.mqtt.client as mqtt
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from influxdb_client import InfluxDBClient
from PIL import Image
from pydantic import BaseModel

from shared.config import (
    DATA_DIR,
    INFLUXDB_BUCKET,
    INFLUXDB_ORG,
    INFLUXDB_TOKEN,
    INFLUXDB_URL,
    MQTT_BROKER,
    MQTT_PORT,
    TOPIC_COMMAND,
    TOPIC_IMAGE,
    TOPIC_TELEMETRY,
)
from shared.telemetry_def import ARRAY_FIELDS, FLOAT_FIELDS, INT_FIELDS, FIELD_KEYS
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS


# --- WebSocket 接続管理 ---
[docs] class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = []
[docs] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket)
[docs] def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket)
[docs] async def broadcast(self, message: str): disconnected = [] for connection in self.active_connections: try: await connection.send_text(message) except Exception: disconnected.append(connection) for conn in disconnected: self.active_connections.remove(conn)
manager = ConnectionManager() loop: asyncio.AbstractEventLoop | None = None
[docs] def broadcast_threadsafe(message: dict) -> None: """MQTT スレッド等イベントループ外からも安全に WebSocket ブロードキャストする。""" if loop: asyncio.run_coroutine_threadsafe(manager.broadcast(json.dumps(message)), loop)
# --- InfluxDB 接続 --- session_id = datetime.now().strftime("%Y%m%d_%H%M%S") influx_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) write_api = influx_client.write_api(write_options=SYNCHRONOUS) print(f"[server] InfluxDB connected (session: {session_id})") # --- パス(コンタクト区間)管理 --- # テレメトリが PASS_GAP_THRESHOLD_S 秒以上途絶したら、次の受信を新しい「パス」とみなす。 # パスは衛星が可視(通信可能)な 1 区間に相当し、ライブ表示のリセット単位になる。 PASS_GAP_THRESHOLD_S = 30.0 current_pass_id: str | None = None current_pass_started: float = 0.0 pass_counter: int = 0
[docs] def start_new_pass(reason: str, ts: float | None = None) -> str: """新しいパスを開始してフロントへ通知する。 Args: reason: 区切りの理由 ("init"=初回 / "gap"=通信途絶検出 / "manual"=手動) ts: パス開始時刻 (Unix 秒)。省略時は現在時刻。 Returns: 新しい pass_id(開始時刻ベースの文字列) """ global current_pass_id, current_pass_started, pass_counter started = ts if ts is not None else time.time() pass_counter += 1 current_pass_id = datetime.fromtimestamp(started).strftime("%Y%m%d_%H%M%S") current_pass_started = started print(f"[server] New pass: {current_pass_id} (reason={reason}, seq={pass_counter})") broadcast_threadsafe({ "type": "pass", "data": { "pass_id": current_pass_id, "started_at": started, "seq": pass_counter, "reason": reason, "session": session_id, }, }) return current_pass_id
# --- MQTT クライアント --- mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) mqtt_connected = False
[docs] def on_connect(client, userdata, flags, rc, properties): global mqtt_connected mqtt_connected = True client.subscribe(TOPIC_TELEMETRY) client.subscribe(TOPIC_IMAGE) client.subscribe(TOPIC_IMAGE + "/packet") print("[server] MQTT subscribed")
[docs] def on_disconnect(client, userdata, flags, rc, properties): global mqtt_connected mqtt_connected = False print("[server] MQTT disconnected")
[docs] def on_telemetry_msg(client, userdata, msg): """テレメトリ受信 → InfluxDB 保存 & WebSocket ブロードキャスト""" global last_telemetry_time try: data = json.loads(msg.payload.decode()) now = time.time() # パス区切り判定: 初回、または前回受信から一定時間途絶していたら新しいパス gap = now - last_telemetry_time if last_telemetry_time > 0 else None if current_pass_id is None: start_new_pass("init", ts=now) elif gap is not None and gap > PASS_GAP_THRESHOLD_S: start_new_pass("gap", ts=now) last_telemetry_time = now # InfluxDB へ書き込み(session と pass の両タグを付与) point = Point("telemetry").tag("session", session_id).tag("pass", current_pass_id) for key in INT_FIELDS: if key in data: point = point.field(key, int(data[key])) for key in FLOAT_FIELDS: if key in data: point = point.field(key, float(data[key])) for key in ARRAY_FIELDS: if key in data: for i, v in enumerate(data[key]): point = point.field(f"{key}_{i}", float(v)) # タイムスタンプがあれば使用、なければ現在時刻 ts = data.get("timestamp", time.time()) point = point.time(int(ts * 1e9), WritePrecision.NS) try: write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point) except Exception as e: print(f"[server] InfluxDB write error: {e}") # WebSocket ブロードキャスト message = json.dumps({"type": "telemetry", "data": data}) if loop: asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop) except Exception as e: print(f"[server] Telemetry parse error: {e}")
[docs] def on_image_msg(client, userdata, msg): """画像受信 → JPG 保存 → WebSocket 通知""" try: payload = msg.payload if len(payload) < 4: return meta_len = struct.unpack("<I", payload[:4])[0] meta = json.loads(payload[4 : 4 + meta_len].decode()) frame_data = payload[4 + meta_len :] width = meta["width"] height = meta["height"] mode = meta.get("mode", "GRAY") if mode == "RGB565": raw = np.frombuffer(frame_data, dtype=np.uint16).reshape((height, width)) r = ((raw >> 11) & 0x1F).astype(np.uint8) g = ((raw >> 5) & 0x3F).astype(np.uint8) b = (raw & 0x1F).astype(np.uint8) rgb = np.stack( [ (r << 3) | (r >> 2), (g << 2) | (g >> 4), (b << 3) | (b >> 2), ], axis=-1, ) img = Image.fromarray(rgb, mode="RGB") else: arr = np.frombuffer(frame_data, dtype=np.uint8).reshape((height, width)) img = Image.fromarray(arr, mode="L") # JPG として保存 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"{timestamp}.jpg" filepath = DATA_DIR / filename img.save(filepath, format="JPEG", quality=90) print(f"[server] Image saved: {filepath}") # WebSocket でファイルパスを通知 message = json.dumps({ "type": "image", "data": { "filename": filename, "path": f"/data/images/{filename}", "timestamp": datetime.now().isoformat(), "width": width, "height": height, "mode": mode, }, }) if loop: asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop) except Exception as e: print(f"[server] Image parse error: {e}")
mqtt_client.on_connect = on_connect mqtt_client.on_disconnect = on_disconnect mqtt_client.message_callback_add(TOPIC_TELEMETRY, on_telemetry_msg) mqtt_client.message_callback_add(TOPIC_IMAGE, on_image_msg) # テレメトリ最終受信時刻 last_telemetry_time: float = 0.0
[docs] def start_mqtt(): """MQTT クライアントを別スレッドで起動""" try: mqtt_client.connect(MQTT_BROKER, MQTT_PORT) mqtt_client.loop_start() print(f"[server] MQTT connected: {MQTT_BROKER}:{MQTT_PORT}") except Exception as e: print(f"[server] MQTT connection failed: {e}") print(f"[server] Continuing without MQTT. Please check if Docker (Mosquitto) is running.")
# --- FastAPI アプリ ---
[docs] @asynccontextmanager async def lifespan(app: FastAPI): global loop loop = asyncio.get_event_loop() start_mqtt() yield mqtt_client.loop_stop() mqtt_client.disconnect()
app = FastAPI(title="CanSat Ground Station API", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 静的ファイル (保存画像) を配信 app.mount("/data/images", StaticFiles(directory=str(DATA_DIR)), name="images") # --- WebSocket エンドポイント ---
[docs] @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: # クライアントからのメッセージを待つ (keepalive) await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket)
# --- REST API ---
[docs] class CommandRequest(BaseModel): command: str
[docs] def write_command(command: str, status: str, ts: float) -> None: """送信コマンドを InfluxDB に記録する(後から履歴として参照可能)。""" try: point = ( Point("command") .tag("session", session_id) .tag("pass", current_pass_id or "unknown") .field("command", command) .field("status", status) .time(int(ts * 1e9), WritePrecision.NS) ) write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point) except Exception as e: print(f"[server] Command write error: {e}")
[docs] @app.get("/api/status") async def get_status(): """バックエンド各レイヤーの接続状態と現在のパスを返す""" return { "mqtt_connected": mqtt_connected, "last_telemetry_time": last_telemetry_time, "server_time": time.time(), "current_pass_id": current_pass_id, "pass_started_at": current_pass_started, "session": session_id, }
[docs] @app.post("/api/command") async def send_command(req: CommandRequest): """コマンドを MQTT トピックに publish し、履歴を InfluxDB に記録する""" mqtt_client.publish(TOPIC_COMMAND, req.command) ts = time.time() write_command(req.command, "sent", ts) broadcast_threadsafe({ "type": "command", "data": { "command": req.command, "status": "sent", "timestamp": ts, "pass_id": current_pass_id, }, }) return {"status": "ok", "command": req.command}
[docs] @app.post("/api/pass/new") async def new_pass(): """手動で新しいパスを開始する(運用者操作)""" pid = start_new_pass("manual") return {"status": "ok", "pass_id": pid, "started_at": current_pass_started}
[docs] @app.get("/api/passes") async def get_passes(): """InfluxDB に記録されたパス一覧を取得(新しい順)""" try: client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) query_api = client.query_api() query = f""" import "influxdata/influxdb/schema" schema.tagValues(bucket: "{INFLUXDB_BUCKET}", tag: "pass") """ tables = query_api.query(query, org=INFLUXDB_ORG) passes = [] for table in tables: for record in table.records: val = record.get_value() if val and val != "unknown": passes.append(val) client.close() return {"passes": sorted(passes, reverse=True), "current": current_pass_id} except Exception as e: return {"passes": [], "error": str(e)}
[docs] @app.get("/api/commands") async def get_commands(limit: int = 100, pass_id: str = ""): """送信済みコマンドの履歴を取得(古い順)""" try: query_api = influx_client.query_api() pass_filter = "" if pass_id: pass_filter = f' |> filter(fn: (r) => r["pass"] == "{pass_id}")\n' query = f""" from(bucket: "{INFLUXDB_BUCKET}") |> range(start: -30d) |> filter(fn: (r) => r["_measurement"] == "command") {pass_filter} |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> sort(columns: ["_time"], desc: false) |> tail(n: {limit}) """ tables = query_api.query(query, org=INFLUXDB_ORG) results = [] for table in tables: for record in table.records: d = record.values results.append({ "timestamp": record.get_time().timestamp(), "command": d.get("command", ""), "status": d.get("status", "sent"), "pass_id": d.get("pass", ""), }) return {"data": results} except Exception as e: print(f"[server] Commands API error: {e}") return {"data": [], "error": str(e)}
[docs] @app.get("/api/history") async def get_history(field: str = "battery_volt", duration: str = "-5m", session: str = ""): """InfluxDB から過去のテレメトリを取得""" try: client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) query_api = client.query_api() session_filter = "" if session: session_filter = f' |> filter(fn: (r) => r["session"] == "{session}")\n' query = f""" from(bucket: "{INFLUXDB_BUCKET}") |> range(start: {duration}) |> filter(fn: (r) => r["_measurement"] == "telemetry") |> filter(fn: (r) => r["_field"] == "{field}") {session_filter} |> aggregateWindow(every: 1s, fn: mean, createEmpty: false) """ tables = query_api.query(query, org=INFLUXDB_ORG) results = [] for table in tables: for record in table.records: results.append({ "time": record.get_time().isoformat(), "value": record.get_value(), }) client.close() return {"data": results} except Exception as e: return {"data": [], "error": str(e)}
[docs] @app.get("/api/telemetry/history") async def get_telemetry_history(limit: int = 0, pass_id: str = "", max_points: int = 0): """InfluxDB からテレメトリ全フィールドを取得。 用途別の使い分け: - ライブ初期ロード: pass_id + limit(直近 limit 点のみ)/ もしくは pass 未指定で直近 limit 点 - リプレイ: pass_id のみ(パス全体)+ max_points(多すぎる場合はサーバ側で間引き) max_points を指定すると、返却点数がそれを超える場合に等間隔で間引いて軽量化する。 """ try: query_api = influx_client.query_api() if pass_id: # 指定パスのデータ。limit があれば直近 limit 点に絞る(ライブ用)、無ければ全体(リプレイ用)。 tail_clause = f" |> tail(n: {limit})\n" if limit and limit > 0 else "" query = f""" from(bucket: "{INFLUXDB_BUCKET}") |> range(start: -30d) |> filter(fn: (r) => r["_measurement"] == "telemetry") |> filter(fn: (r) => r["pass"] == "{pass_id}") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> sort(columns: ["_time"], desc: false) {tail_clause}""" else: # 直近 limit 点(ライブ初期ロード用、既定 300) n = limit if limit and limit > 0 else 300 query = f""" from(bucket: "{INFLUXDB_BUCKET}") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "telemetry") |> tail(n: {n}) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") """ tables = query_api.query(query, org=INFLUXDB_ORG) results = [] # フィールド名のリスト(配列フィールドも含む) for table in tables: for record in table.records: d = record.values # 不要なメタデータを削除 tlm = { "timestamp": record.get_time().timestamp(), } # スキーマに定義されているフィールドを抽出 for key in FIELD_KEYS: if key in d: tlm[key] = d[key] # 配列フィールドの復元 (pd0, pd1... -> pd: [pd0, pd1...]) for key, length in ARRAY_FIELDS.items(): arr = [] for i in range(length): field_name = f"{key}_{i}" if field_name in d: arr.append(d[field_name]) if arr: tlm[key] = arr results.append(tlm) # 点数が多すぎる場合は等間隔で間引く(先頭・末尾は保持)。ブラウザ側の描画負荷を抑える。 if max_points and max_points > 0 and len(results) > max_points: step = (len(results) + max_points - 1) // max_points sampled = results[::step] if sampled[-1] is not results[-1]: sampled.append(results[-1]) results = sampled return {"data": results} except Exception as e: print(f"[server] History API error: {e}") return {"data": [], "error": str(e)}
[docs] @app.get("/api/sessions") async def get_sessions(): """InfluxDB に保存されたセッション一覧を取得""" try: client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) query_api = client.query_api() query = f""" import "influxdata/influxdb/schema" schema.tagValues(bucket: "{INFLUXDB_BUCKET}", tag: "session") """ tables = query_api.query(query, org=INFLUXDB_ORG) sessions = [] for table in tables: for record in table.records: sessions.append(record.get_value()) client.close() return {"sessions": sorted(sessions, reverse=True)} except Exception as e: return {"sessions": [], "error": str(e)}
[docs] @app.get("/api/images") async def list_images(): """保存済み画像一覧を返す(新しい順)""" images = sorted(DATA_DIR.glob("*.jpg"), reverse=True) return { "images": [ { "filename": img.name, "path": f"/data/images/{img.name}", "timestamp": img.stem, } for img in images[:50] ] }
if __name__ == "__main__": import uvicorn uvicorn.run("server:app", host="127.0.0.1", port=8000, reload=True)