const net = require('net'); const fs = require('fs'); const socketPath = '/tmp/oscar_watcher.sock'; //// UNIXSOCKET SERVER //// let progvars = {}; // Remove the socket file if it already exists if (fs.existsSync( socketPath )) { try { fs.unlinkSync(socketPath); } catch (err) { if (err.code !== 'ENOENT') { console.error('Error removing existing socket file:', err); process.exit(1); } } } const unix_socket_server = net.createServer((connection) => { console.log('UnixSocket Client connected.'); let buffer = ''; connection.on('data', (data) => { buffer += data.toString(); let boundary = buffer.indexOf('\n'); while (boundary !== -1) { const message = buffer.substring(0, boundary); buffer = buffer.substring(boundary + 1); processMessage(message); boundary = buffer.indexOf('\n'); } }); connection.on('end', () => { console.log('UnixSocket Client disconnected.'); }); }); unix_socket_server.listen(socketPath, () => { console.log(`UnixSocket Server listening on ${socketPath}`); }); function processMessage(message) { console.log(message); try { msg = JSON.parse(message); console.log(msg); let variable = msg[0]; let value = msg[1]; console.log({var: variable, val: value}); const diff = calculateDiff(variable, progvars[variable], value); console.log({diff: diff}) send_ws_message(diff); progvars[variable] = value; const timestamp = new Date().toISOString(); const logEntry = `log_time="${timestamp}";\n${diff}\n`; fs.writeFile('/data/openpilot/logs/watcher.log', logEntry, { flag: 'a' }, (err) => { if (err) { console.error('Error writing to log file:', err); } }); } catch (err) { console.error('Error processing message:', err); } } function calculateDiff(provVarsVariable, oldValue, newValue) { const changes = []; function findChanges(path, oldVal, newVal) { // Check if both values are objects (and not null), otherwise compare directly if (!(typeof oldVal === 'object' && oldVal !== null) || !(typeof newVal === 'object' && newVal !== null)) { if (oldVal !== newVal) { changes.push(`${path} = ${JSON.stringify(newVal)};`); } return oldVal !== newVal ? 1 : 0; } const oldKeys = Object.keys(oldVal); const newKeys = Object.keys(newVal); const allKeys = new Set([...oldKeys, ...newKeys]); let changedCount = 0; allKeys.forEach(key => { const oldKeyValue = oldVal[key]; const newKeyValue = newVal[key]; const currentPath = path ? `${path}['${key}']` : `['${key}']`; if (!oldVal || !(key in oldVal)) { // New key added changes.push(`${currentPath} = ${JSON.stringify(newKeyValue)};`); changedCount++; } else if (typeof oldKeyValue === 'object' && oldKeyValue !== null && typeof newKeyValue === 'object' && newKeyValue !== null) { // Recursive diff for objects const subChanges = findChanges(currentPath, oldKeyValue, newKeyValue); if (subChanges > 0) changedCount += subChanges; } else if (oldKeyValue !== newKeyValue) { // Direct value change changes.push(`${currentPath} = ${JSON.stringify(newKeyValue)};`); changedCount++; } }); // If more than 1/3 of the properties have been changed, replace the entire node if (changedCount > 0 && changedCount > oldKeys.length / 3) { // Clear individual changes as we replace the entire node changes.splice(-changedCount); // Replace the node changes.push(`${path} = ${JSON.stringify(newVal)};`); return 0; } return changedCount; } // Adjust initial call to handle non-object types if ((typeof oldValue === 'object' && oldValue !== null) && (typeof newValue === 'object' && newValue !== null)) { findChanges('progvars['+provVarsVariable+']', oldValue, newValue); } else if (oldValue !== newValue) { changes.push(`progvars[${provVarsVariable}] = ${JSON.stringify(newValue)};`); } return changes.join('\n'); } //// HTTP SERVER //// const WebSocket = require('ws'); const http = require('http'); const url = require('url'); const path = require('path'); const hostname = '0.0.0.0'; const port = 1024; const server = http.createServer((req, res) => { if(req.url === '/') { // Serve the HTML file fs.readFile(path.join(__dirname, 'watcher.html'), (err, data) => { if(err) { res.writeHead(500); res.end('Error loading watcher.html'); return; } res.writeHead(200, {'Content-Type': 'text/html'}); res.end(data); }); } else { // Handle 404 res.writeHead(404); res.end('Not found'); } }); server.listen(port, hostname, () => { console.log(`Web Server running at http://${hostname}:${port}/`); }); //// WEBSOCKET SERVER //// ws_port = port + 1; // Attach WebSocket server to the HTTP server const wss = new WebSocket.Server({ port: ws_port }); console.log(`WebSocket Server running at http://${hostname}:${ws_port}/`); wss.on('connection', function connection(ws) { console.log('WebSocket client connected'); // Send initial state ws.send(calculateDiff({}, progvars)); ws.on('message', function incoming(message) { console.log('received: %s', message); }); ws.on('close', function close() { console.log('WebSocket client disconnected'); }); }); // Handle upgrade of the request wss.on('upgrade', function upgrade(request, socket, head) { const { pathname } = url.parse(request.url); if (pathname === '/ws') { wss.handleUpgrade(request, socket, head, function done(ws) { wss.emit('connection', ws, request); }); } else { socket.destroy(); } }); // Function to send a message to all connected WebSocket clients function send_ws_message(message) { wss.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { client.send(message); } }); } //// Exit when low disk space ///// const { exec } = require('child_process'); // Function to check disk space function checkDiskSpace() { exec('df -BG /data | tail -n 1 | awk \'{print $4}\'', (error, stdout, stderr) => { if (error) { console.error(`exec error: ${error}`); return; } if (stderr) { console.error(`stderr: ${stderr}`); return; } // Extract the number of GBs available const freeSpaceGB = parseInt(stdout.trim().replace('G', '')); // Check if free space is less than 10GB if (freeSpaceGB < 9) { console.log('Less than 9GB of free space on /data. Exiting.'); process.exit(1); } }); } // Check disk space every 1 minute setInterval(checkDiskSpace, 60000); // Perform an initial check checkDiskSpace();