"""FastAPI 地上局バックエンド
MQTT からテレメトリ/画像を受信し、WebSocket でフロントエンドへリアルタイム配信。
InfluxDB クエリ API と コマンド送信 API を提供。
"""
import asyncio
import json
import struct
import threading
import time
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
import numpy as np
import paho.mqtt.client as mqtt
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from influxdb_client import InfluxDBClient
from PIL import Image
from pydantic import BaseModel
from shared.config import (
DATA_DIR,
INFLUXDB_BUCKET,
INFLUXDB_ORG,
INFLUXDB_TOKEN,
INFLUXDB_URL,
MQTT_BROKER,
MQTT_PORT,
TOPIC_COMMAND,
TOPIC_IMAGE,
TOPIC_TELEMETRY,
)
from shared.telemetry_def import ARRAY_FIELDS, FLOAT_FIELDS, INT_FIELDS, FIELD_KEYS
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
# --- WebSocket 接続管理 ---
[docs]
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
[docs]
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
[docs]
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
[docs]
async def broadcast(self, message: str):
disconnected = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception:
disconnected.append(connection)
for conn in disconnected:
self.active_connections.remove(conn)
manager = ConnectionManager()
loop: asyncio.AbstractEventLoop | None = None
[docs]
def broadcast_threadsafe(message: dict) -> None:
"""MQTT スレッド等イベントループ外からも安全に WebSocket ブロードキャストする。"""
if loop:
asyncio.run_coroutine_threadsafe(manager.broadcast(json.dumps(message)), loop)
# --- InfluxDB 接続 ---
session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
influx_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
print(f"[server] InfluxDB connected (session: {session_id})")
# --- パス(コンタクト区間)管理 ---
# テレメトリが PASS_GAP_THRESHOLD_S 秒以上途絶したら、次の受信を新しい「パス」とみなす。
# パスは衛星が可視(通信可能)な 1 区間に相当し、ライブ表示のリセット単位になる。
PASS_GAP_THRESHOLD_S = 30.0
current_pass_id: str | None = None
current_pass_started: float = 0.0
pass_counter: int = 0
[docs]
def start_new_pass(reason: str, ts: float | None = None) -> str:
"""新しいパスを開始してフロントへ通知する。
Args:
reason: 区切りの理由 ("init"=初回 / "gap"=通信途絶検出 / "manual"=手動)
ts: パス開始時刻 (Unix 秒)。省略時は現在時刻。
Returns:
新しい pass_id(開始時刻ベースの文字列)
"""
global current_pass_id, current_pass_started, pass_counter
started = ts if ts is not None else time.time()
pass_counter += 1
current_pass_id = datetime.fromtimestamp(started).strftime("%Y%m%d_%H%M%S")
current_pass_started = started
print(f"[server] New pass: {current_pass_id} (reason={reason}, seq={pass_counter})")
broadcast_threadsafe({
"type": "pass",
"data": {
"pass_id": current_pass_id,
"started_at": started,
"seq": pass_counter,
"reason": reason,
"session": session_id,
},
})
return current_pass_id
# --- MQTT クライアント ---
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_connected = False
[docs]
def on_connect(client, userdata, flags, rc, properties):
global mqtt_connected
mqtt_connected = True
client.subscribe(TOPIC_TELEMETRY)
client.subscribe(TOPIC_IMAGE)
client.subscribe(TOPIC_IMAGE + "/packet")
print("[server] MQTT subscribed")
[docs]
def on_disconnect(client, userdata, flags, rc, properties):
global mqtt_connected
mqtt_connected = False
print("[server] MQTT disconnected")
[docs]
def on_telemetry_msg(client, userdata, msg):
"""テレメトリ受信 → InfluxDB 保存 & WebSocket ブロードキャスト"""
global last_telemetry_time
try:
data = json.loads(msg.payload.decode())
now = time.time()
# パス区切り判定: 初回、または前回受信から一定時間途絶していたら新しいパス
gap = now - last_telemetry_time if last_telemetry_time > 0 else None
if current_pass_id is None:
start_new_pass("init", ts=now)
elif gap is not None and gap > PASS_GAP_THRESHOLD_S:
start_new_pass("gap", ts=now)
last_telemetry_time = now
# InfluxDB へ書き込み(session と pass の両タグを付与)
point = Point("telemetry").tag("session", session_id).tag("pass", current_pass_id)
for key in INT_FIELDS:
if key in data:
point = point.field(key, int(data[key]))
for key in FLOAT_FIELDS:
if key in data:
point = point.field(key, float(data[key]))
for key in ARRAY_FIELDS:
if key in data:
for i, v in enumerate(data[key]):
point = point.field(f"{key}_{i}", float(v))
# タイムスタンプがあれば使用、なければ現在時刻
ts = data.get("timestamp", time.time())
point = point.time(int(ts * 1e9), WritePrecision.NS)
try:
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
except Exception as e:
print(f"[server] InfluxDB write error: {e}")
# WebSocket ブロードキャスト
message = json.dumps({"type": "telemetry", "data": data})
if loop:
asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop)
except Exception as e:
print(f"[server] Telemetry parse error: {e}")
[docs]
def on_image_msg(client, userdata, msg):
"""画像受信 → JPG 保存 → WebSocket 通知"""
try:
payload = msg.payload
if len(payload) < 4:
return
meta_len = struct.unpack("<I", payload[:4])[0]
meta = json.loads(payload[4 : 4 + meta_len].decode())
frame_data = payload[4 + meta_len :]
width = meta["width"]
height = meta["height"]
mode = meta.get("mode", "GRAY")
if mode == "RGB565":
raw = np.frombuffer(frame_data, dtype=np.uint16).reshape((height, width))
r = ((raw >> 11) & 0x1F).astype(np.uint8)
g = ((raw >> 5) & 0x3F).astype(np.uint8)
b = (raw & 0x1F).astype(np.uint8)
rgb = np.stack(
[
(r << 3) | (r >> 2),
(g << 2) | (g >> 4),
(b << 3) | (b >> 2),
],
axis=-1,
)
img = Image.fromarray(rgb, mode="RGB")
else:
arr = np.frombuffer(frame_data, dtype=np.uint8).reshape((height, width))
img = Image.fromarray(arr, mode="L")
# JPG として保存
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"{timestamp}.jpg"
filepath = DATA_DIR / filename
img.save(filepath, format="JPEG", quality=90)
print(f"[server] Image saved: {filepath}")
# WebSocket でファイルパスを通知
message = json.dumps({
"type": "image",
"data": {
"filename": filename,
"path": f"/data/images/{filename}",
"timestamp": datetime.now().isoformat(),
"width": width,
"height": height,
"mode": mode,
},
})
if loop:
asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop)
except Exception as e:
print(f"[server] Image parse error: {e}")
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
mqtt_client.message_callback_add(TOPIC_TELEMETRY, on_telemetry_msg)
mqtt_client.message_callback_add(TOPIC_IMAGE, on_image_msg)
# テレメトリ最終受信時刻
last_telemetry_time: float = 0.0
[docs]
def start_mqtt():
"""MQTT クライアントを別スレッドで起動"""
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT)
mqtt_client.loop_start()
print(f"[server] MQTT connected: {MQTT_BROKER}:{MQTT_PORT}")
except Exception as e:
print(f"[server] MQTT connection failed: {e}")
print(f"[server] Continuing without MQTT. Please check if Docker (Mosquitto) is running.")
# --- FastAPI アプリ ---
[docs]
@asynccontextmanager
async def lifespan(app: FastAPI):
global loop
loop = asyncio.get_event_loop()
start_mqtt()
yield
mqtt_client.loop_stop()
mqtt_client.disconnect()
app = FastAPI(title="CanSat Ground Station API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 静的ファイル (保存画像) を配信
app.mount("/data/images", StaticFiles(directory=str(DATA_DIR)), name="images")
# --- WebSocket エンドポイント ---
[docs]
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# クライアントからのメッセージを待つ (keepalive)
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
# --- REST API ---
[docs]
class CommandRequest(BaseModel):
command: str
[docs]
def write_command(command: str, status: str, ts: float) -> None:
"""送信コマンドを InfluxDB に記録する(後から履歴として参照可能)。"""
try:
point = (
Point("command")
.tag("session", session_id)
.tag("pass", current_pass_id or "unknown")
.field("command", command)
.field("status", status)
.time(int(ts * 1e9), WritePrecision.NS)
)
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
except Exception as e:
print(f"[server] Command write error: {e}")
[docs]
@app.get("/api/status")
async def get_status():
"""バックエンド各レイヤーの接続状態と現在のパスを返す"""
return {
"mqtt_connected": mqtt_connected,
"last_telemetry_time": last_telemetry_time,
"server_time": time.time(),
"current_pass_id": current_pass_id,
"pass_started_at": current_pass_started,
"session": session_id,
}
[docs]
@app.post("/api/command")
async def send_command(req: CommandRequest):
"""コマンドを MQTT トピックに publish し、履歴を InfluxDB に記録する"""
mqtt_client.publish(TOPIC_COMMAND, req.command)
ts = time.time()
write_command(req.command, "sent", ts)
broadcast_threadsafe({
"type": "command",
"data": {
"command": req.command,
"status": "sent",
"timestamp": ts,
"pass_id": current_pass_id,
},
})
return {"status": "ok", "command": req.command}
[docs]
@app.post("/api/pass/new")
async def new_pass():
"""手動で新しいパスを開始する(運用者操作)"""
pid = start_new_pass("manual")
return {"status": "ok", "pass_id": pid, "started_at": current_pass_started}
[docs]
@app.get("/api/passes")
async def get_passes():
"""InfluxDB に記録されたパス一覧を取得(新しい順)"""
try:
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
query = f"""
import "influxdata/influxdb/schema"
schema.tagValues(bucket: "{INFLUXDB_BUCKET}", tag: "pass")
"""
tables = query_api.query(query, org=INFLUXDB_ORG)
passes = []
for table in tables:
for record in table.records:
val = record.get_value()
if val and val != "unknown":
passes.append(val)
client.close()
return {"passes": sorted(passes, reverse=True), "current": current_pass_id}
except Exception as e:
return {"passes": [], "error": str(e)}
[docs]
@app.get("/api/commands")
async def get_commands(limit: int = 100, pass_id: str = ""):
"""送信済みコマンドの履歴を取得(古い順)"""
try:
query_api = influx_client.query_api()
pass_filter = ""
if pass_id:
pass_filter = f' |> filter(fn: (r) => r["pass"] == "{pass_id}")\n'
query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "command")
{pass_filter} |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> sort(columns: ["_time"], desc: false)
|> tail(n: {limit})
"""
tables = query_api.query(query, org=INFLUXDB_ORG)
results = []
for table in tables:
for record in table.records:
d = record.values
results.append({
"timestamp": record.get_time().timestamp(),
"command": d.get("command", ""),
"status": d.get("status", "sent"),
"pass_id": d.get("pass", ""),
})
return {"data": results}
except Exception as e:
print(f"[server] Commands API error: {e}")
return {"data": [], "error": str(e)}
[docs]
@app.get("/api/history")
async def get_history(field: str = "battery_volt", duration: str = "-5m", session: str = ""):
"""InfluxDB から過去のテレメトリを取得"""
try:
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
session_filter = ""
if session:
session_filter = f' |> filter(fn: (r) => r["session"] == "{session}")\n'
query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
|> range(start: {duration})
|> filter(fn: (r) => r["_measurement"] == "telemetry")
|> filter(fn: (r) => r["_field"] == "{field}")
{session_filter} |> aggregateWindow(every: 1s, fn: mean, createEmpty: false)
"""
tables = query_api.query(query, org=INFLUXDB_ORG)
results = []
for table in tables:
for record in table.records:
results.append({
"time": record.get_time().isoformat(),
"value": record.get_value(),
})
client.close()
return {"data": results}
except Exception as e:
return {"data": [], "error": str(e)}
[docs]
@app.get("/api/telemetry/history")
async def get_telemetry_history(limit: int = 0, pass_id: str = "", max_points: int = 0):
"""InfluxDB からテレメトリ全フィールドを取得。
用途別の使い分け:
- ライブ初期ロード: pass_id + limit(直近 limit 点のみ)/ もしくは pass 未指定で直近 limit 点
- リプレイ: pass_id のみ(パス全体)+ max_points(多すぎる場合はサーバ側で間引き)
max_points を指定すると、返却点数がそれを超える場合に等間隔で間引いて軽量化する。
"""
try:
query_api = influx_client.query_api()
if pass_id:
# 指定パスのデータ。limit があれば直近 limit 点に絞る(ライブ用)、無ければ全体(リプレイ用)。
tail_clause = f" |> tail(n: {limit})\n" if limit and limit > 0 else ""
query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "telemetry")
|> filter(fn: (r) => r["pass"] == "{pass_id}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> sort(columns: ["_time"], desc: false)
{tail_clause}"""
else:
# 直近 limit 点(ライブ初期ロード用、既定 300)
n = limit if limit and limit > 0 else 300
query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "telemetry")
|> tail(n: {n})
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
"""
tables = query_api.query(query, org=INFLUXDB_ORG)
results = []
# フィールド名のリスト(配列フィールドも含む)
for table in tables:
for record in table.records:
d = record.values
# 不要なメタデータを削除
tlm = {
"timestamp": record.get_time().timestamp(),
}
# スキーマに定義されているフィールドを抽出
for key in FIELD_KEYS:
if key in d:
tlm[key] = d[key]
# 配列フィールドの復元 (pd0, pd1... -> pd: [pd0, pd1...])
for key, length in ARRAY_FIELDS.items():
arr = []
for i in range(length):
field_name = f"{key}_{i}"
if field_name in d:
arr.append(d[field_name])
if arr:
tlm[key] = arr
results.append(tlm)
# 点数が多すぎる場合は等間隔で間引く(先頭・末尾は保持)。ブラウザ側の描画負荷を抑える。
if max_points and max_points > 0 and len(results) > max_points:
step = (len(results) + max_points - 1) // max_points
sampled = results[::step]
if sampled[-1] is not results[-1]:
sampled.append(results[-1])
results = sampled
return {"data": results}
except Exception as e:
print(f"[server] History API error: {e}")
return {"data": [], "error": str(e)}
[docs]
@app.get("/api/sessions")
async def get_sessions():
"""InfluxDB に保存されたセッション一覧を取得"""
try:
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
query = f"""
import "influxdata/influxdb/schema"
schema.tagValues(bucket: "{INFLUXDB_BUCKET}", tag: "session")
"""
tables = query_api.query(query, org=INFLUXDB_ORG)
sessions = []
for table in tables:
for record in table.records:
sessions.append(record.get_value())
client.close()
return {"sessions": sorted(sessions, reverse=True)}
except Exception as e:
return {"sessions": [], "error": str(e)}
[docs]
@app.get("/api/images")
async def list_images():
"""保存済み画像一覧を返す(新しい順)"""
images = sorted(DATA_DIR.glob("*.jpg"), reverse=True)
return {
"images": [
{
"filename": img.name,
"path": f"/data/images/{img.name}",
"timestamp": img.stem,
}
for img in images[:50]
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host="127.0.0.1", port=8000, reload=True)