mod_ssl: Use sslscan in place of sslyze

This commit is contained in:
devloop 2024-06-28 23:47:30 +02:00
parent 3a0f685e44
commit b591702f1e
8 changed files with 476 additions and 264 deletions

View File

@ -53,7 +53,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install php8.1-cli php8.1-xml -y --no-install-recommends
sudo apt-get install php8.1-cli php8.1-xml sslscan -y --no-install-recommends
python -m pip install --upgrade pip
pip install -U setuptools
pip3 install .[test]

View File

@ -31,8 +31,6 @@ See `INSTALL.md <https://github.com/wapiti-scanner/wapiti/blob/master/INSTALL.md
Running Wapiti on Windows can be accomplished through the use of `WSL <https://learn.microsoft.com/en-us/training/modules/get-started-with-windows-subsystem-for-linux/>`__.
The `ssl` module used to scan TLS/SSL misconfiguration won't work on ARM processors (see `SSLyze documentation <https://nabla-c0d3.github.io/sslyze/documentation/installation.html>`__).
How it works
============
@ -145,7 +143,7 @@ The aforementioned attacks are tied to the following module names :
+ shellshock (Test Shellshock attack, see `Wikipedia <https://en.wikipedia.org/wiki/Shellshock_%28software_bug%29>`__)
+ spring4shell (Detects websites vulnerable to CVE-2020-5398)
+ sql (Error-based and boolean-based SQL injection detection)
+ ssl (Evaluate the security of SSL/TLS certificate configuration, requires `SSLyze <https://github.com/nabla-c0d3/sslyze>`__)
+ ssl (Evaluate the security of SSL/TLS certificate configuration, requires `sslscan <https://github.com/rbsec/sslscan>`__)
+ ssrf (Server Side Request Forgery)
+ takeover (Subdomain takeover)
+ timesql (SQL injection vulnerabilities detected with time-based methodology)

View File

@ -9,6 +9,7 @@ Unrelease
Core : fix headless explorer method
Core : fix max-scan-time and missing timeout
Python : update dependencies and pip configurations
mod_ssl: Move to sslscan for the ssl module instead of sslyze
09/08/2023
Wapiti 3.1.8

View File

@ -36,8 +36,8 @@ If you are really lost, feel free to contact me.
### I have a warning about the ssl module not working ! ###
The `ssl` module requires the Python libraries [SSLyze](https://github.com/nabla-c0d3/sslyze) and [humanize](https://github.com/python-humanize/humanize).
The module is optional therefore you need to install those dependencies to use the `ssl` module.
The `ssl` module requires the [sslscan](binary) to be present in your PATH.
Check if the software is available with your package manager.
### I have some UnicodeDecodeError as soon as I launch Wapiti ! ###

View File

@ -42,6 +42,7 @@ dependencies = [
"httpcore==1.0.4",
"httpx[brotli, socks]==0.27.0",
"httpx-ntlm==1.4.0",
"humanize==4.9.0",
"loguru==0.7.2",
"mako==1.3.2",
"markupsafe==2.1.5",
@ -64,17 +65,12 @@ wapiti = "wapitiCore.main.wapiti:wapiti_asyncio_wrapper"
wapiti-getcookie = "wapitiCore.main.getcookie:getcookie_asyncio_wrapper"
[project.optional-dependencies]
ssl = [
"humanize==4.9.0",
"sslyze==5.2.0"
]
test = [
"humanize==4.9.0",
"pytest==8.0.2",
"pytest-cov==4.1.0",
"pytest-asyncio==0.23.5",
"respx==0.20.2",
"sslyze==5.2.0"
]
[tool.setuptools.packages]

View File

@ -6,14 +6,22 @@ from asyncio import Event
import http.server
import ssl
from unittest.mock import AsyncMock
from datetime import datetime, timedelta
import httpx
import pytest
import respx
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from wapitiCore.net.classes import CrawlerConfiguration
from wapitiCore.net import Request
from wapitiCore.language.vulnerability import CRITICAL_LEVEL, HIGH_LEVEL, INFO_LEVEL
from wapitiCore.language.vulnerability import CRITICAL_LEVEL, HIGH_LEVEL, INFO_LEVEL, MEDIUM_LEVEL
from wapitiCore.net.crawler import AsyncCrawler
from wapitiCore.attack.mod_ssl import ModuleSsl, NAME
from wapitiCore.attack.mod_ssl import ModuleSsl, NAME, extract_altnames, match_address, check_ocsp_must_staple, \
check_ev_certificate, process_vulnerabilities, process_bad_protocols
def https_server(cert_directory: str):
@ -88,11 +96,11 @@ async def test_ssl_scanner():
payload_type="vulnerability",
module="ssl",
category=NAME,
level=CRITICAL_LEVEL,
level=HIGH_LEVEL,
request=request,
parameter='',
wstg=["WSTG-CRYP-01"],
info="Certificate is invalid for Mozilla trust store: self-signed certificate",
info="Strict Transport Security (HSTS) is not set",
response=None
)
@ -101,10 +109,120 @@ async def test_ssl_scanner():
payload_type="vulnerability",
module="ssl",
category=NAME,
level=HIGH_LEVEL,
level=MEDIUM_LEVEL,
request=request,
parameter='',
wstg=["WSTG-CRYP-01"],
info="Strict Transport Security (HSTS) is not set",
info="Self-signed certificate detected: The certificate is not signed by a trusted Certificate Authority",
response=None
)
def test_extract_alt_names():
assert ["perdu.com", "test.fr"] == extract_altnames("DNS:perdu.com, DNS:test.fr, whatever, ")
def test_match_address():
assert match_address("sub.domain.com", "domain.com", ["*.domain.com", "yolo"])
assert match_address("sub.domain.com", "*.domain.com", ["yolo"])
assert not match_address("sub.domain.com", "google.com", ["*.truc.com"])
def generate_cert(include_organization_name: bool = True, include_ocsp_must_staple: bool = True):
# Generate a private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Build the subject name
subject_name = [
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"California"),
x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(x509.NameOID.COMMON_NAME, u"mysite.com"),
]
if include_organization_name:
subject_name.append(x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"My Company"))
# Generate a certificate
subject = issuer = x509.Name(subject_name)
cert_builder = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=10)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
critical=False,
)
if include_ocsp_must_staple:
cert_builder = cert_builder.add_extension(
x509.TLSFeature([x509.TLSFeatureType.status_request]),
critical=False
)
cert = cert_builder.sign(private_key, hashes.SHA256(), default_backend())
return cert
@pytest.mark.asyncio
@respx.mock
async def test_certificate_transparency():
cert = generate_cert()
respx.get(f'https://crt.sh/?q={cert.serial_number}').mock(
# Method GET that serve as a reference
return_value=httpx.Response(200, text="Success")
)
persister = AsyncMock()
request = Request("https://127.0.0.1:4443/")
request.path_id = 42
crawler_configuration = CrawlerConfiguration(Request("https://127.0.0.1:4443/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2}
module = ModuleSsl(crawler, persister, options, Event(), crawler_configuration)
assert 1 == await module.check_certificate_transparency(cert)
def test_ocsp():
assert 0 == check_ocsp_must_staple(generate_cert(include_ocsp_must_staple=False))
assert 1 == check_ocsp_must_staple(generate_cert())
def test_extended_validation():
assert 0 == check_ev_certificate(generate_cert(include_organization_name=False))
assert 1 == check_ev_certificate(generate_cert())
@pytest.mark.asyncio
async def test_process_vulnerabilities():
base_dir = os.path.dirname(sys.modules["wapitiCore"].__file__)
xml_file = os.path.join(base_dir, "..", "tests/data/ssl/broken_ssl.xml")
results = [info async for info in process_vulnerabilities(xml_file)]
assert [
(4, 'Server is vulnerable to Heartbleed attack via TLSv1.0'),
(3, 'Server honors client-initiated renegotiations (vulnerable to DoS attacks)')
] == results
@pytest.mark.asyncio
async def test_process_bad_protocols():
base_dir = os.path.dirname(sys.modules["wapitiCore"].__file__)
xml_file = os.path.join(base_dir, "..", "tests/data/ssl/broken_ssl.xml")
results = [info async for info in process_bad_protocols(xml_file)]
assert [
(4, 'The following protocols are deprecated and/or insecure and should be deactivated: SSLv2, TLSv1.0')
] == results

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<document title="SSLScan Results" version="2.1.3" web="http://github.com/rbsec/sslscan">
<ssltest host="broken.ssl" sniname="broken.ssl" port="443">
<protocol type="ssl" version="2" enabled="1" />
<protocol type="ssl" version="3" enabled="0" />
<protocol type="tls" version="1.0" enabled="1" />
<protocol type="tls" version="1.1" enabled="0" />
<protocol type="tls" version="1.2" enabled="0" />
<protocol type="tls" version="1.3" enabled="0" />
<renegotiation supported="1" secure="0" />
<compression supported="1" />
<heartbleed sslversion="TLSv1.0" vulnerable="1" />
<cipher status="preferred" sslversion="TLSv1.0" bits="256" cipher="TLS_DHE_RSA_WITH_AES_256_CBC_SHA" id="0x0039" strength="acceptable" dhebits="1024" />
<cipher status="accepted" sslversion="TLSv1.0" bits="128" cipher="TLS_DHE_RSA_WITH_AES_128_CBC_SHA" id="0x0033" strength="acceptable" dhebits="1024" />
<cipher status="accepted" sslversion="TLSv1.0" bits="256" cipher="TLS_RSA_WITH_AES_256_CBC_SHA" id="0x0035" strength="acceptable" />
<cipher status="accepted" sslversion="TLSv1.0" bits="128" cipher="TLS_RSA_WITH_AES_128_CBC_SHA" id="0x002F" strength="acceptable" />
<cipher status="accepted" sslversion="TLSv1.0" bits="40" cipher="TLS_RSA_EXPORT_WITH_RC4_40_MD5" id="0x0003" strength="weak" />
<cipher status="accepted" sslversion="TLSv1.0" bits="128" cipher="TLS_RSA_WITH_RC4_128_MD5" id="0x0004" strength="medium" />
<cipher status="accepted" sslversion="TLSv1.0" bits="128" cipher="TLS_RSA_WITH_RC4_128_SHA" id="0x0005" strength="medium" />
<cipher status="accepted" sslversion="TLSv1.0" bits="40" cipher="TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5" id="0x0006" strength="weak" />
<cipher status="accepted" sslversion="TLSv1.0" bits="40" cipher="TLS_RSA_EXPORT_WITH_DES40_CBC_SHA" id="0x0008" strength="weak" />
<cipher status="accepted" sslversion="TLSv1.0" bits="56" cipher="TLS_RSA_WITH_DES_CBC_SHA" id="0x0009" strength="medium" />
<cipher status="accepted" sslversion="TLSv1.0" bits="112" cipher="TLS_RSA_WITH_3DES_EDE_CBC_SHA" id="0x000A" strength="medium" />
<cipher status="accepted" sslversion="TLSv1.0" bits="40" cipher="TLS_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA" id="0x0014" strength="weak" />
<cipher status="accepted" sslversion="TLSv1.0" bits="56" cipher="TLS_DHE_RSA_WITH_DES_CBC_SHA" id="0x0015" strength="medium" />
<cipher status="accepted" sslversion="TLSv1.0" bits="112" cipher="TLS_DHE_RSA_WITH_3DES_EDE_CBC_SHA" id="0x0016" strength="medium" />
<certificates>
<certificate type="short">
<signature-algorithm>sha256WithRSAEncryption</signature-algorithm>
<pk error="false" type="RSA" bits="2048" />
<subject><![CDATA[broken.ssl]]></subject>
<altnames><![CDATA[DNS:whatever.ssl]]></altnames>
<issuer><![CDATA[Thawte TLS RSA CA G1]]></issuer>
<self-signed>false</self-signed>
<not-valid-before>Jan 19 00:00:00 2024 GMT</not-valid-before>
<not-yet-valid>false</not-yet-valid>
<not-valid-after>Feb 22 23:59:59 2024 GMT</not-valid-after>
<expired>true</expired>
</certificate>
</certificates>
</ssltest>
</document>

View File

@ -15,23 +15,28 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from datetime import datetime
# https://badssl.com/ can help to test the module
import fnmatch
import os
import socket
import ssl
import subprocess
from datetime import datetime, timezone
import json
import asyncio
from os.path import join as path_join
from typing import List, Tuple, Optional
from os.path import join as path_join, exists
from typing import List, Tuple, Optional, AsyncIterator
from collections import defaultdict
from itertools import chain
import xml.etree.ElementTree as ET
import re
import tempfile
import shutil
from httpx import RequestError
import humanize
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from sslyze.plugins.certificate_info._certificate_utils import get_common_names, \
parse_subject_alternative_name_extension
from sslyze.plugins.robot.implementation import RobotScanResultEnum
from sslyze import ServerNetworkLocation, ServerNetworkConfiguration, ScanCommand, Scanner, ServerScanRequest, \
ScanCommandAttemptStatusEnum
from sslyze.errors import ServerHostnameCouldNotBeResolved
from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request, Response
@ -40,135 +45,230 @@ from wapitiCore.main.log import log_red, log_blue, log_green, log_orange, loggin
from wapitiCore.definitions.ssl import NAME, WSTG_CODE
def get_common_name(name_field: x509.Name) -> str:
try:
return get_common_names(name_field)[0]
except IndexError:
return name_field.rfc4514_string()
def cipher_level_to_color(security_level: str) -> str:
if security_level == "Insecure":
def sslscan_level_to_color(security_level: str) -> str:
if security_level == "weak":
return "RED"
if security_level == "Weak":
if security_level == "acceptable":
return "ORANGE"
# Secure / Recommended / Unknown
return "GREEN"
def cipher_level_to_wapiti_level(security_level: str) -> str:
if security_level == "Insecure":
def sslscan_level_to_wapiti_level(security_level: str) -> str:
if security_level == "weak":
return CRITICAL_LEVEL
if security_level == "Weak":
return HIGH_LEVEL
if security_level == "acceptable":
return MEDIUM_LEVEL
# Secure / Recommended / Unknown
return INFO_LEVEL
def process_certificate_info(certinfo_result):
for cert_deployment in certinfo_result.certificate_deployments:
def check_ev_certificate(cert: x509.Certificate) -> bool:
"""
Checks if the certificate is an EV (Extended Validation) certificate.
"""
for attribute in cert.subject:
if attribute.oid == NameOID.ORGANIZATION_NAME:
return True
return False
leaf_certificate = cert_deployment.received_certificate_chain[0]
message = f"Certificate subject: {get_common_name(leaf_certificate.subject)}"
def get_certificate(hostname: str, port: int = 443) -> x509.Certificate:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
conn = context.wrap_socket(
socket.socket(socket.AF_INET),
server_hostname=hostname,
)
conn.connect((hostname, port))
cert_bin = conn.getpeercert(True)
return x509.load_der_x509_certificate(cert_bin, default_backend())
def check_ocsp_must_staple(cert: x509.Certificate) -> bool:
try:
extension = cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.5.5.7.1.24"))
return extension is not None
except x509.ExtensionNotFound:
return False
def extract_altnames(altnames: str) -> List[str]:
return [name.split(":", 1)[1] for name in re.split(r',\s*', altnames) if ":" in name]
def match_address(target: str, subject: str, alt_names: List[str]) -> bool:
# Check against subject
if fnmatch.fnmatch(target, subject):
return True
# Check against each alternative name
for alt_name in alt_names:
if fnmatch.fnmatch(target, alt_name):
return True
return False
def sslscan_date_to_utc(date_str: str) -> datetime:
dt = datetime.strptime(date_str, "%b %d %H:%M:%S %Y GMT")
return dt.replace(tzinfo=timezone.utc)
async def process_cert_info(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
tree = ET.parse(xml_file)
root = tree.getroot()
target = root.find(".//ssltest").get("sniname")
# Extract certificate information
for cert in root.findall(".//certificate"):
subject = cert.find("subject").text
message = f"Certificate subject: {subject}"
log_blue(message)
yield INFO_LEVEL, message
alt_names = parse_subject_alternative_name_extension(leaf_certificate)
alt_names = ", ".join(chain(alt_names.dns_names, alt_names.ip_addresses))
message = f"Alt. names: {alt_names}"
alt_names_tag = cert.find("altnames")
if alt_names_tag is not None and alt_names_tag.text.strip():
alt_names = extract_altnames(alt_names_tag.text)
message = f"Alt. names: {', '.join(alt_names)}"
log_blue(message)
yield INFO_LEVEL, message
else:
alt_names = []
message = f"Issuer: {cert.find('issuer').text}"
log_blue(message)
yield INFO_LEVEL, message
message = f"Issuer: {get_common_name(leaf_certificate.issuer)}"
log_blue(message)
yield INFO_LEVEL, message
if not cert_deployment.leaf_certificate_subject_matches_hostname:
if not match_address(target, subject, alt_names):
message = "Requested hostname doesn't match those in the certificate"
log_red(message)
yield CRITICAL_LEVEL, message
if not cert_deployment.received_chain_has_valid_order:
message = "Certificate chain is in invalid order"
key = cert.find("pk")
message = f"Key: {key.get('type')} {key.get('bits')} bits"
log_blue(message)
yield INFO_LEVEL, message
message = f"Signature Algorithm: {cert.find('signature-algorithm').text}"
log_blue(message)
yield INFO_LEVEL, message
if cert.find("self-signed").text == "true":
message = (
"Self-signed certificate detected: The certificate is not signed by a trusted Certificate Authority"
)
log_orange(message)
yield MEDIUM_LEVEL, message
public_key = leaf_certificate.public_key()
if isinstance(public_key, EllipticCurvePublicKey):
key_size = public_key.curve.key_size
else:
key_size = public_key.key_size
if public_key.__class__.__name__ == "_RSAPublicKey":
algorithm = "RSA"
elif public_key.__class__.__name__ == "_EllipticCurvePublicKey":
algorithm = "ECC"
else:
algorithm = public_key.__class__.__name__
message = f"Key: {algorithm} {key_size} bits"
log_blue(message)
yield INFO_LEVEL, message
message = f"Signature Algorithm: {leaf_certificate.signature_hash_algorithm.name}"
log_blue(message)
yield INFO_LEVEL, message
if leaf_certificate.not_valid_after > datetime.utcnow():
message = "Certificate expires in " + \
humanize.precisedelta(leaf_certificate.not_valid_after - datetime.utcnow())
not_valid_after = sslscan_date_to_utc(cert.find("not-valid-after").text)
utcnow = datetime.utcnow().replace(tzinfo=timezone.utc)
if not_valid_after > utcnow:
message = "Certificate expires in " + humanize.precisedelta(not_valid_after - utcnow)
log_green(message)
yield INFO_LEVEL, message
else:
message = f"Certificate has expired at {leaf_certificate.not_valid_after}"
message = f"Certificate has expired at {not_valid_after}"
log_red(message)
yield CRITICAL_LEVEL, message
if not cert_deployment.leaf_certificate_is_ev:
message = "Certificate doesn't use Extended Validation"
log_orange(message)
yield MEDIUM_LEVEL, message
# https://en.wikipedia.org/wiki/OCSP_stapling
if not cert_deployment.leaf_certificate_has_must_staple_extension:
message = "OCSP Must-Staple extension is missing"
log_orange(message)
yield MEDIUM_LEVEL, message
async def process_cipher_suites2(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
tree = ET.parse(xml_file)
root = tree.getroot()
if cert_deployment.leaf_certificate_signed_certificate_timestamps_count is None:
message = "Certificate transparency: Unknown (OpenSSL version is not recent enough)"
log_orange(message)
yield MEDIUM_LEVEL, message
elif cert_deployment.leaf_certificate_signed_certificate_timestamps_count:
message = (
"Certificate transparency: Yes "
f"({cert_deployment.leaf_certificate_signed_certificate_timestamps_count} SCT)"
protocol_versions = set()
# Enumerate supported protocols first
for cipher in root.findall(".//cipher"):
protocol_versions.add(cipher.get("sslversion"))
# For each protocol, group ciphers by severity then raise a warning per severity
for protocol in protocol_versions:
log_blue(f"\nAccepted cipher suites for {protocol}:")
group_by_severity = defaultdict(list)
for cipher in root.findall(f".//cipher[@sslversion='{protocol}']"):
name = cipher.get("cipher")
group_by_severity[cipher.get("strength")].append(name)
logging.log(
sslscan_level_to_color(cipher.get("strength")),
f"* {name} {cipher.get('strength')}"
)
log_green(message)
yield INFO_LEVEL, message
else:
message = "Certificate transparency: No"
log_red(message)
yield HIGH_LEVEL, message
if cert_deployment.verified_chain_has_sha1_signature:
message = "One of the certificate in the chain is signed using SHA-1"
log_red(message)
yield HIGH_LEVEL, message
for security_level, ciphers in group_by_severity.items():
# We are using sslscan level in the report to be consistent with output from the tool
message = f"The following ciphers are {security_level.lower()} for {protocol}: {', '.join(sorted(ciphers))}"
yield sslscan_level_to_wapiti_level(security_level), message
for validation_result in cert_deployment.path_validation_results:
if not validation_result.was_validation_successful:
message = (
f"Certificate is invalid for {validation_result.trust_store.name} "
f"trust store: {validation_result.openssl_error_string}"
)
log_red(message)
yield CRITICAL_LEVEL, message
# Currently we stop at the first certificate of the server, maybe improve later
# Right now several certificates generates too much confusion in report
break
async def process_bad_protocols(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
# https://blog.mozilla.org/security/2014/10/14/the-poodle-attack-and-the-end-of-ssl-3-0/
# https://blog.qualys.com/product-tech/2018/11/19/grade-change-for-tls-1-0-and-tls-1-1-protocols
known_bad_protocols = {"SSLv2", "SSLv3", "TLSv1.0", "TLSv1.1"}
tree = ET.parse(xml_file)
root = tree.getroot()
bad_protocols = set()
for protocol in root.findall(".//protocol[@enabled='1']"):
name = f"{protocol.get('type').upper()}v{protocol.get('version')}"
if name in known_bad_protocols:
bad_protocols.add(name)
if bad_protocols:
message = "The following protocols are deprecated and/or insecure and should be deactivated: " + \
", ".join(sorted(bad_protocols))
log_red(message)
yield CRITICAL_LEVEL, message
def process_error(xml_file: str) -> str:
if not exists(xml_file):
return "sslscan did not generate the expected XML file"
try:
tree = ET.parse(xml_file)
root = tree.getroot()
error = root.find(".//error")
if error:
return error.text
return ""
except (ET.ParseError, FileNotFoundError, OSError, IOError) as exception:
return "Error parsing sslscan XML output:" + str(exception)
async def process_vulnerabilities(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
tree = ET.parse(xml_file)
root = tree.getroot()
vulnerable_protocols = set()
for protocol in root.findall(".//heartbleed[@vulnerable='1']"):
vulnerable_protocols.add(protocol.get("sslversion"))
if vulnerable_protocols:
message = f"Server is vulnerable to Heartbleed attack via {', '.join(vulnerable_protocols)}"
log_red(message)
yield CRITICAL_LEVEL, message
if root.find(".//compression[@supported='1']"):
message = "Server is vulnerable to CRIME attack (compression is supported)"
log_red(message)
yield CRITICAL_LEVEL, message
if root.find(".//fallback[@supported='1']"):
message = "Server is vulnerable to OpenSSL CCS (CVE-2014-0224)"
log_red(message)
yield CRITICAL_LEVEL, message
renegotiation = root.find(".//renegotiation")
if int(renegotiation.get("supported")) == 0:
message = "Server doesn't support secure renegotiations"
log_orange(message)
yield MEDIUM_LEVEL, message
elif int(renegotiation.get("secure")) == 0:
message = "Server honors client-initiated renegotiations (vulnerable to DoS attacks)"
log_red(message)
yield HIGH_LEVEL, message
def process_cipher_suites(results, version: str):
@ -189,7 +289,7 @@ def process_cipher_suites(results, version: str):
continue
logging.log(
cipher_level_to_color(security_level),
sslscan_level_to_color(security_level),
f"* {accepted_cipher_suite.cipher_suite.name} "
f"{accepted_cipher_suite.cipher_suite.openssl_name} "
# f"{accepted_cipher_suite.cipher_suite.key_size} "
@ -200,146 +300,7 @@ def process_cipher_suites(results, version: str):
for security_level, ciphers in group_by_severity.items():
message = f"The following ciphers are {security_level.lower()} for {version}: {', '.join(sorted(ciphers))}"
yield cipher_level_to_wapiti_level(security_level), message
def analyze(hostname: str, port: int) -> List[Tuple[int, str]]:
results = []
# Define the server that you want to scan
try:
server_location = ServerNetworkLocation(hostname, port)
except ServerHostnameCouldNotBeResolved:
log_red(f"Could not resolve {hostname}")
return results
# Then queue some scan commands for the server
scanner = Scanner()
server_scan_req = ServerScanRequest(
server_location=server_location,
scan_commands={
ScanCommand.CERTIFICATE_INFO,
ScanCommand.SSL_2_0_CIPHER_SUITES,
ScanCommand.SSL_3_0_CIPHER_SUITES,
ScanCommand.TLS_1_0_CIPHER_SUITES,
ScanCommand.TLS_1_1_CIPHER_SUITES,
ScanCommand.TLS_1_2_CIPHER_SUITES,
ScanCommand.TLS_1_3_CIPHER_SUITES,
ScanCommand.ROBOT,
ScanCommand.HEARTBLEED,
ScanCommand.TLS_COMPRESSION,
ScanCommand.TLS_FALLBACK_SCSV,
ScanCommand.TLS_1_3_EARLY_DATA,
ScanCommand.OPENSSL_CCS_INJECTION,
ScanCommand.SESSION_RENEGOTIATION,
ScanCommand.HTTP_HEADERS
},
network_configuration=ServerNetworkConfiguration(
tls_server_name_indication=server_location.hostname,
network_timeout=5,
network_max_retries=2
)
)
scanner.queue_scans([server_scan_req])
# TLS 1.2 / 1.3 results
good_protocols = {
ScanCommand.TLS_1_2_CIPHER_SUITES: "TLS v1.2",
ScanCommand.TLS_1_3_CIPHER_SUITES: "TLS v1.3"
}
# https://blog.mozilla.org/security/2014/10/14/the-poodle-attack-and-the-end-of-ssl-3-0/
# https://blog.qualys.com/product-tech/2018/11/19/grade-change-for-tls-1-0-and-tls-1-1-protocols
bad_protocols = {
ScanCommand.SSL_2_0_CIPHER_SUITES: "SSL v2",
ScanCommand.SSL_3_0_CIPHER_SUITES: "SSL v3",
ScanCommand.TLS_1_0_CIPHER_SUITES: "TLS v1.0",
ScanCommand.TLS_1_1_CIPHER_SUITES: "TLS v1.1"
}
# Then retrieve the results
for result in scanner.get_results():
log_blue(f"\nResults for {result.server_location.hostname}:")
deprecated_protocols = []
if result.connectivity_error_trace:
# Stuff like connection timeout
log_red(result.connectivity_error_trace)
continue
for scan_command in result.scan_result.__annotations__:
scan_results = getattr(result.scan_result, scan_command)
if scan_results.error_reason:
log_red(scan_results.error_reason)
continue
if scan_results.status != ScanCommandAttemptStatusEnum.COMPLETED:
continue
if scan_command == ScanCommand.CERTIFICATE_INFO:
for level, message in process_certificate_info(scan_results.result):
results.append((level, message))
elif scan_command in bad_protocols:
if scan_results.result.accepted_cipher_suites:
deprecated_protocols.append(bad_protocols[scan_command])
elif scan_command == ScanCommand.ROBOT:
if scan_results.result.robot_result in (
RobotScanResultEnum.VULNERABLE_WEAK_ORACLE, RobotScanResultEnum.VULNERABLE_STRONG_ORACLE
):
message = "Server is vulnerable to ROBOT attack"
log_red(message)
results.append((CRITICAL_LEVEL, message))
elif scan_command == ScanCommand.HEARTBLEED:
if scan_results.result.is_vulnerable_to_heartbleed:
message = "Server is vulnerable to Heartbleed attack"
log_red(message)
results.append((CRITICAL_LEVEL, message))
elif scan_command == ScanCommand.TLS_COMPRESSION:
if scan_results.result.supports_compression:
message = "Server is vulnerable to CRIME attack (compression is supported)"
log_red(message)
results.append((CRITICAL_LEVEL, message))
elif scan_command == ScanCommand.TLS_FALLBACK_SCSV:
if not scan_results.result.supports_fallback_scsv:
message = "Server is vulnerable to downgrade attacks (support for TLS_FALLBACK_SCSV is missing)"
log_red(message)
results.append((CRITICAL_LEVEL, message))
elif scan_command == ScanCommand.TLS_1_3_EARLY_DATA:
# https://blog.trailofbits.com/2019/03/25/what-application-developers-need-to-know-about-tls-early-data-0rtt/
if scan_results.result.supports_early_data:
message = "TLS 1.3 Early Data (0RTT) is vulnerable to replay attacks"
log_orange(message)
results.append((MEDIUM_LEVEL, message))
elif scan_command == ScanCommand.OPENSSL_CCS_INJECTION:
if scan_results.result.is_vulnerable_to_ccs_injection:
message = "Server is vulnerable to OpenSSL CCS (CVE-2014-0224)"
log_red(message)
results.append((CRITICAL_LEVEL, message))
elif scan_command == ScanCommand.SESSION_RENEGOTIATION:
if scan_results.result.is_vulnerable_to_client_renegotiation_dos:
message = "Server honors client-initiated renegotiations (vulnerable to DoS attacks)"
log_red(message)
results.append((HIGH_LEVEL, message))
if not scan_results.result.supports_secure_renegotiation:
message = "Server doesn't support secure renegotiations"
log_orange(message)
results.append((MEDIUM_LEVEL, message))
elif scan_command == ScanCommand.HTTP_HEADERS:
if scan_results.result.strict_transport_security_header is None:
message = "Strict Transport Security (HSTS) is not set"
log_red(message)
results.append((HIGH_LEVEL, message))
elif scan_command in good_protocols:
for level, message in process_cipher_suites(scan_results.result, good_protocols[scan_command]):
results.append((level, message))
if deprecated_protocols:
message = "The following protocols are deprecated and/or insecure and should be deactivated: " + \
", ".join(deprecated_protocols)
log_red(message)
results.append((CRITICAL_LEVEL, message))
return results
yield sslscan_level_to_wapiti_level(security_level), message
class ModuleSsl(Attack):
@ -350,8 +311,19 @@ class ModuleSsl(Attack):
Attack.__init__(self, crawler, persister, attack_options, stop_event, crawler_configuration)
# list to ensure only one occurrence per (vulnerable url/post_keys) tuple
self.tested_targets = set()
self.has_sslcan = None
async def must_attack(self, request: Request, response: Optional[Response] = None):
if self.has_sslcan is None:
if shutil.which("sslscan"):
self.has_sslcan = True
else:
log_red("sslscan is not installed or not found in PATH, module will be skipped")
self.has_sslcan = False
if not self.has_sslcan:
return False
if request.scheme != "https":
return False
@ -366,11 +338,96 @@ class ModuleSsl(Attack):
async def attack(self, request: Request, response: Optional[Response] = None):
loop = asyncio.get_running_loop()
# sslyze use threads to launch scanners concurrently, so we put those inside an asyncio executor
scan_results = await loop.run_in_executor(None, analyze, request.hostname, request.port)
scan_results = await loop.run_in_executor(None, self.process_sslscan, request.hostname, request.port)
for level, message in scan_results:
async for level, message in scan_results:
if level == INFO_LEVEL:
await self.add_addition(category=NAME, request=request, info=message, wstg=WSTG_CODE)
else:
await self.add_vuln(level=level, category=NAME, request=request, info=message, wstg=WSTG_CODE)
async def check_hsts(self, hostname: str, port: int) -> int:
"""
Checks if the given hostname supports HSTS.
"""
try:
response = await self.crawler.async_send(Request(f'https://{hostname}:{port}'))
except RequestError:
return -1
return int('strict-transport-security' in response.headers)
async def check_certificate_transparency(self, cert: x509.Certificate) -> int:
"""
Returns 1 if at least one CST exists, 0 otherwise. -1 if an error occurs.
"""
serial_number = cert.serial_number
try:
# crt.sh should be able to provide JSON output, but currently it doesn't work
# moved to that simple check instead
response = await self.crawler.async_send(Request(f'https://crt.sh/?q={serial_number}'))
if response.status != 200:
return -1
if "None found" in response.content:
return 0
return 1
except RequestError:
return -1
async def process_cert_features(self, target: str, port: int) -> AsyncIterator[Tuple[int, str]]:
cert = get_certificate(target, port)
if not check_ev_certificate(cert):
message = "Certificate doesn't use Extended Validation"
log_orange(message)
yield MEDIUM_LEVEL, message
# https://en.wikipedia.org/wiki/OCSP_stapling
if not check_ocsp_must_staple(cert):
message = "OCSP Must-Staple extension is missing"
log_orange(message)
yield MEDIUM_LEVEL, message
has_sct = await self.check_certificate_transparency(cert)
if has_sct > 0:
message = "Certificate transparency: Yes"
log_green(message)
yield INFO_LEVEL, message
elif has_sct == 0:
message = "Certificate transparency: No"
log_red(message)
yield HIGH_LEVEL, message
if await self.check_hsts(target, port) == 0:
message = "Strict Transport Security (HSTS) is not set"
log_red(message)
yield HIGH_LEVEL, message
async def process_sslscan(self, hostname: str, port: int) -> AsyncIterator[Tuple[int, str]]:
with tempfile.NamedTemporaryFile("r", suffix=".xml", delete=False) as temp_file:
sslscan_command = ["sslscan", "--iana-names", "--ocsp", f"--xml={temp_file.name}", f"{hostname}:{port}"]
try:
subprocess.run(sslscan_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
except subprocess.CalledProcessError as exception:
log_red("Error running sslscan: " + str(exception))
return
error = process_error(temp_file.name)
if error:
log_red(error)
else:
async for info in process_cert_info(temp_file.name):
yield info
try:
async for info in self.process_cert_features(hostname, port):
yield info
except ssl.SSLError:
log_red("Could not get extra information about the certificate due to SSL errors")
except (socket.timeout, socket.gaierror):
log_red("Could not get extra information about the certificate due to network errors")
async for info in process_vulnerabilities(temp_file.name):
yield info
async for info in process_cipher_suites2(temp_file.name):
yield info
async for info in process_bad_protocols(temp_file.name):
yield info
os.unlink(temp_file.name)