Skip to content

feat(security): Add package name typosquatting detection #1059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,4 @@ docs/_build
bin/
requirements.txt
.macaron_env_file
**/.DS_Store
13 changes: 13 additions & 0 deletions src/macaron/config/defaults.ini
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,16 @@ major_threshold = 20
epoch_threshold = 3
# The number of days +/- the day of publish the calendar versioning day may be.
day_publish_error = 4

# The threshold ratio for two packages to be considered similar.
distance_ratio_threshold = 0.95
# The Keyboard cost for two characters that are close to each other on the keyboard.
keyboard = 0.8
# The scaling factor for the jaro winkler distance.
scaling = 0.15
# The cost for two characters that are not close to each other on the keyboard.
cost = 1.0
# The path to the file that contains the list of popular packages.
popular_packages_path =
# The threshold for the number of repeated spaces in a single line.
repeated_spaces_threshold = 30
12 changes: 12 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ class Heuristics(str, Enum):
#: Indicates that the package has an unusually large version number for a single release.
ANOMALOUS_VERSION = "anomalous_version"

#: Indicates that the package name is similar to a popular package.
TYPOSQUATTING_PRESENCE = "typosquatting_presence"

#: Indicates that at least one maintainer has a suspicious email address.
FAKE_EMAIL = "fake_email"

#: Indicates that the package has a lot of white spaces or invisible characters.
WHITE_SPACES = "white_spaces"

#: Indicates that the package and other package from the same maintainer have similar folder structure.
SIMILAR_PROJECTS = "similar_projects"


class HeuristicResult(str, Enum):
"""Result type indicating the outcome of a heuristic."""
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The heuristic analyzer to check the email address of the package maintainers."""

import logging
import re

import dns.resolver as dns_resolver

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
"""Analyze the email address of the package maintainers."""

def __init__(self) -> None:
super().__init__(
name="fake_email_analyzer",
heuristic=Heuristics.FAKE_EMAIL,
depends_on=None,
)

self.suspicious_domains: set[str] = set()

def is_valid_email(self, email: str) -> bool:
"""Check if the email format is valid and the domain has MX records.

Parameters
----------
email: str
The email address to check.

Returns
-------
bool:
True if the email address is valid, False otherwise.

Raises
------
HeuristicAnalyzerValueError
if the failure is due to DNS resolution.
"""
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
return False

domain = email.split("@")[1]
if domain in self.suspicious_domains:
return False
try:
records = dns_resolver.resolve(domain, "MX")
if not records:
self.suspicious_domains.add(domain)
return False
return True
except Exception as err:
err_message = f"Failed to resolve domain {domain}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.

Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.

Raises
------
HeuristicAnalyzerValueError
if the analysis fails.
"""
package_name = pypi_package_json.component_name
maintainers = pypi_package_json.pypi_registry.get_maintainers_of_package(package_name)
if not maintainers:
err_message = f"Failed to get maintainers for {package_name}"
raise HeuristicAnalyzerValueError(err_message)

for email in maintainers:
if not self.is_valid_email(email):
return HeuristicResult.FAIL, {"email": email}

return HeuristicResult.PASS, {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This analyzer checks if the package has a similar structure to other packages maintained by the same user."""

import hashlib
import logging
import tarfile
import typing

import requests
from bs4 import BeautifulSoup

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class SimilarProjectAnalyzer(BaseHeuristicAnalyzer):
"""Check whether the package has a similar structure to other packages maintained by the same user."""

def __init__(self) -> None:
super().__init__(
name="similar_project_analyzer",
heuristic=Heuristics.SIMILAR_PROJECTS,
depends_on=None,
)

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.

Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.

Raises
------
HeuristicAnalyzerValueError
if the analysis fails.
"""
package_name = pypi_package_json.component_name
try:
target_hash = self.get_structure_hash(package_name)
except Exception as err:
err_message = f"Failed to get structure hash for {package_name}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err

similar_packages = self.get_packages(package_name)
if not similar_packages:
return HeuristicResult.SKIP, {
"message": f"No similar packages found for {package_name}",
}

for package in similar_packages:
try:
package_hash = self.get_structure_hash(package)
except Exception as err:
err_message = f"Failed to get structure hash for {package}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
if package_hash == target_hash:
return HeuristicResult.FAIL, {
"similar_package": package,
}
return HeuristicResult.PASS, {}

def get_maintainers(self, package_name: str) -> list[str]:
"""Get all maintainers of a package.

Parameters
----------
package_name (str): The name of the package.

Returns
-------
list[str]: A list of maintainers.
"""
url = f"https://pypi.org/project/{package_name}/"
response = requests.get(url, timeout=10)
if response.status_code != 200:
return []

soup = BeautifulSoup(response.text, "html.parser")
gravatar_spans = soup.find_all("span", class_="sidebar-section__user-gravatar-text")
maintainers = [span.get_text().strip() for span in gravatar_spans]

return maintainers

def get_packages_by_user(self, username: str) -> list[str]:
"""Get all packages by a user.

Parameters
----------
username (str): The username of the user.

Returns
-------
list[str]: A list of package names.
"""
url = f"https://pypi.org/user/{username}/"
response = requests.get(url, timeout=10)
if response.status_code != 200:
return []

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h3", class_="package-snippet__title")
packages = [header.get_text().strip() for header in headers]
return packages

def get_packages(self, package_name: str) -> list[str]:
"""Get packages that are maintained by this package's maintainers.

Parameters
----------
package_name (str): The name of the package.

Returns
-------
list[str]: A list of similar projects.
"""
similar_projects = []
maintainers = self.get_maintainers(package_name)
for user in maintainers:
user_packages = self.get_packages_by_user(user)
similar_projects.extend(user_packages)
# Remove the target package from the list of similar projects.
similar_projects_set = set(similar_projects)
similar_projects_set.discard(package_name)
return list(similar_projects_set)

def fetch_sdist_url(self, package_name: str, version: str | None = None) -> str:
"""Fetch the sdist URL for a package.

Parameters
----------
package_name (str): The name of the package.
version (str): The version of the package. If None, the latest version will be used.

Returns
-------
str: The sdist URL, or an empty string if not found.
"""
url = f"https://pypi.org/pypi/{package_name}/json"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as err:
err_message = f"Failed to fetch PyPI JSON for {package_name}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
except ValueError as err:
err_message = f"Failed to decode PyPI JSON for {package_name}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err

actual_version: str
if version is None:
try:
actual_version = typing.cast(str, data["info"]["version"])
except (KeyError, TypeError) as err:
err_message = f"Failed to get version for {package_name}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
else:
actual_version = version

try:
for release_file in data.get("releases", {}).get(actual_version, []):
if isinstance(release_file, dict) and release_file.get("packagetype") == "sdist":
sdist_url = release_file.get("url")
if isinstance(sdist_url, str):
return sdist_url
except Exception as err:
err_message = f"Failed to parse releases for {package_name} version {actual_version}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err

return ""

def get_structure_hash(self, package_name: str) -> str | None:
"""Calculate a hash based on the project's file structure.

Parameters
----------
package_name (str): The name of the package.

Returns
-------
str: The structure hash.

Raises
------
ValueError: If the sdist URL cannot be fetched or the package structure cannot be hashed.
"""
sdist_url = self.fetch_sdist_url(package_name)
if not sdist_url:
return ""

try:
response = requests.get(sdist_url, stream=True, timeout=10)
response.raise_for_status()
raw_file_obj: typing.IO[bytes] = typing.cast(typing.IO[bytes], response.raw)

with tarfile.open(fileobj=raw_file_obj, mode="r:gz") as file_archive:
paths = []
for member in file_archive:
if not member.isdir():
# remove top‑level dir.
parts = member.name.split("/", 1)
normalized = parts[1] if len(parts) > 1 else parts[0]
# replace the pkg name.
normalized = normalized.replace(package_name, "<PKG>")
paths.append(normalized)
paths.sort()
structure_hash_calculator = hashlib.sha256()
for path in paths:
structure_hash_calculator.update(path.encode("utf-8"))
structure_hash_calculator.update(b"\n")
return structure_hash_calculator.hexdigest()
except requests.exceptions.RequestException as err:
err_message = f"Failed to download sdist for {package_name} from {sdist_url}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
except tarfile.TarError as err:
err_message = f"Failed to process tarfile for {package_name} from {sdist_url}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
except Exception as err:
err_message = f"Failed to get structure hash for {package_name}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
Loading
Loading