mirror of
https://github.com/inventree/InvenTree.git
synced 2026-04-14 23:38:53 +00:00
Merge commit from fork
* Fix SSRF in remote image download Add IP address validation to prevent Server-Side Request Forgery when downloading images from remote URLs. The resolved IP is now checked against private, loopback, link-local, and reserved ranges before connecting. Redirects are followed manually (up to 5 hops) with SSRF validation at each step, preventing redirect-based bypass of URL format checks. * Style fix --------- Co-authored-by: tikket1 <chrisveres1@gmail.com>
This commit is contained in:
@@ -1,9 +1,11 @@
|
|||||||
"""Provides helper functions used throughout the InvenTree project that access the database."""
|
"""Provides helper functions used throughout the InvenTree project that access the database."""
|
||||||
|
|
||||||
import io
|
import io
|
||||||
|
import ipaddress
|
||||||
|
import socket
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from typing import Optional, cast
|
from typing import Optional, cast
|
||||||
from urllib.parse import urljoin
|
from urllib.parse import urljoin, urlparse
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
@@ -88,6 +90,36 @@ def construct_absolute_url(*arg, base_url=None, request=None):
|
|||||||
return urljoin(base_url, relative_url)
|
return urljoin(base_url, relative_url)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_url_no_ssrf(url):
|
||||||
|
"""Validate that a URL does not point to a private/internal network address.
|
||||||
|
|
||||||
|
Resolves the hostname to an IP address and checks it against private,
|
||||||
|
loopback, link-local, and reserved IP ranges to prevent SSRF attacks.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
url: The URL to validate
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the URL resolves to a private or reserved IP address
|
||||||
|
"""
|
||||||
|
parsed = urlparse(url)
|
||||||
|
hostname = parsed.hostname
|
||||||
|
|
||||||
|
if not hostname:
|
||||||
|
raise ValueError(_('Invalid URL: no hostname'))
|
||||||
|
|
||||||
|
try:
|
||||||
|
addrinfo = socket.getaddrinfo(hostname, None)
|
||||||
|
except socket.gaierror:
|
||||||
|
raise ValueError(_('Invalid URL: hostname could not be resolved'))
|
||||||
|
|
||||||
|
for _family, _type, _proto, _canonname, sockaddr in addrinfo:
|
||||||
|
ip = ipaddress.ip_address(sockaddr[0])
|
||||||
|
|
||||||
|
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
|
||||||
|
raise ValueError(_('URL points to a private or reserved IP address'))
|
||||||
|
|
||||||
|
|
||||||
def download_image_from_url(remote_url, timeout=2.5):
|
def download_image_from_url(remote_url, timeout=2.5):
|
||||||
"""Download an image file from a remote URL.
|
"""Download an image file from a remote URL.
|
||||||
|
|
||||||
@@ -115,6 +147,9 @@ def download_image_from_url(remote_url, timeout=2.5):
|
|||||||
validator = URLValidator()
|
validator = URLValidator()
|
||||||
validator(remote_url)
|
validator(remote_url)
|
||||||
|
|
||||||
|
# SSRF protection: validate the resolved IP is not private/internal
|
||||||
|
validate_url_no_ssrf(remote_url)
|
||||||
|
|
||||||
# Calculate maximum allowable image size (in bytes)
|
# Calculate maximum allowable image size (in bytes)
|
||||||
max_size = (
|
max_size = (
|
||||||
int(get_global_setting('INVENTREE_DOWNLOAD_IMAGE_MAX_SIZE')) * 1024 * 1024
|
int(get_global_setting('INVENTREE_DOWNLOAD_IMAGE_MAX_SIZE')) * 1024 * 1024
|
||||||
@@ -129,10 +164,36 @@ def download_image_from_url(remote_url, timeout=2.5):
|
|||||||
response = requests.get(
|
response = requests.get(
|
||||||
remote_url,
|
remote_url,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
allow_redirects=True,
|
allow_redirects=False,
|
||||||
stream=True,
|
stream=True,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Handle redirects manually to validate each destination
|
||||||
|
max_redirects = 5
|
||||||
|
redirect_count = 0
|
||||||
|
|
||||||
|
while response.is_redirect and redirect_count < max_redirects:
|
||||||
|
redirect_url = response.headers.get('Location')
|
||||||
|
if not redirect_url:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Validate the redirect destination against SSRF
|
||||||
|
validator(redirect_url)
|
||||||
|
validate_url_no_ssrf(redirect_url)
|
||||||
|
|
||||||
|
redirect_count += 1
|
||||||
|
response = requests.get(
|
||||||
|
redirect_url,
|
||||||
|
timeout=timeout,
|
||||||
|
allow_redirects=False,
|
||||||
|
stream=True,
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
if redirect_count >= max_redirects:
|
||||||
|
raise ValueError(_('Too many redirects'))
|
||||||
|
|
||||||
# Throw an error if anything goes wrong
|
# Throw an error if anything goes wrong
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.exceptions.ConnectionError as exc:
|
except requests.exceptions.ConnectionError as exc:
|
||||||
@@ -143,6 +204,8 @@ def download_image_from_url(remote_url, timeout=2.5):
|
|||||||
raise requests.exceptions.HTTPError(
|
raise requests.exceptions.HTTPError(
|
||||||
_('Server responded with invalid status code') + f': {response.status_code}'
|
_('Server responded with invalid status code') + f': {response.status_code}'
|
||||||
)
|
)
|
||||||
|
except ValueError:
|
||||||
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
raise Exception(_('Exception occurred') + f': {exc!s}')
|
raise Exception(_('Exception occurred') + f': {exc!s}')
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user