Snyk has a proof-of-concept or detailed explanation of how to exploit this vulnerability.
In a few clicks we can analyze your entire application and see what components are vulnerable in your application, and suggest you quick fixes.
Test your applicationsUpgrade minio
to version 7.2.11 or higher.
minio is a MinIO Python SDK for Amazon S3 Compatible Cloud Storage
Affected versions of this package are vulnerable to Race Condition due to improper handling of shared resources in worker threads via the helpers.py
function. An attacker can exploit this by initiating multiple asynchronous tasks that concurrently access and modify the shared state.
from threading import BoundedSemaphore, Thread
from queue import Queue
import time
class Worker(Thread):
def __init__(self, tasks_queue, results_queue, exceptions_queue):
Thread.__init__(self)
self.tasks_queue = tasks_queue
self.results_queue = results_queue
self.exceptions_queue = exceptions_queue
self.daemon = True
self.start()
def run(self):
while True:
task = self.tasks_queue.get()
if not task:
self.tasks_queue.task_done()
break
if self.exceptions_queue.empty():
func, args, kargs, cleanup_func = task
try:
result = func(*args, **kargs)
self.results_queue.put(result)
except Exception as ex:
print(f"exception occurred: {ex}")
self.exceptions_queue.put(ex)
finally:
cleanup_func()
else:
print("Skipping task due to previous exception")
self.tasks_queue.task_done()
class ThreadPool:
def __init__(self, num_threads):
self.results_queue = Queue()
self.exceptions_queue = Queue()
self.tasks_queue = Queue()
self.sem = BoundedSemaphore(num_threads)
self.num_threads = num_threads
def add_task(self, func, *args, **kargs):
print(f"Trying to acquire semaphore at {time.strftime('%H:%M:%S')}")
self.sem.acquire()
print(f"Semaphore acquired at {time.strftime('%H:%M:%S')}")
cleanup_func = self.sem.release
self.tasks_queue.put((func, args, kargs, cleanup_func))
def start_parallel(self):
for _ in range(self.num_threads):
Worker(self.tasks_queue, self.results_queue, self.exceptions_queue)
def result(self):
for _ in range(self.num_threads):
self.tasks_queue.put(None)
self.tasks_queue.join()
if not self.exceptions_queue.empty():
raise self.exceptions_queue.get()
return self.results_queue
def task_with_exception():
raise ValueError("Task failed!")
def normal_task():
time.sleep(1)
return "Task completed"
if __name__ == "__main__":
pool = ThreadPool(num_threads=1)
pool.start_parallel()
for i in range(3):
if i == 0:
print(f"\nAdding task {i} (will raise exception)...")
pool.add_task(task_with_exception)
else:
print(f"\nAdding task {i} (normal task)...")
pool.add_task(normal_task)
time.sleep(0.1)
try:
results = pool.result()
while not results.empty():
print(results.get())
except Exception as e:
print(f"Final exception: {e}")