#!/usr/bin/env python3 import os import random import secrets import threading import time from flask import Flask, render_template, Response, request, send_from_directory, session, redirect, url_for import requests from requests.exceptions import ConnectionError from openpilot.common.realtime import set_core_affinity import openpilot.selfdrive.frogpilot.fleetmanager.helpers as fleet from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog import traceback app = Flask(__name__) @app.route("/") def home_page(): return render_template("index.html") @app.errorhandler(500) def internal_error(exception): print('500 error caught') tberror = traceback.format_exc() return render_template("error.html", error=tberror) @app.route("/footage/full//") def full(cameratype, route): chunk_size = 1024 * 512 # 5KiB file_name = cameratype + (".ts" if cameratype == "qcamera" else ".hevc") vidlist = "|".join(Paths.log_root() + "/" + segment + "/" + file_name for segment in fleet.segments_in_route(route)) def generate_buffered_stream(): with fleet.ffmpeg_mp4_concat_wrap_process_builder(vidlist, cameratype, chunk_size) as process: for chunk in iter(lambda: process.stdout.read(chunk_size), b""): yield bytes(chunk) return Response(generate_buffered_stream(), status=200, mimetype='video/mp4') @app.route("/footage//") def fcamera(cameratype, segment): if not fleet.is_valid_segment(segment): return render_template("error.html", error="invalid segment") file_name = Paths.log_root() + "/" + segment + "/" + cameratype + (".ts" if cameratype == "qcamera" else ".hevc") return Response(fleet.ffmpeg_mp4_wrap_process_builder(file_name).stdout.read(), status=200, mimetype='video/mp4') @app.route("/footage/") def route(route): if len(route) != 20: return render_template("error.html", error="route not found") if str(request.query_string) == "b''": query_segment = str("0") query_type = "qcamera" else: query_segment = (str(request.query_string).split(","))[0][2:] query_type = (str(request.query_string).split(","))[1][:-1] links = "" segments = "" for segment in fleet.segments_in_route(route): links += ""+segment+"
" segments += "'"+segment+"'," return render_template("route.html", route=route, query_type=query_type, links=links, segments=segments, query_segment=query_segment) @app.route("/footage/") @app.route("/footage") def footage(): return render_template("footage.html", rows=fleet.all_routes()) @app.route("/screenrecords/") @app.route("/screenrecords") def screenrecords(): rows = fleet.list_files(fleet.SCREENRECORD_PATH) if not rows: return render_template("error.html", error="no screenrecords found at:

" + fleet.SCREENRECORD_PATH) return render_template("screenrecords.html", rows=rows, clip=rows[0]) @app.route("/screenrecords/") def screenrecord(clip): return render_template("screenrecords.html", rows=fleet.list_files(fleet.SCREENRECORD_PATH), clip=clip) @app.route("/screenrecords/play/pipe/") def videoscreenrecord(file): file_name = fleet.SCREENRECORD_PATH + file return Response(fleet.ffplay_mp4_wrap_process_builder(file_name).stdout.read(), status=200, mimetype='video/mp4') @app.route("/screenrecords/download/") def download_file(clip): return send_from_directory(fleet.SCREENRECORD_PATH, clip, as_attachment=True) @app.route("/about") def about(): return render_template("about.html") @app.route("/error_logs") def error_logs(): return render_template("error_logs.html", rows=fleet.list_files(fleet.ERROR_LOGS_PATH)) @app.route("/error_logs/") def open_error_log(file_name): f = open(fleet.ERROR_LOGS_PATH + file_name) error = f.read() return render_template("error_log.html", file_name=file_name, file_content=error) @app.route("/addr_input", methods=['GET', 'POST']) def addr_input(): SearchInput = fleet.get_SearchInput() token = fleet.get_public_token() s_token = fleet.get_app_token() gmap_key = fleet.get_gmap_key() PrimeType = fleet.get_PrimeType() lon = float(0.0) lat = float(0.0) if request.method == 'POST': valid_addr = False postvars = request.form.to_dict() addr, lon, lat, valid_addr, token = fleet.parse_addr(postvars, lon, lat, valid_addr, token) if not valid_addr: # If address is not found, try searching postvars = request.form.to_dict() addr = request.form.get('addr_val') addr, lon, lat, valid_addr, token = fleet.search_addr(postvars, lon, lat, valid_addr, token) if valid_addr: # If a valid address is found, redirect to nav_confirmation return redirect(url_for('nav_confirmation', addr=addr, lon=lon, lat=lat)) else: return render_template("error.html") elif PrimeType != 0: return render_template("prime.html") elif fleet.get_nav_active(): if SearchInput == 2: return render_template("nonprime.html", gmap_key=gmap_key, lon=lon, lat=lat) else: return render_template("nonprime.html", gmap_key=None, lon=None, lat=None) elif token == "" or token is None: return redirect(url_for('public_token_input')) elif s_token == "" or s_token is None: return redirect(url_for('app_token_input')) elif SearchInput == 2: lon, lat = fleet.get_last_lon_lat() if gmap_key == "" or gmap_key is None: return redirect(url_for('gmap_key_input')) else: return render_template("addr.html", gmap_key=gmap_key, lon=lon, lat=lat) else: return render_template("addr.html", gmap_key=None, lon=None, lat=None) @app.route("/nav_confirmation", methods=['GET', 'POST']) def nav_confirmation(): token = fleet.get_public_token() lon = request.args.get('lon') lat = request.args.get('lat') addr = request.args.get('addr') if request.method == 'POST': postvars = request.form.to_dict() fleet.nav_confirmed(postvars) return redirect(url_for('addr_input')) else: return render_template("nav_confirmation.html", addr=addr, lon=lon, lat=lat, token=token) @app.route("/public_token_input", methods=['GET', 'POST']) def public_token_input(): if request.method == 'POST': postvars = request.form.to_dict() fleet.public_token_input(postvars) return redirect(url_for('addr_input')) else: return render_template("public_token_input.html") @app.route("/app_token_input", methods=['GET', 'POST']) def app_token_input(): if request.method == 'POST': postvars = request.form.to_dict() fleet.app_token_input(postvars) return redirect(url_for('addr_input')) else: return render_template("app_token_input.html") @app.route("/gmap_key_input", methods=['GET', 'POST']) def gmap_key_input(): if request.method == 'POST': postvars = request.form.to_dict() fleet.gmap_key_input(postvars) return redirect(url_for('addr_input')) else: return render_template("gmap_key_input.html") @app.route("/CurrentStep.json", methods=['GET']) def find_CurrentStep(): directory = "/data/openpilot/selfdrive/manager/" filename = "CurrentStep.json" return send_from_directory(directory, filename, as_attachment=True) @app.route("/navdirections.json", methods=['GET']) def find_nav_directions(): directory = "/data/openpilot/selfdrive/manager/" filename = "navdirections.json" return send_from_directory(directory, filename, as_attachment=True) @app.route("/locations", methods=['GET']) def get_locations(): data = fleet.get_locations() return Response(data, content_type="application/json") @app.route("/set_destination", methods=['POST']) def set_destination(): valid_addr = False postvars = request.get_json() data, valid_addr = fleet.set_destination(postvars, valid_addr) if valid_addr: return Response('{"success": true}', content_type='application/json') else: return Response('{"success": false}', content_type='application/json') @app.route("/navigation/", methods=['GET']) def find_navicon(file_name): directory = "/data/openpilot/selfdrive/assets/navigation/" return send_from_directory(directory, file_name, as_attachment=True) def main(): try: set_core_affinity([0, 1, 2, 3]) except Exception: cloudlog.exception("fleet_manager: failed to set core affinity") app.secret_key = secrets.token_hex(32) app.run(host="0.0.0.0", port=8082) if __name__ == '__main__': main()